import logging
import os
import numpy as np
from scipy.io import savemat
from kwave.kmedium import kWaveMedium
from kwave.kgrid import kWaveGrid
from kwave.options.simulation_options import SimulationOptions
from kwave.utils.data import scale_time
from kwave.utils.dotdictionary import dotdict
from kwave.utils.io import write_attributes, write_matrix
from kwave.utils.matrix import num_dim2
from kwave.utils.tictoc import TicToc
[docs]
def save_to_disk_func(
kgrid: kWaveGrid, medium: kWaveMedium, source,
opt: SimulationOptions, auto_chunk: bool,
values: dotdict, flags: dotdict):
# update command line status
logging.log(logging.INFO, ' precomputation completed in ', scale_time(TicToc.toc()))
TicToc.tic()
logging.log(logging.INFO, ' saving input files to disk...')
# check for a binary sensor mask or cuboid corners
# modified by Farid | disabled temporarily!
# assert self.binary_sensor_mask or self.cuboid_corners, \
# "Optional input ''save_to_disk'' only supported for sensor masks defined as a binary matrix
# or the opposing corners of a rectangle (2D) or cuboid (3D)."
# =========================================================================
# VARIABLE LIST
# =========================================================================
integer_variables = dotdict()
float_variables = dotdict()
grab_integer_variables(integer_variables, kgrid, flags, medium)
grab_pml_size(integer_variables, opt)
grab_float_variables(float_variables, kgrid, opt, values, flags.elastic_code, flags.axisymmetric)
# overwrite z-values for 2D simulations
if kgrid.dim == 2:
integer_variables.Nz = 1
integer_variables.pml_z_size = 0
grab_medium_props(integer_variables, float_variables, medium, flags.elastic_code)
grab_source_props(integer_variables, float_variables, source,
values.u_source_pos_index, values.s_source_pos_index, values.p_source_pos_index,
values.transducer_input_signal, values.delay_mask)
grab_sensor_props(integer_variables, kgrid.dim, values.sensor_mask_index, values.record.cuboid_corners_list)
grab_nonuniform_grid_props(float_variables, kgrid, flags.nonuniform_grid)
# =========================================================================
# DATACAST AND SAVING
# =========================================================================
remove_z_dimension(float_variables, kgrid.dim)
save_file(opt.input_filename, integer_variables, float_variables, opt.hdf_compression_level,
auto_chunk=auto_chunk)
# update command line status
logging.log(logging.INFO, ' completed in ', scale_time(TicToc.toc()))
[docs]
def grab_integer_variables(integer_variables, kgrid, flags, medium):
# integer variables used within the time loop for all codes
variables = dotdict({
'Nx': kgrid.Nx,
'Ny': kgrid.Ny,
'Nz': kgrid.Nz,
'Nt': kgrid.Nt,
'p_source_flag': flags.source_p,
'p0_source_flag': flags.source_p0,
'ux_source_flag': flags.source_ux,
'uy_source_flag': flags.source_uy,
'uz_source_flag': flags.source_uz,
'sxx_source_flag': flags.source_sxx,
'syy_source_flag': flags.source_syy,
'szz_source_flag': flags.source_szz,
'sxy_source_flag': flags.source_sxy,
'sxz_source_flag': flags.source_sxz,
'syz_source_flag': flags.source_syz,
'transducer_source_flag': flags.transducer_source,
'nonuniform_grid_flag': flags.nonuniform_grid,
'nonlinear_flag': medium.is_nonlinear(),
'absorbing_flag': None,
'elastic_flag': flags.elastic_code,
'axisymmetric_flag': flags.axisymmetric,
# create pseudonyms for the sensor flgs
# 0: binary mask indices
# 1: cuboid corners
'sensor_mask_type': flags.cuboid_corners
})
integer_variables.update(variables)
[docs]
def grab_pml_size(integer_variables, opt):
# additional integer variables not used within time loop but stored directly to output file
integer_variables['pml_x_size'] = opt.pml_x_size
integer_variables['pml_y_size'] = opt.pml_y_size
integer_variables['pml_z_size'] = opt.pml_z_size
[docs]
def grab_float_variables(float_variables: dotdict, kgrid, opt, values, is_elastic_code, is_axisymmetric):
# single precision variables not used within time loop but stored directly
# to the output file for all files
variables = dotdict({
'dx': kgrid.dx,
'dy': kgrid.dy,
'dz': kgrid.dz,
'pml_x_alpha': opt.pml_x_alpha,
'pml_y_alpha': opt.pml_y_alpha,
'pml_z_alpha': opt.pml_z_alpha
})
float_variables.update(variables)
if is_elastic_code: # pragma: no cover
grab_elastic_code_variables(float_variables, kgrid, values)
elif is_axisymmetric:
grab_axisymmetric_variables(float_variables, values)
else:
# single precision variables used within the time loop
float_variables['dt'] = values.dt
float_variables['c0'] = values.c0
float_variables['c_ref'] = values.c_ref
float_variables['rho0'] = values.rho0
float_variables['rho0_sgx'] = values.rho0_sgx
float_variables['rho0_sgy'] = values.rho0_sgy
float_variables['rho0_sgz'] = values.rho0_sgz
[docs]
def grab_elastic_code_variables(float_variables, kgrid, values): # pragma: no cover
# single precision variables used within the time loop
float_variables['dt'] = None
float_variables['c_ref'] = None
float_variables['lambda'] = None
float_variables['mu'] = None
float_variables['rho0_sgx'] = None
float_variables['rho0_sgy'] = None
float_variables['rho0_sgz'] = None
float_variables['mu_sgxy'] = None
float_variables['mu_sgxz'] = None
float_variables['mu_sgyz'] = None
# create shift variables used for calculating u_non_staggered and I outputs
x_shift_neg = np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.x * kgrid.dx / 2))
y_shift_neg = np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.y * kgrid.dy / 2)).T
z_shift_neg = np.transpose(np.fft.ifftshift(np.exp(-1j * kgrid.k_vec.z * kgrid.dz / 2)), (1, 2, 0))
# create reduced variables for use with real-to-complex FFT
Nz = kgrid.Nz if kgrid.dim != 2 else 1
Nx_r = kgrid.Nx // 2 + 1
Ny_r = kgrid.Ny // 2 + 1
Nz_r = Nz // 2 + 1
ddx_k_shift_pos = values.ddx_k_shift_pos
ddx_k_shift_neg = values.ddx_k_shift_neg
float_variables['ddx_k_shift_pos_r'] = ddx_k_shift_pos[:Nx_r]
float_variables['ddy_k_shift_pos'] = None
float_variables['ddz_k_shift_pos'] = None
float_variables['ddx_k_shift_neg_r'] = ddx_k_shift_neg[:Nx_r]
float_variables['ddy_k_shift_neg'] = None
float_variables['ddz_k_shift_neg'] = None
float_variables['x_shift_neg_r'] = x_shift_neg[:Nx_r]
float_variables['y_shift_neg_r'] = y_shift_neg[:Ny_r]
float_variables['z_shift_neg_r'] = z_shift_neg[:Nz_r]
del x_shift_neg
float_variables['pml_x'] = None
float_variables['pml_y'] = None
float_variables['pml_z'] = None
float_variables['pml_x_sgx'] = None
float_variables['pml_y_sgy'] = None
float_variables['pml_z_sgz'] = None
float_variables['mpml_x_sgx'] = None
float_variables['mpml_y_sgy'] = None
float_variables['mpml_z_sgz'] = None
float_variables['mpml_x'] = None
float_variables['mpml_y'] = None
float_variables['mpml_z'] = None
[docs]
def grab_axisymmetric_variables(float_variables, values):
# single precision variables used within the time loop
float_variables['dt'] = values.dt
float_variables['c0'] = values.c0
float_variables['c_ref'] = values.c_ref
float_variables['rho0'] = values.rho0
float_variables['rho0_sgx'] = values.rho0_sgx
float_variables['rho0_sgy'] = values.rho0_sgy
[docs]
def grab_medium_props(integer_variables, float_variables, medium, is_elastic_code):
# =========================================================================
# VARIABLES USED IN NONLINEAR SIMULATIONS
# =========================================================================
if medium.is_nonlinear():
float_variables['BonA'] = medium.BonA
# =========================================================================
# VARIABLES USED IN ABSORBING SIMULATIONS
# =========================================================================
# set absorbing flag
if medium.absorbing:
integer_variables.absorbing_flag = 2 if medium.stokes else 1
else:
integer_variables.absorbing_flag = 0
if medium.absorbing:
if is_elastic_code: # pragma: no cover
# add to the variable list
float_variables['chi'] = None
float_variables['eta'] = None
float_variables['eta_sgxy'] = None
float_variables['eta_sgxz'] = None
float_variables['eta_sgyz'] = None
else:
float_variables['alpha_coeff'] = medium.alpha_coeff
float_variables['alpha_power'] = medium.alpha_power
[docs]
def grab_source_props(integer_variables, float_variables, source,
u_source_pos_index, s_source_pos_index, p_source_pos_index,
transducer_input_signal, delay_mask):
# =========================================================================
# SOURCE VARIABLES
# =========================================================================
# source modes and indicies
# - these are only defined if the source flgs are > 0
# - the source mode describes whether the source will be added or replaced
# - the source indicies describe which grid points act as the source
# - the u_source_index is reused for any of the u sources and the transducer source
grab_velocity_source_props(integer_variables, source, u_source_pos_index)
grab_stress_source_props(integer_variables, source, s_source_pos_index)
grab_pressure_source_props(integer_variables, source, p_source_pos_index, u_source_pos_index)
grab_time_varying_source_props(integer_variables, float_variables, source, transducer_input_signal, delay_mask)
[docs]
def grab_velocity_source_props(integer_variables, source, u_source_pos_index):
# velocity source
if any(integer_variables.get(k) for k in ['ux_source_flag', 'uy_source_flag', 'uz_source_flag']):
integer_variables['u_source_mode'] = {
'dirichlet': 0,
'additive-no-correction': 1,
'additive': 2,
}[source.u_mode]
if integer_variables.ux_source_flag:
u_source_many = num_dim2(source.ux) > 1
elif integer_variables.uy_source_flag:
u_source_many = num_dim2(source.uy) > 1
elif integer_variables.uz_source_flag:
u_source_many = num_dim2(source.uz) > 1
integer_variables['u_source_many'] = u_source_many
integer_variables.u_source_index = u_source_pos_index
[docs]
def grab_stress_source_props(integer_variables, source, s_source_pos_index):
# stress source
if integer_variables.sxx_source_flag or integer_variables.syy_source_flag or integer_variables.szz_source_flag \
or integer_variables.sxy_source_flag or integer_variables.sxz_source_flag or integer_variables.syz_source_flag:
integer_variables.s_source_mode = source.s_mode != 'dirichlet'
if integer_variables.sxx_source_flag:
s_source_many = num_dim2(source.sxx) > 1
elif integer_variables.syy_source_flag:
s_source_many = num_dim2(source.syy) > 1
elif integer_variables.szz_source_flag:
s_source_many = num_dim2(source.szz) > 1
elif integer_variables.sxy_source_flag:
s_source_many = num_dim2(source.sxy) > 1
elif integer_variables.sxz_source_flag:
s_source_many = num_dim2(source.sxz) > 1
elif integer_variables.syz_source_flag:
s_source_many = num_dim2(source.syz) > 1
integer_variables.s_source_many = s_source_many
integer_variables.s_source_index = s_source_pos_index
[docs]
def grab_pressure_source_props(integer_variables, source, p_source_pos_index, u_source_pos_index):
# pressure source
if integer_variables.p_source_flag:
integer_variables.p_source_mode = {
'dirichlet': 0,
'additive-no-correction': 1,
'additive': 2,
}[source.p_mode]
integer_variables.p_source_many = num_dim2(source.p) > 1
integer_variables.p_source_index = p_source_pos_index
# transducer source
if integer_variables.transducer_source_flag:
integer_variables.u_source_index = u_source_pos_index
[docs]
def grab_time_varying_source_props(integer_variables, float_variables, source, transducer_input_signal, delay_mask):
# time varying source variables
# - these are only defined if the source flgs are > 0
# - these are the actual source values
# - these are indexed as (position_index, time_index)
if integer_variables.ux_source_flag:
float_variables.ux_source_input = source.ux
if integer_variables.uy_source_flag:
float_variables.uy_source_input = source.uy
if integer_variables.uz_source_flag:
float_variables.uz_source_input = source.uz
if integer_variables.sxx_source_flag:
float_variables.sxx_source_input = source.sxx
if integer_variables.syy_source_flag:
float_variables.syy_source_input = source.syy
if integer_variables.szz_source_flag:
float_variables.szz_source_input = source.szz
if integer_variables.sxy_source_flag:
float_variables.sxy_source_input = source.sxy
if integer_variables.sxz_source_flag:
float_variables.sxz_source_input = source.sxz
if integer_variables.syz_source_flag:
float_variables.syz_source_input = source.syz
if integer_variables.p_source_flag:
float_variables.p_source_input = source.p
if integer_variables.transducer_source_flag:
float_variables.transducer_source_input = transducer_input_signal
integer_variables.delay_mask = delay_mask
# initial pressure source variable
# - this is only defined if the p0 source flag is 1
# - this defines the initial pressure everywhere (there is no indicies)
if integer_variables.p0_source_flag:
float_variables.p0_source_input = source.p0
[docs]
def grab_sensor_props(integer_variables, kgrid_dim, sensor_mask_index, cuboid_corners_list):
# =========================================================================
# SENSOR VARIABLES
# =========================================================================
if integer_variables.sensor_mask_type == 0:
# mask is defined as a list of grid indices
integer_variables.sensor_mask_index = sensor_mask_index
elif integer_variables.sensor_mask_type == 1:
cuboid_corners_list = cuboid_corners_list
# mask is defined as a list of cuboid corners
if kgrid_dim == 2:
sensor_mask_corners = np.ones((6, cuboid_corners_list.shape[1]))
sensor_mask_corners[0, :] = cuboid_corners_list[0, :]
sensor_mask_corners[1, :] = cuboid_corners_list[1, :]
sensor_mask_corners[3, :] = cuboid_corners_list[2, :]
sensor_mask_corners[4, :] = cuboid_corners_list[3, :]
else:
sensor_mask_corners = cuboid_corners_list
integer_variables.sensor_mask_corners = sensor_mask_corners
else:
raise NotImplementedError('unknown option for sensor_mask_type')
[docs]
def remove_z_dimension(float_variables, kgrid_dim):
# remove z-dimension variables for saving 2D files
if kgrid_dim == 2:
for k in list(float_variables.keys()):
if 'z' in k:
del float_variables[k]
[docs]
def enforce_filename_standards(filepath):
# check for HDF5 filename extension
filename_ext = os.path.splitext(filepath)[1]
# use .h5 as default if no extension is given
if len(filename_ext) == 0:
filename_ext = '.h5'
filepath = filepath + '.h5'
return filepath, filename_ext
[docs]
def save_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk):
filepath, filename_ext = enforce_filename_standards(filepath)
# save file
if filename_ext == '.h5':
save_h5_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk)
elif filename_ext == '.mat':
save_mat_file(filepath, integer_variables, float_variables)
else:
# throw error for unknown filetype
raise NotImplementedError('unknown file extension for ''save_to_disk'' filename')
[docs]
def save_h5_file(filepath, integer_variables, float_variables, hdf_compression_level, auto_chunk):
# ----------------
# SAVE HDF5 FILE
# ----------------
# check if file exists, and delete if it does (the hdf5 library will
# give an error if the file already exists)
if os.path.exists(filepath):
os.remove(filepath)
# change all the variables to be in single precision (float in C++),
# then add to HDF5 File
for key, value in float_variables.items():
# cast matrix to single precision
value = np.array(value, dtype=np.float32)
write_matrix(filepath, value, key, hdf_compression_level, auto_chunk)
del value
# change all the index variables to be in 64-bit unsigned integers
# (long in C++), then add to HDF5 file
for key, value in integer_variables.items():
# cast matrix to 64-bit unsigned integer
value = np.array(value, dtype=np.uint64)
write_matrix(filepath, value, key, hdf_compression_level, auto_chunk)
del value
# set additional file attributes
write_attributes(filepath)
[docs]
def save_mat_file(filepath, integer_variables, float_variables):
# ----------------
# SAVE .MAT FILE
# ----------------
# change all the variables to be in single precision (float in C++)
for key, value in float_variables.items():
float_variables[key] = np.array(value, dtype=np.float32)
for key, value in integer_variables.items():
integer_variables[key] = np.array(value, dtype=np.uint64)
# save the input variables to disk as a MATLAB binary file
float_variables = dict(**float_variables, **integer_variables)
savemat(filepath, float_variables)