Source code for kwave.ksource

from dataclasses import dataclass
import logging

import numpy as np

from kwave.kgrid import kWaveGrid
from kwave.utils.matrix import num_dim2


[docs] @dataclass class kSource(object): _p0 = None #: time varying pressure at each of the source positions given by source.p_mask p = None #: binary matrix specifying the positions of the time varying pressure source distribution p_mask = None #: optional input to control whether the input pressure is injected as a mass source or enforced # as a dirichlet boundary condition; valid inputs are 'additive' (the default) or 'dirichlet' p_mode = None #: Pressure reference frequency p_frequency_ref = None #: time varying particle velocity in the x-direction at each of the source positions given by source.u_mask ux = None #: time varying particle velocity in the y-direction at each of the source positions given by source.u_mask uy = None #: time varying particle velocity in the z-direction at each of the source positions given by source.u_mask uz = None #: binary matrix specifying the positions of the time varying particle velocity distribution u_mask = None #: optional input to control whether the input velocity is applied as a force source or enforced as a dirichlet # boundary condition; valid inputs are 'additive' (the default) or 'dirichlet' u_mode = None #: Velocity reference frequency u_frequency_ref = None sxx = None #: Stress source in x -> x direction syy = None #: Stress source in y -> y direction szz = None #: Stress source in z -> z direction sxy = None #: Stress source in x -> y direction sxz = None #: Stress source in x -> z direction syz = None #: Stress source in y -> z direction s_mask = None #: Stress source mask s_mode = None #: Stress source mode
[docs] def is_p0_empty(self) -> bool: """ Check if the `p0` field is set and not empty """ return self.p0 is None or len(self.p0) == 0 or (np.sum(self.p0 != 0) == 0)
@property def p0(self): """ Initial pressure within the acoustic medium """ return self._p0 @p0.setter def p0(self, val): # check size and contents if len(val) == 0 or np.sum(val != 0) == 0: # if the initial pressure is empty, remove field self._p0 = None else: self._p0 = val
[docs] def validate(self, kgrid: kWaveGrid) -> None: """ Validate the object fields for correctness Args: kgrid: Instance of `~kwave.kgrid.kWaveGrid` class Returns: None """ if self.p0 is not None: if self.p0.shape != kgrid.k.shape: # throw an error if p0 is not the correct size raise ValueError('source.p0 must be the same size as the computational grid.') # if using the elastic code, reformulate source.p0 in terms of the # stress source terms using the fact that source.p = [0.5 0.5] / # (2*CFL) is the same as source.p0 = 1 # if self.elastic_code: # raise NotImplementedError # check for a time varying pressure source input if self.p is not None: # force p_mask to be given if p is given assert self.p_mask is not None # check mask is the correct size # noinspection PyTypeChecker if (num_dim2(self.p_mask) != kgrid.dim) or (self.p_mask.shape != kgrid.k.shape): raise ValueError('source.p_mask must be the same size as the computational grid.') # check mask is not empty assert np.sum(self.p_mask) != 0, 'source.p_mask must be a binary grid with at least one element set to 1.' # don't allow both source.p0 and source.p in the same simulation # USERS: please contact us via http://www.k-wave.org/forum if this # is a problem assert self.p0 is None, "source.p0 and source.p can't be defined in the same simulation." # check the source mode input is valid if self.p_mode is not None: assert self.p_mode in ['additive', 'dirichlet', 'additive-no-correction'], \ "source.p_mode must be set to ''additive'', ''additive-no-correction'', or ''dirichlet''." # check if a reference frequency is defined if self.p_frequency_ref is not None: # check frequency is a scalar, positive number assert np.isscalar(self.p_frequency_ref) and self.p_frequency_ref > 0 # check frequency is within range assert self.p_frequency_ref <= kgrid.k_max_all * np.min(self.medium.sound_speed / 2 * np.pi), \ 'source.p_frequency_ref is higher than the maximum frequency supported by the spatial grid.' # change source mode to no include k-space correction self.p_mode = 'additive-no-correction' if len(self.p[0]) > kgrid.Nt: logging.log(logging.WARN, ' source.p has more time points than kgrid.Nt, remaining time points will not be used.') # check if the mask is binary or labelled p_unique = np.unique(self.p_mask) # create a second indexing variable if p_unique.size <= 2 and p_unique.sum() == 1: # if more than one time series is given, check the number of time # series given matches the number of source elements, or the number # of labelled sources if self.p.shape[0] > 1 and (len(self.p[:, 0]) != self.p_mask.sum()): raise ValueError('The number of time series in source.p ' 'must match the number of source elements in source.p_mask.') else: # check the source labels are monotonic, and start from 1 if (sum(p_unique[1:] - p_unique[:-1]) != len(p_unique) - 1) or (not any(p_unique == 1)): raise ValueError( 'If using a labelled source.p_mask, ' 'the source labels must be monotonically increasing and start from 1.') # make sure the correct number of input signals are given if np.size(self.p, 1) != (np.size(p_unique) - 1): raise ValueError('The number of time series in source.p ' 'must match the number of labelled source elements in source.p_mask.') # check for time varying velocity source input and set source flag if any([(getattr(self, k) is not None) for k in ['ux', 'uy', 'uz', 'u_mask']]): # force u_mask to be given assert self.u_mask is not None # check mask is the correct size assert num_dim2(self.u_mask) == kgrid.dim and self.u_mask.shape == kgrid.k.shape, \ 'source.u_mask must be the same size as the computational grid.' # check mask is not empty assert np.array(self.u_mask).sum() != 0, \ 'source.u_mask must be a binary grid with at least one element set to 1.' # check the source mode input is valid if self.u_mode is not None: assert self.u_mode in ['additive', 'dirichlet', 'additive-no-correction'], \ "source.u_mode must be set to ''additive'', ''additive-no-correction'', or ''dirichlet''." # check if a reference frequency is defined if self.u_frequency_ref is not None: # check frequency is a scalar, positive number u_frequency_ref = self.u_frequency_ref assert np.isscalar(u_frequency_ref) and u_frequency_ref > 0 # check frequency is within range assert self.u_frequency_ref <= (kgrid.k_max_all * np.min(self.medium.sound_speed) / 2 * np.pi), \ "source.u_frequency_ref is higher than the maximum frequency supported by the spatial grid." # change source mode to no include k-space correction self.u_mode = 'additive-no-correction' if self.ux is not None: if self.flag_ux > kgrid.Nt: logging.log(logging.WARN, ' source.ux has more time points than kgrid.Nt, ' 'remaining time points will not be used.') if self.uy is not None: if self.flag_uy > kgrid.Nt: logging.log(logging.WARN, ' source.uy has more time points than kgrid.Nt, ' 'remaining time points will not be used.') if self.uz is not None: if self.flag_uz > kgrid.Nt: logging.log(logging.WARN, ' source.uz has more time points than kgrid.Nt, ' 'remaining time points will not be used.') # check if the mask is binary or labelled u_unique = np.unique(self.u_mask) # create a second indexing variable if u_unique.size <= 2 and u_unique.sum() == 1: # if more than one time series is given, check the number of time # series given matches the number of source elements ux_size = self.ux[:, 0].size uy_size = self.uy[:, 0].size if (self.uy is not None) else None uz_size = self.uz[:, 0].size if (self.uz is not None) else None u_sum = np.sum(self.u_mask) if (self.flag_ux and (ux_size > 1)) or \ (self.flag_uy and (uy_size > 1)) or \ (self.flag_uz and (uz_size > 1)): if (self.flag_ux and (ux_size != u_sum)) and \ (self.flag_uy and (uy_size != u_sum)) or \ (self.flag_uz and (uz_size != u_sum)): raise ValueError('The number of time series in source.ux (etc) ' 'must match the number of source elements in source.u_mask.') # if more than one time series is given, check the number of time # series given matches the number of source elements if (self.flag_ux and (ux_size > 1)) or \ (self.flag_uy and (uy_size > 1)) or (self.flag_uz and (uz_size > 1)): if (self.flag_ux and (ux_size != u_sum)) or \ (self.flag_uy and (uy_size != u_sum)) or (self.flag_uz and (uz_size != u_sum)): raise ValueError('The number of time series in source.ux (etc) ' 'must match the number of source elements in source.u_mask.') else: raise NotImplementedError # check the source labels are monotonic, and start from 1 # if (sum(u_unique(2:end) - u_unique(1:end-1)) != (numel(u_unique) - 1)) or (~any(u_unique == 1)) if eng.eval('(sum(u_unique(2:end) - ' 'u_unique(1:end-1)) ~= ' '(numel(u_unique) - 1)) ' '|| ' '(~any(u_unique == 1))'): raise ValueError('If using a labelled source.u_mask, ' 'the source labels must be monotonically increasing and start from 1.') # if more than one time series is given, check the number of time # series given matches the number of source elements # if (flgs.source_ux and (size(source.ux, 1) != (numel(u_unique) - 1))) or # (flgs.source_uy and (size(source.uy, 1) != (numel(u_unique) - 1))) or # (flgs.source_uz and (size(source.uz, 1) != (numel(u_unique) - 1))) if eng.eval('(flgs.source_ux && (size(source.ux, 1) ~= (numel(u_unique) - 1))) ' '|| (flgs.source_uy && (size(source.uy, 1) ~= (numel(u_unique) - 1))) ' '|| ' '(flgs.source_uz && (size(source.uz, 1) ~= (numel(u_unique) - 1)))'): raise ValueError('The number of time series in source.ux (etc) ' 'must match the number of labelled source elements in source.u_mask.') # check for time varying stress source input and set source flag if any([(getattr(self, k) is not None) for k in ['sxx', 'syy', 'szz', 'sxy', 'sxz', 'syz', 's_mask']]): # force s_mask to be given enforce_fields(self, 's_mask') # check mask is the correct size # if (numDim(source.s_mask) != kgrid.dim) or (all(size(source.s_mask) != size(kgrid.k))) if eng.eval('(numDim(source.s_mask) ~= kgrid.dim) || (all(size(source.s_mask) ~= size(kgrid.k)))'): raise ValueError('source.s_mask must be the same size as the computational grid.') # check mask is not empty assert np.array(eng.getfield(source, 's_mask')) != 0, \ "source.s_mask must be a binary grid with at least one element set to 1." # check the source mode input is valid if eng.isfield(source, 's_mode'): assert eng.getfield(source, 's_mode') in ['additive', 'dirichlet'], \ "source.s_mode must be set to ''additive'' or ''dirichlet''." else: eng.setfield(source, 's_mode', self.SOURCE_S_MODE_DEF) # set source flgs to the length of the sources, this allows the # inputs to be defined independently and be of any length if self.sxx is not None and self_sxx > k_Nt: logging.log(logging.WARN, ' source.sxx has more time points than kgrid.Nt,' ' remaining time points will not be used.') if self.syy is not None and self_syy > k_Nt: logging.log(logging.WARN, ' source.syy has more time points than kgrid.Nt,' ' remaining time points will not be used.') if self.szz is not None and self_szz > k_Nt: logging.log(logging.WARN, ' source.szz has more time points than kgrid.Nt,' ' remaining time points will not be used.') if self.sxy is not None and self_sxy > k_Nt: logging.log(logging.WARN, ' source.sxy has more time points than kgrid.Nt,' ' remaining time points will not be used.') if self.sxz is not None and self_sxz > k_Nt: logging.log(logging.WARN, ' source.sxz has more time points than kgrid.Nt,' ' remaining time points will not be used.') if self.syz is not None and self_syz > k_Nt: logging.log(logging.WARN, ' source.syz has more time points than kgrid.Nt,' ' remaining time points will not be used.') # create an indexing variable corresponding to the location of all # the source elements raise NotImplementedError # check if the mask is binary or labelled 's_unique = unique(source.s_mask);' # create a second indexing variable if eng.eval('numel(s_unique) <= 2 && sum(s_unique) == 1'): s_mask = eng.getfield(source, 's_mask') s_mask_sum = np.array(s_mask).sum() # if more than one time series is given, check the number of time # series given matches the number of source elements if (self.source_sxx and (eng.eval('length(source.sxx(:,1)) > 1))'))) or \ (self.source_syy and (eng.eval('length(source.syy(:,1)) > 1))'))) or \ (self.source_szz and (eng.eval('length(source.szz(:,1)) > 1))'))) or \ (self.source_sxy and (eng.eval('length(source.sxy(:,1)) > 1))'))) or \ (self.source_sxz and (eng.eval('length(source.sxz(:,1)) > 1))'))) or \ (self.source_syz and (eng.eval('length(source.syz(:,1)) > 1))'))): if (self.source_sxx and (eng.eval('length(source.sxx(:,1))') != s_mask_sum)) or \ (self.source_syy and (eng.eval('length(source.syy(:,1))') != s_mask_sum)) or \ (self.source_szz and (eng.eval('length(source.szz(:,1))') != s_mask_sum)) or \ (self.source_sxy and (eng.eval('length(source.sxy(:,1))') != s_mask_sum)) or \ (self.source_sxz and (eng.eval('length(source.sxz(:,1))') != s_mask_sum)) or \ (self.source_syz and (eng.eval('length(source.syz(:,1))') != s_mask_sum)): raise ValueError('The number of time series in source.sxx (etc) ' 'must match the number of source elements in source.s_mask.') else: # check the source labels are monotonic, and start from 1 # if (sum(s_unique(2:end) - s_unique(1:end-1)) != (numel(s_unique) - 1)) or (~any(s_unique == 1)) if eng.eval('(sum(s_unique(2:end) - s_unique(1:end-1)) ~= ' '(numel(s_unique) - 1)) || (~any(s_unique == 1))'): raise ValueError('If using a labelled source.s_mask, ' 'the source labels must be monotonically increasing and start from 1.') numel_s_unique = eng.eval('numel(s_unique) - 1;') # if more than one time series is given, check the number of time # series given matches the number of source elements if (self.source_sxx and (eng.eval('size(source.sxx, 1)') != numel_s_unique)) or \ (self.source_syy and (eng.eval('size(source.syy, 1)') != numel_s_unique)) or \ (self.source_szz and (eng.eval('size(source.szz, 1)') != numel_s_unique)) or \ (self.source_sxy and (eng.eval('size(source.sxy, 1)') != numel_s_unique)) or \ (self.source_sxz and (eng.eval('size(source.sxz, 1)') != numel_s_unique)) or \ (self.source_syz and (eng.eval('size(source.syz, 1)') != numel_s_unique)): raise ValueError('The number of time series in source.sxx (etc) ' 'must match the number of labelled source elements in source.u_mask.')
@property def flag_ux(self): """ Get the length of the sources in X-direction, this allows the inputs to be defined independently and be of any length Returns: Length of the sources """ return len(self.ux[0]) if self.ux is not None else 0 @property def flag_uy(self): """ Get the length of the sources in X-direction, this allows the inputs to be defined independently and be of any length Returns: Length of the sources """ return len(self.uy[0]) if self.uy is not None else 0 @property def flag_uz(self): """ Get the length of the sources in X-direction, this allows the inputs to be defined independently and be of any length Returns: Length of the sources """ return len(self.uz[0]) if self.uz is not None else 0