Source code for kwave.kWaveSimulation

import logging
from dataclasses import dataclass

import numpy as np

from kwave.data import Vector
from kwave.kWaveSimulation_helper import (
    display_simulation_params,
    set_sound_speed_ref,
    expand_grid_matrices,
    create_absorption_variables,
    scale_source_terms_func,
)
from kwave.kgrid import kWaveGrid
from kwave.kmedium import kWaveMedium
from kwave.ksensor import kSensor
from kwave.ksource import kSource
from kwave.ktransducer import NotATransducer
from kwave.options.simulation_options import SimulationOptions, SimulationType
from kwave.recorder import Recorder
from kwave.utils.checks import check_stability
from kwave.utils.colormap import get_color_map
from kwave.utils.conversion import cast_to_type, cart2grid
from kwave.utils.data import get_smallest_possible_type, get_date_string
from kwave.utils.dotdictionary import dotdict
from kwave.utils.filters import smooth
from kwave.utils.matlab import matlab_find, matlab_mask
from kwave.utils.matrix import num_dim2


[docs] @dataclass class kWaveSimulation(object):
[docs] def __init__( self, kgrid: kWaveGrid, source: kSource, sensor: NotATransducer, medium: kWaveMedium, simulation_options: SimulationOptions ): self.precision = None self.kgrid = kgrid self.medium = medium self.source = source self.sensor = sensor self.options = simulation_options # ========================================================================= # FLAGS WHICH DEPEND ON USER INPUTS (THESE SHOULD NOT BE MODIFIED) # ========================================================================= # flags which control the characteristics of the sensor #: Whether time reversal simulation is enabled # check if performing time reversal, and replace inputs to explicitly use a # source with a dirichlet boundary condition if self.sensor.time_reversal_boundary_data is not None: # define a new source structure source = {"p_mask": self.sensor.p_mask, "p": np.flip(self.sensor.time_reversal_boundary_data, 2), "p_mode": "dirichlet"} # define a new sensor structure Nx, Ny, Nz = self.kgrid.Nx, self.kgrid.Ny, self.kgrid.Nz sensor = kSensor(mask=np.ones((Nx, Ny, max(1, Nz))), record=["p_final"]) # set time reversal flag self.userarg_time_rev = True else: # set time reversal flag self.userarg_time_rev = False #: Whether sensor.mask should be re-ordered. #: True if sensor.mask is Cartesian with nearest neighbour interpolation which is calculated using a binary mask #: and thus must be re-ordered self.reorder_data = False #: Whether the sensor.mask is binary self.binary_sensor_mask = True # check if the sensor mask is defined as a list of cuboid corners if self.sensor.mask is not None and self.sensor.mask.shape[0] == (2 * self.kgrid.dim): self.userarg_cuboid_corners = True else: self.userarg_cuboid_corners = False #: If tse sensor is an object of the kWaveTransducer class self.transducer_sensor = False self.record = Recorder() # transducer source flags #: transducer is object of kWaveTransducer class self.transducer_source = False #: Apply receive elevation focus on the transducer self.transducer_receive_elevation_focus = False # general self.COLOR_MAP = get_color_map() #: default color map self.ESTIMATE_SIM_TIME_STEPS = 50 #: time steps used to estimate simulation time self.HIGHEST_PRIME_FACTOR_WARNING = 7 #: largest prime factor before warning self.KSPACE_CFL = 0.3 #: default CFL value used if kgrid.t_array is set to 'auto' self.PSTD_CFL = 0.1 #: default CFL value used if kgrid.t_array is set to 'auto' # source types self.SOURCE_S_MODE_DEF = "additive" #: source mode for stress sources self.SOURCE_P_MODE_DEF = "additive" #: source mode for pressure sources self.SOURCE_U_MODE_DEF = "additive" #: source mode for velocity sources # filenames self.STREAM_TO_DISK_FILENAME = "temp_sensor_data.bin" #: default disk stream filename self.LOG_NAME = ["k-Wave-Log-", get_date_string()] #: default log filename self.calling_func_name = None logging.log(logging.INFO, f" start time: {get_date_string()}") self.c_ref, self.c_ref_compression, self.c_ref_shear = [None] * 3 self.transducer_input_signal = None #: Indexing variable corresponding to the location of all the pressure source elements self.p_source_pos_index = None #: Indexing variable corresponding to the location of all the velocity source elements self.u_source_pos_index = None #: Indexing variable corresponding to the location of all the stress source elements self.s_source_pos_index = None #: Delay mask that accounts for the beamforming delays and elevation focussing self.delay_mask = None self.absorb_nabla1 = None #: absorbing fractional Laplacian operator self.absorb_tau = None #: absorbing fractional Laplacian coefficient self.absorb_nabla2 = None #: dispersive fractional Laplacian operator self.absorb_eta = None #: dispersive fractional Laplacian coefficient self.dt = None #: Alias to kgrid.dt self.rho0 = None #: Alias to medium.density self.c0 = None #: Alias to medium.sound_speed self.index_data_type = None
@property def equation_of_state(self): """ Returns: Set equation of state variable """ if self.medium.absorbing: if self.medium.stokes: return "stokes" else: return "absorbing" else: return "loseless" @property def use_sensor(self): """ Returns: False if no output of any kind is required """ return self.sensor is not None @property def blank_sensor(self): """ Returns True if sensor.mask is not defined but _max_all or _final variables are still recorded """ fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"] if not (isinstance(self.sensor, NotATransducer) or any(self.record.is_set(fields)) or self.time_rev): return True return False @property def kelvin_voigt_model(self): """ Returns: Whether the simulation is elastic with absorption """ return False @property def nonuniform_grid(self): """ Returns: True if the computational grid is non-uniform """ return self.kgrid.nonuniform @property def time_rev(self): """ Returns: True for time reversal simulaions using sensor.time_reversal_boundary_data """ if self.sensor is not None and not isinstance(self.sensor, NotATransducer): if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None: return True else: return self.userarg_time_rev @property def elastic_time_rev(self): """ Returns: True if using time reversal with the elastic code """ return False @property def compute_directivity(self): """ Returns: True if directivity calculations in 2D are used by setting sensor.directivity_angle """ if self.sensor is not None and not isinstance(self.sensor, NotATransducer): if self.kgrid.dim == 2: # check for sensor directivity input and set flag directivity = self.sensor.directivity if directivity is not None and directivity.angle is not None: return True return False @property def cuboid_corners(self): """ Returns: Whether the sensor.mask is a list of cuboid corners """ if self.sensor is not None and not isinstance(self.sensor, NotATransducer): if not self.blank_sensor and self.sensor.mask.shape[0] == 2 * self.kgrid.dim: return True return self.userarg_cuboid_corners ############## # flags which control the types of source used ############## @property def source_p0(self): # initial pressure """ Returns: Whether initial pressure source is present (default=False) """ flag = False # default if not isinstance(self.source, NotATransducer) and self.source.p0 is not None: # set flag flag = True return flag @property def source_p0_elastic(self): # initial pressure in the elastic code """ Returns: Whether initial pressure source is present in the elastic code (default=False) """ # Not clear where this flag is set return False @property def source_p(self): """ Returns: Whether time-varying pressure source is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.p is not None: # set source flag to the length of the source, this allows source.p # to be shorter than kgrid.Nt flag = len(self.source.p[0]) return flag @property def source_p_labelled(self): # time-varying pressure with labelled source mask """ Returns: True/False if labelled/binary source mask, respectively. """ flag = False if not isinstance(self.source, NotATransducer) and self.source.p is not None: # check if the mask is binary or labelled p_unique = np.unique(self.source.p_mask) flag = not (p_unique.size <= 2 and p_unique.sum() == 1) return flag @property def source_ux(self) -> bool: """ Returns: Whether time-varying particle velocity source is used in X-direction """ flag = False if not isinstance(self.source, NotATransducer) and self.source.ux is not None: # set source flgs to the length of the sources, this allows the # inputs to be defined independently and be of any length flag = len(self.source.ux[0]) return flag @property def source_uy(self) -> bool: """ Returns: Whether time-varying particle velocity source is used in Y-direction """ flag = False if not isinstance(self.source, NotATransducer) and self.source.uy is not None: # set source flgs to the length of the sources, this allows the # inputs to be defined independently and be of any length flag = len(self.source.uy[0]) return flag @property def source_uz(self) -> bool: """ Returns: Whether time-varying particle velocity source is used in Z-direction """ flag = False if not isinstance(self.source, NotATransducer) and self.source.uz is not None: # set source flgs to the length of the sources, this allows the # inputs to be defined independently and be of any length flag = len(self.source.uz[0]) return flag @property def source_u_labelled(self): """ Returns: Whether time-varying velocity source with labelled source mask is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.u_mask is not None: # check if the mask is binary or labelled u_unique = np.unique(self.source.u_mask) if u_unique.size <= 2 and u_unique.sum() == 1: # binary source mask flag = False else: # labelled source mask flag = True return flag @property def source_sxx(self): """ Returns: Whether time-varying stress source in X->X direction is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.sxx is not None: flag = len(self.source.sxx[0]) return flag @property def source_syy(self): """ Returns: Whether time-varying stress source in Y->Y direction is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.syy is not None: flag = len(self.source.syy[0]) return flag @property def source_szz(self): """ Returns: Whether time-varying stress source in Z->Z direction is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.szz is not None: flag = len(self.source.szz[0]) return flag @property def source_sxy(self): """ Returns: Whether time-varying stress source in X->Y direction is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.sxy is not None: flag = len(self.source.sxy[0]) return flag @property def source_sxz(self): """ Returns: Whether time-varying stress source in X->Z direction is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.sxz is not None: flag = len(self.source.sxz[0]) return flag @property def source_syz(self): """ Returns: Whether time-varying stress source in Y->Z direction is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.syz is not None: flag = len(self.source.syz[0]) return flag @property def source_s_labelled(self): """ Returns: Whether time-varying stress source with labelled source mask is present (default=False) """ flag = False if not isinstance(self.source, NotATransducer) and self.source.s_mask is not None: # check if the mask is binary or labelled s_unique = np.unique(self.source.s_mask) if s_unique.size <= 2 and s_unique.sum() == 1: # binary source mask flag = False else: # labelled source mask flag = True return flag @property def use_w_source_correction_p(self): """ Returns: Whether to use the w source correction instead of the k-space source correction for pressure sources """ flag = False if not isinstance(self.source, NotATransducer): if self.source.p is not None and self.source.p_frequency_ref is not None: flag = True return flag @property def use_w_source_correction_u(self): """ Returns: Whether to use the w source correction instead of the k-space source correction for velocity sources """ flag = False if not isinstance(self.source, NotATransducer): if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]): if self.source.u_frequency_ref is not None: flag = True return flag
[docs] def input_checking(self, calling_func_name) -> None: """ Check the input fields for correctness and validness Args: calling_func_name: Name of the script that calls this function Returns: None """ self.calling_func_name = calling_func_name k_dim = self.kgrid.dim self.check_calling_func_name_and_dim(calling_func_name, k_dim) # run subscript to check optional inputs self.options = SimulationOptions.option_factory(self.kgrid, self.options) opt = self.options # TODO(Walter): clean this up with getters in simulation options pml size pml_x_size, pml_y_size, pml_z_size = opt.pml_x_size, opt.pml_y_size, opt.pml_z_size pml_size = Vector([pml_x_size, pml_y_size, pml_z_size]) is_elastic_code = opt.simulation_type.is_elastic_simulation() self.print_start_status(is_elastic_code=is_elastic_code) self.set_index_data_type() user_medium_density_input = self.check_medium(self.medium, self.kgrid.k, simulation_type=opt.simulation_type) # select the reference sound speed used in the k-space operator self.c_ref, self.c_ref_compression, self.c_ref_shear = set_sound_speed_ref(self.medium, opt.simulation_type) self.check_source(k_dim, self.kgrid.Nt) self.check_sensor(k_dim) self.check_kgrid_time() self.precision = self.select_precision(opt) self.check_input_combinations(opt, user_medium_density_input, k_dim, pml_size, self.kgrid.N) # run subscript to display time step, max supported frequency etc. display_simulation_params(self.kgrid, self.medium, is_elastic_code) self.smooth_and_enlarge(self.source, k_dim, Vector(self.kgrid.N), opt) self.create_sensor_variables() self.create_absorption_vars() self.assign_pseudonyms(self.medium, self.kgrid) self.scale_source_terms(opt.scale_source_terms) self.create_pml_indices( kgrid_dim=self.kgrid.dim, kgrid_N=Vector(self.kgrid.N), pml_size=pml_size, pml_inside=opt.pml_inside, is_axisymmetric=opt.simulation_type.is_axisymmetric(), )
[docs] @staticmethod def check_calling_func_name_and_dim(calling_func_name, kgrid_dim) -> None: """ Check correct function has been called for the dimensionality of kgrid Args: calling_func_name: Name of the script that makes calls to kWaveSimulation kgrid_dim: Dimensionality of the kWaveGrid Returns: None """ assert not calling_func_name.startswith(("pstdElastic", "kspaceElastic")), "Elastic simulation is not supported." if calling_func_name == "kspaceFirstOrder1D": assert kgrid_dim == 1, f"kgrid has the wrong dimensionality for {calling_func_name}." elif calling_func_name in ["kspaceFirstOrder2D", "pstdElastic2D", "kspaceElastic2D", "kspaceFirstOrderAS"]: assert kgrid_dim == 2, f"kgrid has the wrong dimensionality for {calling_func_name}." elif calling_func_name in ["kspaceFirstOrder3D", "pstdElastic3D", "kspaceElastic3D"]: assert kgrid_dim == 3, f"kgrid has the wrong dimensionality for {calling_func_name}."
[docs] @staticmethod def print_start_status(is_elastic_code: bool) -> None: """ Update command-line status with the start time Args: is_elastic_code: is the simulation elastic Returns: None """ if is_elastic_code: # pragma: no cover logging.log(logging.INFO, "Running k-Wave elastic simulation...") else: logging.log(logging.INFO, "Running k-Wave simulation...") logging.log(logging.INFO, f" start time: {get_date_string()}")
[docs] def set_index_data_type(self) -> None: """ Pre-calculate the data type needed to store the matrix indices given the total number of grid points: indexing variables will be created using this data type to save memory Returns: None """ total_grid_points = self.kgrid.total_grid_points self.index_data_type = get_smallest_possible_type(total_grid_points, "uint", default="double")
[docs] @staticmethod def check_medium(medium, kgrid_k, simulation_type: SimulationType) -> bool: """ Check the properties of the medium structure for correctness and validity Args: medium: kWaveMedium instance kgrid_k: kWaveGrid.k matrix is_elastic: Whether the simulation is elastic is_axisymmetric: Whether the simulation is axisymmetric Returns: Medium Density """ # if using the fluid code, allow the density field to be blank if the medium is homogeneous if (not simulation_type.is_elastic_simulation()) and medium.density is None and medium.sound_speed.size == 1: user_medium_density_input = False medium.density = 1 else: medium.ensure_defined("density") user_medium_density_input = True # check medium absorption inputs for the fluid code is_absorbing = any(medium.is_defined("alpha_coeff", "alpha_power")) is_stokes = simulation_type.is_axisymmetric() or medium.alpha_mode == "stokes" medium.set_absorbing(is_absorbing, is_stokes) if is_absorbing: medium.check_fields(kgrid_k.shape) return user_medium_density_input
[docs] def check_sensor(self, kgrid_dim) -> None: """ Check the Sensor properties for correctness and validity Args: k_dim: kWaveGrid dimensionality Returns: None """ # ========================================================================= # CHECK SENSOR STRUCTURE INPUTS # ========================================================================= # check sensor fields if self.sensor is not None: # check the sensor input is valid # TODO FARID move this check as a type checking assert isinstance( self.sensor, (kSensor, NotATransducer) ), "sensor must be defined as an object of the kSensor or kWaveTransducer class." # check if sensor is a transducer, otherwise check input fields if not isinstance(self.sensor, NotATransducer): if kgrid_dim == 2: # check for sensor directivity input and set flag directivity = self.sensor.directivity if directivity is not None and self.sensor.directivity.angle is not None: # make sure the sensor mask is not blank assert self.sensor.mask is not None, "The mask must be defined for the sensor" # check sensor.directivity.pattern and sensor.mask have the same size assert ( directivity.angle.shape == self.sensor.mask.shape ), "sensor.directivity.angle and sensor.mask must be the same size." # check if directivity size input exists, otherwise make it # a constant times kgrid.dx if directivity.size is None: directivity.set_default_size(self.kgrid) # find the unique directivity angles # assign the wavenumber vectors directivity.set_unique_angles(self.sensor.mask) directivity.set_wavenumbers(self.kgrid) # check for time reversal inputs and set flags if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None: self.record.p = False # check for sensor.record and set usage flgs - if no flgs are # given, the time history of the acoustic pressure is recorded by # default if self.sensor.record is not None: # check for time reversal data if self.time_rev: logging.log(logging.WARN, "sensor.record is not used for time reversal reconstructions") # check the input is a cell array assert isinstance(self.sensor.record, list), 'sensor.record must be given as a list, e.g. ["p", "u"]' # check the sensor record flgs self.record.set_flags_from_list(self.sensor.record, self.options.simulation_type.is_elastic_simulation()) # enforce the sensor.mask field unless just recording the max_all # and _final variables fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"] if any(self.record.is_set(fields)): assert self.sensor.mask is not None # check if sensor mask is a binary grid, a set of cuboid corners, # or a set of Cartesian interpolation points if not self.blank_sensor: if (kgrid_dim == 3 and num_dim2(self.sensor.mask) == 3) or ( kgrid_dim != 3 and (self.sensor.mask.shape == self.kgrid.k.shape) ): # check the grid is binary assert self.sensor.mask.sum() == ( self.sensor.mask.size - (self.sensor.mask == 0).sum() ), "sensor.mask must be a binary grid (numeric values must be 0 or 1)." # check the grid is not empty assert self.sensor.mask.sum() != 0, "sensor.mask must be a binary grid with at least one element set to 1." elif self.sensor.mask.shape[0] == 2 * kgrid_dim: # make sure the points are integers assert np.all(self.sensor.mask % 1 == 0), "sensor.mask cuboid corner indices must be integers." # store a copy of the cuboid corners self.record.cuboid_corners_list = self.sensor.mask # check the list makes sense if np.any(self.sensor.mask[self.kgrid.dim :, :] - self.sensor.mask[: self.kgrid.dim, :] < 0): if kgrid_dim == 1: raise ValueError("sensor.mask cuboid corners must be defined " "as [x1, x2; ...]." " where x2 => x1, etc.") elif kgrid_dim == 2: raise ValueError( "sensor.mask cuboid corners must be defined " "as [x1, y1, x2, y2; ...]." " where x2 => x1, etc." ) elif kgrid_dim == 3: raise ValueError( "sensor.mask cuboid corners must be defined" " as [x1, y1, z1, x2, y2, z2; ...]." " where x2 => x1, etc." ) # check the list are within bounds if np.any(self.sensor.mask < 1): raise ValueError("sensor.mask cuboid corners must be within the grid.") else: if kgrid_dim == 1: if np.any(self.sensor.mask > self.kgrid.Nx): raise ValueError("sensor.mask cuboid corners must be within the grid.") elif kgrid_dim == 2: if np.any(self.sensor.mask[[0, 2], :] > self.kgrid.Nx) or np.any( self.sensor.mask[[1, 3], :] > self.kgrid.Ny ): raise ValueError("sensor.mask cuboid corners must be within the grid.") elif kgrid_dim == 3: if ( np.any(self.sensor.mask[[0, 3], :] > self.kgrid.Nx) or np.any(self.sensor.mask[[1, 4], :] > self.kgrid.Ny) or np.any(self.sensor.mask[[2, 5], :] > self.kgrid.Nz) ): raise ValueError("sensor.mask cuboid corners must be within the grid.") # create a binary mask for display from the list of corners # TODO FARID mask should be option_factory in sensor not here self.sensor.mask = np.zeros_like(self.kgrid.k, dtype=bool) cuboid_corners_list = self.record.cuboid_corners_list for cuboid_index in range(cuboid_corners_list.shape[1]): if self.kgrid.dim == 1: self.sensor.mask[cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[1, cuboid_index]] = 1 if self.kgrid.dim == 2: self.sensor.mask[ cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[2, cuboid_index], cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[3, cuboid_index], ] = 1 if self.kgrid.dim == 3: self.sensor.mask[ cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[3, cuboid_index], cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[4, cuboid_index], cuboid_corners_list[2, cuboid_index] : cuboid_corners_list[5, cuboid_index], ] = 1 else: # check the Cartesian sensor mask is the correct size # (1 x N, 2 x N, 3 x N) assert ( self.sensor.mask.shape[0] == kgrid_dim and num_dim2(self.sensor.mask) <= 2 ), f"Cartesian sensor.mask for a {kgrid_dim}D simulation must be given as a {kgrid_dim} by N array." # set Cartesian mask flag (this is modified in # createStorageVariables if the interpolation setting is # set to nearest) self.binary_sensor_mask = False # extract Cartesian data from sensor mask if kgrid_dim == 1: # align sensor data as a column vector to be the # same as kgrid.x_vec so that calls to interp1 # return data in the correct dimension self.sensor_x = np.reshape((self.sensor.mask, (-1, 1))) # add sensor_x to the record structure for use with # the _extractSensorData subfunction self.record.sensor_x = self.sensor_x "record.sensor_x = sensor_x;" elif kgrid_dim == 2: self.sensor_x = self.sensor.mask[0, :] self.sensor_y = self.sensor.mask[1, :] elif kgrid_dim == 3: self.sensor_x = self.sensor.mask[0, :] self.sensor_y = self.sensor.mask[1, :] self.sensor_z = self.sensor.mask[2, :] # compute an equivalent sensor mask using nearest neighbour # interpolation, if flgs.time_rev = false and # cartesian_interp = 'linear' then this is only used for # display, if flgs.time_rev = true or cartesian_interp = # 'nearest' this grid is used as the sensor.mask self.sensor.mask, self.order_index, self.reorder_index = cart2grid( self.kgrid, self.sensor.mask, self.options.simulation_type.is_axisymmetric() ) # if in time reversal mode, reorder the p0 input data in # the order of the binary sensor_mask if self.time_rev: raise NotImplementedError """ # append the reordering data new_col_pos = length(sensor.time_reversal_boundary_data(1, :)) + 1; sensor.time_reversal_boundary_data(:, new_col_pos) = order_index; # reorder p0 based on the order_index sensor.time_reversal_boundary_data = sort_rows(sensor.time_reversal_boundary_data, new_col_pos); # remove the reordering data sensor.time_reversal_boundary_data = sensor.time_reversal_boundary_data(:, 1:new_col_pos - 1); """ else: # set transducer sensor flag self.transducer_sensor = True self.record.p = False # check to see if there is an elevation focus if not np.isinf(self.sensor.elevation_focus_distance): # set flag self.transducer_receive_elevation_focus = True # get the elevation mask that is used to extract the correct values # from the sensor data buffer for averaging self.transducer_receive_mask = self.sensor.elevation_beamforming_mask # check for directivity inputs with time reversal if kgrid_dim == 2 and self.use_sensor and self.compute_directivity and self.time_rev: logging.log(logging.WARN, "sensor directivity fields are not used for time reversal.")
[docs] def check_source(self, k_dim, k_Nt) -> None: """ Check the source properties for correctness and validity Args: kgrid_dim: kWaveGrid dimension k_Nt: Number of time steps in kWaveGrid Returns: None """ # ========================================================================= # CHECK SOURCE STRUCTURE INPUTS # ========================================================================= # check source inputs if not isinstance(self.source, (kSource, NotATransducer)): # allow an invalid or empty source input if computing time reversal, # otherwise return error assert self.time_rev, "source must be defined as an object of the kSource or kWaveTransducer classes." elif not isinstance(self.source, NotATransducer): # -------------------------- # SOURCE IS NOT A TRANSDUCER # -------------------------- """ check allowable source types Depending on the kgrid dimensionality and the simulation type, following fields are allowed & might be use: kgrid.dim == 1: non-elastic code: ['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'u_mask', 'u_mode', 'u_frequency_ref'] kgrid.dim == 2: non-elastic code: ['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'u_mask', 'u_mode', 'u_frequency_ref'] elastic code: ['p0', 'sxx', 'syy', 'sxy', 's_mask', 's_mode', 'ux', 'uy', 'u_mask', 'u_mode'] kgrid.dim == 3: non-elastic code: ['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'uz', 'u_mask', 'u_mode', 'u_frequency_ref'] elastic code: ['p0', 'sxx', 'syy', 'szz', 'sxy', 'sxz', 'syz', 's_mask', 's_mode', 'ux', 'uy', 'uz', 'u_mask', 'u_mode'] """ self.source.validate(self.kgrid) # check for a time varying pressure source input if self.source.p is not None: # check the source mode input is valid if self.source.p_mode is None: self.source.p_mode = self.SOURCE_P_MODE_DEF if self.source_p > k_Nt: logging.log(logging.WARN, " source.p has more time points than kgrid.Nt, remaining time points will not be used.") # create an indexing variable corresponding to the location of all the source elements self.p_source_pos_index = matlab_find(self.source.p_mask) # check if the mask is binary or labelled p_unique = np.unique(self.source.p_mask) # create a second indexing variable if p_unique.size <= 2 and p_unique.sum() == 1: # set signal index to all elements self.p_source_sig_index = ":" else: # set signal index to the labels (this allows one input signal # to be used for each source label) self.p_source_sig_index = self.source.p_mask(self.source.p_mask != 0) # convert the data type depending on the number of indices self.p_source_pos_index = cast_to_type(self.p_source_pos_index, self.index_data_type) if self.source_p_labelled: self.p_source_sig_index = cast_to_type(self.p_source_sig_index, self.index_data_type) # check for time varying velocity source input and set source flag if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]): # check the source mode input is valid if self.source.u_mode is None: self.source.u_mode = self.SOURCE_U_MODE_DEF # create an indexing variable corresponding to the location of all # the source elements self.u_source_pos_index = matlab_find(self.source.u_mask) # check if the mask is binary or labelled u_unique = np.unique(self.source.u_mask) # create a second indexing variable if u_unique.size <= 2 and u_unique.sum() == 1: # set signal index to all elements self.u_source_sig_index = ":" else: # set signal index to the labels (this allows one input signal # to be used for each source label) self.u_source_sig_index = self.source.u_mask[self.source.u_mask != 0] # convert the data type depending on the number of indices self.u_source_pos_index = cast_to_type(self.u_source_pos_index, self.index_data_type) if self.source_u_labelled: self.u_source_sig_index = cast_to_type(self.u_source_sig_index, self.index_data_type) # check for time varying stress source input and set source flag if any([(getattr(self.source, k) is not None) for k in ["sxx", "syy", "szz", "sxy", "sxz", "syz", "s_mask"]]): # create an indexing variable corresponding to the location of all # the source elements raise NotImplementedError "s_source_pos_index = find(source.s_mask != 0);" # check if the mask is binary or labelled "s_unique = unique(source.s_mask);" # create a second indexing variable if eng.eval("numel(s_unique) <= 2 && sum(s_unique) == 1"): # noqa: F821 # set signal index to all elements eng.workspace["s_source_sig_index"] = ":" # noqa: F821 else: # set signal index to the labels (this allows one input signal # to be used for each source label) s_source_sig_index = source.s_mask(source.s_mask != 0) # noqa f"s_source_pos_index = {self.index_data_type}(s_source_pos_index);" if self.source_s_labelled: f"s_source_sig_index = {self.index_data_type}(s_source_sig_index);" else: # ---------------------- # SOURCE IS A TRANSDUCER # ---------------------- # if the sensor is a transducer, check that the simulation is in 3D assert k_dim == 3, "Transducer inputs are only compatible with 3D simulations." # get the input signal - this is appended with zeros if required to # account for the beamforming delays (this will throw an error if the # input signal is not defined) self.transducer_input_signal = self.source.input_signal # get the delay mask that accounts for the beamforming delays and # elevation focussing; this is used so that a single time series can be # applied to the complete transducer mask with different delays delay_mask = self.source.delay_mask() # set source flag - this should be the length of signal minus the # maximum delay self.transducer_source = self.transducer_input_signal.size - delay_mask.max() # get the active elements mask active_elements_mask = self.source.active_elements_mask # get the apodization mask if not set to 'Rectangular' and convert to a # linear array if self.source.transmit_apodization == "Rectangular": self.transducer_transmit_apodization = 1 else: self.transducer_transmit_apodization = self.source.transmit_apodization_mask self.transducer_transmit_apodization = self.transducer_transmit_apodization[active_elements_mask != 0] # create indexing variable corresponding to the active elements # and convert the data type depending on the number of indices self.u_source_pos_index = matlab_find(active_elements_mask).astype(self.index_data_type) # convert the delay mask to an indexing variable (this doesn't need to # be modified if the grid is expanded) which tells each point in the # source mask which point in the input_signal should be used delay_mask = matlab_mask(delay_mask, active_elements_mask != 0) # compatibility # convert the data type depending on the maximum value of the delay # mask and the length of the source smallest_type = get_smallest_possible_type(delay_mask.max(), "uint") if smallest_type is not None: delay_mask = delay_mask.astype(smallest_type) # move forward by 1 as a delay of 0 corresponds to the first point in the input signal self.delay_mask = delay_mask + 1 # clean up unused variables del active_elements_mask
[docs] def check_kgrid_time(self) -> None: """ Check time-related kWaveGrid inputs Returns: None """ # check kgrid for t_array existance, and create if not defined if isinstance(self.kgrid.t_array, str) and self.kgrid.t_array == "auto": # check for time reversal mode if self.time_rev: raise ValueError("kgrid.t_array (Nt and dt) must be defined explicitly in time reversal mode.") # check for time varying sources if (not self.source_p0_elastic) and ( self.source_p or self.source_ux or self.source_uy or self.source_uz or self.source_sxx or self.source_syy or self.source_szz or self.source_sxy or self.source_sxz or self.source_syz ): raise ValueError("kgrid.t_array (Nt and dt) must be defined explicitly when using a time-varying source.") # create the time array using the compressional sound speed self.kgrid.makeTime(self.medium.sound_speed, self.KSPACE_CFL) # check kgrid.t_array for stability given medium properties if not self.options.simulation_type.is_elastic_simulation(): # calculate the largest timestep for which the model is stable dt_stability_limit = check_stability(self.kgrid, self.medium) # give a warning if the timestep is larger than stability limit allows if self.kgrid.dt > dt_stability_limit: logging.log(logging.WARN, " time step may be too large for a stable simulation.")
[docs] @staticmethod def select_precision(opt: SimulationOptions): """ Select the minimal precision for storing the data Args: opt: SimulationOptions instance Returns: Minimal precision for variable allocation """ # set storage variable type based on data_cast - this enables the # output variables to be directly created in the data_cast format, # rather than creating them in double precision and then casting them if opt.data_cast == "off": precision = "double" elif opt.data_cast == "single": precision = "single" elif opt.data_cast == "gsingle": precision = "single" elif opt.data_cast == "gdouble": precision = "double" elif opt.data_cast == "gpuArray": raise NotImplementedError("gpuArray is not supported in Python-version") elif opt.data_cast == "kWaveGPUsingle": precision = "single" elif opt.data_cast == "kWaveGPUdouble": precision = "double" else: raise ValueError("'Unknown ''DataCast'' option'") return precision
[docs] def check_input_combinations(self, opt: SimulationOptions, user_medium_density_input: bool, k_dim, pml_size, kgrid_N) -> None: """ Check the input combinations for correctness and validity Args: opt: SimulationOptions instance user_medium_density_input: Medium Density k_dim: kWaveGrid dimensionality pml_size: Size of the PML kgrid_N: kWaveGrid size in each direction Returns: None """ # ========================================================================= # CHECK FOR VALID INPUT COMBINATIONS # ========================================================================= # enforce density input if velocity sources or output are being used if not user_medium_density_input and ( self.source_ux or self.source_uy or self.source_uz or self.record.u or self.record.u_max or self.record.u_rms ): raise ValueError( "medium.density must be explicitly defined " "if velocity inputs or outputs are used, even in homogeneous media." ) # TODO(walter): move to check medium # enforce density input if nonlinear equations are being used if not user_medium_density_input and self.medium.is_nonlinear(): raise ValueError("medium.density must be explicitly defined if medium.BonA is specified.") # check sensor compatability options for flgs.compute_directivity if self.use_sensor and k_dim == 2 and self.compute_directivity and not self.binary_sensor_mask and opt.cartesian_interp == "linear": raise ValueError( "sensor directivity fields are only compatible " "with binary sensor masks or " "CartInterp" " set to " "nearest" "." ) # check for split velocity output if self.record.u_split_field and not self.binary_sensor_mask: raise ValueError("The option sensor.record = {" "u_split_field" "} is only compatible " "with a binary sensor mask.") # check input options for data streaming ***** if opt.stream_to_disk: if not self.use_sensor or self.time_rev: raise ValueError( "The optional input " "StreamToDisk" " is currently only compatible " "with forward simulations using a non-zero sensor mask." ) elif self.sensor.record is not None and self.sensor.record.ismember(self.record.flags[1:]).any(): raise ValueError( "The optional input " "StreamToDisk" " is currently only compatible " "with sensor.record = {" "p" "} (the default)." ) is_axisymmetric = self.options.simulation_type.is_axisymmetric() # make sure the PML size is smaller than the grid if PMLInside is true if opt.pml_inside and ( (k_dim == 1 and (pml_size.x * 2 > self.kgrid.Nx)) or (k_dim == 2 and not is_axisymmetric and ((pml_size.x * 2 > kgrid_N[0]) or (pml_size.y * 2 > kgrid_N[1]))) or (k_dim == 2 and is_axisymmetric and ((pml_size.x * 2 > kgrid_N[0]) or (pml_size.y > kgrid_N[1]))) or (k_dim == 3 and ((pml_size.x * 2 > kgrid_N[0]) or (pml_size.x * 2 > kgrid_N[1]) or (pml_size.z * 2 > kgrid_N[2]))) ): raise ValueError("The size of the PML must be smaller than the size of the grid.") # make sure the PML is inside if using a non-uniform grid if self.nonuniform_grid and not opt.pml_inside: raise ValueError("''PMLInside'' must be true for simulations using non-uniform grids.") # check for compatible input options if saving to disk # modified by Farid | disabled temporarily! # if k_dim == 3 and isinstance(self.options.save_to_disk, str) and # (not self.use_sensor or not self.binary_sensor_mask or self.time_rev): # raise ValueError('The optional input ''SaveToDisk'' is currently only compatible # with forward simulations using a non-zero binary sensor mask.') # check the record start time is within range record_start_index = self.sensor.record_start_index if self.use_sensor and ((record_start_index > self.kgrid.Nt) or (record_start_index < 1)): raise ValueError("sensor.record_start_index must be between 1 and the number of time steps.") # ensure 'WSWA' symmetry if using axisymmetric code with 'SaveToDisk' if is_axisymmetric and self.options.radial_symmetry != "WSWA" and isinstance(self.options.save_to_disk, str): # display a warning only if using WSWS symmetry (not WSWA-FFT) if self.options.radial_symmetry.startswith("WSWS"): logging.log( logging.WARN, " Optional input " "RadialSymmetry" " changed to " "WSWA" " for compatability with " "SaveToDisk" "." ) # update setting self.options.radial_symmetry = "WSWA" # ensure p0 smoothing is switched off if p0 is empty if not self.source_p0: self.options.smooth_p0 = False # start log if required if opt.create_log: raise NotImplementedError(f"diary({self.LOG_NAME}.txt');") # update command line status if self.time_rev: logging.log(logging.INFO, " time reversal mode") # cleanup unused variables for k in list(self.__dict__.keys()): if k.endswith("_DEF"): delattr(self, k)
[docs] def smooth_and_enlarge(self, source, k_dim, kgrid_N, opt: SimulationOptions) -> None: """ Smooth and enlarge grids Args: source: kWaveSource instance k_dim: kWaveGrid dimensionality kgrid_N: kWaveGrid size in each direction opt: SimulationOptions Returns: None """ # smooth the initial pressure distribution p0 if required, and then restore # the maximum magnitude # NOTE 1: if p0 has any values at the edge of the domain, the smoothing # may cause part of p0 to wrap to the other side of the domain # NOTE 2: p0 is smoothed before the grid is expanded to ensure that p0 is # exactly zero within the PML # NOTE 3: for the axisymmetric code, p0 is smoothed assuming WS origin # symmetry if self.source_p0 and self.options.smooth_p0: # update command line status logging.log(logging.INFO, " smoothing p0 distribution...") if self.options.simulation_type.is_axisymmetric(): if self.options.radial_symmetry in ["WSWA-FFT", "WSWA"]: # create a new kWave grid object with expanded radial grid kgrid_exp = kWaveGrid([kgrid_N.x, kgrid_N.y * 4], [self.kgrid.dx, self.kgrid.dy]) # mirror p0 in radial dimension using WSWA symmetry self.source.p0 = self.source.p0.astype(float) p0_exp = np.zeros((kgrid_exp.Nx, kgrid_exp.Ny)) p0_exp[:, kgrid_N.y * 0 + 0 : kgrid_N.y * 1] = self.source.p0 p0_exp[:, kgrid_N.y * 1 + 1 : kgrid_N.y * 2] = -np.fliplr(self.source.p0[:, 1:]) p0_exp[:, kgrid_N.y * 2 + 0 : kgrid_N.y * 3] = -self.source.p0 p0_exp[:, kgrid_N.y * 3 + 1 : kgrid_N.y * 4] = np.fliplr(self.source.p0[:, 1:]) elif self.options.radial_symmetry in ["WSWS-FFT", "WSWS"]: # create a new kWave grid object with expanded radial grid kgrid_exp = kWaveGrid([kgrid_N.x, kgrid_N.y * 2 - 2], [self.kgrid.dx, self.kgrid.dy]) # mirror p0 in radial dimension using WSWS symmetry p0_exp = np.zeros((kgrid_exp.Nx, kgrid_exp.Ny)) p0_exp[:, 1 : kgrid_N.y] = source.p0 p0_exp[:, kgrid_N.y + 0 : kgrid_N.y * 2 - 2] = np.fliplr(source.p0[:, 1:-1]) # smooth p0 p0_exp = smooth(p0_exp, True) # trim back to original size source.p0 = p0_exp[:, 0 : self.kgrid.Ny] # clean up unused variables del kgrid_exp del p0_exp else: source.p0 = smooth(source.p0, True) # expand the computational grid if the PML is set to be outside the input # grid defined by the user if opt.pml_inside is False: expand_results = expand_grid_matrices( self.kgrid, self.medium, self.source, self.sensor, self.options, dotdict( { "p_source_pos_index": self.p_source_pos_index, "u_source_pos_index": self.u_source_pos_index, "s_source_pos_index": self.s_source_pos_index, } ), dotdict( { "axisymmetric": self.options.simulation_type.is_axisymmetric(), "use_sensor": self.use_sensor, "blank_sensor": self.blank_sensor, "cuboid_corners": self.cuboid_corners, "source_p0": self.source_p0, "source_p": self.source_p, "source_ux": self.source_ux, "source_uy": self.source_uy, "source_uz": self.source_uz, "transducer_source": self.transducer_source, "source_sxx": self.source_sxx, "source_syy": self.source_syy, "source_szz": self.source_szz, "source_sxy": self.source_sxy, "source_sxz": self.source_sxz, "source_syz": self.source_syz, } ), ) self.kgrid, self.index_data_type, self.p_source_pos_index, self.u_source_pos_index, self.s_source_pos_index = expand_results # get maximum prime factors if self.options.simulation_type.is_axisymmetric(): prime_facs = self.kgrid.highest_prime_factors(self.options.radial_symmetry[:4]) else: prime_facs = self.kgrid.highest_prime_factors() # give warning for bad dimension sizes if prime_facs.max() > self.HIGHEST_PRIME_FACTOR_WARNING: prime_facs = prime_facs[prime_facs != 0] logging.log(logging.WARN, f"Highest prime factors in each dimension are {prime_facs}") logging.log(logging.WARN, "Use dimension sizes with lower prime factors to improve speed") del prime_facs # smooth the sound speed distribution if required if opt.smooth_c0 and num_dim2(self.medium.sound_speed) == k_dim and self.medium.sound_speed.size > 1: logging.log(logging.INFO, " smoothing sound speed distribution...") self.medium.sound_speed = smooth(self.medium.sound_speed) # smooth the ambient density distribution if required if opt.smooth_rho0 and num_dim2(self.medium.density) == k_dim and self.medium.density.size > 1: logging.log(logging.INFO, "smoothing density distribution...") self.medium.density = smooth(self.medium.density)
[docs] def create_sensor_variables(self) -> None: """ Create the sensor related variables Returns: None """ # define the output variables and mask indices if using the sensor if self.use_sensor: if not self.blank_sensor or self.options.save_to_disk: if self.cuboid_corners: # create empty list of sensor indices self.sensor_mask_index = [] # loop through the list of cuboid corners, and extract the # sensor mask indices for each cube for cuboid_index in range(self.record.cuboid_corners_list.shape[1]): # create empty binary mask temp_mask = np.zeros_like(self.kgrid.k, dtype=bool) if self.kgrid.dim == 1: self.sensor.mask[ self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[1, cuboid_index] ] = 1 if self.kgrid.dim == 2: self.sensor.mask[ self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[2, cuboid_index], self.record.cuboid_corners_list[1, cuboid_index] : self.record.cuboid_corners_list[3, cuboid_index], ] = 1 if self.kgrid.dim == 3: self.sensor.mask[ self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[3, cuboid_index], self.record.cuboid_corners_list[1, cuboid_index] : self.record.cuboid_corners_list[4, cuboid_index], self.record.cuboid_corners_list[2, cuboid_index] : self.record.cuboid_corners_list[5, cuboid_index], ] = 1 # extract mask indices self.sensor_mask_index.append(matlab_find(temp_mask)) self.sensor_mask_index = np.array(self.sensor_mask_index) # cleanup unused variables del temp_mask else: # create mask indices (this works for both normal sensor and # transducer inputs) self.sensor_mask_index = np.where(self.sensor.mask.flatten(order="F") != 0)[0] + 1 # +1 due to matlab indexing self.sensor_mask_index = np.expand_dims(self.sensor_mask_index, -1) # compatibility, n => [n, 1] # convert the data type depending on the number of indices (this saves # memory) self.sensor_mask_index = cast_to_type(self.sensor_mask_index, self.index_data_type) else: # set the sensor mask index variable to be empty self.sensor_mask_index = []
[docs] def create_absorption_vars(self) -> None: """ Create absorption variables for the fluid code based on the expanded and smoothed values of the medium parameters (if not saving to disk) Returns: None """ if not self.options.simulation_type.is_elastic_simulation() and not self.options.save_to_disk: self.absorb_nabla1, self.absorb_nabla2, self.absorb_tau, self.absorb_eta = create_absorption_variables( self.kgrid, self.medium, self.equation_of_state )
[docs] def assign_pseudonyms(self, medium: kWaveMedium, kgrid: kWaveGrid) -> None: """ Shorten commonly used field names (these act only as pointers provided that the values aren't modified) (done after enlarging and smoothing the grids) Args: medium: kWaveMedium instance kgrid: kWaveGrid instance Returns: None """ self.dt = float(kgrid.dt) self.rho0 = medium.density self.c0 = medium.sound_speed
[docs] def scale_source_terms(self, is_scale_source_terms) -> None: """ Scale the source terms based on the expanded and smoothed values of the medium parameters Args: is_scale_source_terms: Should the source terms be scaled Returns: None """ if not is_scale_source_terms: return try: p_source_pos_index = self.p_source_pos_index except AttributeError: p_source_pos_index = None try: s_source_pos_index = self.s_source_pos_index except AttributeError: s_source_pos_index = None try: u_source_pos_index = self.u_source_pos_index except AttributeError: u_source_pos_index = None self.transducer_input_signal = scale_source_terms_func( self.c0, self.dt, self.kgrid, self.source, p_source_pos_index, s_source_pos_index, u_source_pos_index, self.transducer_input_signal, dotdict( { "nonuniform_grid": self.nonuniform_grid, "source_ux": self.source_ux, "source_uy": self.source_uy, "source_uz": self.source_uz, "transducer_source": self.transducer_source, "source_p": self.source_p, "source_p0": self.source_p0, "use_w_source_correction_p": self.use_w_source_correction_p, "use_w_source_correction_u": self.use_w_source_correction_u, "source_sxx": self.source_sxx, "source_syy": self.source_syy, "source_szz": self.source_szz, "source_sxy": self.source_sxy, "source_sxz": self.source_sxz, "source_syz": self.source_syz, } ), )
[docs] def create_pml_indices(self, kgrid_dim, kgrid_N: Vector, pml_size, pml_inside, is_axisymmetric): """ Define index variables to remove the PML from the display if the optional input 'PlotPML' is set to false Args: kgrid_dim: kWaveGrid dimensinality kgrid_N: kWaveGrid size in each direction pml_size: Size of the PML pml_inside: Whether the PML is inside the grid defined by the user is_axisymmetric: Whether the simulation is axisymmetric """ # comment by Farid: PlotPML is always False in Python version, # therefore if statement removed if kgrid_dim == 1: self.x1 = pml_size.x + 1.0 self.x2 = kgrid_N.x - pml_size.x elif kgrid_dim == 2: self.x1 = pml_size.x + 1.0 self.x2 = kgrid_N.x - pml_size.x if is_axisymmetric: self.y1 = 1.0 else: self.y1 = pml_size.y + 1.0 self.y2 = kgrid_N.y - pml_size.y elif kgrid_dim == 3: self.x1 = pml_size.x + 1.0 self.x2 = kgrid_N.x - pml_size.x self.y1 = pml_size.y + 1.0 self.y2 = kgrid_N.y - pml_size.y self.z1 = pml_size.z + 1.0 self.z2 = kgrid_N.z - pml_size.z # define index variables to allow original grid size to be maintained for # the _final and _all output variables if 'PMLInside' is set to false # if self.record is None: # self.record = Recorder() self.record.set_index_variables(self.kgrid, pml_size, pml_inside, is_axisymmetric)