Source code for kwave.options.simulation_options

from __future__ import annotations

import os
from dataclasses import dataclass, field
from enum import Enum
from tempfile import gettempdir
from typing import TYPE_CHECKING, List, Optional

import numpy as np

if TYPE_CHECKING:
    # Found here: https://adamj.eu/tech/2021/05/13/python-type-hints-how-to-fix-circular-imports/
    from kwave.kgrid import kWaveGrid
from kwave.utils.data import get_date_string
from kwave.utils.io import get_h5_literals
from kwave.utils.pml import get_optimal_pml_size


class SimulationType(Enum):
    """
    Enum for the simulation type

    In the original matlab code, simulation type was determined
        by looking at the calling function name and the user args.

    Rules from the original matlab code:
        AXISYMMETRIC => if the calling function name started with 'kspaceFirstOrderAS'
                            or if the userarg_axisymmetric is set to true
        ELASTIC => if the calling function name started with 'pstdElastic' or 'kspaceElastic'
        ELASTIC_WITH_KSPACE_CORRECTION => if the calling function name started with 'kspaceElastic'
    """

    FLUID = 1
    AXISYMMETRIC = 2
    ELASTIC = 3
    ELASTIC_WITH_KSPACE_CORRECTION = 4

    def is_elastic_simulation(self):
        return self in [SimulationType.ELASTIC, SimulationType.ELASTIC_WITH_KSPACE_CORRECTION]

    def is_axisymmetric(self):
        return self == SimulationType.AXISYMMETRIC


