import logging
import os
import numpy as np
from scipy.io import savemat
from kwave.kmedium import kWaveMedium
from kwave.kgrid import kWaveGrid
from kwave.options.simulation_options import SimulationOptions
from kwave.utils.data import scale_time
from kwave.utils.dotdictionary import dotdict
from kwave.utils.io import write_attributes, write_matrix
from kwave.utils.matrix import num_dim2
from kwave.utils.tictoc import TicToc
[docs]
def save_to_disk_func(
kgrid: kWaveGrid, medium: kWaveMedium, source, opt: SimulationOptions, auto_chunk: bool, values: dotdict, flags: dotdict
):
# update command line status
logging.log(logging.INFO, " precomputation completed in ", scale_time(TicToc.toc()))
TicToc.tic()
logging.log(logging.INFO, " saving input files to disk...")
# check for a binary sensor mask or cuboid corners
# modified by Farid | disabled temporarily!
# assert self.binary_sensor_mask or self.cuboid_corners, \
# "Optional input ''save_to_disk'' only supported for sensor masks defined as a binary matrix
# or the opposing corners of a rectangle (2D) or cuboid (3D)."
# =========================================================================
# VARIABLE LIST
# =========================================================================
integer_variables = dotdict()
float_variables = dotdict()
grab_integer_variables(integer_variables, kgrid, flags, medium)
grab_pml_size(integer_variables, opt)
grab_float_variables(float_variables, kgrid, opt, values, flags.elastic_code, flags.axisymmetric)
# overwrite z-values for 2D simulations
if kgrid.dim == 2:
integer_variables.Nz = 1
integer_variables.pml_z_size = 0
grab_medium_props(integer_variables, float_variables, medium, flags.elastic_code)
grab_source_props(
integer_variables,
float_variables,
source,
values.u_source_pos_index,
values.s_source_pos_index,
values.p_source_pos_index,
values.transducer_input_signal,
values.delay_mask,
)
grab_sensor_props(integer_variables, kgrid.dim, values.sensor_mask_index, values.record.cuboid_corners_list)
grab_nonuniform_grid_props(float_variables, kgrid, flags.nonuniform_grid)
# =========================================================================
# DATACAST AND SAVING
# =========================================================================
remove_z_dimension(float_variables, kgrid.dim)
save_file(opt.input_filename, integer_variables, float_variables, opt.hdf_compression_level, auto_chunk=auto_chunk)
# update command line status
logging.log(logging.INFO, " completed in ", scale_time(TicToc.toc()))
[docs]
def grab_integer_variables(integer_variables, kgrid, flags, medium):
# integer variables used within the time loop for all codes
variables = dotdict(
{
"Nx": kgrid.Nx,
"Ny": kgrid.Ny,
"Nz": kgrid.Nz,
"Nt": kgrid.Nt,
"p_source_flag": flags.source_p,
"p0_source_flag": flags.source_p0,
"ux_source_flag": flags.source_ux,
"uy_source_flag": flags.source_uy,
"uz_source_flag": flags.source_uz,
"sxx_source_flag": flags.source_sxx,
"syy_source_flag": flags.source_syy,
"szz_source_flag": flags.source_szz,
"sxy_source_flag": flags.source_sxy,
"sxz_source_flag": flags.source_sxz,
"syz_source_flag": flags.source_syz,
"transducer_source_flag": flags.transducer_source,
"nonuniform_grid_flag": flags.nonuniform_grid,
"nonlinear_flag": medium.is_nonlinear(),
"absorbing_flag": None,
"elastic_flag": flags.elastic_code,
"axisymmetric_flag": flags.axisymmetric,
# create pseudonyms for the sensor flgs
# 0: binary mask indices
# 1: cuboid corners
"sensor_mask_type": flags.cuboid_corners,
}
)
integer_variables.update(variables)
[docs]
def grab_pml_size(integer_variables, opt):
# additional integer variables not used within time loop but stored directly to output file
integer_variables["pml_x_size"] = opt.pml_x_size
integer_variables["pml_y_size"] = opt.pml_y_size
integer_variables["pml_z_size"] = opt.pml_z_size
[docs]
def grab_float_variables(float_variables: dotdict, kgrid, opt, values, is_elastic_code, is_axisymmetric):
# single precision variables not used within time loop but stored directly
# to the output file for all files
variables = dotdict(
{
"dx": kgrid.dx,
"dy": kgrid.dy,
"dz": kgrid.dz,
"pml_x_alpha": opt.pml_x_alpha,
"pml_y_alpha": opt.pml_y_alpha,
"pml_z_alpha": opt.pml_z_alpha,
}
)
float_variables.update(variables)
if is_elastic_code: # pragma: no cover
grab_elastic_code_variables(float_variables, kgrid, values)
elif is_axisymmetric:
grab_axisymmetric_variables(float_variables, values)
else:
# single precision variables used within the time loop
float_variables["dt"] = values.dt
float_variables["c0"] = values.c0
float_variables["c_ref"] = values.c_ref
float_variables["rho0"] = values.rho0
float_variables["rho0_sgx"] = values.rho0_sgx
float_variables["rho0_sgy"] = values.rho0_sgy
float_variables["rho0_sgz"] = values.rho0_sgz
[docs]
def grab_elastic_code_variables(float_variables, kgrid, values): # pragma: no cover
# single precision variables used within the time loop
float_variables["dt"] = None
float_variables["c_ref"] = None
float_variables["lambda"] = None
float_variables["mu"] = None
float_variables["rho0_sgx"] = None
float_variables["rho0_sgy"] = None
float_variables["rho0_sgz"] = None
float_variables["mu_sgxy"] = None
float_variables["mu_sgxz"] = None
float_variables["mu_sgyz"] = None
# create shift variables used for calculating u_non_staggered and I outputs
x_shift_neg = np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.x * kgrid.dx / 2))
y_shift_neg = np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.y * kgrid.dy / 2)).T
z_shift_neg = np.transpose(np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.z * kgrid.dz / 2)), (1, 2, 0))
# create reduced variables for use with real-to-complex FFT
Nz = kgrid.Nz if kgrid.dim != 2 else 1
Nx_r = kgrid.Nx // 2 + 1
Ny_r = kgrid.Ny // 2 + 1
Nz_r = Nz // 2 + 1
ddx_k_shift_pos = values.ddx_k_shift_pos
ddx_k_shift_neg = values.ddx_k_shift_neg
float_variables["ddx_k_shift_pos_r"] = ddx_k_shift_pos[:Nx_r]
float_variables["ddy_k_shift_pos"] = None
float_variables["ddz_k_shift_pos"] = None
float_variables["ddx_k_shift_neg_r"] = ddx_k_shift_neg[:Nx_r]
float_variables["ddy_k_shift_neg"] = None
float_variables["ddz_k_shift_neg"] = None
float_variables["x_shift_neg_r"] = x_shift_neg[:Nx_r]
float_variables["y_shift_neg_r"] = y_shift_neg[:Ny_r]
float_variables["z_shift_neg_r"] = z_shift_neg[:Nz_r]
del x_shift_neg
float_variables["pml_x"] = None
float_variables["pml_y"] = None
float_variables["pml_z"] = None
float_variables["pml_x_sgx"] = None
float_variables["pml_y_sgy"] = None
float_variables["pml_z_sgz"] = None
float_variables["mpml_x_sgx"] = None
float_variables["mpml_y_sgy"] = None
float_variables["mpml_z_sgz"] = None
float_variables["mpml_x"] = None
float_variables["mpml_y"] = None
float_variables["mpml_z"] = None
[docs]
def grab_axisymmetric_variables(float_variables, values):
# single precision variables used within the time loop
float_variables["dt"] = values.dt
float_variables["c0"] = values.c0
float_variables["c_ref"] = values.c_ref
float_variables["rho0"] = values.rho0
float_variables["rho0_sgx"] = values.rho0_sgx
float_variables["rho0_sgy"] = values.rho0_sgy
[docs]
def grab_medium_props(integer_variables, float_variables, medium, is_elastic_code):
# =========================================================================
# VARIABLES USED IN NONLINEAR SIMULATIONS
# =========================================================================
if medium.is_nonlinear():
float_variables["BonA"] = medium.BonA
# =========================================================================
# VARIABLES USED IN ABSORBING SIMULATIONS
# =========================================================================
# set absorbing flag
if medium.absorbing:
integer_variables.absorbing_flag = 2 if medium.stokes else 1
else:
integer_variables.absorbing_flag = 0
if medium.absorbing:
if is_elastic_code: # pragma: no cover
# add to the variable list
float_variables["chi"] = None
float_variables["eta"] = None
float_variables["eta_sgxy"] = None
float_variables["eta_sgxz"] = None
float_variables["eta_sgyz"] = None
else:
float_variables["alpha_coeff"] = medium.alpha_coeff
float_variables["alpha_power"] = medium.alpha_power
[docs]
def grab_source_props(
integer_variables,
float_variables,
source,
u_source_pos_index,
s_source_pos_index,
p_source_pos_index,
transducer_input_signal,
delay_mask,
):
# =========================================================================
# SOURCE VARIABLES
# =========================================================================
# source modes and indicies
# - these are only defined if the source flgs are > 0
# - the source mode describes whether the source will be added or replaced
# - the source indicies describe which grid points act as the source
# - the u_source_index is reused for any of the u sources and the transducer source
grab_velocity_source_props(integer_variables, source, u_source_pos_index)
grab_stress_source_props(integer_variables, source, s_source_pos_index)
grab_pressure_source_props(integer_variables, source, p_source_pos_index, u_source_pos_index)
grab_time_varying_source_props(integer_variables, float_variables, source, transducer_input_signal, delay_mask)
[docs]
def grab_velocity_source_props(integer_variables, source, u_source_pos_index):
# velocity source
if any(integer_variables.get(k) for k in ["ux_source_flag", "uy_source_flag", "uz_source_flag"]):
integer_variables["u_source_mode"] = {
"dirichlet": 0,
"additive-no-correction": 1,
"additive": 2,
}[source.u_mode]
if integer_variables.ux_source_flag:
u_source_many = num_dim2(source.ux) > 1
elif integer_variables.uy_source_flag:
u_source_many = num_dim2(source.uy) > 1
elif integer_variables.uz_source_flag:
u_source_many = num_dim2(source.uz) > 1
integer_variables["u_source_many"] = u_source_many
integer_variables.u_source_index = u_source_pos_index
[docs]
def grab_stress_source_props(integer_variables, source, s_source_pos_index):
# stress source
if (
integer_variables.sxx_source_flag
or integer_variables.syy_source_flag
or integer_variables.szz_source_flag
or integer_variables.sxy_source_flag
or integer_variables.sxz_source_flag
or integer_variables.syz_source_flag
):
integer_variables.s_source_mode = source.s_mode != "dirichlet"
if integer_variables.sxx_source_flag:
s_source_many = num_dim2(source.sxx) > 1
elif integer_variables.syy_source_flag:
s_source_many = num_dim2(source.syy) > 1
elif integer_variables.szz_source_flag:
s_source_many = num_dim2(source.szz) > 1
elif integer_variables.sxy_source_flag:
s_source_many = num_dim2(source.sxy) > 1
elif integer_variables.sxz_source_flag:
s_source_many = num_dim2(source.sxz) > 1
elif integer_variables.syz_source_flag:
s_source_many = num_dim2(source.syz) > 1
integer_variables.s_source_many = s_source_many
integer_variables.s_source_index = s_source_pos_index
[docs]
def grab_pressure_source_props(integer_variables, source, p_source_pos_index, u_source_pos_index):
# pressure source
if integer_variables.p_source_flag:
integer_variables.p_source_mode = {
"dirichlet": 0,
"additive-no-correction": 1,
"additive": 2,
}[source.p_mode]
integer_variables.p_source_many = num_dim2(source.p) > 1
integer_variables.p_source_index = p_source_pos_index
# transducer source
if integer_variables.transducer_source_flag:
integer_variables.u_source_index = u_source_pos_index
[docs]
def grab_time_varying_source_props(integer_variables, float_variables, source, transducer_input_signal, delay_mask):
# time varying source variables
# - these are only defined if the source flgs are > 0
# - these are the actual source values
# - these are indexed as (position_index, time_index)
if integer_variables.ux_source_flag:
float_variables.ux_source_input = source.ux
if integer_variables.uy_source_flag:
float_variables.uy_source_input = source.uy
if integer_variables.uz_source_flag:
float_variables.uz_source_input = source.uz
if integer_variables.sxx_source_flag:
float_variables.sxx_source_input = source.sxx
if integer_variables.syy_source_flag:
float_variables.syy_source_input = source.syy
if integer_variables.szz_source_flag:
float_variables.szz_source_input = source.szz
if integer_variables.sxy_source_flag:
float_variables.sxy_source_input = source.sxy
if integer_variables.sxz_source_flag:
float_variables.sxz_source_input = source.sxz
if integer_variables.syz_source_flag:
float_variables.syz_source_input = source.syz
if integer_variables.p_source_flag:
float_variables.p_source_input = source.p
if integer_variables.transducer_source_flag:
float_variables.transducer_source_input = transducer_input_signal
integer_variables.delay_mask = delay_mask
# initial pressure source variable
# - this is only defined if the p0 source flag is 1
# - this defines the initial pressure everywhere (there is no indicies)
if integer_variables.p0_source_flag:
float_variables.p0_source_input = source.p0
[docs]
def grab_sensor_props(integer_variables, kgrid_dim, sensor_mask_index, cuboid_corners_list):
# =========================================================================
# SENSOR VARIABLES
# =========================================================================
if integer_variables.sensor_mask_type == 0:
# mask is defined as a list of grid indices
integer_variables.sensor_mask_index = sensor_mask_index
elif integer_variables.sensor_mask_type == 1:
cuboid_corners_list = cuboid_corners_list
# mask is defined as a list of cuboid corners
if kgrid_dim == 2:
sensor_mask_corners = np.ones((6, cuboid_corners_list.shape[1]))
sensor_mask_corners[0, :] = cuboid_corners_list[0, :]
sensor_mask_corners[1, :] = cuboid_corners_list[1, :]
sensor_mask_corners[3, :] = cuboid_corners_list[2, :]
sensor_mask_corners[4, :] = cuboid_corners_list[3, :]
else:
sensor_mask_corners = cuboid_corners_list
integer_variables.sensor_mask_corners = sensor_mask_corners
else:
raise NotImplementedError("unknown option for sensor_mask_type")
[docs]
def remove_z_dimension(float_variables, kgrid_dim):
# remove z-dimension variables for saving 2D files
if kgrid_dim == 2:
for k in list(float_variables.keys()):
if "z" in k:
del float_variables[k]
[docs]
def enforce_filename_standards(filepath):
# check for HDF5 filename extension
filename_ext = os.path.splitext(filepath)[1]
# use .h5 as default if no extension is given
if len(filename_ext) == 0:
filename_ext = ".h5"
filepath = filepath + ".h5"
return filepath, filename_ext
[docs]
def save_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk):
filepath, filename_ext = enforce_filename_standards(filepath)
# save file
if filename_ext == ".h5":
save_h5_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk)
elif filename_ext == ".mat":
save_mat_file(filepath, integer_variables, float_variables)
else:
# throw error for unknown filetype
raise NotImplementedError("unknown file extension for " "save_to_disk" " filename")
[docs]
def save_h5_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk):
# ----------------
# SAVE HDF5 FILE
# ----------------
# check if file exists, and delete if it does (the hdf5 library will
# give an error if the file already exists)
if os.path.exists(filepath):
os.remove(filepath)
# change all the variables to be in single precision (float in C++),
# then add to HDF5 File
for key, value in float_variables.items():
# cast matrix to single precision
value = np.array(value, dtype=np.float32)
write_matrix(filepath, value, key, hdf_compression_level, auto_chunk)
del value
# change all the index variables to be in 64-bit unsigned integers
# (long in C++), then add to HDF5 file
for key, value in integer_variables.items():
# cast matrix to 64-bit unsigned integer
value = np.array(value, dtype=np.uint64)
write_matrix(filepath, value, key, hdf_compression_level, auto_chunk)
del value
# set additional file attributes
write_attributes(filepath)
[docs]
def save_mat_file(filepath, integer_variables, float_variables):
# ----------------
# SAVE .MAT FILE
# ----------------
# change all the variables to be in single precision (float in C++)
for key, value in float_variables.items():
float_variables[key] = np.array(value, dtype=np.float32)
for key, value in integer_variables.items():
integer_variables[key] = np.array(value, dtype=np.uint64)
# save the input variables to disk as a MATLAB binary file
float_variables = dict(**float_variables, **integer_variables)
savemat(filepath, float_variables)