Source code for kwave.kWaveSimulation_helper.save_to_disk_func

import logging
import os

import numpy as np
from scipy.io import savemat

from kwave.kgrid import kWaveGrid
from kwave.kmedium import kWaveMedium
from kwave.options.simulation_options import SimulationOptions
from kwave.utils.data import scale_time
from kwave.utils.dotdictionary import dotdict
from kwave.utils.io import write_attributes, write_matrix
from kwave.utils.matrix import num_dim2
from kwave.utils.tictoc import TicToc


[docs] def save_to_disk_func( kgrid: kWaveGrid, medium: kWaveMedium, source, opt: SimulationOptions, auto_chunk: bool, values: dotdict, flags: dotdict ): # update command line status logging.log(logging.INFO, f" precomputation completed in {scale_time(TicToc.toc())}") TicToc.tic() logging.log(logging.INFO, " saving input files to disk...") # check for a binary sensor mask or cuboid corners # modified by Farid | disabled temporarily! # assert self.binary_sensor_mask or self.cuboid_corners, \ # "Optional input ''save_to_disk'' only supported for sensor masks defined as a binary matrix # or the opposing corners of a rectangle (2D) or cuboid (3D)." # ========================================================================= # VARIABLE LIST # ========================================================================= integer_variables = dotdict() float_variables = dotdict() grab_integer_variables(integer_variables, kgrid, flags, medium) grab_pml_size(integer_variables, opt) grab_float_variables(float_variables, kgrid, opt, values, flags.elastic_code, flags.axisymmetric) # overwrite z-values for 2D simulations if kgrid.dim == 2: integer_variables.Nz = 1 integer_variables.pml_z_size = 0 grab_medium_props(integer_variables, float_variables, medium, flags.elastic_code) grab_source_props( integer_variables, float_variables, source, values.u_source_pos_index, values.s_source_pos_index, values.p_source_pos_index, values.transducer_input_signal, values.delay_mask, ) grab_sensor_props(integer_variables, kgrid.dim, values.sensor_mask_index, values.record.cuboid_corners_list) grab_nonuniform_grid_props(float_variables, kgrid, flags.nonuniform_grid) # ========================================================================= # DATACAST AND SAVING # ========================================================================= remove_z_dimension(float_variables, kgrid.dim) save_file(opt.input_filename, integer_variables, float_variables, opt.hdf_compression_level, auto_chunk=auto_chunk) # update command line status logging.log(logging.INFO, f" completed in {scale_time(TicToc.toc())}")
[docs] def grab_integer_variables(integer_variables, kgrid, flags, medium): # integer variables used within the time loop for all codes variables = dotdict( { "Nx": kgrid.Nx, "Ny": kgrid.Ny, "Nz": kgrid.Nz, "Nt": kgrid.Nt, "p_source_flag": flags.source_p, "p0_source_flag": flags.source_p0, "ux_source_flag": flags.source_ux, "uy_source_flag": flags.source_uy, "uz_source_flag": flags.source_uz, "sxx_source_flag": flags.source_sxx, "syy_source_flag": flags.source_syy, "szz_source_flag": flags.source_szz, "sxy_source_flag": flags.source_sxy, "sxz_source_flag": flags.source_sxz, "syz_source_flag": flags.source_syz, "transducer_source_flag": flags.transducer_source, "nonuniform_grid_flag": flags.nonuniform_grid, "nonlinear_flag": medium.is_nonlinear(), "absorbing_flag": None, "elastic_flag": flags.elastic_code, "axisymmetric_flag": flags.axisymmetric, # create pseudonyms for the sensor flgs # 0: binary mask indices # 1: cuboid corners "sensor_mask_type": flags.cuboid_corners, } ) integer_variables.update(variables)
[docs] def grab_pml_size(integer_variables, opt): # additional integer variables not used within time loop but stored directly to output file integer_variables["pml_x_size"] = opt.pml_x_size integer_variables["pml_y_size"] = opt.pml_y_size integer_variables["pml_z_size"] = opt.pml_z_size
[docs] def grab_float_variables(float_variables: dotdict, kgrid, opt, values, is_elastic_code, is_axisymmetric): # single precision variables not used within time loop but stored directly # to the output file for all files variables = dotdict( { "dx": kgrid.dx, "dy": kgrid.dy, "dz": kgrid.dz, "pml_x_alpha": opt.pml_x_alpha, "pml_y_alpha": opt.pml_y_alpha, "pml_z_alpha": opt.pml_z_alpha, } ) float_variables.update(variables) if is_elastic_code: # pragma: no cover grab_elastic_code_variables(float_variables, kgrid, values) elif is_axisymmetric: grab_axisymmetric_variables(float_variables, values) else: # single precision variables used within the time loop float_variables["dt"] = values.dt float_variables["c0"] = values.c0 float_variables["c_ref"] = values.c_ref float_variables["rho0"] = values.rho0 float_variables["rho0_sgx"] = values.rho0_sgx float_variables["rho0_sgy"] = values.rho0_sgy float_variables["rho0_sgz"] = values.rho0_sgz
[docs] def grab_elastic_code_variables(float_variables, kgrid, values): # pragma: no cover # single precision variables used within the time loop float_variables["dt"] = None float_variables["c_ref"] = None float_variables["lambda"] = None float_variables["mu"] = None float_variables["rho0_sgx"] = None float_variables["rho0_sgy"] = None float_variables["rho0_sgz"] = None float_variables["mu_sgxy"] = None float_variables["mu_sgxz"] = None float_variables["mu_sgyz"] = None # create shift variables used for calculating u_non_staggered and I outputs x_shift_neg = np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.x * kgrid.dx / 2)) y_shift_neg = np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.y * kgrid.dy / 2)).T z_shift_neg = np.transpose(np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.z * kgrid.dz / 2)), (1, 2, 0)) # create reduced variables for use with real-to-complex FFT Nz = kgrid.Nz if kgrid.dim != 2 else 1 Nx_r = kgrid.Nx // 2 + 1 Ny_r = kgrid.Ny // 2 + 1 Nz_r = Nz // 2 + 1 ddx_k_shift_pos = values.ddx_k_shift_pos ddx_k_shift_neg = values.ddx_k_shift_neg float_variables["ddx_k_shift_pos_r"] = ddx_k_shift_pos[:Nx_r] float_variables["ddy_k_shift_pos"] = None float_variables["ddz_k_shift_pos"] = None float_variables["ddx_k_shift_neg_r"] = ddx_k_shift_neg[:Nx_r] float_variables["ddy_k_shift_neg"] = None float_variables["ddz_k_shift_neg"] = None float_variables["x_shift_neg_r"] = x_shift_neg[:Nx_r] float_variables["y_shift_neg_r"] = y_shift_neg[:Ny_r] float_variables["z_shift_neg_r"] = z_shift_neg[:Nz_r] del x_shift_neg float_variables["pml_x"] = None float_variables["pml_y"] = None float_variables["pml_z"] = None float_variables["pml_x_sgx"] = None float_variables["pml_y_sgy"] = None float_variables["pml_z_sgz"] = None float_variables["mpml_x_sgx"] = None float_variables["mpml_y_sgy"] = None float_variables["mpml_z_sgz"] = None float_variables["mpml_x"] = None float_variables["mpml_y"] = None float_variables["mpml_z"] = None
[docs] def grab_axisymmetric_variables(float_variables, values): # single precision variables used within the time loop float_variables["dt"] = values.dt float_variables["c0"] = values.c0 float_variables["c_ref"] = values.c_ref float_variables["rho0"] = values.rho0 float_variables["rho0_sgx"] = values.rho0_sgx float_variables["rho0_sgy"] = values.rho0_sgy
[docs] def grab_medium_props(integer_variables, float_variables, medium, is_elastic_code): # ========================================================================= # VARIABLES USED IN NONLINEAR SIMULATIONS # ========================================================================= if medium.is_nonlinear(): float_variables["BonA"] = medium.BonA # ========================================================================= # VARIABLES USED IN ABSORBING SIMULATIONS # ========================================================================= # set absorbing flag if medium.absorbing: integer_variables.absorbing_flag = 2 if medium.stokes else 1 else: integer_variables.absorbing_flag = 0 if medium.absorbing: if is_elastic_code: # pragma: no cover # add to the variable list float_variables["chi"] = None float_variables["eta"] = None float_variables["eta_sgxy"] = None float_variables["eta_sgxz"] = None float_variables["eta_sgyz"] = None else: float_variables["alpha_coeff"] = medium.alpha_coeff float_variables["alpha_power"] = medium.alpha_power
[docs] def grab_source_props( integer_variables, float_variables, source, u_source_pos_index, s_source_pos_index, p_source_pos_index, transducer_input_signal, delay_mask, ): # ========================================================================= # SOURCE VARIABLES # ========================================================================= # source modes and indices # - these are only defined if the source flgs are > 0 # - the source mode describes whether the source will be added or replaced # - the source indices describe which grid points act as the source # - the u_source_index is reused for any of the u sources and the transducer source grab_velocity_source_props(integer_variables, source, u_source_pos_index) grab_stress_source_props(integer_variables, source, s_source_pos_index) grab_pressure_source_props(integer_variables, source, p_source_pos_index, u_source_pos_index) grab_time_varying_source_props(integer_variables, float_variables, source, transducer_input_signal, delay_mask)
[docs] def grab_velocity_source_props(integer_variables, source, u_source_pos_index): # velocity source if any(integer_variables.get(k) for k in ["ux_source_flag", "uy_source_flag", "uz_source_flag"]): integer_variables["u_source_mode"] = { "dirichlet": 0, "additive-no-correction": 1, "additive": 2, }[source.u_mode] if integer_variables.ux_source_flag: u_source_many = num_dim2(source.ux) > 1 elif integer_variables.uy_source_flag: u_source_many = num_dim2(source.uy) > 1 elif integer_variables.uz_source_flag: u_source_many = num_dim2(source.uz) > 1 integer_variables["u_source_many"] = u_source_many integer_variables.u_source_index = u_source_pos_index
[docs] def grab_stress_source_props(integer_variables, source, s_source_pos_index): # stress source if ( integer_variables.sxx_source_flag or integer_variables.syy_source_flag or integer_variables.szz_source_flag or integer_variables.sxy_source_flag or integer_variables.sxz_source_flag or integer_variables.syz_source_flag ): integer_variables.s_source_mode = source.s_mode != "dirichlet" if integer_variables.sxx_source_flag: s_source_many = num_dim2(source.sxx) > 1 elif integer_variables.syy_source_flag: s_source_many = num_dim2(source.syy) > 1 elif integer_variables.szz_source_flag: s_source_many = num_dim2(source.szz) > 1 elif integer_variables.sxy_source_flag: s_source_many = num_dim2(source.sxy) > 1 elif integer_variables.sxz_source_flag: s_source_many = num_dim2(source.sxz) > 1 elif integer_variables.syz_source_flag: s_source_many = num_dim2(source.syz) > 1 integer_variables.s_source_many = s_source_many integer_variables.s_source_index = s_source_pos_index
[docs] def grab_pressure_source_props(integer_variables, source, p_source_pos_index, u_source_pos_index): # pressure source if integer_variables.p_source_flag: integer_variables.p_source_mode = { "dirichlet": 0, "additive-no-correction": 1, "additive": 2, }[source.p_mode] integer_variables.p_source_many = num_dim2(source.p) > 1 integer_variables.p_source_index = p_source_pos_index # transducer source if integer_variables.transducer_source_flag: integer_variables.u_source_index = u_source_pos_index
[docs] def grab_time_varying_source_props(integer_variables, float_variables, source, transducer_input_signal, delay_mask): # time varying source variables # - these are only defined if the source flgs are > 0 # - these are the actual source values # - these are indexed as (position_index, time_index) if integer_variables.ux_source_flag: float_variables.ux_source_input = source.ux if integer_variables.uy_source_flag: float_variables.uy_source_input = source.uy if integer_variables.uz_source_flag: float_variables.uz_source_input = source.uz if integer_variables.sxx_source_flag: float_variables.sxx_source_input = source.sxx if integer_variables.syy_source_flag: float_variables.syy_source_input = source.syy if integer_variables.szz_source_flag: float_variables.szz_source_input = source.szz if integer_variables.sxy_source_flag: float_variables.sxy_source_input = source.sxy if integer_variables.sxz_source_flag: float_variables.sxz_source_input = source.sxz if integer_variables.syz_source_flag: float_variables.syz_source_input = source.syz if integer_variables.p_source_flag: float_variables.p_source_input = source.p if integer_variables.transducer_source_flag: float_variables.transducer_source_input = transducer_input_signal integer_variables.delay_mask = delay_mask # initial pressure source variable # - this is only defined if the p0 source flag is 1 # - this defines the initial pressure everywhere (there is no indices) if integer_variables.p0_source_flag: float_variables.p0_source_input = source.p0
[docs] def grab_sensor_props(integer_variables, kgrid_dim, sensor_mask_index, cuboid_corners_list): # ========================================================================= # SENSOR VARIABLES # ========================================================================= if integer_variables.sensor_mask_type == 0: # mask is defined as a list of grid indices integer_variables.sensor_mask_index = sensor_mask_index elif integer_variables.sensor_mask_type == 1: cuboid_corners_list = cuboid_corners_list # mask is defined as a list of cuboid corners if kgrid_dim == 2: sensor_mask_corners = np.ones((6, cuboid_corners_list.shape[1])) sensor_mask_corners[0, :] = cuboid_corners_list[0, :] sensor_mask_corners[1, :] = cuboid_corners_list[1, :] sensor_mask_corners[3, :] = cuboid_corners_list[2, :] sensor_mask_corners[4, :] = cuboid_corners_list[3, :] else: sensor_mask_corners = cuboid_corners_list integer_variables.sensor_mask_corners = sensor_mask_corners else: raise NotImplementedError("unknown option for sensor_mask_type")
[docs] def grab_nonuniform_grid_props(float_variables, kgrid, is_nonuniform_grid): # ========================================================================= # VARIABLES USED FOR NONUNIFORM GRIDS # ========================================================================= # set nonuniform flag and variables # - these are only defined if nonuniform_grid_flag is 1 # - these are applied using the bsxfun formulation if not is_nonuniform_grid: return dxudxn = kgrid.dudn.x if np.array(dxudxn).size == 1: dxudxn = np.ones((kgrid.Nx, 1)) float_variables["dxudxn"] = dxudxn dyudyn = kgrid.dudn.y if np.array(dyudyn).size == 1: dyudyn = np.ones((1, kgrid.Ny)) float_variables["dyudyn"] = dyudyn dzudzn = kgrid.dudn.z if np.array(dzudzn).size == 1: dzudzn = np.ones((1, 1, kgrid.Nz)) float_variables["dzudzn"] = dzudzn dxudxn_sgx = kgrid.dudn_sg.x if np.array(dxudxn).size == 1: dxudxn_sgx = np.ones((kgrid.Nx, 1)) float_variables["dxudxn_sgx"] = dxudxn_sgx dyudyn_sgy = kgrid.dudn_sg.y if np.array(dyudyn).size == 1: dyudyn_sgy = np.ones((1, kgrid.Ny)) float_variables["dyudyn_sgy"] = dyudyn_sgy dzudzn_sgz = kgrid.dudn_sg.z if np.array(dzudzn).size == 1: dzudzn_sgz = np.ones((1, 1, kgrid.Nz)) float_variables["dzudzn_sgz"] = dzudzn_sgz
[docs] def remove_z_dimension(float_variables, kgrid_dim): # remove z-dimension variables for saving 2D files if kgrid_dim == 2: for k in list(float_variables.keys()): if "z" in k: del float_variables[k]
[docs] def enforce_filename_standards(filepath): # check for HDF5 filename extension filename_ext = os.path.splitext(filepath)[1] # use .h5 as default if no extension is given if len(filename_ext) == 0: filename_ext = ".h5" filepath = filepath + ".h5" return filepath, filename_ext
[docs] def save_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk): filepath, filename_ext = enforce_filename_standards(filepath) # save file if filename_ext == ".h5": save_h5_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk) elif filename_ext == ".mat": save_mat_file(filepath, integer_variables, float_variables) else: # throw error for unknown filetype raise NotImplementedError("unknown file extension for " "save_to_disk" " filename")
[docs] def save_h5_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk): # ---------------- # SAVE HDF5 FILE # ---------------- # check if file exists, and delete if it does (the hdf5 library will # give an error if the file already exists) if os.path.exists(filepath): os.remove(filepath) # change all the variables to be in single precision (float in C++), # then add to HDF5 File for key, value in float_variables.items(): # cast matrix to single precision value = np.array(value, dtype=np.float32) write_matrix(filepath, value, key, hdf_compression_level, auto_chunk) del value # change all the index variables to be in 64-bit unsigned integers # (long in C++), then add to HDF5 file for key, value in integer_variables.items(): # cast matrix to 64-bit unsigned integer value = np.array(value, dtype=np.uint64) write_matrix(filepath, value, key, hdf_compression_level, auto_chunk) del value # set additional file attributes write_attributes(filepath)
[docs] def save_mat_file(filepath, integer_variables, float_variables): # ---------------- # SAVE .MAT FILE # ---------------- # change all the variables to be in single precision (float in C++) for key, value in float_variables.items(): float_variables[key] = np.array(value, dtype=np.float32) for key, value in integer_variables.items(): integer_variables[key] = np.array(value, dtype=np.uint64) # save the input variables to disk as a MATLAB binary file float_variables = dict(**float_variables, **integer_variables) savemat(filepath, float_variables)