import os
import platform
import socket
from datetime import datetime
from typing import Optional
import cv2
import h5py
import numpy as np
import kwave
from .conversion import cast_to_type
from .data import get_date_string
from .dotdictionary import dotdict
[docs]
def get_h5_literals():
literals = dotdict(
{
# data type
"DATA_TYPE_ATT_NAME": "data_type",
"MATRIX_DATA_TYPE_MATLAB": "single",
"MATRIX_DATA_TYPE_C": "float",
"INTEGER_DATA_TYPE_MATLAB": "uint64",
"INTEGER_DATA_TYPE_C": "long",
# real / complex
"DOMAIN_TYPE_ATT_NAME": "domain_type",
"DOMAIN_TYPE_REAL": "real",
"DOMAIN_TYPE_COMPLEX": "complex",
# file descriptors
"FILE_MAJOR_VER_ATT_NAME": "major_version",
"FILE_MINOR_VER_ATT_NAME": "minor_version",
"FILE_DESCR_ATT_NAME": "file_description",
"FILE_CREATION_DATE_ATT_NAME": "creation_date",
"CREATED_BY_ATT_NAME": "created_by",
# file type
"FILE_TYPE_ATT_NAME": "file_type",
"HDF_INPUT_FILE": "input",
"HDF_OUTPUT_FILE": "output",
"HDF_CHECKPOINT_FILE": "checkpoint",
# file version information
"HDF_FILE_MAJOR_VERSION": "1",
"HDF_FILE_MINOR_VERSION": "2",
# compression level
"HDF_COMPRESSION_LEVEL": 0,
}
)
return literals
[docs]
def write_matrix(filename, matrix: np.ndarray, matrix_name: str, compression_level: int = None, auto_chunk: bool = True):
# get literals
h5_literals = get_h5_literals()
assert isinstance(auto_chunk, bool), "auto_chunk must be a boolean."
if compression_level is None:
compression_level = h5_literals.HDF_COMPRESSION_LEVEL
# dims = num_dim(matrix)
dims = len(matrix.shape)
if dims == 3:
matrix = np.transpose(matrix, [2, 1, 0]) # C <=> Fortran ordering
if dims == 2:
matrix = np.transpose(matrix) # C <=> Fortran ordering
# get the size of the input matrix
if dims == 3:
Nx, Ny, Nz = matrix.shape
elif dims == 2:
Ny, Nz = matrix.shape
Nx = 1
else:
Nx, Ny, Nz = 1, 1, 1
# check size of matrix and set chunk size and compression level
if dims == 3:
# set chunk size to Nx * Ny
chunk_size = [Nx, Ny, 1]
elif dims == 2:
# set chunk size to Nx
chunk_size = [Nx, 1, 1]
elif dims <= 1:
# check that the matrix size is greater than 1 MB
one_mb = (1024**2) / 8
if matrix.size > one_mb:
# set chunk size to 1 MB
if Nx > Ny:
chunk_size = [one_mb, 1, 1]
elif Ny > Nz:
chunk_size = [1, one_mb, 1]
else:
chunk_size = [1, 1, one_mb]
else:
# set no compression
compression_level = 0
# set chunk size to grid size
if matrix.size == 1:
chunk_size = (1, 1, 1)
elif Nx > Ny:
chunk_size = (Nx, 1, 1)
elif Ny > Nz:
chunk_size = (1, Ny, 1)
else:
chunk_size = (1, 1, Nz)
else:
# throw error for unknown matrix size
raise ValueError("Input matrix must have 1, 2 or 3 dimensions.")
# check the format of the matrix is either single precision (float in C++)
# or uint64 (unsigned long in C++)
if matrix.dtype == np.float32:
# set data type flags
data_type_matlab = h5_literals.MATRIX_DATA_TYPE_MATLAB
data_type_c = h5_literals.MATRIX_DATA_TYPE_C
elif matrix.dtype == np.uint64:
# set data type flags
data_type_matlab = h5_literals.INTEGER_DATA_TYPE_MATLAB
data_type_c = h5_literals.INTEGER_DATA_TYPE_C
else:
# throw error for unknown data type
raise ValueError("Input matrix must be of type " "single" " or " "uint64" ".")
# check if the input matrix is real or complex, if complex, rearrange the
# data in the C++ format
if np.isreal(matrix).all():
# set file tag
domain_type = "real" # DOMAIN_TYPE_REAL
elif dims == 3:
# set file tag
domain_type = h5_literals.DOMAIN_TYPE_COMPLEX
# rearrange the data so the real and imaginary parts are stored in the
# same matrix
matrix = np.concatenate(matrix.real, matrix.imag, axis=0)
matrix = matrix.reshape((Nx, 2, Ny, Nz))
matrix = np.transpose(matrix, (1, 0, 2, 3))
matrix = matrix.reshape((2 * Nx, Ny, Nz))
# update the size of Nx
Nx = 2 * Nx
elif dims <= 1:
# set file tag
domain_type = h5_literals.DOMAIN_TYPE_COMPLEX
# rearrange the data so the real and imaginary parts are stored in the
# same matrix
nelems = matrix.size
matrix = matrix.reshape((nelems, 1))
matrix = np.concatenate(matrix.real, matrix.imag, axis=0)
matrix = matrix.reshape((nelems, 2, 1, 1))
matrix = np.transpose(matrix, (1, 0, 2, 3))
# update the matrix size
Nx = Nx * (2 - np.array(Nx == 1).astype(float))
Ny = Ny * (2 - np.array(Ny == 1).astype(float))
Nz = Nz * (2 - np.array(Nz == 1).astype(float))
# double store in x-direction if a complex scalar
if Nx == 1 and Ny == 1 and Nz == 1:
Nx = 2 * Nx
# put in correct dimension
matrix = matrix.reshape((Nx, Ny, Nz))
else:
raise NotImplementedError("Currently there is no support for saving 2D complex matrices.")
# allocate a holder for the new matrix within the file
opts = {"dtype": data_type_matlab, "chunks": auto_chunk if auto_chunk is True else tuple(chunk_size)}
if compression_level != 0:
# use compression
opts["compression"] = compression_level
# write the matrix into the file
with h5py.File(filename, "a") as f:
f.create_dataset(f"/{matrix_name}", [Nx, Ny, Nz], data=matrix, **opts)
# set attributes for the matrix (used by k-Wave++)
assign_str_attr(f[f"/{matrix_name}"].attrs, h5_literals.DOMAIN_TYPE_ATT_NAME, domain_type)
assign_str_attr(f[f"/{matrix_name}"].attrs, h5_literals.DATA_TYPE_ATT_NAME, data_type_c)
[docs]
def write_attributes(filename: str, file_description: Optional[str] = None) -> None:
"""
Write attributes to a HDF5 file.
This function writes attributes to a HDF5 file using a deprecated legacy method if legacy is set to True, or a new
typed method if legacy is set to False. The function warns if legacy is set to True and deprecates it. If
file_description is not provided, a default file description will be used.
Args:
filename: The name of the HDF5 file.
file_description: The description of the file. If not provided, a default description
will be used.
"""
# get literals
h5_literals = get_h5_literals()
# get computer infor
comp_info = dotdict(
{
"date": datetime.now().strftime("%d-%b-%Y"),
"computer_name": socket.gethostname(),
"operating_system_type": platform.system(),
"operating_system": platform.system() + " " + platform.release() + " " + platform.version(),
"user_name": os.environ.get("USERNAME"),
"matlab_version": "N/A",
"kwave_version": "1.3",
"kwave_path": "N/A",
}
)
# set file description if not provided by user
if file_description is None:
file_description = (
f"Input data created by {comp_info.user_name} running MATLAB "
f"{comp_info.matlab_version} on {comp_info.operating_system_type}"
)
# set additional file attributes
with h5py.File(filename, "a") as f:
# create a dictionary of attributes
attributes = {
h5_literals.FILE_MAJOR_VER_ATT_NAME: h5_literals.HDF_FILE_MAJOR_VERSION,
h5_literals.FILE_MINOR_VER_ATT_NAME: h5_literals.HDF_FILE_MINOR_VERSION,
h5_literals.CREATED_BY_ATT_NAME: f"k-Wave {kwave.VERSION}",
h5_literals.FILE_DESCR_ATT_NAME: file_description,
h5_literals.FILE_TYPE_ATT_NAME: h5_literals.HDF_INPUT_FILE,
h5_literals.FILE_CREATION_DATE_ATT_NAME: get_date_string(),
}
# loop through the attributes dictionary and assign each attribute to the file
for key, value in attributes.items():
assign_str_attr(f.attrs, key, value)
[docs]
def write_flags(filename):
"""
writeFlags reads the input HDF5 file and derives and writes the
required source and medium flags based on the datasets present in the
file. For example, if the file contains a data set named 'BonA', the
nonlinear_flag will be written as true. Conditional flags are also
written. The source mode flags are written when appropriate if they
are not already present in the file. The default source mode is
'additive'.
List of flags that are always written
ux_source_flag
uy_source_flag
uz_source_flag
sxx_source_flag
sxy_source_flag
sxz_source_flag
syy_source_flag
syz_source_flag
szz_source_flag
p_source_flag
p0_source_flag
transducer_source_flag
nonuniform_grid_flag
nonlinear_flag
absorbing_flag
axisymmetric_flag
elastic_flag
sensor_mask_type
List of conditional flags
u_source_mode
u_source_many
p_source_mode
p_source_many
s_source_mode
s_source_many
Args:
filename:
"""
# h5_literals = get_h5_literals()
with h5py.File(filename, "r") as hf:
names = hf.keys()
v_list = [
("ux_source", "u_source_many"),
("uy_source", "u_source_many"),
("uz_source", "u_source_many"),
("sxx_source", "s_source_many"),
("syy_source", "s_source_many"),
("szz_source", "s_source_many"),
("sxy_source", "s_source_many"),
("sxz_source", "s_source_many"),
("syz_source", "s_source_many"),
("p_source", "p_source_many"),
]
variable_list = {}
for prefix, many_flag_key in v_list:
inp_name = f"{prefix}_input"
flag_name = f"{prefix}_flag"
if inp_name in names:
variable_list[flag_name] = hf[inp_name].shape[1]
variable_list[many_flag_key] = hf[inp_name].shape[0] != 1
else:
variable_list[flag_name] = 0
# --------------------
# u source
# --------------------
# write u_source mode if not already in file (1 is Additive, 0 is Dirichlet)
if any(variable_list[flag] for flag in ["ux_source_flag", "uy_source_flag", "uz_source_flag"]) and "u_source_mode" not in names:
variable_list["u_source_mode"] = 1
# --------------------
# s source
# --------------------
# write s_source mode if not already in file (1 is Additive, 0 is Dirichlet)
if (
any(
variable_list[flag]
for flag in [
"sxx_source_flag",
"syy_source_flag",
"szz_source_flag",
"sxy_source_flag",
"sxz_source_flag",
"syz_source_flag",
]
)
and "s_source_mode" not in names
):
variable_list["s_source_mode"] = 1
# --------------------
# p source
# --------------------
# write p_source mode if not already in file (1 is Additive, 0 is Dirichlet)
if any(variable_list[flag] for flag in ["p_source_flag"]) and "p_source_mode" not in names:
variable_list["p_source_mode"] = 1
# check for p0_source_input and set p0_source_flag
variable_list["p0_source_flag"] = "p0_source_input" in names
# --------------------
# additional flags
# --------------------
# check for transducer_source_input and set transducer_source_flag
variable_list["transducer_source_flag"] = "transducer_source_input" in names
# check for BonA and set nonlinear flag
variable_list["nonlinear_flag"] = "BonA" in names
# check for alpha_coeff and set absorbing flag
variable_list["absorbing_flag"] = "alpha_coeff" in names
# check for lambda and set elastic flag
variable_list["elastic_flag"] = "lambda" in names
# set axisymmetric grid flag to false
variable_list["axisymmetric_flag"] = 0
# set nonuniform grid flag to false
variable_list["nonuniform_grid_flag"] = 0
# check for sensor_mask_index and sensor_mask_corners
if "sensor_mask_index" in names:
variable_list["sensor_mask_type"] = 0
elif "sensor_mask_corners" in names:
variable_list["sensor_mask_type"] = 1
else:
raise ValueError("Either sensor_mask_index or sensor_mask_corners must be defined in the input file")
# --------------------
# write flags to file
# --------------------
# change all the index variables to be in 64-bit unsigned integers (long in C++) and write to file
for key, value in variable_list.items():
# cast matrix to 64-bit unsigned integer
value = np.array(value, dtype=np.uint64)
write_matrix(filename, value, key)
del value
[docs]
def write_grid(filename, grid_size, grid_spacing, pml_size, pml_alpha, Nt, dt, c_ref):
"""
Creates and writes the wavenumber grids and PML variables
required by the k-Wave C++ code to the HDF5 file specified by the
user.
List of parameters that are written:
Nx
Ny
Nz
Nt
dt
dx
dy
dz
c_ref
pml_x_alpha
pml_y_alpha
pml_z_alpha
pml_x_size
pml_y_size
pml_z_size
"""
h5_literals = get_h5_literals()
# =========================================================================
# STORE FLOATS
# =========================================================================
variable_list = {
"dt": dt,
"dx": grid_spacing[0],
"dy": grid_spacing[1],
"dz": grid_spacing[2],
"pml_x_alpha": pml_alpha[0],
"pml_y_alpha": pml_alpha[1],
"pml_z_alpha": pml_alpha[2],
"c_ref": c_ref,
}
# change float variables to be in single precision (float in C++), then add to HDF5 file
for key, value in variable_list.items():
# cast matrix to single precision
value = cast_to_type(value, h5_literals.MATRIX_DATA_TYPE_MATLAB)
write_matrix(filename, value, key)
del value
# =========================================================================
# STORE INTEGERS
# =========================================================================
# integer variables
variable_list = {
"Nx": grid_size[0],
"Ny": grid_size[1],
"Nz": grid_size[2],
"Nt": Nt,
"pml_x_size": pml_size[0],
"pml_y_size": pml_size[1],
"pml_z_size": pml_size[2],
}
# change all the index variables to be in 64-bit unsigned integers (long in C++)
for key, value in variable_list.items():
# cast matrix to 64-bit unsigned integer
value = cast_to_type(value, h5_literals.INTEGER_DATA_TYPE_MATLAB)
write_matrix(filename, value, key)
del value
[docs]
def assign_str_attr(attrs, attr_name, attr_val):
"""
Assigns HDF5 attribute with value as a fixed-length string
Args:
attrs: HDF5 attribute object
attr_name: name of attribute
attr_val: value of attribute
"""
attrs.create(attr_name, attr_val, None, dtype=f"<S{len(attr_val)}")
[docs]
def load_image(path, is_gray):
if is_gray:
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
else:
img = cv2.imread(path, cv2.IMREAD_COLOR)
raise NotImplementedError
# im = squeeze(double(im(:, :, 1)) + double(im(:, :, 2)) + double(im(:, :, 3)));
img = img.astype(float)
# scale pixel values from 0 -> 1
img = img.max() - img
img = img * (1 / img.max())
return img