import logging
from math import floor
import numpy as np
import scipy
from numpy.fft import ifftshift, fft, ifft
from beartype import beartype as typechecker
from beartype.typing import Union, List, Optional, Tuple
from jaxtyping import Int, Bool
from .conversion import freq2wavenumber
from .data import scale_SI
from .mapgen import ndgrid
from .math import sinc, gaussian
from .matlab import matlab_mask, unflatten_matlab_mask, rem
from .matrix import broadcast_axis, num_dim
import kwave.utils.typing as kt
[docs]
def add_noise(signal: np.ndarray, snr: float, mode="rms"):
"""
Add Gaussian noise to a signal.
Args:
signal: input signal
snr: desired signal snr (signal-to-noise ratio) in decibels after adding noise
mode: 'rms' (default) or 'peak'
Returns:
Signal with augmented with noise. This behaviour differs from the k-Wave MATLAB implementation in that the SNR is nor returned.
"""
if mode == "rms":
reference = np.sqrt(np.mean(signal**2))
elif mode == "peak":
reference = np.max(signal)
else:
raise ValueError(f"Unknown parameter '{mode}' for input mode.")
# calculate the standard deviation of the Gaussian noise
std_dev = reference / (10 ** (snr / 20))
# calculate noise
noise = std_dev * np.random.randn(*signal.shape)
# check the snr
noise_rms = np.sqrt(np.mean(noise**2))
snr = 20.0 * np.log10(reference / noise_rms)
# add noise to the recorded sensor data
signal = signal + noise
return signal
[docs]
@typechecker
def get_win(
N: Union[int, np.ndarray, Tuple[int, int], Tuple[int, int, int], List[Int[kt.ScalarLike, ""]]],
# TODO: replace and refactor for scipy.signal.get_window
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.get_window.html#scipy.signal.get_window
type_: str, # TODO change this to enum in the future
plot_win: bool = False,
param: Optional[float] = None,
rotation: bool = False,
symmetric: Union[bool, Bool[np.ndarray, "N"]] = True,
square: bool = False,
):
"""
A frequency domain windowing function of specified type and dimensions.
Args:
N: Number of samples, [Nx] for 1D, [Nx, Ny] for 2D, [Nx, Ny, Nz] for 3D.
type_: Window type. Supported values: 'Bartlett', 'Bartlett-Hanning', 'Blackman', 'Blackman-Harris',
'Blackman-Nuttall', 'Cosine', 'Flattop', 'Gaussian', 'HalfBand',
'Hamming', 'Hanning', 'Kaiser', 'Lanczos', 'Nuttall',
'Rectangular', 'Triangular', 'Tukey'.
plot_win: Boolean to display the window (default = False).
param: Control parameter for Tukey, Blackman, Gaussian, and Kaiser windows: taper ratio (Tukey),
alpha (Blackman, Kaiser), standard deviation (Gaussian)
(default = 0.5, 0.16, 3 respectively).
rotation: Boolean to create windows via rotation or outer product (default = False).
symmetric: Boolean to make the window symmetrical (default = True).
Can also be a vector defining the symmetry in each matrix dimension.
square: Boolean to force the window to be square (default = False).
Returns:
A tuple of (win, cg) where win is the window and cg is the coherent gain of the window.
"""
def cosine_series(n: int, N: int, coeffs: List[float]) -> np.ndarray:
"""
Sub-function to calculate a summed filter cosine series.
Args:
n: An integer representing the current index in the series.
N: An integer representing the total number of terms in the series.
coeffs: A list of floats representing the coefficients of the cosine terms.
Returns:
A numpy ndarray containing the calculated series.
"""
series = coeffs[0]
for index in range(1, len(coeffs)):
series = series + (-1) ** index * coeffs[index] * np.cos(index * 2 * np.pi * n / (N - 1))
return series.T
# Check if N is either `int` or `list of ints`
# assert isinstance(N, int) or isinstance(N, list) or isinstance(N, np.ndarray)
N = np.array(N, dtype=int)
N = N if np.size(N) > 1 else int(N)
# Check if symmetric is either `bool` or `list of bools`
# assert isinstance(symmetric, int) or isinstance(symmetric, list)
symmetric = np.array(symmetric, dtype=bool)
# Set default value for `param` if type is one of the special ones
assert not plot_win, NotImplementedError("Plotting is not implemented.")
if type_ == "Tukey":
if param is None:
param = 0.5
param = np.clip(param, a_min=0, a_max=1)
elif type_ == "Blackman":
if param is None:
param = 0.16
param = np.clip(param, a_min=0, a_max=1)
elif type_ == "Gaussian":
if param is None:
param = 0.5
param = np.clip(param, a_min=0, a_max=0.5)
elif type_ == "Kaiser":
if param is None:
param = 3
param = np.clip(param, a_min=0, a_max=100)
# if a non-symmetrical window is required, enlarge the window size (note,
# this expands each dimension individually if symmetric is a vector)
N = N + 1 * (1 - symmetric.astype(int))
# if a square window is required, replace grid sizes with smallest size and
# store a copy of the original size
if square and (N.size != 1):
N_orig = np.copy(N)
L = min(N)
N[:] = L
# create the window
if N.size == 1:
# TODO: what should this behaviour be if N is a list of ints? make windows of multiple lengths?
n = np.arange(0, N)
# TODO: find failure cases in test suite when N is zero.
# assert np.all(N) > 1, 'Signal length N must be greater than 1'
if type_ == "Bartlett":
win = (2 / (N - 1) * ((N - 1) / 2 - abs(n - (N - 1) / 2))).T
elif type_ == "Bartlett-Hanning":
win = (0.62 - 0.48 * abs(n / (N - 1) - 1 / 2) - 0.38 * np.cos(2 * np.pi * n / (N - 1))).T
elif type_ == "Blackman":
win = cosine_series(n, N, [(1 - param) / 2, 0.5, param / 2])
elif type_ == "Blackman-Harris":
win = cosine_series(n, N, [0.35875, 0.48829, 0.14128, 0.01168])
elif type_ == "Blackman-Nuttall":
win = cosine_series(n, N, [0.3635819, 0.4891775, 0.1365995, 0.0106411])
elif type_ == "Cosine":
win = (np.cos(np.pi * n / (N - 1) - np.pi / 2)).T
elif type_ == "Flattop":
win = cosine_series(n, N, [0.21557895, 0.41663158, 0.277263158, 0.083578947, 0.006947368])
elif type_ == "Gaussian":
win = (np.exp(-0.5 * ((n - (N - 1) / 2) / (param * (N - 1) / 2)) ** 2)).T
elif type_ == "HalfBand":
win = np.ones(N)
# why not to just round? => because rounding 0.5 introduces unexpected behaviour
# round(0.5) should be 1 but it is 0
ramp_length = round(N / 4 + 1e-8)
ramp = (
1 / 2
+ 9 / 16 * np.cos(np.pi * np.arange(1, ramp_length + 1) / (2 * ramp_length))
- 1 / 16 * np.cos(3 * np.pi * np.arange(1, ramp_length + 1) / (2 * ramp_length))
)
if ramp_length > 0:
win[0:ramp_length] = np.flip(ramp)
win[-ramp_length:] = ramp
elif type_ == "Hamming":
win = (0.54 - 0.46 * np.cos(2 * np.pi * n / (N - 1))).T
elif type_ == "Hanning":
win = (0.5 - 0.5 * np.cos(2 * np.pi * n / (N - 1))).T
elif type_ == "Kaiser":
part_1 = scipy.special.iv(0, np.pi * param * np.sqrt(1 - (2 * n / (N - 1) - 1) ** 2))
part_2 = scipy.special.iv(0, np.pi * param)
win = part_1 / part_2
elif type_ == "Lanczos":
win = 2 * np.pi * n / (N - 1) - np.pi
win = sinc(win + 1e-12).T
elif type_ == "Nuttall":
win = cosine_series(n, N, [0.3635819, 0.4891775, 0.1365995, 0.0106411])
elif type_ == "Rectangular":
win = np.ones(N)
elif type_ == "Triangular":
win = (2 / N * (N / 2 - abs(n - (N - 1) / 2))).T
elif type_ == "Tukey":
win = np.ones((N, 1))
index = np.arange(0, (N - 1) * param / 2 + 1e-8)
param = param * N
win[0 : len(index)] = 0.5 * (1 + np.cos(2 * np.pi / param * (index - param / 2)))[:, None]
win[np.arange(-1, -len(index) - 1, -1)] = win[0 : len(index)]
win = win.squeeze(axis=-1)
else:
raise ValueError(f"Unknown window type: {type_}")
# trim the window if required
if not symmetric:
N -= 1
win = win[0:N]
win = np.expand_dims(win, axis=-1)
# calculate the coherent gain
cg = win.sum() / N
elif N.size == 2:
# create the 2D window
if rotation:
# create the window in one dimension using getWin recursively
L = max(N)
win_lin, _ = get_win(int(L), type_, param=param)
win_lin = np.squeeze(win_lin)
# create the reference axis
radius = (L - 1) / 2
ll = np.linspace(-radius, radius, L)
# create the 2D window using rotation
xx = np.linspace(-radius, radius, N[0])
yy = np.linspace(-radius, radius, N[1])
[x, y] = ndgrid(xx, yy)
r = np.sqrt(x**2 + y**2)
r[r > radius] = radius
interp_func = scipy.interpolate.interp1d(ll, win_lin)
win = interp_func(r)
win[r <= radius] = interp_func(r[r <= radius])
else:
# create the window in each dimension using getWin recursively
win_x, _ = get_win(int(N[0]), type_, param=param)
win_y, _ = get_win(int(N[1]), type_, param=param)
# create the 2D window using the outer product
win = (win_y * win_x.T).T
# trim the window if required
N = N - 1 * (1 - np.array(symmetric).astype(int))
win = win[0 : N[0], 0 : N[1]]
# calculate the coherent gain
cg = win.sum() / np.prod(N)
elif N.size == 3:
# create the 3D window
if rotation:
# create the window in one dimension using getWin recursively
L = N.max()
win_lin, _ = get_win(int(L), type_, param=param)
# create the reference axis
radius = (L - 1) / 2
ll = np.linspace(-radius, radius, L)
# create the 3D window using rotation
xx = np.linspace(-radius, radius, N[0])
yy = np.linspace(-radius, radius, N[1])
zz = np.linspace(-radius, radius, N[2])
[x, y, z] = ndgrid(xx, yy, zz)
r = np.sqrt(x**2 + y**2 + z**2)
r[r > radius] = radius
win_lin = np.squeeze(win_lin)
interp_func = scipy.interpolate.interp1d(ll, win_lin)
win = interp_func(r)
win[r <= radius] = interp_func(r[r <= radius])
else:
# create the window in each dimension using getWin recursively
win_x, _ = get_win(int(N[0]), type_, param=param)
win_y, _ = get_win(int(N[1]), type_, param=param)
win_z, _ = get_win(int(N[2]), type_, param=param)
# create the 2D window using the outer product
win_2D = win_x * win_z.T
# create the 3D window
win = np.zeros((N[0], N[1], N[2]))
for index in range(0, N[1]):
win[:, index, :] = win_2D[:, :] * win_y[index]
# trim the window if required
N = N - 1 * (1 - np.array(symmetric).astype(int))
win = win[0 : N[0], 0 : N[1], 0 : N[2]]
# calculate the coherent gain
cg = win.sum() / np.prod(N)
else:
raise ValueError("Invalid input for N, only 1-, 2-, and 3-D windows are supported.")
# enlarge the window if required
if square and (N.size != 1):
L = N[0]
win_sq = win
win = np.zeros(N_orig)
if N.size == 2:
index1 = round((N[0] - L) / 2)
index2 = round((N[1] - L) / 2)
win[index1 : (index1 + L), index2 : (index2 + L)] = win_sq
elif N.size == 3:
index1 = floor((N_orig[0] - L) / 2)
index2 = floor((N_orig[1] - L) / 2)
index3 = floor((N_orig[2] - L) / 2)
win[index1 : index1 + L, index2 : index2 + L, index3 : index3 + L] = win_sq
return win, cg
[docs]
def tone_burst(sample_freq, signal_freq, num_cycles, envelope="Gaussian", plot_signal=False, signal_length=0, signal_offset=0):
"""
Create an enveloped single frequency tone burst.
Args:
sample_freq: sampling frequency in Hz
signal_freq: frequency of the tone burst signal in Hz
num_cycles: number of sinusoidal oscillations
envelope: Envelope used to taper the tone burst. Valid inputs are:
- 'Gaussian' (the default)
- 'Rectangular'
- [num_ring_up_cycles, num_ring_down_cycles]
The last option generates a continuous wave signal with a cosine taper of the specified length at the beginning and end.
plot: Boolean controlling whether the created tone burst is plotted.
signal_length: Signal length in number of samples. If longer than the tone burst length, the signal is appended with zeros.
signal_offset: Signal offset before the tone burst starts in number of samples.
If an array is given, a matrix of tone bursts is created where each row corresponds to
a tone burst for each value of the 'SignalOffset'.
Returns:
created tone burst
"""
assert isinstance(signal_offset, int) or isinstance(signal_offset, np.ndarray), "signal_offset must be integer or array of integers"
assert isinstance(signal_length, int), "signal_length must be integer"
# calculate the temporal spacing
dt = 1 / sample_freq # [s]
# create the tone burst
tone_length = num_cycles / signal_freq # [s]
# We want to include the endpoint but only if it's divisible by the step-size.
# Modulo operator is not stable, so multiple conditions included.
# if ( (tone_length % dt) < 1e-18 or (np.abs(tone_length % dt - dt) < 1e-18) ):
if rem(tone_length, dt) < 1e-18:
tone_t = np.linspace(0, tone_length, int(tone_length / dt) + 1)
else:
tone_t = np.arange(0, tone_length, dt)
tone_burst = np.sin(2 * np.pi * signal_freq * tone_t)
tone_index = np.round(signal_offset)
# check for ring up and ring down input
if isinstance(envelope, list) or isinstance(envelope, np.ndarray): # and envelope.size == 2:
# assign the inputs
num_ring_up_cycles, num_ring_down_cycles = envelope
# check signal is long enough for ring up and down
assert num_cycles >= (
num_ring_up_cycles + num_ring_down_cycles
), "Input num_cycles must be longer than num_ring_up_cycles + num_ring_down_cycles."
# get period
period = 1 / signal_freq
# create x-axis for ramp between 0 and pi
up_ramp_length_points = round(num_ring_up_cycles * period / dt)
down_ramp_length_points = round(num_ring_down_cycles * period / dt)
up_ramp_axis = np.arange(0, np.pi + 1e-8, np.pi / (up_ramp_length_points - 1))
down_ramp_axis = np.arange(0, np.pi + 1e-8, np.pi / (down_ramp_length_points - 1))
# create ramp using a shifted cosine
up_ramp = (-np.cos(up_ramp_axis) + 1) * 0.5
down_ramp = (np.cos(down_ramp_axis) + 1) * 0.5
# apply the ramps
tone_burst[0:up_ramp_length_points] = tone_burst[0:up_ramp_length_points] * up_ramp
tone_burst[-down_ramp_length_points:] = tone_burst[-down_ramp_length_points:] * down_ramp
else:
# create the envelope
if envelope == "Gaussian":
x_lim = 3
window_x = np.arange(-x_lim, x_lim + 1e-8, 2 * x_lim / (len(tone_burst) - 1))
window = gaussian(window_x, 1, 0, 1)
elif envelope == "Rectangular":
window = np.ones_like(tone_burst)
elif envelope == "RingUpDown":
raise NotImplementedError("RingUpDown not yet implemented")
else:
raise ValueError(f"Unknown envelope {envelope}.")
# apply the envelope
tone_burst = tone_burst * window
# force the ends to be zero by applying a second window
if envelope == "Gaussian":
tone_burst = tone_burst * np.squeeze(get_win(len(tone_burst), type_="Tukey", param=0.05)[0])
# calculate the expected FWHM in the frequency domain
# t_var = tone_length/(2*x_lim)
# w_var = 1/(4*pi^2*t_var)
# fw = 2 * sqrt(2 * log(2) * w_var)
# Convert tone_index and signal_offset to numpy arrays
signal_offset = np.array(signal_offset)
# Determine the length of the signal array
signal_length = max(signal_length, signal_offset.max() + len(tone_burst))
# Create the signal array with the correct size
signal = np.zeros((tone_index.size, signal_length))
# Add the tone burst to the signal array
# Add the tone burst to the signal array
tone_index = np.atleast_1d(tone_index)
if tone_index.size == 1:
signal[:, int(tone_index) : int(tone_index) + len(tone_burst)] = tone_burst.T
else:
for offset, tone_idx in enumerate(tone_index):
signal[offset, int(tone_idx) : int(tone_idx) + len(tone_burst)] = tone_burst.T
# plot the signal if required
if plot_signal:
raise NotImplementedError
return signal
[docs]
def reorder_sensor_data(kgrid, sensor, sensor_data: np.ndarray) -> np.ndarray:
"""
Reorders the sensor data based on the coordinates of the sensor points.
Args:
kgrid: The k-Wave grid object.
sensor: The k-Wave sensor object.
sensor_data: The sensor data to be reordered.
Returns:
np.ndarray of the reordered sensor data.
Raises:
ValueError: If the simulation is not 2D or the sensor is not defined as a binary mask.
"""
# check simulation is 2D
if kgrid.dim != 2:
raise ValueError("The simulation must be 2D.")
# check sensor.mask is a binary mask
if sensor.mask.dtype != bool and set(np.unique(sensor.mask).tolist()) != {0, 1}:
raise ValueError("The sensor must be defined as a binary mask.")
# find the coordinates of the sensor points
x_sensor = matlab_mask(kgrid.x, sensor.mask == 1)
x_sensor = np.squeeze(x_sensor)
y_sensor = matlab_mask(kgrid.y, sensor.mask == 1)
y_sensor = np.squeeze(y_sensor)
# find the angle of each sensor point (from the centre)
angle = np.arctan2(-x_sensor, -y_sensor)
angle[angle < 0] = 2 * np.pi + angle[angle < 0]
# sort the sensor points in order of increasing angle
indices_new = np.argsort(angle, kind="stable")
# reorder the measure time series so that adjacent time series correspond
# to adjacent sensor points.
reordered_sensor_data = sensor_data[indices_new]
return reordered_sensor_data
[docs]
def reorder_binary_sensor_data(sensor_data: np.ndarray, reorder_index: np.ndarray):
"""
Args:
sensor_data: N x K
reorder_index: N
Returns:
reordered sensor data
"""
reorder_index = np.squeeze(reorder_index)
assert sensor_data.ndim == 2
assert reorder_index.ndim == 1
return sensor_data[reorder_index.argsort()]
[docs]
def calc_max_freq(max_spat_freq, c):
filter_cutoff_freq = max_spat_freq * c / (2 * np.pi)
return filter_cutoff_freq
[docs]
def get_alpha_filter(kgrid, medium, filter_cutoff, taper_ratio=0.5):
"""
get_alpha_filter uses get_win to create a Tukey window via rotation to
pass to the medium.alpha_filter. This parameter is used to regularise time
reversal image reconstruction when absorption compensation is included.
Args:
kgrid: simulation grid
medium: simulation medium
filter_cutoff: Any of the filter_cutoff inputs may be set to 'max' to set the cutoff frequency to
the maximum frequency supported by the grid
taper_ratio: The taper_ratio input is used to control the width of the transition region between
the passband and stopband. The default value is 0.5, which corresponds to
a transition region of 50% of the filter width.
Returns:
alpha_filter
"""
dim = num_dim(kgrid.k)
logging.log(logging.INFO, f" taper ratio: {taper_ratio}")
# extract the maximum sound speed
c = max(medium.sound_speed)
assert len(filter_cutoff) == dim, f"Input filter_cutoff must have {dim} elements for a {dim}D grid"
# parse cutoff freqs
filter_size = []
for idx, freq in enumerate(filter_cutoff):
if freq == "max":
filter_cutoff[idx] = calc_max_freq(kgrid.k_max[idx], c)
filter_size_local = kgrid.N[idx]
else:
filter_size_local, filter_cutoff[idx] = freq2wavenumber(kgrid.N[idx], kgrid.k_max[idx], filter_cutoff[idx], c, kgrid.k[idx])
filter_size.append(filter_size_local)
# create the alpha_filter
filter_sec, _ = get_win(filter_size, "Tukey", param=taper_ratio, rotation=True)
# enlarge the alpha_filter to the size of the grid
alpha_filter = np.zeros(kgrid.N)
indexes = [round((kgrid.N[idx] - filter_size[idx]) / 2) for idx in range(len(filter_size))]
if dim == 1:
alpha_filter[indexes[0] : indexes[0] + filter_size[0]] = np.squeeze(filter_sec)
elif dim == 2:
alpha_filter[indexes[0] : indexes[0] + filter_size[0], indexes[1] : indexes[1] + filter_size[1]] = filter_sec
elif dim == 3:
alpha_filter[
indexes[0] : indexes[0] + filter_size[0], indexes[1] : indexes[1] + filter_size[1], indexes[2] : indexes[2] + filter_size[2]
] = filter_sec
def dim_string(cutoff_vals):
return "".join([(str(scale_SI(co)[0]) + " Hz by ") for co in cutoff_vals])
# update the command line status
logging.log(logging.INFO, " filter cutoff: " + dim_string(filter_cutoff)[:-4] + ".")
return alpha_filter
[docs]
def get_wave_number(Nx, dx, dim):
if Nx % 2 == 0:
# even
nx = np.arange(start=-Nx / 2, stop=Nx / 2) / Nx
else:
nx = np.arange(start=-(Nx - 1) / 2, stop=(Nx - 1) / 2 + 1) / Nx
kx = ifftshift((2 * np.pi / dx) * nx)
return kx
[docs]
def gradient_spect(f: np.ndarray, dn: List[float], dim: Optional[Union[int, List[int]]] = None, deriv_order: int = 1) -> np.ndarray:
"""
gradient_spect calculates the gradient of an n-dimensional input matrix using the Fourier collocation spectral method.
The gradient for singleton dimensions is returned as 0.
Args:
f: A numpy ndarray representing the input matrix.
dn: A list of floats representing the grid spacings in each dimension.
dim: An optional integer or list of integers representing the dimensions along which to calculate the gradient.
deriv_order: An integer representing the order of the derivative to calculate. Default is 1.
Returns:
A numpy ndarray containing the calculated gradient.
"""
# get size of the input function
sz = f.shape
# check if input is 1D or user defined input dimension is given
if dim or len(sz) == 1:
# check if a single dn value is given, if not, extract the required value
if not (isinstance(dn, int) or isinstance(dn, float)):
dn = dn[dim]
# get the grid size along the specified dimension, or the longest dimension if 1D
if max(sz) == np.prod(sz):
dim = np.argmax(sz)
Nx = sz[dim]
else:
Nx = sz[dim]
# get the wave number
kx = get_wave_number(Nx, dn, dim)
# calculate derivative and assign output
grads = np.real(ifft((1j * kx) ** deriv_order * fft(f, axis=dim), axis=dim))
else:
# logging.log(logging.WARN, "This implementation is not tested.")
# get the wave number
# kx = get_wave_number(sz(dim), dn[dim], dim)
assert len(dn) == len(sz), ValueError(f"{len(sz)} values for dn must be specified for a {len(sz)}-dimensional input matrix.")
grads = []
# calculate the gradient for each non-singleton dimension
for dim in range(num_dim(f)):
# get the wave number
kx = get_wave_number(sz[dim], dn[dim], dim)
# calculate derivative and assign output
# TODO: replace this with numpy broadcasting
kx = broadcast_axis(kx, num_dim(f), dim)
grads.append(np.real(ifft((1j * kx) ** deriv_order * fft(f, axis=dim), axis=dim)))
return grads
[docs]
def unmask_sensor_data(kgrid, sensor, sensor_data: np.ndarray) -> np.ndarray:
# create an empty matrix
if kgrid.k == 1:
unmasked_sensor_data = np.zeros((kgrid.Nx, 1))
elif kgrid.k == 2:
unmasked_sensor_data = np.zeros((kgrid.Nx, kgrid.Ny))
elif kgrid.k == 3:
unmasked_sensor_data = np.zeros((kgrid.Nx, kgrid.Ny, kgrid.Nz))
else:
raise NotImplementedError
# reorder input data
flat_sensor_mask = (sensor.mask != 0).flatten("F")
assignment_mask = unflatten_matlab_mask(unmasked_sensor_data, np.where(flat_sensor_mask)[0])
# unmasked_sensor_data.flatten('F')[flat_sensor_mask] = sensor_data.flatten()
unmasked_sensor_data[assignment_mask] = sensor_data.flatten()
# unmasked_sensor_data[unflatten_matlab_mask(unmasked_sensor_data, sensor.mask != 0)] = sensor_data
return unmasked_sensor_data
[docs]
def create_cw_signals(t_array: np.ndarray, freq: float, amp: np.ndarray, phase: np.ndarray, ramp_length: int = 4) -> np.ndarray:
"""
Generate a series of continuous wave (CW) signals based on the 1D or 2D input matrices `amp` and `phase`, where each signal
is given by:
amp[i, j] .* sin(2 * pi * freq * t_array + phase[i, j]);
To avoid startup transients, a cosine tapered up-ramp is applied to the beginning of the signal. By default, the length
of this ramp is four periods of the wave. The up-ramp can be turned off by setting the `ramp_length` to 0.
Examples:
# define sampling parameters
f = 5e6
T = 1/f
Fs = 100e6
dt = 1/Fs
t_array = np.arange(0, 10*T, dt)
# define amplitude and phase
amp = get_win(9, 'Gaussian')
phase = np.arange(0, 2*pi, 9).T
# create signals and plot
cw_signal = create_cw_signals(t_array, f, amp, phase)
Args:
t_array: A numpy ndarray representing the time values.
freq: A float representing the frequency of the signals.
amp: A numpy ndarray representing the amplitudes of the signals.
phase: A numpy ndarray representing the phases of the signals.
ramp_length: An optional integer representing the length of the cosine up-ramp in periods of the wave. Default is 4.
Returns:
A numpy ndarray containing the generated CW signals.
"""
if len(phase) == 1:
phase = phase * np.ones(amp.shape)
if amp.ndim > 1:
N1, N2 = amp.shape
else:
N1, N2 = amp.shape[0], 1
# create input signals
cw_signal = np.zeros((N1, N2, len(t_array)))
# create signal
for index1 in range(N1):
for index2 in range(N2):
if amp.ndim > 1:
cw_signal[index1, index2, :] = amp[index1, index2] * np.sin(2 * np.pi * freq * t_array + phase[index1, index2])
else:
cw_signal[index1, index2, :] = amp[index1] * np.sin(2 * np.pi * freq * t_array + phase[index1])
# apply ramp to avoid startup transients
if ramp_length != 0:
# get period and time step (assuming dt is constant)
period = 1 / freq
dt = t_array[1] - t_array[0]
# create x-axis for ramp between 0 and pi
ramp_length_points = round(ramp_length * period / dt)
ramp_axis = np.linspace(0, np.pi, ramp_length_points)
# create ramp using a shifted cosine
ramp = (-np.cos(ramp_axis) + 1) * 0.5
ramp = np.expand_dims(ramp, axis=(0, 1))
# apply ramp to all signals simultaneously
cw_signal[:, :, :ramp_length_points] = ramp * cw_signal[:, :, :ramp_length_points]
# remove singleton dimensions if cw_signal has more than two dimensions
if cw_signal.ndim > 2:
cw_signal = np.squeeze(cw_signal)
# if only a single amplitude and phase is given, force time to be the
# second dimensions
if amp.ndim == 1:
cw_signal = np.reshape(cw_signal, (N1, -1))
return cw_signal