[docs] @dataclass class SimulationOptions(object): """ Args: axisymmetric: Flag that indicates whether axisymmetric simulation is used cart_interp: Interpolation mode used to extract the pressure when a Cartesian sensor mask is given. If set to 'nearest' and more than one Cartesian point maps to the same grid point, duplicated data points are discarded and sensor_data will be returned with less points than that specified by sensor.mask (default = 'linear'). pml_inside: put the PML inside the grid defined by the user pml_alpha: Absorption within the perfectly matched layer in Nepers per grid point (default = 2). save_to_disk: save the input data to a HDF5 file save_to_disk_exit: exit the simulation after saving the HDF5 file scale_source_terms: apply the source scaling term to time varying sources smooth_c0: smooth the sound speed distribution smooth_rho0: smooth the density distribution smooth_p0: smooth the initial pressure distribution use_kspace: use the k-space correction use_sg: use a staggered grid use_fd: Use finite difference gradients instead of spectral (in 1D) pml_auto: automatically choose the PML size to give small prime factors create_log: create a log using diary use_finite_difference: use finite difference gradients instead of spectral (in 1D) stream_to_disk: String containing a filename (including pathname if required). If set, after the precomputation phase, the input variables used in the time loop are saved the specified location in HDF5 format. The simulation then exits. The saved variables can be used to run simulations using the C++ code. data_recast: recast the sensor data back to double precision cartesian_interp: interpolation mode for Cartesian sensor mask hdf_compression_level: zip compression level for HDF5 input files data_cast: data cast pml_search_range: search range used when automatically determining PML size radial_symmetry: radial symmetry used in axisymmetric code multi_axial_PML_ratio: MPML settings pml_x_alpha: PML Alpha for x-axis pml_y_alpha: PML Alpha for y-axis pml_z_alpha: PML Alpha for z-axis pml_x_size: PML Size for x-axis pml_y_size: PML Size for y-axis pml_z_size: PML Size for z-axis """ simulation_type: SimulationType = SimulationType.FLUID cart_interp: str = "linear" pml_inside: bool = True pml_alpha: float = 2.0 save_to_disk: bool = False save_to_disk_exit: bool = False scale_source_terms: bool = True smooth_c0: bool = False smooth_rho0: bool = False smooth_p0: bool = True use_kspace: bool = True use_sg: bool = True use_fd: Optional[int] = None pml_auto: bool = False create_log: bool = False use_finite_difference: bool = False stream_to_disk: bool = False data_recast: Optional[bool] = False cartesian_interp: str = "linear" hdf_compression_level: Optional[int] = None data_cast: str = "off" pml_search_range: List[int] = field(default_factory=lambda: [10, 40]) radial_symmetry: str = "WSWA-FFT" multi_axial_PML_ratio: float = 0.1 data_path: Optional[str] = field(default_factory=lambda: gettempdir()) input_filename: Optional[str] = field(default_factory=lambda: f"{get_date_string()}_kwave_input.h5") output_filename: Optional[str] = field(default_factory=lambda: f"{get_date_string()}_kwave_output.h5") pml_x_alpha: Optional[float] = None pml_y_alpha: Optional[float] = None pml_z_alpha: Optional[float] = None pml_size: Optional[List[int]] = None pml_x_size: Optional[int] = None pml_y_size: Optional[int] = None pml_z_size: Optional[int] = None def __post_init__(self): assert self.cartesian_interp in [ "linear", "nearest", ], "Optional input ''cartesian_interp'' must be set to ''linear'' or ''nearest''." assert isinstance(self.data_cast, str), "Optional input ''data_cast'' must be a string." assert self.data_cast in ["off", "double", "single"], "Invalid input for ''data_cast''." if self.data_cast == "double": self.data_cast = "off" # load the HDF5 literals (for the default compression level) h5_literals = get_h5_literals() self.hdf_compression_level = h5_literals.HDF_COMPRESSION_LEVEL # check value is an integer between 0 and 9 assert ( isinstance(self.hdf_compression_level, int) and 0 <= self.hdf_compression_level <= 9 ), "Optional input ''hdf_compression_level'' must be an integer between 0 and 9." assert ( np.isscalar(self.multi_axial_PML_ratio) and self.multi_axial_PML_ratio >= 0 ), "Optional input ''multi_axial_PML_ratio'' must be a single positive value." assert np.isscalar(self.stream_to_disk) or isinstance( self.stream_to_disk, bool ), "Optional input ''stream_to_disk'' must be a single scalar or Boolean value." boolean_inputs = { "use_sg": self.use_sg, "data_recast": self.data_recast, "save_to_disk_exit": self.save_to_disk_exit, "use_kspace": self.use_kspace, "save_to_disk": self.save_to_disk, "pml_inside": self.pml_inside, "create_log": self.create_log, "scale_source_terms": self.scale_source_terms, } for key, val in boolean_inputs.items(): assert isinstance(val, bool), f"Optional input ''{key}'' must be Boolean." assert self.radial_symmetry in [ "WSWA", "WSWS", "WSWA-FFT", "WSWS-FFT", ], "Optional input ''RadialSymmetry'' must be set to ''WSWA'', ''WSWS'', ''WSWA-FFT'', ''WSWS-FFT''." # automatically assign the PML size to give small prime factors if self.pml_auto and self.pml_inside: raise NotImplementedError("''pml_size'' set to ''auto'' is only supported with ''pml_inside'' set to false.") if self.pml_size is not None: # TODO(walter): remove auto option in exchange for pml_auto=True if isinstance(self.pml_size, int): self.pml_size = np.array([self.pml_size]) if not isinstance(self.pml_size, (list, np.ndarray)): raise ValueError("Optional input ''PMLSize'' must be a integer array of 1, 2 or 3 dimensions.") # Check if each member variable is None, and set it to self.pml_alpha if it is self.pml_x_alpha = self.pml_alpha if self.pml_x_alpha is None else self.pml_x_alpha self.pml_y_alpha = self.pml_alpha if self.pml_y_alpha is None else self.pml_y_alpha self.pml_z_alpha = self.pml_alpha if self.pml_z_alpha is None else self.pml_z_alpha # add pathname to input and output filenames self.input_filename = os.path.join(self.data_path, self.input_filename) self.output_filename = os.path.join(self.data_path, self.output_filename) assert self.use_fd is None or ( np.issubdtype(self.use_fd, np.number) and self.use_fd in [2, 4] ), "Optional input ''UseFD'' can only be set to 2, 4."
[docs] @staticmethod def option_factory(kgrid: "kWaveGrid", options: SimulationOptions): """ Initialize the Simulation Options Args: kgrid: kWaveGrid instance elastic_code: Flag that indicates whether elastic simulation is used **kwargs: Dictionary that holds following optional simulation properties: * cart_interp: Interpolation mode used to extract the pressure when a Cartesian sensor mask is given. If set to 'nearest', duplicated data points are discarded and sensor_data will be returned with fewer points than specified by sensor.mask (default = 'linear'). * create_log: Boolean controlling whether the command line output is saved using the diary function with a date and time stamped filename (default = false). * data_cast: String input of the data type that variables are cast to before computation. For example, setting to 'single' will speed up the computation time (due to the improved efficiency of fftn and ifftn for this data type) at the expense of a loss in precision (default = 'off'). * data_recast: Boolean controlling whether the output data is cast back to double precision. If set to false, sensor_data will be returned in the data format set using the 'data_cast' option. * hdf_compression_level: Compression level used for writing the input HDF5 file when using 'save_to_disk' or kspaceFirstOrder3DC. Can be set to an integer between 0 (no compression, the default) and 9 (maximum compression). The compression is lossless. Increasing the compression level will reduce the file size if there are portions of the medium that are homogeneous, but will also increase the time to create the HDF5 file. * multi_axial_pml_ratio: MPML settings * pml_alpha: Absorption within the perfectly matched layer in Nepers per grid point (default = 2). * pml_inside: Boolean controlling whether the perfectly matched layer is inside or outside the grid. If set to false, the input grids are enlarged by pml_size before running the simulation (default = true). * pml_range: Search range used when automatically determining PML size. Tuple of two elements * pml_size: Size of the perfectly matched layer in grid points. By default, the PML is added evenly to all sides of the grid, however, both pml_size and pml_alpha can be given as three element arrays to specify the x, y, and z properties, respectively. To remove the PML, set the appropriate pml_alpha to zero rather than forcing the PML to be of zero size (default = 10). * radial_symmetry: Radial symmetry used in axisymmetric code * stream_to_disk: Boolean controlling whether sensor_data is periodically saved to disk to avoid storing the complete matrix in memory. StreamToDisk may also be given as an integer which specifies the number of time steps that are taken before the data is saved to disk (default = 200). * save_to_disk: String containing a filename (including pathname if required). If set, after the precomputation phase, the input variables used in the time loop are saved the specified location in HDF5 format. The simulation then exits. The saved variables can be used to run simulations using the C++ code. * save_to_disk_exit: Exit the simulation after saving the HDF5 file * scale_source_terms: Apply the source scaling term to time varying sources * use_fd: Use finite difference gradients instead of spectral (in 1D) * use_k_space: use the k-space correction * use_sg: Use a staggered grid Returns: SimulationOptions instance """ STREAM_TO_DISK_STEPS_DEF = 200 # number of steps before streaming to disk if options.pml_size is not None and not isinstance(options.pml_size, bool): if len(options.pml_size) > kgrid.dim: if kgrid.dim > 1: raise ValueError(f"Optional input ''pml_size'' must be a 1 or {kgrid.dim} element numerical array.") else: raise ValueError("Optional input ''pml_size'' must be a single numerical value.") if kgrid.dim == 1: options.pml_x_size = options.pml_size if options.pml_size else 20 options.plot_scale = [-1.1, 1.1] elif kgrid.dim == 2: if options.pml_size is not None: if len(options.pml_size) == kgrid.dim: options.pml_x_size, options.pml_y_size = np.asarray(options.pml_size, dtype=int).ravel() else: options.pml_x_size, options.pml_y_size = (options.pml_size[0], options.pml_size[0]) else: options.pml_x_size, options.pml_y_size = (20, 20) options.plot_scale = [-1, 1] elif kgrid.dim == 3: if (options.pml_size is not None) and (len(options.pml_size) == kgrid.dim): options.pml_x_size, options.pml_y_size, options.pml_z_size = np.asarray(options.pml_size).ravel() else: if options.pml_size is None: options.pml_x_size = 10 options.pml_y_size = 10 options.pml_z_size = 10 else: options.pml_x_size = options.pml_size[0] options.pml_y_size = options.pml_x_size options.pml_z_size = options.pml_x_size options.plot_scale = [-1, 1] # replace defaults with user defined values if provided and check inputs if (val := options.pml_alpha) is not None and not isinstance(options.pml_alpha, str): # check input is correct size val = np.atleast_1d(val) if val.size > kgrid.dim: if kgrid.dim > 1: raise ValueError(f"Optional input ''pml_alpha'' must be a 1 or {kgrid.dim} element numerical array.") else: raise ValueError("Optional input ''pml_alpha'' must be a single numerical value.") # assign input based on number of dimensions if kgrid.dim == 1: options.pml_x_alpha = val elif kgrid.dim == 2: options.pml_x_alpha = val[0] options.pml_y_alpha = val[-1] elif kgrid.dim == 3: options.pml_x_alpha = val[0] options.pml_y_alpha = val[len(val) // 2] options.pml_z_alpha = val[-1] if options.save_to_disk_exit: assert kgrid.dim != 1, "Optional input ''save_to_disk'' is not compatible with 1D simulations." if options.stream_to_disk: assert ( not options.simulation_type.is_elastic_simulation() and kgrid.dim == 3 ), "Optional input ''stream_to_disk'' is currently only compatible with 3D fluid simulations." # if given as a Boolean, replace with the default number of time steps if isinstance(options.stream_to_disk, bool) and options.stream_to_disk: options.stream_to_disk = STREAM_TO_DISK_STEPS_DEF if options.save_to_disk or options.save_to_disk_exit: assert kgrid.dim != 1, "Optional input ''save_to_disk'' is not compatible with 1D simulations." if options.use_fd: # input only supported in 1D fluid code assert kgrid.dim == 1 and not options.simulation_type.is_elastic_simulation(), "Optional input ''use_fd'' only supported in 1D." # get optimal pml size if options.simulation_type.is_axisymmetric() or options.pml_auto: if options.simulation_type.is_axisymmetric(): pml_size_temp = get_optimal_pml_size(kgrid, options.pml_search_range, options.radial_symmetry[:4]) else: pml_size_temp = get_optimal_pml_size(kgrid, options.pml_search_range) # assign to individual variables if kgrid.dim == 1: options.pml_x_size = int(pml_size_temp[0]) elif kgrid.dim == 2: options.pml_x_size = int(pml_size_temp[0]) options.pml_y_size = int(pml_size_temp[1]) elif kgrid.dim == 3: options.pml_x_size = int(pml_size_temp[0]) options.pml_y_size = int(pml_size_temp[1]) options.pml_z_size = int(pml_size_temp[2]) # cleanup unused variables del pml_size_temp return options