Source code for mixsea.helpers

import pathlib

import numpy as np
from scipy import fftpack, signal
from scipy.interpolate import interp1d


def denan(data):
    ii = np.squeeze(np.argwhere(~np.isnan(data)))
    return data[ii], ii


[docs] def calc_shear(u, depth): """Calculate vertical shear from velocity data. Parameters ---------- u : array-like Velocity data. depth : array-like Depth vector. Returns ------- array-like Vertical shear at depths of `depth`. """ f = interp1d( depth[0:-1] + np.diff(depth[:2]) / 2, np.diff(u) / np.diff(depth), bounds_error=False, ) return f(depth)
def extrapolate_data(data): """Extrapolate over NaNs near the bottom.""" out = data.copy() tmp = np.squeeze(np.where(~np.isnan(data))) max_tmp = np.max(tmp) if np.isfinite(max_tmp) & np.isnan(data[-1]): maxind = max_tmp out[maxind + 1 :] = data[maxind] return out
[docs] def psd( g, dx, axis=1, ffttype="p", detrend=True, window="hamming", tser_window=None, tser_overlap=None, ): """ Compute power spectral density. Adapted from Jen MacKinnon. Parameters ---------- g : array-like Real or complex input data of size [M * 1], [1 * M] or [M * N] dx : float Distance or time between entries in g axis : int, optional Axis along which to fft: 0 = columns, 1 = rows (default) ffttype : str, optional Flag for fft type - 'p' for periodic boundary conditions = pure fft (default) - 'c' or 's' if the input is a sum of cosines or sines only in which case the input is extended evenly or oddly before ffting. - 't' for time series, or any other non-exactly periodic series in which the data should be windowed and filtered before computing the periodogram. In this case you may also specify: * tser_window: an integer that gives the length of each window the series should be broken up into for filtering and computing the periodogram. Default is length(g). * tser_overlap: an integer equal to the lengh of points overlap between sucessive windows. Default = tser_window/2, which means 50% overlap. detrend : bool, optional Detrend along dim by removing a linear fit using scipy.detrend(). Defaults to True. window : str, optional Window type. Default 'hamming'. See scipy.signal.get_window() for window options. Returns ------- Pcw : array-like Clockwise power spectral density Pccw : array-like Counter-clockwise power spectral density Ptot : array-like Total = ccw + cw psd one-sided spectra omega: array-like Vector of wavenumbers. For plotting use frequency f = omega/(2*pi). """ M0 = g.shape g = np.atleast_2d(g) if detrend: g = signal.detrend(g) # FFT and welch act on rows by default. If we want to calculate # column-wise, simply transpose the input matrix. if axis == 0 & len(M0) > 1: g = np.transpose(g) M = g.shape # If sin or cos transform needed, appropriately extend time series if ffttype == "s": g_ext = np.concatenate( (g, -1 * np.flip(g[:, 1 : int(M[1]) - 1], axis=1)), axis=1 ) g = g_ext M = g.shape if ffttype == "c": g_ext = np.concatenate((g, np.flip(g[:, 1 : int(M[1]) - 1], axis=1)), axis=1) g = g_ext M = g.shape # Setup frequency vectors df = 1 / (M[1] * dx) domega = 2 * np.pi * df # full frequency output length M f_full = np.linspace(0, (M[1] - 1) * df, num=M[1], endpoint=True) if np.remainder(M[1], 2) == 0: # even -> length M/2+1 f, step = np.linspace( 0, M[1] * df / 2, num=int(M[1] / 2 + 1), endpoint=True, retstep=True ) else: # odd -> length (M+1)/2 f, step = np.linspace( 0, (M[1] - 1) * df / 2, num=int((M[1] + 1) / 2), endpoint=True, retstep=True ) assert step - df < df / 1000 omega = 2 * np.pi * f # Compute power spectra using fft if ffttype in ["p", "c", "s"]: P0 = fftpack.fft(g, axis=-1) # Normalize by wavenumber in RADIANS Pxx = P0 * np.conj(P0) / M[1] / (M[1] * domega) if ffttype == "t": if tser_window is None: # default one segment tser_window = np.array(np.fix(M[1])) if tser_overlap is None: # default 50% overlap tser_overlap = np.fix(tser_window / 2) f0, Pxx0 = signal.welch( g, fs=1 / dx, axis=-1, nperseg=tser_window, noverlap=tser_overlap, window=window, return_onesided=False, ) # One side of f0 is shifted to negative values - shift everything to # positive. f0 = np.fft.fftshift(f0) + np.absolute(f0).max() # Interpolate to f_full frequency vector Pxx0 = interp1d(f0, Pxx0, bounds_error=False, axis=-1)(f_full) Pxx = Pxx0.copy() Pxx = Pxx / 2 / np.pi # normalize by radial wavenumber/frequency # Separate into cw and ccw spectra if np.remainder(M[1], 2) == 0: # even lengths, divide 0 and nyquist freq between ccw and cw Pccw = np.concatenate( ( np.reshape(0.5 * Pxx[:, 0], (-1, 1)), Pxx[:, 1 : int((M[1]) / 2)], np.reshape(0.5 * Pxx[:, int(M[1] / 2) + 1], (-1, 1)), ), axis=1, ) Pcw = np.concatenate( ( np.reshape(0.5 * Pxx[:, 0], (-1, 1)), np.flip(Pxx[:, int(M[1] / 2 + 1) : M[1]], axis=1), np.reshape(0.5 * Pxx[:, int(M[1] / 2) + 2], (-1, 1)), ), axis=1, ) Ptot = Pccw + Pcw else: # odd lengths, divide 0 freq between ccw and cw Pccw = np.concatenate( (np.reshape(0.5 * Pxx[:, 0], (-1, 1)), Pxx[:, 1 : int((M[1] + 1) / 2)]), axis=1, ) Pcw = np.concatenate( ( np.reshape(0.5 * Pxx[:, 0], (-1, 1)), np.flip(Pxx[:, int(((M[1] + 3) / 2)) - 1 : M[1]], axis=1), ), axis=1, ) Ptot = Pccw + Pcw Ptot = np.squeeze(Ptot) Pcw = np.squeeze(Pcw) Pccw = np.squeeze(Pccw) # transpose back if axis=0 if axis == 0 & len(M0) > 1: Ptot = Ptot.transpose() Pcw = Pcw.transpose() Pccw = Pccw.transpose() return Pcw, Pccw, Ptot, omega
def read_testfile(filename): """ Read ascii file with test data The ascii file is expected to have one header line with variable names, preceded by a #. The variable names are used as keys in the output dict. Parameters ---------- filename : str or pathlib.Path Path to testfile Returns ------- dict Dictionary with test data """ data = np.loadtxt(filename, comments="#", delimiter=",") # read variable names from comment line and divide data into a dict with open(filename) as f: vars = f.readline().strip() v = [x.strip() for x in vars[2:].split(",")] return {vi: data[:, i] for i, vi in enumerate(v)}
[docs] def read_ctd_testfile(path=None): """Read CTD profile from test suite. Parameters ---------- path : pathlib.Path or str, optional Provide path to data file. Returns ------- dict """ if path is None: cwd = pathlib.Path.cwd() ctdfile = cwd.parent / "tests" / "data" / "ctd_test_data.csv" else: ctdfile = path ctd = read_testfile(ctdfile) return ctd
[docs] def read_ladcp_testfile(path=None): """Read LADCP profile from test suite. Parameters ---------- path : pathlib.Path or str, optional Provide path to data file. Returns ------- dict """ if path is None: cwd = pathlib.Path.cwd() ladcpfile = cwd.parent / "tests" / "data" / "ladcp_test_data.csv" else: ladcpfile = path ladcp = read_testfile(ladcpfile) return ladcp