import logging
import warnings
from dataclasses import dataclass
import numpy as np
from deprecated import deprecated
from kwave.data import Vector
from kwave.kgrid import kWaveGrid
from kwave.kmedium import kWaveMedium
from kwave.ksensor import kSensor
from kwave.ksource import kSource
from kwave.ktransducer import NotATransducer
from kwave.kWaveSimulation_helper import (
create_absorption_variables,
display_simulation_params,
expand_grid_matrices,
scale_source_terms_func,
set_sound_speed_ref,
)
from kwave.options.simulation_options import SimulationOptions, SimulationType
from kwave.recorder import Recorder
from kwave.utils.checks import check_stability
from kwave.utils.colormap import get_color_map
from kwave.utils.conversion import cart2grid, cast_to_type
from kwave.utils.data import get_date_string, get_smallest_possible_type
from kwave.utils.dotdictionary import dotdict
from kwave.utils.filters import smooth
from kwave.utils.matlab import matlab_find, matlab_mask
from kwave.utils.matrix import num_dim2
[docs]
@dataclass
class kWaveSimulation(object):
[docs]
def __init__(
self, kgrid: kWaveGrid, source: kSource, sensor: NotATransducer, medium: kWaveMedium, simulation_options: SimulationOptions
):
self.precision = None
self.kgrid = kgrid
self.medium = medium
self.source = source
self.sensor = sensor
self.options = simulation_options
# =========================================================================
# FLAGS WHICH DEPEND ON USER INPUTS (THESE SHOULD NOT BE MODIFIED)
# =========================================================================
# flags which control the characteristics of the sensor
#: Whether the sensor sensor mask is defined by cuboid corners
#: Whether time reversal simulation is enabled
# check if the sensor mask is defined as a list of cuboid corners
if self.sensor.mask is not None and self.sensor.mask.shape[0] == (2 * self.kgrid.dim):
self.userarg_cuboid_corners = True
else:
self.userarg_cuboid_corners = False
# check if performing time reversal, and replace inputs to explicitly use a
# source with a dirichlet boundary condition
if self.sensor.time_reversal_boundary_data is not None:
warnings.warn(
"Time reversal simulation is deprecated. Use TimeReversal class instead. See examples/pr_2D_TR_line_sensor/pr_2D_TR_line_sensor.py for an example."
)
# define a new source structure
self.source = kSource()
self.source.p_mask = self.sensor.mask
self.source.p = np.flip(self.sensor.time_reversal_boundary_data, 1)
self.source.p_mode = "dirichlet"
# define a new sensor structure
Nx, Ny, Nz = self.kgrid.Nx, self.kgrid.Ny, self.kgrid.Nz
self.sensor = kSensor(mask=np.ones((Nx, Ny, max(1, Nz))), record=["p_final"])
# set time reversal flag
self.userarg_time_rev = True
self.record = Recorder()
self.record.p = False
else:
# set time reversal flag
self.userarg_time_rev = False
#: Whether sensor.mask should be re-ordered.
#: True if sensor.mask is Cartesian with nearest neighbour interpolation which is calculated using a binary mask
#: and thus must be re-ordered
self.reorder_data = False
#: Whether the sensor.mask is binary
self.binary_sensor_mask = True
#: If tse sensor is an object of the kWaveTransducer class
self.transducer_sensor = False
self.record = Recorder()
# transducer source flags
#: transducer is object of kWaveTransducer class
self.transducer_source = False
#: Apply receive elevation focus on the transducer
self.transducer_receive_elevation_focus = False
# general
self.COLOR_MAP = get_color_map() #: default color map
self.ESTIMATE_SIM_TIME_STEPS = 50 #: time steps used to estimate simulation time
self.HIGHEST_PRIME_FACTOR_WARNING = 7 #: largest prime factor before warning
self.KSPACE_CFL = 0.3 #: default CFL value used if kgrid.t_array is set to 'auto'
self.PSTD_CFL = 0.1 #: default CFL value used if kgrid.t_array is set to 'auto'
# source types
self.SOURCE_S_MODE_DEF = "additive" #: source mode for stress sources
self.SOURCE_P_MODE_DEF = "additive" #: source mode for pressure sources
self.SOURCE_U_MODE_DEF = "additive" #: source mode for velocity sources
# filenames
self.STREAM_TO_DISK_FILENAME = "temp_sensor_data.bin" #: default disk stream filename
self.LOG_NAME = ["k-Wave-Log-", get_date_string()] #: default log filename
self.calling_func_name = None
logging.log(logging.INFO, f" start time: {get_date_string()}")
self.c_ref, self.c_ref_compression, self.c_ref_shear = [None] * 3
self.transducer_input_signal = None
#: Indexing variable corresponding to the location of all the pressure source elements
self.p_source_pos_index = None
#: Indexing variable corresponding to the location of all the velocity source elements
self.u_source_pos_index = None
#: Indexing variable corresponding to the location of all the stress source elements
self.s_source_pos_index = None
#: Delay mask that accounts for the beamforming delays and elevation focussing
self.delay_mask = None
self.absorb_nabla1 = None #: absorbing fractional Laplacian operator
self.absorb_tau = None #: absorbing fractional Laplacian coefficient
self.absorb_nabla2 = None #: dispersive fractional Laplacian operator
self.absorb_eta = None #: dispersive fractional Laplacian coefficient
self.dt = None #: Alias to kgrid.dt
self.rho0 = None #: Alias to medium.density
self.c0 = None #: Alias to medium.sound_speed
self.index_data_type = None
@property
def equation_of_state(self):
"""
Select the active equation-of-state label based on the medium's absorption and Stokes settings.
Returns:
equation (str): One of:
- 'stokes' when the medium is absorbing and uses the Stokes model.
- 'absorbing' when the medium is absorbing and not using the Stokes model.
- 'lossless' when the medium is not absorbing.
"""
if self.medium.absorbing:
if self.medium.stokes:
return "stokes"
else:
return "absorbing"
else:
return "lossless"
@property
def use_sensor(self):
"""
Indicates whether a sensor is defined for the simulation.
Returns:
True if a sensor object is present, False otherwise.
"""
return self.sensor is not None
@property
def blank_sensor(self):
"""
Returns
True if sensor.mask is not defined but _max_all or _final variables are still recorded
"""
fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"]
if not (isinstance(self.sensor, NotATransducer) or any(self.record.is_set(fields)) or self.time_rev):
return True
return False
@property
def kelvin_voigt_model(self):
"""
Returns:
Whether the simulation is elastic with absorption
"""
return False
@property
def nonuniform_grid(self):
"""
Returns:
True if the computational grid is non-uniform
"""
return self.kgrid.nonuniform
@property
@deprecated(version="0.4.1", reason="Use TimeReversal class instead")
def time_rev(self) -> bool:
if self.sensor and not isinstance(self.sensor, NotATransducer):
return not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None
return self.userarg_time_rev
@property
@deprecated(version="0.4.1", reason="Use TimeReversal class instead")
def elastic_time_rev(self):
"""
Returns:
True if using time reversal with the elastic code
"""
return False
@property
def compute_directivity(self):
"""
Returns:
True if directivity calculations in 2D are used by setting sensor.directivity_angle
"""
if self.sensor is not None and not isinstance(self.sensor, NotATransducer):
if self.kgrid.dim == 2:
# check for sensor directivity input and set flag
directivity = self.sensor.directivity
if directivity is not None and directivity.angle is not None:
return True
return False
@property
def cuboid_corners(self):
"""
Returns:
Whether the sensor.mask is a list of cuboid corners
"""
if self.sensor is not None and not isinstance(self.sensor, NotATransducer):
if not self.blank_sensor and self.sensor.mask.shape[0] == 2 * self.kgrid.dim:
return True
return self.userarg_cuboid_corners
##############
# flags which control the types of source used
##############
@property
def source_p0(self): # initial pressure
"""
Returns:
Whether initial pressure source is present (default=False)
"""
flag = False # default
if not isinstance(self.source, NotATransducer) and self.source.p0 is not None:
# set flag
flag = True
return flag
@property
def source_p0_elastic(self): # initial pressure in the elastic code
"""
Returns:
Whether initial pressure source is present in the elastic code (default=False)
"""
# Not clear where this flag is set
return False
@property
def source_p(self):
"""
Returns:
Whether time-varying pressure source is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.p is not None:
# set source flag to the length of the source, this allows source.p
# to be shorter than kgrid.Nt
flag = len(self.source.p[0])
return flag
@property
def source_p_labelled(self): # time-varying pressure with labelled source mask
"""
Returns:
True/False if labelled/binary source mask, respectively.
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.p is not None:
# check if the mask is binary or labelled
p_unique = np.unique(self.source.p_mask)
flag = not (p_unique.size <= 2 and p_unique.sum() == 1)
return flag
@property
def source_ux(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in X-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.ux is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.ux[0])
return flag
@property
def source_uy(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in Y-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.uy is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.uy[0])
return flag
@property
def source_uz(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in Z-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.uz is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.uz[0])
return flag
@property
def source_u_labelled(self):
"""
Returns:
Whether time-varying velocity source with labelled source mask is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.u_mask is not None:
# check if the mask is binary or labelled
u_unique = np.unique(self.source.u_mask)
if u_unique.size <= 2 and u_unique.sum() == 1:
# binary source mask
flag = False
else:
# labelled source mask
flag = True
return flag
@property
def source_sxx(self):
"""
Returns:
Whether time-varying stress source in X->X direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxx is not None:
flag = len(self.source.sxx[0])
return flag
@property
def source_syy(self):
"""
Returns:
Whether time-varying stress source in Y->Y direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.syy is not None:
flag = len(self.source.syy[0])
return flag
@property
def source_szz(self):
"""
Returns:
Whether time-varying stress source in Z->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.szz is not None:
flag = len(self.source.szz[0])
return flag
@property
def source_sxy(self):
"""
Returns:
Whether time-varying stress source in X->Y direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxy is not None:
flag = len(self.source.sxy[0])
return flag
@property
def source_sxz(self):
"""
Returns:
Whether time-varying stress source in X->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxz is not None:
flag = len(self.source.sxz[0])
return flag
@property
def source_syz(self):
"""
Returns:
Whether time-varying stress source in Y->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.syz is not None:
flag = len(self.source.syz[0])
return flag
@property
def source_s_labelled(self):
"""
Returns:
Whether time-varying stress source with labelled source mask is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.s_mask is not None:
# check if the mask is binary or labelled
s_unique = np.unique(self.source.s_mask)
if s_unique.size <= 2 and s_unique.sum() == 1:
# binary source mask
flag = False
else:
# labelled source mask
flag = True
return flag
@property
def use_w_source_correction_p(self):
"""
Returns:
Whether to use the w source correction instead of the k-space source correction for pressure sources
"""
flag = False
if not isinstance(self.source, NotATransducer):
if self.source.p is not None and self.source.p_frequency_ref is not None:
flag = True
return flag
@property
def use_w_source_correction_u(self):
"""
Returns:
Whether to use the w source correction instead of the k-space source correction for velocity sources
"""
flag = False
if not isinstance(self.source, NotATransducer):
if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]):
if self.source.u_frequency_ref is not None:
flag = True
return flag
[docs]
@staticmethod
def check_calling_func_name_and_dim(calling_func_name, kgrid_dim) -> None:
"""
Check correct function has been called for the dimensionality of kgrid
Args:
calling_func_name: Name of the script that makes calls to kWaveSimulation
kgrid_dim: Dimensionality of the kWaveGrid
Returns:
None
"""
assert not calling_func_name.startswith(("pstdElastic", "kspaceElastic")), "Elastic simulation is not supported."
if calling_func_name == "kspaceFirstOrder1D":
assert kgrid_dim == 1, f"kgrid has the wrong dimensionality for {calling_func_name}."
elif calling_func_name in ["kspaceFirstOrder2D", "pstdElastic2D", "kspaceElastic2D", "kspaceFirstOrderAS"]:
assert kgrid_dim == 2, f"kgrid has the wrong dimensionality for {calling_func_name}."
elif calling_func_name in ["kspaceFirstOrder3D", "pstdElastic3D", "kspaceElastic3D"]:
assert kgrid_dim == 3, f"kgrid has the wrong dimensionality for {calling_func_name}."
[docs]
@staticmethod
def print_start_status(is_elastic_code: bool) -> None:
"""
Update command-line status with the start time
Args:
is_elastic_code: is the simulation elastic
Returns:
None
"""
if is_elastic_code: # pragma: no cover
logging.log(logging.INFO, "Running k-Wave elastic simulation...")
else:
logging.log(logging.INFO, "Running k-Wave simulation...")
logging.log(logging.INFO, f" start time: {get_date_string()}")
[docs]
def set_index_data_type(self) -> None:
"""
Pre-calculate the data type needed to store the matrix indices given the
total number of grid points: indexing variables will be created using this data type to save memory
Returns:
None
"""
total_grid_points = self.kgrid.total_grid_points
self.index_data_type = get_smallest_possible_type(total_grid_points, "uint", default="double")
[docs]
@staticmethod
def check_medium(medium, kgrid_k, simulation_type: SimulationType) -> bool:
"""
Check the properties of the medium structure for correctness and validity
Args:
medium: kWaveMedium instance
kgrid_k: kWaveGrid.k matrix
is_elastic: Whether the simulation is elastic
is_axisymmetric: Whether the simulation is axisymmetric
Returns:
Medium Density
"""
# if using the fluid code, allow the density field to be blank if the medium is homogeneous
if (not simulation_type.is_elastic_simulation()) and medium.density is None and medium.sound_speed.size == 1:
user_medium_density_input = False
medium.density = 1
else:
medium.ensure_defined("density")
user_medium_density_input = True
# check medium absorption inputs for the fluid code
is_absorbing = any(medium.is_defined("alpha_coeff", "alpha_power"))
is_stokes = simulation_type.is_axisymmetric() or medium.alpha_mode == "stokes"
medium.set_absorbing(is_absorbing, is_stokes)
if is_absorbing:
medium.check_fields(kgrid_k.shape)
return user_medium_density_input
[docs]
def check_sensor(self, kgrid_dim) -> None:
"""
Check the Sensor properties for correctness and validity
Args:
k_dim: kWaveGrid dimensionality
Returns:
None
"""
# =========================================================================
# CHECK SENSOR STRUCTURE INPUTS
# =========================================================================
# check sensor fields
if self.sensor is not None:
# check the sensor input is valid
# TODO FARID move this check as a type checking
assert isinstance(
self.sensor, (kSensor, NotATransducer)
), "sensor must be defined as an object of the kSensor or kWaveTransducer class."
# check if sensor is a transducer, otherwise check input fields
if isinstance(self.sensor, kSensor):
if kgrid_dim == 2:
# check for sensor directivity input and set flag
directivity = self.sensor.directivity
if directivity is not None and self.sensor.directivity.angle is not None:
# make sure the sensor mask is not blank
assert self.sensor.mask is not None, "The mask must be defined for the sensor"
# check sensor.directivity.pattern and sensor.mask have the same size
assert (
directivity.angle.shape == self.sensor.mask.shape
), "sensor.directivity.angle and sensor.mask must be the same size."
# check if directivity size input exists, otherwise make it
# a constant times kgrid.dx
if directivity.size is None:
directivity.set_default_size(self.kgrid)
# find the unique directivity angles
# assign the wavenumber vectors
directivity.set_unique_angles(self.sensor.mask)
directivity.set_wavenumbers(self.kgrid)
# check for time reversal inputs and set flags
if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None:
self.record.p = False
# check for sensor.record and set usage flgs - if no flgs are
# given, the time history of the acoustic pressure is recorded by
# default
if self.sensor.record is not None:
# check for time reversal data
if self.time_rev:
logging.log(logging.WARN, "sensor.record is not used for time reversal reconstructions")
# check the input is a cell array
assert isinstance(self.sensor.record, list), 'sensor.record must be given as a list, e.g. ["p", "u"]'
# check the sensor record flgs
self.record.set_flags_from_list(self.sensor.record, self.options.simulation_type.is_elastic_simulation())
# enforce the sensor.mask field unless just recording the max_all
# and _final variables
fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"]
if any(self.record.is_set(fields)):
assert self.sensor.mask is not None
# check if sensor mask is a binary grid, a set of cuboid corners,
# or a set of Cartesian interpolation points
if not self.blank_sensor:
if self._is_binary_sensor_mask(kgrid_dim):
# check the grid is binary
assert self.sensor.mask.sum() == (
self.sensor.mask.size - (self.sensor.mask == 0).sum()
), "sensor.mask must be a binary grid (numeric values must be 0 or 1)."
# check the grid is not empty
assert self.sensor.mask.sum() != 0, "sensor.mask must be a binary grid with at least one element set to 1."
elif self._is_cuboid_corners_mask(kgrid_dim):
# make sure the points are integers
assert np.all(self.sensor.mask % 1 == 0), "sensor.mask cuboid corner indices must be integers."
# store a copy of the cuboid corners
self.record.cuboid_corners_list = self.sensor.mask
# check the list makes sense
if np.any(self.sensor.mask[self.kgrid.dim :, :] - self.sensor.mask[: self.kgrid.dim, :] < 0):
if kgrid_dim == 1:
raise ValueError("sensor.mask cuboid corners must be defined " "as [x1, x2; ...]." " where x2 => x1, etc.")
elif kgrid_dim == 2:
raise ValueError(
"sensor.mask cuboid corners must be defined " "as [x1, y1, x2, y2; ...]." " where x2 => x1, etc."
)
elif kgrid_dim == 3:
raise ValueError(
"sensor.mask cuboid corners must be defined"
" as [x1, y1, z1, x2, y2, z2; ...]."
" where x2 => x1, etc."
)
# check the list are within bounds
if np.any(self.sensor.mask < 1):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
else:
if kgrid_dim == 1:
if np.any(self.sensor.mask > self.kgrid.Nx):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
elif kgrid_dim == 2:
if np.any(self.sensor.mask[[0, 2], :] > self.kgrid.Nx) or np.any(
self.sensor.mask[[1, 3], :] > self.kgrid.Ny
):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
elif kgrid_dim == 3:
if (
np.any(self.sensor.mask[[0, 3], :] > self.kgrid.Nx)
or np.any(self.sensor.mask[[1, 4], :] > self.kgrid.Ny)
or np.any(self.sensor.mask[[2, 5], :] > self.kgrid.Nz)
):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
# create a binary mask for display from the list of corners
# TODO FARID mask should be option_factory in sensor not here
self.sensor.mask = np.zeros_like(self.kgrid.k, dtype=bool)
cuboid_corners_list = self.record.cuboid_corners_list
for cuboid_index in range(cuboid_corners_list.shape[1]):
if self.kgrid.dim == 1:
self.sensor.mask[cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[1, cuboid_index]] = 1
if self.kgrid.dim == 2:
self.sensor.mask[
cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[2, cuboid_index],
cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[3, cuboid_index],
] = 1
if self.kgrid.dim == 3:
self.sensor.mask[
cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[3, cuboid_index],
cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[4, cuboid_index],
cuboid_corners_list[2, cuboid_index] : cuboid_corners_list[5, cuboid_index],
] = 1
else:
# check the Cartesian sensor mask is the correct size
# (1 x N, 2 x N, 3 x N)
assert (
self.sensor.mask.shape[0] == kgrid_dim and num_dim2(self.sensor.mask) <= 2
), f"Cartesian sensor.mask for a {kgrid_dim}D simulation must be given as a {kgrid_dim} by N array."
# set Cartesian mask flag (this is modified in
# createStorageVariables if the interpolation setting is
# set to nearest)
self.binary_sensor_mask = False
# extract Cartesian data from sensor mask
if kgrid_dim == 1:
# align sensor data as a column vector to be the
# same as kgrid.x_vec so that calls to interp1
# return data in the correct dimension
self.sensor_x = np.reshape((self.sensor.mask, (-1, 1)))
# add sensor_x to the record structure for use with
# the _extractSensorData subfunction
self.record.sensor_x = self.sensor_x
"record.sensor_x = sensor_x;"
elif kgrid_dim == 2:
self.sensor_x = self.sensor.mask[0, :]
self.sensor_y = self.sensor.mask[1, :]
elif kgrid_dim == 3:
self.sensor_x = self.sensor.mask[0, :]
self.sensor_y = self.sensor.mask[1, :]
self.sensor_z = self.sensor.mask[2, :]
# compute an equivalent sensor mask using nearest neighbour
# interpolation, if flgs.time_rev = false and
# cartesian_interp = 'linear' then this is only used for
# display, if flgs.time_rev = true or cartesian_interp =
# 'nearest' this grid is used as the sensor.mask
self.sensor.mask, self.order_index, self.reorder_index = cart2grid(
self.kgrid, self.sensor.mask, self.options.simulation_type.is_axisymmetric()
)
# if in time reversal mode, reorder the p0 input data in
# the order of the binary sensor_mask
if self.time_rev:
raise NotImplementedError
"""
# append the reordering data
new_col_pos = length(sensor.time_reversal_boundary_data(1, :)) + 1;
sensor.time_reversal_boundary_data(:, new_col_pos) = order_index;
# reorder p0 based on the order_index
sensor.time_reversal_boundary_data = sort_rows(sensor.time_reversal_boundary_data, new_col_pos);
# remove the reordering data
sensor.time_reversal_boundary_data = sensor.time_reversal_boundary_data(:, 1:new_col_pos - 1);
"""
else:
# set transducer sensor flag
self.transducer_sensor = True
self.record.p = False
# check to see if there is an elevation focus
if not np.isinf(self.sensor.elevation_focus_distance):
# set flag
self.transducer_receive_elevation_focus = True
# get the elevation mask that is used to extract the correct values
# from the sensor data buffer for averaging
self.transducer_receive_mask = self.sensor.elevation_beamforming_mask
# check for directivity inputs with time reversal
if kgrid_dim == 2 and self.use_sensor and self.compute_directivity and self.time_rev:
logging.log(logging.WARN, "sensor directivity fields are not used for time reversal.")
[docs]
def check_source(self, k_dim, k_Nt) -> None:
"""
Check the source properties for correctness and validity
Args:
kgrid_dim: kWaveGrid dimension
k_Nt: Number of time steps in kWaveGrid
Returns:
None
"""
# =========================================================================
# CHECK SOURCE STRUCTURE INPUTS
# =========================================================================
# check source inputs
if not isinstance(self.source, (kSource, NotATransducer)):
# allow an invalid or empty source input if computing time reversal,
# otherwise return error
assert self.time_rev, "source must be defined as an object of the kSource or kWaveTransducer classes."
elif not isinstance(self.source, NotATransducer):
# --------------------------
# SOURCE IS NOT A TRANSDUCER
# --------------------------
"""
check allowable source types
Depending on the kgrid dimensionality and the simulation type,
following fields are allowed & might be use:
kgrid.dim == 1:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'u_mask', 'u_mode', 'u_frequency_ref']
kgrid.dim == 2:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'u_mask', 'u_mode', 'u_frequency_ref']
elastic code:
['p0', 'sxx', 'syy', 'sxy', 's_mask', 's_mode', 'ux', 'uy', 'u_mask', 'u_mode']
kgrid.dim == 3:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'uz', 'u_mask', 'u_mode', 'u_frequency_ref']
elastic code:
['p0', 'sxx', 'syy', 'szz', 'sxy', 'sxz', 'syz', 's_mask', 's_mode', 'ux', 'uy', 'uz', 'u_mask', 'u_mode']
"""
self.source.validate(self.kgrid)
# check for a time varying pressure source input
if self.source.p is not None:
# check the source mode input is valid
if self.source.p_mode is None:
self.source.p_mode = self.SOURCE_P_MODE_DEF
if self.source_p > k_Nt:
logging.log(logging.WARN, " source.p has more time points than kgrid.Nt, remaining time points will not be used.")
# create an indexing variable corresponding to the location of all the source elements
self.p_source_pos_index = matlab_find(self.source.p_mask)
# check if the mask is binary or labelled
p_unique = np.unique(self.source.p_mask)
# create a second indexing variable
if p_unique.size <= 2 and p_unique.sum() == 1:
# set signal index to all elements
self.p_source_sig_index = ":"
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
self.p_source_sig_index = self.source.p_mask(self.source.p_mask != 0)
# convert the data type depending on the number of indices
self.p_source_pos_index = cast_to_type(self.p_source_pos_index, self.index_data_type)
if self.source_p_labelled:
self.p_source_sig_index = cast_to_type(self.p_source_sig_index, self.index_data_type)
# check for time varying velocity source input and set source flag
if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]):
# check the source mode input is valid
if self.source.u_mode is None:
self.source.u_mode = self.SOURCE_U_MODE_DEF
# create an indexing variable corresponding to the location of all
# the source elements
self.u_source_pos_index = matlab_find(self.source.u_mask)
# check if the mask is binary or labelled
u_unique = np.unique(self.source.u_mask)
# create a second indexing variable
if u_unique.size <= 2 and u_unique.sum() == 1:
# set signal index to all elements
self.u_source_sig_index = ":"
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
self.u_source_sig_index = self.source.u_mask[self.source.u_mask != 0]
# convert the data type depending on the number of indices
self.u_source_pos_index = cast_to_type(self.u_source_pos_index, self.index_data_type)
if self.source_u_labelled:
self.u_source_sig_index = cast_to_type(self.u_source_sig_index, self.index_data_type)
# check for time varying stress source input and set source flag
if any([(getattr(self.source, k) is not None) for k in ["sxx", "syy", "szz", "sxy", "sxz", "syz", "s_mask"]]):
# create an indexing variable corresponding to the location of all
# the source elements
raise NotImplementedError
"s_source_pos_index = find(source.s_mask != 0);"
# check if the mask is binary or labelled
"s_unique = unique(source.s_mask);"
# create a second indexing variable
if eng.eval("numel(s_unique) <= 2 && sum(s_unique) == 1"): # noqa: F821
# set signal index to all elements
eng.workspace["s_source_sig_index"] = ":" # noqa: F821
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
s_source_sig_index = source.s_mask(source.s_mask != 0) # noqa
f"s_source_pos_index = {self.index_data_type}(s_source_pos_index);"
if self.source_s_labelled:
f"s_source_sig_index = {self.index_data_type}(s_source_sig_index);"
else:
# ----------------------
# SOURCE IS A TRANSDUCER
# ----------------------
# if the sensor is a transducer, check that the simulation is in 3D
assert k_dim == 3, "Transducer inputs are only compatible with 3D simulations."
# get the input signal - this is appended with zeros if required to
# account for the beamforming delays (this will throw an error if the
# input signal is not defined)
self.transducer_input_signal = self.source.input_signal
# get the delay mask that accounts for the beamforming delays and
# elevation focussing; this is used so that a single time series can be
# applied to the complete transducer mask with different delays
delay_mask = self.source.delay_mask()
# set source flag - this should be the length of signal minus the
# maximum delay
self.transducer_source = self.transducer_input_signal.size - delay_mask.max()
# get the active elements mask
active_elements_mask = self.source.active_elements_mask
# get the apodization mask if not set to 'Rectangular' and convert to a
# linear array
if self.source.transmit_apodization == "Rectangular":
self.transducer_transmit_apodization = 1
else:
self.transducer_transmit_apodization = self.source.transmit_apodization_mask
self.transducer_transmit_apodization = self.transducer_transmit_apodization[active_elements_mask != 0]
# create indexing variable corresponding to the active elements
# and convert the data type depending on the number of indices
self.u_source_pos_index = matlab_find(active_elements_mask).astype(self.index_data_type)
# convert the delay mask to an indexing variable (this doesn't need to
# be modified if the grid is expanded) which tells each point in the
# source mask which point in the input_signal should be used
delay_mask = matlab_mask(delay_mask, active_elements_mask != 0) # compatibility
# convert the data type depending on the maximum value of the delay
# mask and the length of the source
smallest_type = get_smallest_possible_type(delay_mask.max(), "uint")
if smallest_type is not None:
delay_mask = delay_mask.astype(smallest_type)
# move forward by 1 as a delay of 0 corresponds to the first point in the input signal
self.delay_mask = delay_mask + 1
# clean up unused variables
del active_elements_mask
[docs]
def check_kgrid_time(self) -> None:
"""
Check time-related kWaveGrid inputs
Returns:
None
"""
# check kgrid for t_array existence, and create if not defined
if isinstance(self.kgrid.t_array, str) and self.kgrid.t_array == "auto":
# check for time reversal mode
if self.time_rev:
raise ValueError("kgrid.t_array (Nt and dt) must be defined explicitly in time reversal mode.")
# check for time varying sources
if (not self.source_p0_elastic) and (
self.source_p
or self.source_ux
or self.source_uy
or self.source_uz
or self.source_sxx
or self.source_syy
or self.source_szz
or self.source_sxy
or self.source_sxz
or self.source_syz
):
raise ValueError("kgrid.t_array (Nt and dt) must be defined explicitly when using a time-varying source.")
# create the time array using the compressional sound speed
self.kgrid.makeTime(self.medium.sound_speed, self.KSPACE_CFL)
# check kgrid.t_array for stability given medium properties
if not self.options.simulation_type.is_elastic_simulation():
# calculate the largest timestep for which the model is stable
dt_stability_limit = check_stability(self.kgrid, self.medium)
# give a warning if the timestep is larger than stability limit allows
if self.kgrid.dt > dt_stability_limit:
logging.log(logging.WARN, " time step may be too large for a stable simulation.")
[docs]
@staticmethod
def select_precision(opt: SimulationOptions):
"""
Select the minimal precision for storing the data
Args:
opt: SimulationOptions instance
Returns:
Minimal precision for variable allocation
"""
# set storage variable type based on data_cast - this enables the
# output variables to be directly created in the data_cast format,
# rather than creating them in double precision and then casting them
if opt.data_cast == "off":
precision = "double"
elif opt.data_cast == "single":
precision = "single"
elif opt.data_cast == "gsingle":
precision = "single"
elif opt.data_cast == "gdouble":
precision = "double"
elif opt.data_cast == "gpuArray":
raise NotImplementedError("gpuArray is not supported in Python-version")
elif opt.data_cast == "kWaveGPUsingle":
precision = "single"
elif opt.data_cast == "kWaveGPUdouble":
precision = "double"
else:
raise ValueError("'Unknown ''DataCast'' option'")
return precision
[docs]
def smooth_and_enlarge(self, source, k_dim, kgrid_N, opt: SimulationOptions) -> None:
"""
Smooth and enlarge grids
Args:
source: kWaveSource instance
k_dim: kWaveGrid dimensionality
kgrid_N: kWaveGrid size in each direction
opt: SimulationOptions
Returns:
None
"""
# smooth the initial pressure distribution p0 if required, and then restore
# the maximum magnitude
# NOTE 1: if p0 has any values at the edge of the domain, the smoothing
# may cause part of p0 to wrap to the other side of the domain
# NOTE 2: p0 is smoothed before the grid is expanded to ensure that p0 is
# exactly zero within the PML
# NOTE 3: for the axisymmetric code, p0 is smoothed assuming WS origin
# symmetry
if self.source_p0 and self.options.smooth_p0:
# update command line status
logging.log(logging.INFO, " smoothing p0 distribution...")
if self.options.simulation_type.is_axisymmetric():
if self.options.radial_symmetry in ["WSWA-FFT", "WSWA"]:
# create a new kWave grid object with expanded radial grid
kgrid_exp = kWaveGrid([kgrid_N.x, kgrid_N.y * 4], [self.kgrid.dx, self.kgrid.dy])
# mirror p0 in radial dimension using WSWA symmetry
self.source.p0 = self.source.p0.astype(float)
p0_exp = np.zeros((kgrid_exp.Nx, kgrid_exp.Ny))
p0_exp[:, kgrid_N.y * 0 + 0 : kgrid_N.y * 1] = self.source.p0
p0_exp[:, kgrid_N.y * 1 + 1 : kgrid_N.y * 2] = -np.fliplr(self.source.p0[:, 1:])
p0_exp[:, kgrid_N.y * 2 + 0 : kgrid_N.y * 3] = -self.source.p0
p0_exp[:, kgrid_N.y * 3 + 1 : kgrid_N.y * 4] = np.fliplr(self.source.p0[:, 1:])
elif self.options.radial_symmetry in ["WSWS-FFT", "WSWS"]:
# create a new kWave grid object with expanded radial grid
kgrid_exp = kWaveGrid([kgrid_N.x, kgrid_N.y * 2 - 2], [self.kgrid.dx, self.kgrid.dy])
# mirror p0 in radial dimension using WSWS symmetry
p0_exp = np.zeros((kgrid_exp.Nx, kgrid_exp.Ny))
p0_exp[:, 1 : kgrid_N.y] = source.p0
p0_exp[:, kgrid_N.y + 0 : kgrid_N.y * 2 - 2] = np.fliplr(source.p0[:, 1:-1])
# smooth p0
p0_exp = smooth(p0_exp, True)
# trim back to original size
source.p0 = p0_exp[:, 0 : self.kgrid.Ny]
# clean up unused variables
del kgrid_exp
del p0_exp
else:
source.p0 = smooth(source.p0, True)
# expand the computational grid if the PML is set to be outside the input
# grid defined by the user
if opt.pml_inside is False:
expand_results = expand_grid_matrices(
self.kgrid,
self.medium,
self.source,
self.sensor,
self.options,
dotdict(
{
"p_source_pos_index": self.p_source_pos_index,
"u_source_pos_index": self.u_source_pos_index,
"s_source_pos_index": self.s_source_pos_index,
}
),
dotdict(
{
"axisymmetric": self.options.simulation_type.is_axisymmetric(),
"use_sensor": self.use_sensor,
"blank_sensor": self.blank_sensor,
"cuboid_corners": self.cuboid_corners,
"source_p0": self.source_p0,
"source_p": self.source_p,
"source_ux": self.source_ux,
"source_uy": self.source_uy,
"source_uz": self.source_uz,
"transducer_source": self.transducer_source,
"source_sxx": self.source_sxx,
"source_syy": self.source_syy,
"source_szz": self.source_szz,
"source_sxy": self.source_sxy,
"source_sxz": self.source_sxz,
"source_syz": self.source_syz,
}
),
)
self.kgrid, self.index_data_type, self.p_source_pos_index, self.u_source_pos_index, self.s_source_pos_index = expand_results
# get maximum prime factors
if self.options.simulation_type.is_axisymmetric():
prime_facs = self.kgrid.highest_prime_factors(self.options.radial_symmetry[:4])
else:
prime_facs = self.kgrid.highest_prime_factors()
# give warning for bad dimension sizes
if prime_facs.max() > self.HIGHEST_PRIME_FACTOR_WARNING:
prime_facs = prime_facs[prime_facs != 0]
logging.log(logging.WARN, f"Highest prime factors in each dimension are {prime_facs}")
logging.log(logging.WARN, "Use dimension sizes with lower prime factors to improve speed")
del prime_facs
# smooth the sound speed distribution if required
if opt.smooth_c0 and num_dim2(self.medium.sound_speed) == k_dim and self.medium.sound_speed.size > 1:
logging.log(logging.INFO, " smoothing sound speed distribution...")
self.medium.sound_speed = smooth(self.medium.sound_speed)
# smooth the ambient density distribution if required
if opt.smooth_rho0 and num_dim2(self.medium.density) == k_dim and self.medium.density.size > 1:
logging.log(logging.INFO, "smoothing density distribution...")
self.medium.density = smooth(self.medium.density)
[docs]
def create_sensor_variables(self) -> None:
"""
Create the sensor related variables
Returns:
None
"""
# define the output variables and mask indices if using the sensor
if self.use_sensor:
if not self.blank_sensor or self.options.save_to_disk:
if self.cuboid_corners:
# create empty list of sensor indices
self.sensor_mask_index = []
# loop through the list of cuboid corners, and extract the
# sensor mask indices for each cube
for cuboid_index in range(self.record.cuboid_corners_list.shape[1]):
# create empty binary mask
temp_mask = np.zeros_like(self.kgrid.k, dtype=bool)
if self.kgrid.dim == 1:
self.sensor.mask[
self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[1, cuboid_index]
] = 1
if self.kgrid.dim == 2:
self.sensor.mask[
self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[2, cuboid_index],
self.record.cuboid_corners_list[1, cuboid_index] : self.record.cuboid_corners_list[3, cuboid_index],
] = 1
if self.kgrid.dim == 3:
self.sensor.mask[
self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[3, cuboid_index],
self.record.cuboid_corners_list[1, cuboid_index] : self.record.cuboid_corners_list[4, cuboid_index],
self.record.cuboid_corners_list[2, cuboid_index] : self.record.cuboid_corners_list[5, cuboid_index],
] = 1
# extract mask indices
self.sensor_mask_index.append(matlab_find(temp_mask))
self.sensor_mask_index = np.array(self.sensor_mask_index)
# cleanup unused variables
del temp_mask
else:
# create mask indices (this works for both normal sensor and
# transducer inputs)
self.sensor_mask_index = np.where(self.sensor.mask.flatten(order="F") != 0)[0] + 1 # +1 due to matlab indexing
self.sensor_mask_index = np.expand_dims(self.sensor_mask_index, -1) # compatibility, n => [n, 1]
# convert the data type depending on the number of indices (this saves
# memory)
self.sensor_mask_index = cast_to_type(self.sensor_mask_index, self.index_data_type)
else:
# set the sensor mask index variable to be empty
self.sensor_mask_index = []
[docs]
def create_absorption_vars(self) -> None:
"""
Create absorption variables for the fluid code based on
the expanded and smoothed values of the medium parameters (if not saving to disk)
Returns:
None
"""
if not self.options.simulation_type.is_elastic_simulation() and not self.options.save_to_disk:
self.absorb_nabla1, self.absorb_nabla2, self.absorb_tau, self.absorb_eta = create_absorption_variables(
self.kgrid, self.medium, self.equation_of_state
)
[docs]
def assign_pseudonyms(self, medium: kWaveMedium, kgrid: kWaveGrid) -> None:
"""
Shorten commonly used field names (these act only as pointers provided that the values aren't modified)
(done after enlarging and smoothing the grids)
Args:
medium: kWaveMedium instance
kgrid: kWaveGrid instance
Returns:
None
"""
self.dt = float(kgrid.dt)
self.rho0 = medium.density
self.c0 = medium.sound_speed
[docs]
def scale_source_terms(self, is_scale_source_terms) -> None:
"""
Scale the source terms based on the expanded and smoothed values of the medium parameters
Args:
is_scale_source_terms: Should the source terms be scaled
Returns:
None
"""
if not is_scale_source_terms:
return
try:
p_source_pos_index = self.p_source_pos_index
except AttributeError:
p_source_pos_index = None
try:
s_source_pos_index = self.s_source_pos_index
except AttributeError:
s_source_pos_index = None
try:
u_source_pos_index = self.u_source_pos_index
except AttributeError:
u_source_pos_index = None
self.transducer_input_signal = scale_source_terms_func(
self.c0,
self.dt,
self.kgrid,
self.source,
p_source_pos_index,
s_source_pos_index,
u_source_pos_index,
self.transducer_input_signal,
dotdict(
{
"nonuniform_grid": self.nonuniform_grid,
"source_ux": self.source_ux,
"source_uy": self.source_uy,
"source_uz": self.source_uz,
"transducer_source": self.transducer_source,
"source_p": self.source_p,
"source_p0": self.source_p0,
"use_w_source_correction_p": self.use_w_source_correction_p,
"use_w_source_correction_u": self.use_w_source_correction_u,
"source_sxx": self.source_sxx,
"source_syy": self.source_syy,
"source_szz": self.source_szz,
"source_sxy": self.source_sxy,
"source_sxz": self.source_sxz,
"source_syz": self.source_syz,
}
),
)
[docs]
def create_pml_indices(self, kgrid_dim, kgrid_N: Vector, pml_size, pml_inside, is_axisymmetric):
"""
Define index variables to remove the PML from the display if the optional
input 'PlotPML' is set to false
Args:
kgrid_dim: kWaveGrid dimensionality
kgrid_N: kWaveGrid size in each direction
pml_size: Size of the PML
pml_inside: Whether the PML is inside the grid defined by the user
is_axisymmetric: Whether the simulation is axisymmetric
"""
# comment by Farid: PlotPML is always False in Python version,
# therefore if statement removed
if kgrid_dim == 1:
self.x1 = pml_size.x + 1.0
self.x2 = kgrid_N.x - pml_size.x
elif kgrid_dim == 2:
self.x1 = pml_size.x + 1.0
self.x2 = kgrid_N.x - pml_size.x
if is_axisymmetric:
self.y1 = 1.0
else:
self.y1 = pml_size.y + 1.0
self.y2 = kgrid_N.y - pml_size.y
elif kgrid_dim == 3:
self.x1 = pml_size.x + 1.0
self.x2 = kgrid_N.x - pml_size.x
self.y1 = pml_size.y + 1.0
self.y2 = kgrid_N.y - pml_size.y
self.z1 = pml_size.z + 1.0
self.z2 = kgrid_N.z - pml_size.z
# define index variables to allow original grid size to be maintained for
# the _final and _all output variables if 'PMLInside' is set to false
# if self.record is None:
# self.record = Recorder()
self.record.set_index_variables(self.kgrid, pml_size, pml_inside, is_axisymmetric)
[docs]
@deprecated(version="0.4.1", reason="Use TimeReversal class instead")
def check_time_reversal(self) -> bool:
return self.time_rev
def _is_binary_sensor_mask(self, kgrid_dim: int) -> bool:
"""Check if sensor mask is a binary grid matching kgrid dimensions (with or without PML)."""
mask_shape = self.sensor.mask.shape
grid_shape = self.kgrid.k.shape
if mask_shape == grid_shape:
return True
pml = self.options.pml_size
if pml is None:
return False
pml = np.broadcast_to(np.atleast_1d(pml), (kgrid_dim,))
return mask_shape == tuple(g + 2 * int(p) for g, p in zip(grid_shape, pml))
def _is_cuboid_corners_mask(self, kgrid_dim: int) -> bool:
"""
Check if the sensor mask is defined as cuboid corners.
For 2D: shape should be (4, N) for [x1, y1, x2, y2; ...]
For 3D: shape should be (6, N) for [x1, y1, z1, x2, y2, z2; ...]
Args:
kgrid_dim: Dimensionality of the kWaveGrid (2 or 3)
Returns:
bool: True if the sensor mask is defined as cuboid corners
"""
if kgrid_dim == 2:
return self.sensor.mask.shape[0] == 4 # [x1, y1, x2, y2]
elif kgrid_dim == 3:
return self.sensor.mask.shape[0] == 6 # [x1, y1, z1, x2, y2, z2]
return False # Not valid for 1D or other dimensions