import logging
from dataclasses import dataclass
import numpy as np
from kwave.data import Vector
from kwave.kWaveSimulation_helper import (
display_simulation_params,
set_sound_speed_ref,
expand_grid_matrices,
create_absorption_variables,
scale_source_terms_func,
)
from kwave.kgrid import kWaveGrid
from kwave.kmedium import kWaveMedium
from kwave.ksensor import kSensor
from kwave.ksource import kSource
from kwave.ktransducer import NotATransducer
from kwave.options.simulation_options import SimulationOptions, SimulationType
from kwave.recorder import Recorder
from kwave.utils.checks import check_stability
from kwave.utils.colormap import get_color_map
from kwave.utils.conversion import cast_to_type, cart2grid
from kwave.utils.data import get_smallest_possible_type, get_date_string
from kwave.utils.dotdictionary import dotdict
from kwave.utils.filters import smooth
from kwave.utils.matlab import matlab_find, matlab_mask
from kwave.utils.matrix import num_dim2
[docs]
@dataclass
class kWaveSimulation(object):
[docs]
def __init__(
self, kgrid: kWaveGrid, source: kSource, sensor: NotATransducer, medium: kWaveMedium, simulation_options: SimulationOptions
):
self.precision = None
self.kgrid = kgrid
self.medium = medium
self.source = source
self.sensor = sensor
self.options = simulation_options
# =========================================================================
# FLAGS WHICH DEPEND ON USER INPUTS (THESE SHOULD NOT BE MODIFIED)
# =========================================================================
# flags which control the characteristics of the sensor
#: Whether the sensor sensor mask is defined by cuboid corners
#: Whether time reversal simulation is enabled
# check if the sensor mask is defined as a list of cuboid corners
if self.sensor.mask is not None and self.sensor.mask.shape[0] == (2 * self.kgrid.dim):
self.userarg_cuboid_corners = True
else:
self.userarg_cuboid_corners = False
# check if performing time reversal, and replace inputs to explicitly use a
# source with a dirichlet boundary condition
if self.sensor.time_reversal_boundary_data is not None:
# define a new source structure
self.source = kSource()
self.source.p_mask = self.sensor.mask
self.source.p = np.flip(self.sensor.time_reversal_boundary_data, 1)
self.source.p_mode = "dirichlet"
# define a new sensor structure
Nx, Ny, Nz = self.kgrid.Nx, self.kgrid.Ny, self.kgrid.Nz
self.sensor = kSensor(mask=np.ones((Nx, Ny, max(1, Nz))), record=["p_final"])
# set time reversal flag
self.userarg_time_rev = True
self.record = Recorder()
self.record.p = False
else:
# set time reversal flag
self.userarg_time_rev = False
#: Whether sensor.mask should be re-ordered.
#: True if sensor.mask is Cartesian with nearest neighbour interpolation which is calculated using a binary mask
#: and thus must be re-ordered
self.reorder_data = False
#: Whether the sensor.mask is binary
self.binary_sensor_mask = True
#: If tse sensor is an object of the kWaveTransducer class
self.transducer_sensor = False
self.record = Recorder()
# transducer source flags
#: transducer is object of kWaveTransducer class
self.transducer_source = False
#: Apply receive elevation focus on the transducer
self.transducer_receive_elevation_focus = False
# general
self.COLOR_MAP = get_color_map() #: default color map
self.ESTIMATE_SIM_TIME_STEPS = 50 #: time steps used to estimate simulation time
self.HIGHEST_PRIME_FACTOR_WARNING = 7 #: largest prime factor before warning
self.KSPACE_CFL = 0.3 #: default CFL value used if kgrid.t_array is set to 'auto'
self.PSTD_CFL = 0.1 #: default CFL value used if kgrid.t_array is set to 'auto'
# source types
self.SOURCE_S_MODE_DEF = "additive" #: source mode for stress sources
self.SOURCE_P_MODE_DEF = "additive" #: source mode for pressure sources
self.SOURCE_U_MODE_DEF = "additive" #: source mode for velocity sources
# filenames
self.STREAM_TO_DISK_FILENAME = "temp_sensor_data.bin" #: default disk stream filename
self.LOG_NAME = ["k-Wave-Log-", get_date_string()] #: default log filename
self.calling_func_name = None
logging.log(logging.INFO, f" start time: {get_date_string()}")
self.c_ref, self.c_ref_compression, self.c_ref_shear = [None] * 3
self.transducer_input_signal = None
#: Indexing variable corresponding to the location of all the pressure source elements
self.p_source_pos_index = None
#: Indexing variable corresponding to the location of all the velocity source elements
self.u_source_pos_index = None
#: Indexing variable corresponding to the location of all the stress source elements
self.s_source_pos_index = None
#: Delay mask that accounts for the beamforming delays and elevation focussing
self.delay_mask = None
self.absorb_nabla1 = None #: absorbing fractional Laplacian operator
self.absorb_tau = None #: absorbing fractional Laplacian coefficient
self.absorb_nabla2 = None #: dispersive fractional Laplacian operator
self.absorb_eta = None #: dispersive fractional Laplacian coefficient
self.dt = None #: Alias to kgrid.dt
self.rho0 = None #: Alias to medium.density
self.c0 = None #: Alias to medium.sound_speed
self.index_data_type = None
@property
def equation_of_state(self):
"""
Returns:
Set equation of state variable
"""
if self.medium.absorbing:
if self.medium.stokes:
return "stokes"
else:
return "absorbing"
else:
return "loseless"
@property
def use_sensor(self):
"""
Returns:
False if no output of any kind is required
"""
return self.sensor is not None
@property
def blank_sensor(self):
"""
Returns
True if sensor.mask is not defined but _max_all or _final variables are still recorded
"""
fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"]
if not (isinstance(self.sensor, NotATransducer) or any(self.record.is_set(fields)) or self.time_rev):
return True
return False
@property
def kelvin_voigt_model(self):
"""
Returns:
Whether the simulation is elastic with absorption
"""
return False
@property
def nonuniform_grid(self):
"""
Returns:
True if the computational grid is non-uniform
"""
return self.kgrid.nonuniform
@property
def time_rev(self):
"""
Returns:
True for time reversal simulaions using sensor.time_reversal_boundary_data
"""
if self.sensor is not None and not isinstance(self.sensor, NotATransducer):
if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None:
return True
else:
return self.userarg_time_rev
@property
def elastic_time_rev(self):
"""
Returns:
True if using time reversal with the elastic code
"""
return False
@property
def compute_directivity(self):
"""
Returns:
True if directivity calculations in 2D are used by setting sensor.directivity_angle
"""
if self.sensor is not None and not isinstance(self.sensor, NotATransducer):
if self.kgrid.dim == 2:
# check for sensor directivity input and set flag
directivity = self.sensor.directivity
if directivity is not None and directivity.angle is not None:
return True
return False
@property
def cuboid_corners(self):
"""
Returns:
Whether the sensor.mask is a list of cuboid corners
"""
if self.sensor is not None and not isinstance(self.sensor, NotATransducer):
if not self.blank_sensor and self.sensor.mask.shape[0] == 2 * self.kgrid.dim:
return True
return self.userarg_cuboid_corners
##############
# flags which control the types of source used
##############
@property
def source_p0(self): # initial pressure
"""
Returns:
Whether initial pressure source is present (default=False)
"""
flag = False # default
if not isinstance(self.source, NotATransducer) and self.source.p0 is not None:
# set flag
flag = True
return flag
@property
def source_p0_elastic(self): # initial pressure in the elastic code
"""
Returns:
Whether initial pressure source is present in the elastic code (default=False)
"""
# Not clear where this flag is set
return False
@property
def source_p(self):
"""
Returns:
Whether time-varying pressure source is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.p is not None:
# set source flag to the length of the source, this allows source.p
# to be shorter than kgrid.Nt
flag = len(self.source.p[0])
return flag
@property
def source_p_labelled(self): # time-varying pressure with labelled source mask
"""
Returns:
True/False if labelled/binary source mask, respectively.
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.p is not None:
# check if the mask is binary or labelled
p_unique = np.unique(self.source.p_mask)
flag = not (p_unique.size <= 2 and p_unique.sum() == 1)
return flag
@property
def source_ux(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in X-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.ux is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.ux[0])
return flag
@property
def source_uy(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in Y-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.uy is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.uy[0])
return flag
@property
def source_uz(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in Z-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.uz is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.uz[0])
return flag
@property
def source_u_labelled(self):
"""
Returns:
Whether time-varying velocity source with labelled source mask is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.u_mask is not None:
# check if the mask is binary or labelled
u_unique = np.unique(self.source.u_mask)
if u_unique.size <= 2 and u_unique.sum() == 1:
# binary source mask
flag = False
else:
# labelled source mask
flag = True
return flag
@property
def source_sxx(self):
"""
Returns:
Whether time-varying stress source in X->X direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxx is not None:
flag = len(self.source.sxx[0])
return flag
@property
def source_syy(self):
"""
Returns:
Whether time-varying stress source in Y->Y direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.syy is not None:
flag = len(self.source.syy[0])
return flag
@property
def source_szz(self):
"""
Returns:
Whether time-varying stress source in Z->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.szz is not None:
flag = len(self.source.szz[0])
return flag
@property
def source_sxy(self):
"""
Returns:
Whether time-varying stress source in X->Y direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxy is not None:
flag = len(self.source.sxy[0])
return flag
@property
def source_sxz(self):
"""
Returns:
Whether time-varying stress source in X->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxz is not None:
flag = len(self.source.sxz[0])
return flag
@property
def source_syz(self):
"""
Returns:
Whether time-varying stress source in Y->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.syz is not None:
flag = len(self.source.syz[0])
return flag
@property
def source_s_labelled(self):
"""
Returns:
Whether time-varying stress source with labelled source mask is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.s_mask is not None:
# check if the mask is binary or labelled
s_unique = np.unique(self.source.s_mask)
if s_unique.size <= 2 and s_unique.sum() == 1:
# binary source mask
flag = False
else:
# labelled source mask
flag = True
return flag
@property
def use_w_source_correction_p(self):
"""
Returns:
Whether to use the w source correction instead of the k-space source correction for pressure sources
"""
flag = False
if not isinstance(self.source, NotATransducer):
if self.source.p is not None and self.source.p_frequency_ref is not None:
flag = True
return flag
@property
def use_w_source_correction_u(self):
"""
Returns:
Whether to use the w source correction instead of the k-space source correction for velocity sources
"""
flag = False
if not isinstance(self.source, NotATransducer):
if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]):
if self.source.u_frequency_ref is not None:
flag = True
return flag
[docs]
@staticmethod
def check_calling_func_name_and_dim(calling_func_name, kgrid_dim) -> None:
"""
Check correct function has been called for the dimensionality of kgrid
Args:
calling_func_name: Name of the script that makes calls to kWaveSimulation
kgrid_dim: Dimensionality of the kWaveGrid
Returns:
None
"""
assert not calling_func_name.startswith(("pstdElastic", "kspaceElastic")), "Elastic simulation is not supported."
if calling_func_name == "kspaceFirstOrder1D":
assert kgrid_dim == 1, f"kgrid has the wrong dimensionality for {calling_func_name}."
elif calling_func_name in ["kspaceFirstOrder2D", "pstdElastic2D", "kspaceElastic2D", "kspaceFirstOrderAS"]:
assert kgrid_dim == 2, f"kgrid has the wrong dimensionality for {calling_func_name}."
elif calling_func_name in ["kspaceFirstOrder3D", "pstdElastic3D", "kspaceElastic3D"]:
assert kgrid_dim == 3, f"kgrid has the wrong dimensionality for {calling_func_name}."
[docs]
@staticmethod
def print_start_status(is_elastic_code: bool) -> None:
"""
Update command-line status with the start time
Args:
is_elastic_code: is the simulation elastic
Returns:
None
"""
if is_elastic_code: # pragma: no cover
logging.log(logging.INFO, "Running k-Wave elastic simulation...")
else:
logging.log(logging.INFO, "Running k-Wave simulation...")
logging.log(logging.INFO, f" start time: {get_date_string()}")
[docs]
def set_index_data_type(self) -> None:
"""
Pre-calculate the data type needed to store the matrix indices given the
total number of grid points: indexing variables will be created using this data type to save memory
Returns:
None
"""
total_grid_points = self.kgrid.total_grid_points
self.index_data_type = get_smallest_possible_type(total_grid_points, "uint", default="double")
[docs]
@staticmethod
def check_medium(medium, kgrid_k, simulation_type: SimulationType) -> bool:
"""
Check the properties of the medium structure for correctness and validity
Args:
medium: kWaveMedium instance
kgrid_k: kWaveGrid.k matrix
is_elastic: Whether the simulation is elastic
is_axisymmetric: Whether the simulation is axisymmetric
Returns:
Medium Density
"""
# if using the fluid code, allow the density field to be blank if the medium is homogeneous
if (not simulation_type.is_elastic_simulation()) and medium.density is None and medium.sound_speed.size == 1:
user_medium_density_input = False
medium.density = 1
else:
medium.ensure_defined("density")
user_medium_density_input = True
# check medium absorption inputs for the fluid code
is_absorbing = any(medium.is_defined("alpha_coeff", "alpha_power"))
is_stokes = simulation_type.is_axisymmetric() or medium.alpha_mode == "stokes"
medium.set_absorbing(is_absorbing, is_stokes)
if is_absorbing:
medium.check_fields(kgrid_k.shape)
return user_medium_density_input
[docs]
def check_sensor(self, kgrid_dim) -> None:
"""
Check the Sensor properties for correctness and validity
Args:
k_dim: kWaveGrid dimensionality
Returns:
None
"""
# =========================================================================
# CHECK SENSOR STRUCTURE INPUTS
# =========================================================================
# check sensor fields
if self.sensor is not None:
# check the sensor input is valid
# TODO FARID move this check as a type checking
assert isinstance(
self.sensor, (kSensor, NotATransducer)
), "sensor must be defined as an object of the kSensor or kWaveTransducer class."
# check if sensor is a transducer, otherwise check input fields
if not isinstance(self.sensor, NotATransducer):
if kgrid_dim == 2:
# check for sensor directivity input and set flag
directivity = self.sensor.directivity
if directivity is not None and self.sensor.directivity.angle is not None:
# make sure the sensor mask is not blank
assert self.sensor.mask is not None, "The mask must be defined for the sensor"
# check sensor.directivity.pattern and sensor.mask have the same size
assert (
directivity.angle.shape == self.sensor.mask.shape
), "sensor.directivity.angle and sensor.mask must be the same size."
# check if directivity size input exists, otherwise make it
# a constant times kgrid.dx
if directivity.size is None:
directivity.set_default_size(self.kgrid)
# find the unique directivity angles
# assign the wavenumber vectors
directivity.set_unique_angles(self.sensor.mask)
directivity.set_wavenumbers(self.kgrid)
# check for time reversal inputs and set flags
if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None:
self.record.p = False
# check for sensor.record and set usage flgs - if no flgs are
# given, the time history of the acoustic pressure is recorded by
# default
if self.sensor.record is not None:
# check for time reversal data
if self.time_rev:
logging.log(logging.WARN, "sensor.record is not used for time reversal reconstructions")
# check the input is a cell array
assert isinstance(self.sensor.record, list), 'sensor.record must be given as a list, e.g. ["p", "u"]'
# check the sensor record flgs
self.record.set_flags_from_list(self.sensor.record, self.options.simulation_type.is_elastic_simulation())
# enforce the sensor.mask field unless just recording the max_all
# and _final variables
fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"]
if any(self.record.is_set(fields)):
assert self.sensor.mask is not None
# check if sensor mask is a binary grid, a set of cuboid corners,
# or a set of Cartesian interpolation points
if not self.blank_sensor:
if (kgrid_dim == 3 and num_dim2(self.sensor.mask) == 3) or (
kgrid_dim != 3 and (self.sensor.mask.shape == self.kgrid.k.shape)
):
# check the grid is binary
assert self.sensor.mask.sum() == (
self.sensor.mask.size - (self.sensor.mask == 0).sum()
), "sensor.mask must be a binary grid (numeric values must be 0 or 1)."
# check the grid is not empty
assert self.sensor.mask.sum() != 0, "sensor.mask must be a binary grid with at least one element set to 1."
elif self.sensor.mask.shape[0] == 2 * kgrid_dim:
# make sure the points are integers
assert np.all(self.sensor.mask % 1 == 0), "sensor.mask cuboid corner indices must be integers."
# store a copy of the cuboid corners
self.record.cuboid_corners_list = self.sensor.mask
# check the list makes sense
if np.any(self.sensor.mask[self.kgrid.dim :, :] - self.sensor.mask[: self.kgrid.dim, :] < 0):
if kgrid_dim == 1:
raise ValueError("sensor.mask cuboid corners must be defined " "as [x1, x2; ...]." " where x2 => x1, etc.")
elif kgrid_dim == 2:
raise ValueError(
"sensor.mask cuboid corners must be defined " "as [x1, y1, x2, y2; ...]." " where x2 => x1, etc."
)
elif kgrid_dim == 3:
raise ValueError(
"sensor.mask cuboid corners must be defined"
" as [x1, y1, z1, x2, y2, z2; ...]."
" where x2 => x1, etc."
)
# check the list are within bounds
if np.any(self.sensor.mask < 1):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
else:
if kgrid_dim == 1:
if np.any(self.sensor.mask > self.kgrid.Nx):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
elif kgrid_dim == 2:
if np.any(self.sensor.mask[[0, 2], :] > self.kgrid.Nx) or np.any(
self.sensor.mask[[1, 3], :] > self.kgrid.Ny
):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
elif kgrid_dim == 3:
if (
np.any(self.sensor.mask[[0, 3], :] > self.kgrid.Nx)
or np.any(self.sensor.mask[[1, 4], :] > self.kgrid.Ny)
or np.any(self.sensor.mask[[2, 5], :] > self.kgrid.Nz)
):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
# create a binary mask for display from the list of corners
# TODO FARID mask should be option_factory in sensor not here
self.sensor.mask = np.zeros_like(self.kgrid.k, dtype=bool)
cuboid_corners_list = self.record.cuboid_corners_list
for cuboid_index in range(cuboid_corners_list.shape[1]):
if self.kgrid.dim == 1:
self.sensor.mask[cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[1, cuboid_index]] = 1
if self.kgrid.dim == 2:
self.sensor.mask[
cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[2, cuboid_index],
cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[3, cuboid_index],
] = 1
if self.kgrid.dim == 3:
self.sensor.mask[
cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[3, cuboid_index],
cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[4, cuboid_index],
cuboid_corners_list[2, cuboid_index] : cuboid_corners_list[5, cuboid_index],
] = 1
else:
# check the Cartesian sensor mask is the correct size
# (1 x N, 2 x N, 3 x N)
assert (
self.sensor.mask.shape[0] == kgrid_dim and num_dim2(self.sensor.mask) <= 2
), f"Cartesian sensor.mask for a {kgrid_dim}D simulation must be given as a {kgrid_dim} by N array."
# set Cartesian mask flag (this is modified in
# createStorageVariables if the interpolation setting is
# set to nearest)
self.binary_sensor_mask = False
# extract Cartesian data from sensor mask
if kgrid_dim == 1:
# align sensor data as a column vector to be the
# same as kgrid.x_vec so that calls to interp1
# return data in the correct dimension
self.sensor_x = np.reshape((self.sensor.mask, (-1, 1)))
# add sensor_x to the record structure for use with
# the _extractSensorData subfunction
self.record.sensor_x = self.sensor_x
"record.sensor_x = sensor_x;"
elif kgrid_dim == 2:
self.sensor_x = self.sensor.mask[0, :]
self.sensor_y = self.sensor.mask[1, :]
elif kgrid_dim == 3:
self.sensor_x = self.sensor.mask[0, :]
self.sensor_y = self.sensor.mask[1, :]
self.sensor_z = self.sensor.mask[2, :]
# compute an equivalent sensor mask using nearest neighbour
# interpolation, if flgs.time_rev = false and
# cartesian_interp = 'linear' then this is only used for
# display, if flgs.time_rev = true or cartesian_interp =
# 'nearest' this grid is used as the sensor.mask
self.sensor.mask, self.order_index, self.reorder_index = cart2grid(
self.kgrid, self.sensor.mask, self.options.simulation_type.is_axisymmetric()
)
# if in time reversal mode, reorder the p0 input data in
# the order of the binary sensor_mask
if self.time_rev:
raise NotImplementedError
"""
# append the reordering data
new_col_pos = length(sensor.time_reversal_boundary_data(1, :)) + 1;
sensor.time_reversal_boundary_data(:, new_col_pos) = order_index;
# reorder p0 based on the order_index
sensor.time_reversal_boundary_data = sort_rows(sensor.time_reversal_boundary_data, new_col_pos);
# remove the reordering data
sensor.time_reversal_boundary_data = sensor.time_reversal_boundary_data(:, 1:new_col_pos - 1);
"""
else:
# set transducer sensor flag
self.transducer_sensor = True
self.record.p = False
# check to see if there is an elevation focus
if not np.isinf(self.sensor.elevation_focus_distance):
# set flag
self.transducer_receive_elevation_focus = True
# get the elevation mask that is used to extract the correct values
# from the sensor data buffer for averaging
self.transducer_receive_mask = self.sensor.elevation_beamforming_mask
# check for directivity inputs with time reversal
if kgrid_dim == 2 and self.use_sensor and self.compute_directivity and self.time_rev:
logging.log(logging.WARN, "sensor directivity fields are not used for time reversal.")
[docs]
def check_source(self, k_dim, k_Nt) -> None:
"""
Check the source properties for correctness and validity
Args:
kgrid_dim: kWaveGrid dimension
k_Nt: Number of time steps in kWaveGrid
Returns:
None
"""
# =========================================================================
# CHECK SOURCE STRUCTURE INPUTS
# =========================================================================
# check source inputs
if not isinstance(self.source, (kSource, NotATransducer)):
# allow an invalid or empty source input if computing time reversal,
# otherwise return error
assert self.time_rev, "source must be defined as an object of the kSource or kWaveTransducer classes."
elif not isinstance(self.source, NotATransducer):
# --------------------------
# SOURCE IS NOT A TRANSDUCER
# --------------------------
"""
check allowable source types
Depending on the kgrid dimensionality and the simulation type,
following fields are allowed & might be use:
kgrid.dim == 1:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'u_mask', 'u_mode', 'u_frequency_ref']
kgrid.dim == 2:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'u_mask', 'u_mode', 'u_frequency_ref']
elastic code:
['p0', 'sxx', 'syy', 'sxy', 's_mask', 's_mode', 'ux', 'uy', 'u_mask', 'u_mode']
kgrid.dim == 3:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'uz', 'u_mask', 'u_mode', 'u_frequency_ref']
elastic code:
['p0', 'sxx', 'syy', 'szz', 'sxy', 'sxz', 'syz', 's_mask', 's_mode', 'ux', 'uy', 'uz', 'u_mask', 'u_mode']
"""
self.source.validate(self.kgrid)
# check for a time varying pressure source input
if self.source.p is not None:
# check the source mode input is valid
if self.source.p_mode is None:
self.source.p_mode = self.SOURCE_P_MODE_DEF
if self.source_p > k_Nt:
logging.log(logging.WARN, " source.p has more time points than kgrid.Nt, remaining time points will not be used.")
# create an indexing variable corresponding to the location of all the source elements
self.p_source_pos_index = matlab_find(self.source.p_mask)
# check if the mask is binary or labelled
p_unique = np.unique(self.source.p_mask)
# create a second indexing variable
if p_unique.size <= 2 and p_unique.sum() == 1:
# set signal index to all elements
self.p_source_sig_index = ":"
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
self.p_source_sig_index = self.source.p_mask(self.source.p_mask != 0)
# convert the data type depending on the number of indices
self.p_source_pos_index = cast_to_type(self.p_source_pos_index, self.index_data_type)
if self.source_p_labelled:
self.p_source_sig_index = cast_to_type(self.p_source_sig_index, self.index_data_type)
# check for time varying velocity source input and set source flag
if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]):
# check the source mode input is valid
if self.source.u_mode is None:
self.source.u_mode = self.SOURCE_U_MODE_DEF
# create an indexing variable corresponding to the location of all
# the source elements
self.u_source_pos_index = matlab_find(self.source.u_mask)
# check if the mask is binary or labelled
u_unique = np.unique(self.source.u_mask)
# create a second indexing variable
if u_unique.size <= 2 and u_unique.sum() == 1:
# set signal index to all elements
self.u_source_sig_index = ":"
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
self.u_source_sig_index = self.source.u_mask[self.source.u_mask != 0]
# convert the data type depending on the number of indices
self.u_source_pos_index = cast_to_type(self.u_source_pos_index, self.index_data_type)
if self.source_u_labelled:
self.u_source_sig_index = cast_to_type(self.u_source_sig_index, self.index_data_type)
# check for time varying stress source input and set source flag
if any([(getattr(self.source, k) is not None) for k in ["sxx", "syy", "szz", "sxy", "sxz", "syz", "s_mask"]]):
# create an indexing variable corresponding to the location of all
# the source elements
raise NotImplementedError
"s_source_pos_index = find(source.s_mask != 0);"
# check if the mask is binary or labelled
"s_unique = unique(source.s_mask);"
# create a second indexing variable
if eng.eval("numel(s_unique) <= 2 && sum(s_unique) == 1"): # noqa: F821
# set signal index to all elements
eng.workspace["s_source_sig_index"] = ":" # noqa: F821
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
s_source_sig_index = source.s_mask(source.s_mask != 0) # noqa
f"s_source_pos_index = {self.index_data_type}(s_source_pos_index);"
if self.source_s_labelled:
f"s_source_sig_index = {self.index_data_type}(s_source_sig_index);"
else:
# ----------------------
# SOURCE IS A TRANSDUCER
# ----------------------
# if the sensor is a transducer, check that the simulation is in 3D
assert k_dim == 3, "Transducer inputs are only compatible with 3D simulations."
# get the input signal - this is appended with zeros if required to
# account for the beamforming delays (this will throw an error if the
# input signal is not defined)
self.transducer_input_signal = self.source.input_signal
# get the delay mask that accounts for the beamforming delays and
# elevation focussing; this is used so that a single time series can be
# applied to the complete transducer mask with different delays
delay_mask = self.source.delay_mask()
# set source flag - this should be the length of signal minus the
# maximum delay
self.transducer_source = self.transducer_input_signal.size - delay_mask.max()
# get the active elements mask
active_elements_mask = self.source.active_elements_mask
# get the apodization mask if not set to 'Rectangular' and convert to a
# linear array
if self.source.transmit_apodization == "Rectangular":
self.transducer_transmit_apodization = 1
else:
self.transducer_transmit_apodization = self.source.transmit_apodization_mask
self.transducer_transmit_apodization = self.transducer_transmit_apodization[active_elements_mask != 0]
# create indexing variable corresponding to the active elements
# and convert the data type depending on the number of indices
self.u_source_pos_index = matlab_find(active_elements_mask).astype(self.index_data_type)
# convert the delay mask to an indexing variable (this doesn't need to
# be modified if the grid is expanded) which tells each point in the
# source mask which point in the input_signal should be used
delay_mask = matlab_mask(delay_mask, active_elements_mask != 0) # compatibility
# convert the data type depending on the maximum value of the delay
# mask and the length of the source
smallest_type = get_smallest_possible_type(delay_mask.max(), "uint")
if smallest_type is not None:
delay_mask = delay_mask.astype(smallest_type)
# move forward by 1 as a delay of 0 corresponds to the first point in the input signal
self.delay_mask = delay_mask + 1
# clean up unused variables
del active_elements_mask
[docs]
def check_kgrid_time(self) -> None:
"""
Check time-related kWaveGrid inputs
Returns:
None
"""
# check kgrid for t_array existance, and create if not defined
if isinstance(self.kgrid.t_array, str) and self.kgrid.t_array == "auto":
# check for time reversal mode
if self.time_rev:
raise ValueError("kgrid.t_array (Nt and dt) must be defined explicitly in time reversal mode.")
# check for time varying sources
if (not self.source_p0_elastic) and (
self.source_p
or self.source_ux
or self.source_uy
or self.source_uz
or self.source_sxx
or self.source_syy
or self.source_szz
or self.source_sxy
or self.source_sxz
or self.source_syz
):
raise ValueError("kgrid.t_array (Nt and dt) must be defined explicitly when using a time-varying source.")
# create the time array using the compressional sound speed
self.kgrid.makeTime(self.medium.sound_speed, self.KSPACE_CFL)
# check kgrid.t_array for stability given medium properties
if not self.options.simulation_type.is_elastic_simulation():
# calculate the largest timestep for which the model is stable
dt_stability_limit = check_stability(self.kgrid, self.medium)
# give a warning if the timestep is larger than stability limit allows
if self.kgrid.dt > dt_stability_limit:
logging.log(logging.WARN, " time step may be too large for a stable simulation.")
[docs]
@staticmethod
def select_precision(opt: SimulationOptions):
"""
Select the minimal precision for storing the data
Args:
opt: SimulationOptions instance
Returns:
Minimal precision for variable allocation
"""
# set storage variable type based on data_cast - this enables the
# output variables to be directly created in the data_cast format,
# rather than creating them in double precision and then casting them
if opt.data_cast == "off":
precision = "double"
elif opt.data_cast == "single":
precision = "single"
elif opt.data_cast == "gsingle":
precision = "single"
elif opt.data_cast == "gdouble":
precision = "double"
elif opt.data_cast == "gpuArray":
raise NotImplementedError("gpuArray is not supported in Python-version")
elif opt.data_cast == "kWaveGPUsingle":
precision = "single"
elif opt.data_cast == "kWaveGPUdouble":
precision = "double"
else:
raise ValueError("'Unknown ''DataCast'' option'")
return precision
[docs]
def smooth_and_enlarge(self, source, k_dim, kgrid_N, opt: SimulationOptions) -> None:
"""
Smooth and enlarge grids
Args:
source: kWaveSource instance
k_dim: kWaveGrid dimensionality
kgrid_N: kWaveGrid size in each direction
opt: SimulationOptions
Returns:
None
"""
# smooth the initial pressure distribution p0 if required, and then restore
# the maximum magnitude
# NOTE 1: if p0 has any values at the edge of the domain, the smoothing
# may cause part of p0 to wrap to the other side of the domain
# NOTE 2: p0 is smoothed before the grid is expanded to ensure that p0 is
# exactly zero within the PML
# NOTE 3: for the axisymmetric code, p0 is smoothed assuming WS origin
# symmetry
if self.source_p0 and self.options.smooth_p0:
# update command line status
logging.log(logging.INFO, " smoothing p0 distribution...")
if self.options.simulation_type.is_axisymmetric():
if self.options.radial_symmetry in ["WSWA-FFT", "WSWA"]:
# create a new kWave grid object with expanded radial grid
kgrid_exp = kWaveGrid([kgrid_N.x, kgrid_N.y * 4], [self.kgrid.dx, self.kgrid.dy])
# mirror p0 in radial dimension using WSWA symmetry
self.source.p0 = self.source.p0.astype(float)
p0_exp = np.zeros((kgrid_exp.Nx, kgrid_exp.Ny))
p0_exp[:, kgrid_N.y * 0 + 0 : kgrid_N.y * 1] = self.source.p0
p0_exp[:, kgrid_N.y * 1 + 1 : kgrid_N.y * 2] = -np.fliplr(self.source.p0[:, 1:])
p0_exp[:, kgrid_N.y * 2 + 0 : kgrid_N.y * 3] = -self.source.p0
p0_exp[:, kgrid_N.y * 3 + 1 : kgrid_N.y * 4] = np.fliplr(self.source.p0[:, 1:])
elif self.options.radial_symmetry in ["WSWS-FFT", "WSWS"]:
# create a new kWave grid object with expanded radial grid
kgrid_exp = kWaveGrid([kgrid_N.x, kgrid_N.y * 2 - 2], [self.kgrid.dx, self.kgrid.dy])
# mirror p0 in radial dimension using WSWS symmetry
p0_exp = np.zeros((kgrid_exp.Nx, kgrid_exp.Ny))
p0_exp[:, 1 : kgrid_N.y] = source.p0
p0_exp[:, kgrid_N.y + 0 : kgrid_N.y * 2 - 2] = np.fliplr(source.p0[:, 1:-1])
# smooth p0
p0_exp = smooth(p0_exp, True)
# trim back to original size
source.p0 = p0_exp[:, 0 : self.kgrid.Ny]
# clean up unused variables
del kgrid_exp
del p0_exp
else:
source.p0 = smooth(source.p0, True)
# expand the computational grid if the PML is set to be outside the input
# grid defined by the user
if opt.pml_inside is False:
expand_results = expand_grid_matrices(
self.kgrid,
self.medium,
self.source,
self.sensor,
self.options,
dotdict(
{
"p_source_pos_index": self.p_source_pos_index,
"u_source_pos_index": self.u_source_pos_index,
"s_source_pos_index": self.s_source_pos_index,
}
),
dotdict(
{
"axisymmetric": self.options.simulation_type.is_axisymmetric(),
"use_sensor": self.use_sensor,
"blank_sensor": self.blank_sensor,
"cuboid_corners": self.cuboid_corners,
"source_p0": self.source_p0,
"source_p": self.source_p,
"source_ux": self.source_ux,
"source_uy": self.source_uy,
"source_uz": self.source_uz,
"transducer_source": self.transducer_source,
"source_sxx": self.source_sxx,
"source_syy": self.source_syy,
"source_szz": self.source_szz,
"source_sxy": self.source_sxy,
"source_sxz": self.source_sxz,
"source_syz": self.source_syz,
}
),
)
self.kgrid, self.index_data_type, self.p_source_pos_index, self.u_source_pos_index, self.s_source_pos_index = expand_results
# get maximum prime factors
if self.options.simulation_type.is_axisymmetric():
prime_facs = self.kgrid.highest_prime_factors(self.options.radial_symmetry[:4])
else:
prime_facs = self.kgrid.highest_prime_factors()
# give warning for bad dimension sizes
if prime_facs.max() > self.HIGHEST_PRIME_FACTOR_WARNING:
prime_facs = prime_facs[prime_facs != 0]
logging.log(logging.WARN, f"Highest prime factors in each dimension are {prime_facs}")
logging.log(logging.WARN, "Use dimension sizes with lower prime factors to improve speed")
del prime_facs
# smooth the sound speed distribution if required
if opt.smooth_c0 and num_dim2(self.medium.sound_speed) == k_dim and self.medium.sound_speed.size > 1:
logging.log(logging.INFO, " smoothing sound speed distribution...")
self.medium.sound_speed = smooth(self.medium.sound_speed)
# smooth the ambient density distribution if required
if opt.smooth_rho0 and num_dim2(self.medium.density) == k_dim and self.medium.density.size > 1:
logging.log(logging.INFO, "smoothing density distribution...")
self.medium.density = smooth(self.medium.density)
[docs]
def create_sensor_variables(self) -> None:
"""
Create the sensor related variables
Returns:
None
"""
# define the output variables and mask indices if using the sensor
if self.use_sensor:
if not self.blank_sensor or self.options.save_to_disk:
if self.cuboid_corners:
# create empty list of sensor indices
self.sensor_mask_index = []
# loop through the list of cuboid corners, and extract the
# sensor mask indices for each cube
for cuboid_index in range(self.record.cuboid_corners_list.shape[1]):
# create empty binary mask
temp_mask = np.zeros_like(self.kgrid.k, dtype=bool)
if self.kgrid.dim == 1:
self.sensor.mask[
self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[1, cuboid_index]
] = 1
if self.kgrid.dim == 2:
self.sensor.mask[
self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[2, cuboid_index],
self.record.cuboid_corners_list[1, cuboid_index] : self.record.cuboid_corners_list[3, cuboid_index],
] = 1
if self.kgrid.dim == 3:
self.sensor.mask[
self.record.cuboid_corners_list[0, cuboid_index] : self.record.cuboid_corners_list[3, cuboid_index],
self.record.cuboid_corners_list[1, cuboid_index] : self.record.cuboid_corners_list[4, cuboid_index],
self.record.cuboid_corners_list[2, cuboid_index] : self.record.cuboid_corners_list[5, cuboid_index],
] = 1
# extract mask indices
self.sensor_mask_index.append(matlab_find(temp_mask))
self.sensor_mask_index = np.array(self.sensor_mask_index)
# cleanup unused variables
del temp_mask
else:
# create mask indices (this works for both normal sensor and
# transducer inputs)
self.sensor_mask_index = np.where(self.sensor.mask.flatten(order="F") != 0)[0] + 1 # +1 due to matlab indexing
self.sensor_mask_index = np.expand_dims(self.sensor_mask_index, -1) # compatibility, n => [n, 1]
# convert the data type depending on the number of indices (this saves
# memory)
self.sensor_mask_index = cast_to_type(self.sensor_mask_index, self.index_data_type)
else:
# set the sensor mask index variable to be empty
self.sensor_mask_index = []
[docs]
def create_absorption_vars(self) -> None:
"""
Create absorption variables for the fluid code based on
the expanded and smoothed values of the medium parameters (if not saving to disk)
Returns:
None
"""
if not self.options.simulation_type.is_elastic_simulation() and not self.options.save_to_disk:
self.absorb_nabla1, self.absorb_nabla2, self.absorb_tau, self.absorb_eta = create_absorption_variables(
self.kgrid, self.medium, self.equation_of_state
)
[docs]
def assign_pseudonyms(self, medium: kWaveMedium, kgrid: kWaveGrid) -> None:
"""
Shorten commonly used field names (these act only as pointers provided that the values aren't modified)
(done after enlarging and smoothing the grids)
Args:
medium: kWaveMedium instance
kgrid: kWaveGrid instance
Returns:
None
"""
self.dt = float(kgrid.dt)
self.rho0 = medium.density
self.c0 = medium.sound_speed
[docs]
def scale_source_terms(self, is_scale_source_terms) -> None:
"""
Scale the source terms based on the expanded and smoothed values of the medium parameters
Args:
is_scale_source_terms: Should the source terms be scaled
Returns:
None
"""
if not is_scale_source_terms:
return
try:
p_source_pos_index = self.p_source_pos_index
except AttributeError:
p_source_pos_index = None
try:
s_source_pos_index = self.s_source_pos_index
except AttributeError:
s_source_pos_index = None
try:
u_source_pos_index = self.u_source_pos_index
except AttributeError:
u_source_pos_index = None
self.transducer_input_signal = scale_source_terms_func(
self.c0,
self.dt,
self.kgrid,
self.source,
p_source_pos_index,
s_source_pos_index,
u_source_pos_index,
self.transducer_input_signal,
dotdict(
{
"nonuniform_grid": self.nonuniform_grid,
"source_ux": self.source_ux,
"source_uy": self.source_uy,
"source_uz": self.source_uz,
"transducer_source": self.transducer_source,
"source_p": self.source_p,
"source_p0": self.source_p0,
"use_w_source_correction_p": self.use_w_source_correction_p,
"use_w_source_correction_u": self.use_w_source_correction_u,
"source_sxx": self.source_sxx,
"source_syy": self.source_syy,
"source_szz": self.source_szz,
"source_sxy": self.source_sxy,
"source_sxz": self.source_sxz,
"source_syz": self.source_syz,
}
),
)
[docs]
def create_pml_indices(self, kgrid_dim, kgrid_N: Vector, pml_size, pml_inside, is_axisymmetric):
"""
Define index variables to remove the PML from the display if the optional
input 'PlotPML' is set to false
Args:
kgrid_dim: kWaveGrid dimensinality
kgrid_N: kWaveGrid size in each direction
pml_size: Size of the PML
pml_inside: Whether the PML is inside the grid defined by the user
is_axisymmetric: Whether the simulation is axisymmetric
"""
# comment by Farid: PlotPML is always False in Python version,
# therefore if statement removed
if kgrid_dim == 1:
self.x1 = pml_size.x + 1.0
self.x2 = kgrid_N.x - pml_size.x
elif kgrid_dim == 2:
self.x1 = pml_size.x + 1.0
self.x2 = kgrid_N.x - pml_size.x
if is_axisymmetric:
self.y1 = 1.0
else:
self.y1 = pml_size.y + 1.0
self.y2 = kgrid_N.y - pml_size.y
elif kgrid_dim == 3:
self.x1 = pml_size.x + 1.0
self.x2 = kgrid_N.x - pml_size.x
self.y1 = pml_size.y + 1.0
self.y2 = kgrid_N.y - pml_size.y
self.z1 = pml_size.z + 1.0
self.z2 = kgrid_N.z - pml_size.z
# define index variables to allow original grid size to be maintained for
# the _final and _all output variables if 'PMLInside' is set to false
# if self.record is None:
# self.record = Recorder()
self.record.set_index_variables(self.kgrid, pml_size, pml_inside, is_axisymmetric)