Source code for kwave.kgrid

import math
import sys
from dataclasses import dataclass

import numpy as np

from kwave.data import FlexibleVector, Vector
from kwave.enums import DiscreteCosine, DiscreteSine
from kwave.utils import matlab
from kwave.utils.math import largest_prime_factor


[docs] @dataclass class kWaveGrid(object): """ kWaveGrid is the grid class used across the k-Wave Toolbox. An object of the kWaveGrid class contains the grid coordinates and wavenumber matrices used within the simulation and reconstruction functions in k-Wave. The grid matrices are indexed as: (x, 1) in 1D; (x, y) in 2D; and (x, y, z) in 3D. The grid is assumed to be a regularly spaced Cartesian grid, with grid spacing given by dx, dy, dz (typically the grid spacing in each direction is constant). """ # default CFL number CFL_DEFAULT = 0.3 # machine precision MACHINE_PRECISION = 100 * sys.float_info.epsilon
[docs] def __init__(self, N, spacing): """ Args: N: grid size in each dimension [grid points] spacing: grid point spacing in each direction [m] """ N, spacing = np.atleast_1d(N), np.atleast_1d(spacing) # if inputs are lists assert N.ndim == 1 and spacing.ndim == 1 # ensure no multidimensional lists assert (1 <= N.size <= 3) and (1 <= spacing.size <= 3) # ensure valid dimensionality assert N.size == spacing.size, "Size list N and spacing list do not have the same size." self.N = N.astype(int) #: grid size in each dimension [grid points] self.spacing = spacing #: grid point spacing in each direction [m] self.dim = self.N.size #: Number of dimensions (1, 2 or 3) self.nonuniform = False #: flag that indicates grid non-uniformity self.dt = "auto" #: size of time step [s] self.Nt = "auto" #: number of time steps [s] # originally there was [xn_vec, yn_vec, zn_vec] self.n_vec = FlexibleVector([0] * self.dim) #: position vectors for the grid points in [0, 1] # originally there was [xn_vec_sgx, yn_vec_sgy, zn_vec_sgz] self.n_vec_sg = FlexibleVector([0] * self.dim) #: position vectors for the staggered grid points in [0, 1] # originally there was [dxudxn, dyudyn, dzudzn] self.dudn = FlexibleVector([0] * self.dim) #: transformation gradients between uniform and staggered grids # originally there was [dxudxn_sgx, dyudyn_sgy, dzudzn_sgz] self.dudn_sg = FlexibleVector([0] * self.dim) #: transformation gradients between uniform and staggered grids # assign the grid parameters for the x spatial direction # originally kx_vec self.k_vec = FlexibleVector([self.makeDim(self.Nx, self.dx)]) #: Nx x 1 vector of wavenumber components in the x-direction [rad/m] if self.dim == 1: # define the scalar wavenumber based on the wavenumber components self.k = abs(self.k_vec.x) #: scalar wavenumber if self.dim >= 2: # assign the grid parameters for the x and y spatial directions # Ny x 1 vector of wavenumber components in the y-direction [rad/m] self.k_vec = self.k_vec.append(self.makeDim(self.Ny, self.dy)) if self.dim == 2: # define the wavenumber based on the wavenumber components self.k = np.zeros((self.Nx, self.Ny)) self.k = np.reshape(self.k_vec.x, (-1, 1)) ** 2 + self.k self.k = np.reshape(self.k_vec.y, (1, -1)) ** 2 + self.k self.k = np.sqrt(self.k) #: scalar wavenumber if self.dim == 3: # assign the grid parameters for the x, y, and z spatial directions # Nz x 1 vector of wavenumber components in the z-direction [rad/m] self.k_vec = self.k_vec.append(self.makeDim(self.Nz, self.dz)) # define the wavenumber based on the wavenumber components self.k = np.zeros((self.Nx, self.Ny, self.Nz)) self.k = np.reshape(self.k_vec.x, (-1, 1, 1)) ** 2 + self.k self.k = np.reshape(self.k_vec.y, (1, -1, 1)) ** 2 + self.k self.k = np.reshape(self.k_vec.z, (1, 1, -1)) ** 2 + self.k self.k = np.sqrt(self.k) #: scalar wavenumber
@property def t_array(self): """ time array [s] """ # TODO (walter): I would change this functionality to return a time array even if Nt or dt are not yet set # (e.g. if they are still 'auto') if self.Nt == "auto" or self.dt == "auto": return "auto" else: t_array = np.arange(0, self.Nt) * self.dt # TODO: adding this extra dimension seems unnecessary # This leads to an extra squeeze when plotting e.g. in example "array as sensor" on lines 110 and 111 return np.expand_dims(t_array, axis=0) @t_array.setter def t_array(self, t_array): # check for 'auto' input if isinstance(t_array, str) and t_array == "auto": # set values to auto self.Nt = "auto" self.dt = "auto" else: # extract property values Nt_temp = t_array.size dt_temp = t_array[1] - t_array[0] # check the time array begins at zero assert t_array[0] == 0, "t_array must begin at zero." # check the time array is evenly spaced assert (t_array[1:] - t_array[0:-1] - dt_temp).max() < self.MACHINE_PRECISION, "t_array must be evenly spaced." # check the time steps are increasing assert dt_temp > 0, "t_array must be monotonically increasing." # assign values self.Nt = Nt_temp self.dt = dt_temp
[docs] def setTime(self, Nt, dt) -> None: """ Set Nt and dt based on user input Args: Nt: dt: Returns: None """ # check the value for Nt assert ( isinstance(Nt, int) or np.issubdtype(Nt, np.int64) or np.issubdtype(Nt, np.int32) ) and Nt > 0, "Nt must be a positive integer." # check the value for dt assert dt > 0, "dt must be positive." # assign values self.Nt = Nt self.dt = dt
@property def Nx(self): """ grid size in x-direction [grid points] """ return self.N[0] @property def Ny(self): """ grid size in y-direction [grid points] """ return self.N[1] if self.N.size >= 2 else 0 @property def Nz(self): """ grid size in z-direction [grid points] """ return self.N[2] if self.N.size == 3 else 0 @property def dx(self): """ grid point spacing in x-direction [m] """ return self.spacing[0] @property def dy(self): """ grid point spacing in y-direction [m] """ return self.spacing[1] if self.spacing.size >= 2 else 0 @property def dz(self): """ grid point spacing in z-direction [m] """ return self.spacing[2] if self.spacing.size == 3 else 0 @property def x_vec(self): """ Nx x 1 vector of the grid coordinates in the x-direction [m] """ # calculate x_vec based on kx_vec return self.size[0] * self.k_vec.x * self.dx / (2 * np.pi) @property def y_vec(self): """ Ny x 1 vector of the grid coordinates in the y-direction [m] """ # calculate y_vec based on ky_vec if self.dim < 2: return np.nan return self.size[1] * self.k_vec.y * self.dy / (2 * np.pi) @property def z_vec(self): """ Nz x 1 vector of the grid coordinates in the z-direction [m] """ # calculate z_vec based on kz_vec if self.dim < 3: return np.nan return self.size[2] * self.k_vec.z * self.dz / (2 * np.pi) @property def x(self): """ Nx x Ny x Nz grid containing repeated copies of the grid coordinates in the x-direction [m] """ return self.size[0] * self.kx * self.dx / (2 * math.pi) @property def y(self): """ Nx x Ny x Nz grid containing repeated copies of the grid coordinates in the y-direction [m] """ if self.dim < 2: return 0 return self.size[1] * self.ky * self.dy / (2 * math.pi) @property def z(self): """ Nx x Ny x Nz grid containing repeated copies of the grid coordinates in the z-direction [m] """ if self.dim < 3: return 0 return self.size[2] * self.kz * self.dz / (2 * math.pi) @property def xn(self): """ 3D plaid non-uniform spatial grids Returns: plaid xn matrix """ if self.dim == 1: return self.n_vec.x if self.nonuniform else 0 elif self.dim == 2: return np.tile(self.n_vec.x, (1, self.Ny)) if self.nonuniform else 0 else: return np.tile(self.n_vec.x, (1, self.Ny, self.Nz)) if self.nonuniform else 0 @property def yn(self): """ 3D plaid non-uniform spatial grids Returns: plaid yn matrix """ if self.dim < 2: return np.nan n_vec_y = np.array(self.n_vec.y).T if self.dim == 2: return np.tile(n_vec_y, (self.Nx, 1)) if self.nonuniform else 0 else: return np.tile(n_vec_y, (self.Nx, 1, self.Nz)) if self.nonuniform else 0 @property def zn(self): """ 3D plaid non-uniform spatial grids Returns: plaid zn matrix """ if self.dim < 3: return np.nan n_vec_z = np.atleast_1d(np.squeeze(self.n_vec.z))[None, None, :] return np.tile(n_vec_z, (self.Nx, self.Ny, 1)) if self.nonuniform else 0 @property def size(self): """ Size of grid in the all directions [m] """ return Vector(self.N * self.spacing) @property def total_grid_points(self) -> np.ndarray: """ Total number of grid points (equal to Nx * Ny * Nz) """ return np.prod(self.N) @property def kx(self): """ Nx x Ny x Nz grid containing repeated copies of the wavenumber components in the x-direction [rad/m] Returns: plaid xn matrix """ if self.dim == 1: return self.k_vec.x elif self.dim == 2: return np.tile(self.k_vec.x, (1, self.Ny)) else: return np.tile(self.k_vec.x[:, :, None], (1, self.Ny, self.Nz)) @property def ky(self): """ Nx x Ny x Nz grid containing repeated copies of the wavenumber components in the y-direction [rad/m] Returns: plaid yn matrix """ if self.dim == 2: return np.tile(self.k_vec.y.T, (self.Nx, 1)) elif self.dim == 3: return np.tile(self.k_vec.y[None, :, :], (self.Nx, 1, self.Nz)) return np.nan @property def kz(self): """ Nx x Ny x Nz grid containing repeated copies of the wavenumber components in the z-direction [rad/m] Returns: plaid zn matrix """ if self.dim == 3: return np.tile(self.k_vec.z.T[None, :, :], (self.Nx, self.Ny, 1)) else: return np.nan @property def x_size(self): """ Size of grid in the x-direction [m] """ return self.Nx * self.dx @property def y_size(self): """ Size of grid in the y-direction [m] """ return self.Ny * self.dy @property def z_size(self): """ Size of grid in the z-direction [m] """ return self.Nz * self.dz @property def k_max(self): # added by us, not the same as kWave k_max (see k_max_all for KwaveGrid.k_max) """ Maximum supported spatial frequency in the 3 directions [rad/m] Returns: Vector of 3 elements each in [rad/m]. Value for higher dimensions set to NaN """ # kx_max = np.abs(self.k_vec.x).max() ky_max = np.abs(self.k_vec.y).max() if self.dim >= 2 else np.nan kz_max = np.abs(self.k_vec.z).max() if self.dim == 3 else np.nan return Vector([kx_max, ky_max, kz_max]) @property def k_max_all(self): """ Maximum supported spatial frequency in all directions [rad/m] Originally k_max in kWave.kWaveGrid! Returns: Scalar in [rad/m] """ # return np.nanmin(self.k_max) ######################################## # functions that can only be accessed by class members ########################################
[docs] @staticmethod # TODO (walter): convert this name to snake case def makeDim(num_points, spacing): """ Create the grid parameters for a single spatial direction Args: num_points: spacing: Returns: """ # define the discretisation of the spatial dimension such that there is always a DC component if num_points % 2 == 0: # grid dimension has an even number of points nx = np.arange(-num_points / 2, num_points / 2) / num_points else: # grid dimension has an odd number of points nx = np.arange(-(num_points - 1) / 2, (num_points - 1) / 2 + 1) / num_points nx = np.array(nx).T # force middle value to be zero in case 1/Nx is a recurring # number and the series doesn't give exactly zero nx[int(num_points // 2)] = 0 # define the wavenumber vector components res = (2 * math.pi / spacing) * nx return res[:, None]
[docs] def highest_prime_factors(self, axisymmetric=None) -> np.ndarray: """ calculate the highest prime factors Args: axisymmetric: Axisymmetric code or None Returns: Vector of three elements """ # import statement place here in order to avoid circular dependencies if axisymmetric is not None: if axisymmetric == "WSWA": prime_facs = [largest_prime_factor(self.Nx), largest_prime_factor(self.Ny * 4), largest_prime_factor(self.Nz)] elif axisymmetric == "WSWS": prime_facs = [largest_prime_factor(self.Nx), largest_prime_factor(self.Ny * 2 - 2), largest_prime_factor(self.Nz)] else: raise ValueError("Unknown axisymmetric symmetry.") else: prime_facs = [largest_prime_factor(self.Nx), largest_prime_factor(self.Ny), largest_prime_factor(self.Nz)] return np.array(prime_facs)
# TODO (walter): convert this name to snake case
[docs] def makeTime(self, c, cfl=CFL_DEFAULT, t_end=None): """ Compute Nt and dt based on the cfl number and grid size, where the number of time-steps is chosen based on the time it takes to travel from one corner of the grid to the geometrically opposite corner. Note, if c is given as a matrix, the calculation for dt is based on the maximum value, and the calculation for t_end based on the minimum value. Args: c: sound speed cfl: convergence condition by Courant–Friedrichs–Lewy t_end: final time step Returns: Nothing """ # if c is a matrix, find the minimum and maximum values c = np.array(c) c_min, c_max = np.min(c), np.max(c) # check for user define t_end, otherwise set the simulation # length based on the size of the grid diagonal and the maximum # sound speed in the medium if t_end is None: t_end = np.linalg.norm(self.size, ord=2) / c_min # extract the smallest grid spacing min_grid_dim = np.min(self.spacing) # assign time step based on CFL stability criterion self.dt = cfl * min_grid_dim / c_max # assign number of time steps based on t_end self.Nt = int(np.floor(t_end / self.dt) + 1) # catch case where dt is a recurring number if (np.floor(t_end / self.dt) != np.ceil(t_end / self.dt)) and (matlab.rem(t_end, self.dt) == 0): self.Nt = self.Nt + 1 return self.t_array, self.dt
################################################## #### #### FUNCTIONS BELOW WERE NOT TESTED FOR CORRECTNESS! #### ##################################################
[docs] def kx_vec_dtt(self, dtt_type): """ Compute the DTT wavenumber vector in the x-direction Args: dtt_type: Returns: """ kx_vec_dtt, M = self.makeDTTDim(self.Nx, self.dx, dtt_type) return kx_vec_dtt, M
[docs] def ky_vec_dtt(self, dtt_type): """ Compute the DTT wavenumber vector in the y-direction Args: dtt_type: Returns: """ ky_vec_dtt, M = self.makeDTTDim(self.Ny, self.dy, dtt_type) return ky_vec_dtt, M
[docs] def kz_vec_dtt(self, dtt_type): """ Compute the DTT wavenumber vector in the z-direction Args: dtt_type: Returns: """ kz_vec_dtt, M = self.makeDTTDim(self.Nz, self.dz, dtt_type) return kz_vec_dtt, M
[docs] @staticmethod # TODO (walter): convert this name to snake case def makeDTTDim(Nx, dx, dtt_type): """ Create the DTT grid parameters for a single spatial direction Args: Nx: dx: dtt_type: Returns: """ # compute the implied period of the input function if dtt_type == DiscreteCosine.TYPE_1: M = 2 * (Nx - 1) elif dtt_type == DiscreteSine.TYPE_1: M = 2 * (Nx + 1) else: M = 2 * Nx # calculate the wavenumbers if dtt_type == DiscreteCosine.TYPE_1: # whole-wavenumber DTT # WSWS / DCT-I n = np.arange(0, M // 2 + 1).T kx_vec = 2 * math.pi * n / (M * dx) elif dtt_type == DiscreteCosine.TYPE_2: # whole-wavenumber DTT # HSHS / DCT-II n = np.arange(0, M // 2).T kx_vec = 2 * math.pi * n / (M * dx) elif dtt_type == DiscreteSine.TYPE_1: # whole-wavenumber DTT # WAWA / DST-I n = np.arange(1, M // 2).T kx_vec = 2 * math.pi * n / (M * dx) elif dtt_type == DiscreteSine.TYPE_2: # whole-wavenumber DTT # HAHA / DST-II n = np.arange(1, M // 2 + 1).T kx_vec = 2 * math.pi * n / (M * dx) elif dtt_type in [DiscreteCosine.TYPE_3, DiscreteCosine.TYPE_4, DiscreteSine.TYPE_3, DiscreteSine.TYPE_4]: # half-wavenumber DTTs # WSWA / DCT-III # HSHA / DCT-IV # WAWS / DST-III # HAHS / DST-IV n = np.arange(0, M // 2).T kx_vec = 2 * math.pi * (n + 0.5) / (M * dx) else: raise ValueError return kx_vec, M
######################################## # functions for non-uniform grids ######################################## # TODO (walter): convert this name to snake case
[docs] def setNUGrid(self, dim, n_vec, dudn, n_vec_sg, dudn_sg): """ Function to set non-uniform grid parameters in specified dimension Args: dim: n_vec: dudn: n_vec_sg: dudn_sg: Returns: """ # check the dimension to set the nonuniform grid is appropriate assert dim <= self.dim, f"Cannot set nonuniform parameters for dimension {dim} of {self.dim}-dimensional grid." # force non-uniform grid spacing to be column vectors, and the # gradients to be in the correct direction for use with bsxfun n_vec = np.reshape(n_vec, (-1, 1), order="F") n_vec_sg = np.reshape(n_vec_sg, (-1, 1), order="F") if dim == 1: dudn = np.reshape(dudn, (-1, 1), order="F") dudn_sg = np.reshape(dudn_sg, (-1, 1), order="F") elif dim == 2: dudn = np.reshape(dudn, (1, -1), order="F") dudn_sg = np.reshape(dudn_sg, (1, -1), order="F") elif dim == 3: dudn = np.reshape(dudn, (1, 1, -1), order="F") dudn_sg = np.reshape(dudn_sg, (1, 1, -1), order="F") self.n_vec.assign_dim(self.dim, n_vec) self.n_vec_sg.assign_dim(self.dim, n_vec_sg) self.dudn.assign_dim(self.dim, dudn) self.dudn_sg.assign_dim(self.dim, dudn_sg) # set non-uniform flag self.nonuniform = True
[docs] def k_dtt(self, dtt_type): # Not tested for correctness! """ compute the individual wavenumber vectors, where dtt_type is the type of discrete trigonometric transform, which corresponds to the assumed input symmetry of the input function, where: 1. DCT-I WSWS 2. DCT-II HSHS 3. DCT-III WSWA 4. DCT-IV HSHA 5. DST-I WAWA 6. DST-II HAHA 7. DST-III WAWS 8. DST-IV HAHS Args: dtt_type: Returns: """ # check dtt_type is a scalar or a vector the same size self.dim dtt_type = np.array(dtt_type) assert dtt_type.size in [1, self.dim], f"dtt_type must be a scalar, or {self.dim}D vector" if self.dim == 1: k, M = self.kx_vec_dtt(dtt_type[0]) return k, M elif self.dim == 2: # assign the grid parameters for the x and y spatial directions kx_vec_dtt, Mx = self.kx_vec_dtt(dtt_type[0]) ky_vec_dtt, My = self.ky_vec_dtt(dtt_type[-1]) # define the wavenumber based on the wavenumber components k = np.zeros((self.Nx, self.Ny)) # assert len(kx_vec_dtt.shape) == 3 k += np.reshape(kx_vec_dtt, (-1, 1)) ** 2 k += np.reshape(ky_vec_dtt, (1, -1)) ** 2 k = np.sqrt(k) # define product of implied period M = Mx * My return k, M elif self.dim == 3: # assign the grid parameters for the x, y, and z spatial directions kx_vec_dtt, Mx = self.kx_vec_dtt(dtt_type[0]) ky_vec_dtt, My = self.ky_vec_dtt(dtt_type[len(dtt_type) // 2]) kz_vec_dtt, Mz = self.kz_vec_dtt(dtt_type[-1]) # define the wavenumber based on the wavenumber components k = np.zeros((self.Nx, self.Ny, self.Nz)) k = np.reshape(kx_vec_dtt, (-1, 1, 1)) ** 2 + k k = np.reshape(ky_vec_dtt, (1, -1, 1)) ** 2 + k k = np.reshape(kz_vec_dtt, (1, 1, -1)) ** 2 + k k = np.sqrt(k) # define product of implied period M = Mx * My * Mz return k, M