Source code for disdrodb.utils.manipulations

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Include functions helping for DISDRODB product manipulations."""

import numpy as np
import xarray as xr
from scipy.interpolate import PchipInterpolator

from disdrodb.constants import DIAMETER_DIMENSION, VELOCITY_DIMENSION
from disdrodb.utils.xarray import unstack_datarray_dimension


[docs] def define_diameter_datarray(bounds, dim="diameter_bin_center"): """Define diameter DataArray.""" diameters_bin_lower = bounds[:-1] diameters_bin_upper = bounds[1:] diameters_bin_width = diameters_bin_upper - diameters_bin_lower diameters_bin_center = diameters_bin_lower + diameters_bin_width / 2 da = xr.DataArray( diameters_bin_center, dims=dim, coords={ "diameter_bin_width": (dim, diameters_bin_width), "diameter_bin_lower": (dim, diameters_bin_lower), "diameter_bin_upper": (dim, diameters_bin_upper), dim: (dim, diameters_bin_center), }, ) return da
[docs] def define_velocity_datarray(bounds, dim="velocity_bin_center"): """Define velocity DataArray.""" velocitys_bin_lower = bounds[:-1] velocitys_bin_upper = bounds[1:] velocitys_bin_width = velocitys_bin_upper - velocitys_bin_lower velocitys_bin_center = velocitys_bin_lower + velocitys_bin_width / 2 da = xr.DataArray( velocitys_bin_center, dims=dim, coords={ "velocity_bin_width": (dim, velocitys_bin_width), "velocity_bin_lower": (dim, velocitys_bin_lower), "velocity_bin_upper": (dim, velocitys_bin_upper), dim: (dim, velocitys_bin_center), }, ) return da
[docs] def define_diameter_array(diameter_min=0, diameter_max=10, diameter_spacing=0.05): """ Define an array of diameters and their corresponding bin properties. Parameters ---------- diameter_min : float, optional The minimum diameter value. The default value is 0 mm. diameter_max : float, optional The maximum diameter value. The default value is 10 mm. diameter_spacing : float, optional The spacing between diameter values. The default value is 0.05 mm. Returns ------- xarray.DataArray A DataArray containing the center of each diameter bin, with coordinates for the bin width, lower bound, upper bound, and center. """ diameters_bounds = np.arange(diameter_min, diameter_max + diameter_spacing / 2, step=diameter_spacing) return define_diameter_datarray(diameters_bounds)
[docs] def define_velocity_array(velocity_min=0, velocity_max=10, velocity_spacing=0.05): """ Define an array of velocities and their corresponding bin properties. Parameters ---------- velocity_min : float, optional The minimum velocity value. The default value is 0 mm. velocity_max : float, optional The maximum velocity value. The default value is 10 mm. velocity_spacing : float, optional The spacing between velocity values. The default value is 0.05 mm. Returns ------- xarray.DataArray A DataArray containing the center of each velocity bin, with coordinates for the bin width, lower bound, upper bound, and center. """ velocitys_bounds = np.arange(velocity_min, velocity_max + velocity_spacing / 2, step=velocity_spacing) return define_velocity_datarray(velocitys_bounds)
[docs] def filter_diameter_bins(ds, minimum_diameter=None, maximum_diameter=None): """ Filter the dataset to include only diameter bins within specified bounds. Parameters ---------- ds : xarray.Dataset The dataset containing diameter bin data. minimum_diameter : float, optional The minimum diameter to be included, in millimeters. Defaults to the minimum value in `ds["diameter_bin_lower"]`. maximum_diameter : float, optional The maximum diameter to be included, in millimeters. Defaults to the maximum value in `ds["diameter_bin_upper"]`. Returns ------- xarray.Dataset The filtered dataset containing only the specified diameter bins. """ # Put data into memory ds["diameter_bin_lower"] = ds["diameter_bin_lower"].compute() ds["diameter_bin_upper"] = ds["diameter_bin_upper"].compute() # Initialize default arguments if minimum_diameter is None: minimum_diameter = ds["diameter_bin_lower"].min().item() if maximum_diameter is None: maximum_diameter = ds["diameter_bin_upper"].max().item() # Select bins which overlap the specified diameters valid_indices = np.logical_and( ds["diameter_bin_upper"] > minimum_diameter, ds["diameter_bin_lower"] < maximum_diameter, ) ds = ds.isel({DIAMETER_DIMENSION: valid_indices}) if ds.sizes[DIAMETER_DIMENSION] == 0: msg = f"Filtering using {minimum_diameter=} removes all diameter bins." raise ValueError(msg) return ds
[docs] def filter_velocity_bins(ds, minimum_velocity=None, maximum_velocity=None): """ Filter the dataset to include only velocity bins within specified bounds. Parameters ---------- ds : xarray.Dataset The dataset containing velocity bin data. minimum_velocity : float, optional The minimum velocity to include in the filter, in meters per second. Defaults to the minimum value in `ds["velocity_bin_lower"]`. maximum_velocity : float, optional The maximum velocity to include in the filter, in meters per second. Defaults to the maximum value in `ds["velocity_bin_upper"]`. Returns ------- xarray.Dataset The filtered dataset containing only the specified velocity bins. """ # Put data into memory ds["velocity_bin_lower"] = ds["velocity_bin_lower"].compute() ds["velocity_bin_upper"] = ds["velocity_bin_upper"].compute() # Initialize default arguments if minimum_velocity is None: minimum_velocity = ds["velocity_bin_lower"].min().item() if maximum_velocity is None: maximum_velocity = ds["velocity_bin_upper"].max().item() # Select bins which overlap the specified velocities valid_indices = np.logical_and( ds["velocity_bin_upper"] > minimum_velocity, ds["velocity_bin_lower"] < maximum_velocity, ) ds = ds.isel({VELOCITY_DIMENSION: valid_indices}) if ds.sizes[VELOCITY_DIMENSION] == 0: msg = f"Filtering using {minimum_velocity=} removes all velocity bins." raise ValueError(msg) return ds
[docs] def get_diameter_bin_edges(ds): """Retrieve diameter bin edges.""" bin_edges = np.append(ds["diameter_bin_lower"].to_numpy(), ds["diameter_bin_upper"].to_numpy()[-1]) return bin_edges
[docs] def get_velocity_bin_edges(ds): """Retrieve velocity bin edges.""" bin_edges = np.append(ds["velocity_bin_lower"].to_numpy(), ds["velocity_bin_upper"].to_numpy()[-1]) return bin_edges
[docs] def convert_from_decibel(x): """Convert dB to unit.""" return np.power(10.0, 0.1 * x) # x/10
[docs] def convert_to_decibel(x): """Convert unit to dB.""" return 10 * np.log10(x)
[docs] def unstack_radar_variables(ds): """Unstack radar variables.""" from disdrodb.scattering import RADAR_VARIABLES for var in RADAR_VARIABLES: if var in ds: ds_unstack = unstack_datarray_dimension(ds[var], dim="frequency", prefix="", suffix="_") ds.update(ds_unstack) ds = ds.drop_vars(var) if "frequency" in ds.dims: ds = ds.drop_dims("frequency") return ds
[docs] def get_diameter_coords_dict_from_bin_edges(diameter_bin_edges): """Get dictionary with all relevant diameter coordinates.""" if np.size(diameter_bin_edges) < 2: raise ValueError("Expecting at least 2 values defining bin edges.") diameter_bin_center = diameter_bin_edges[:-1] + np.diff(diameter_bin_edges) / 2 diameter_bin_width = np.diff(diameter_bin_edges) diameter_bin_lower = diameter_bin_edges[:-1] diameter_bin_upper = diameter_bin_edges[1:] coords_dict = { "diameter_bin_center": (DIAMETER_DIMENSION, diameter_bin_center), "diameter_bin_width": (DIAMETER_DIMENSION, diameter_bin_width), "diameter_bin_lower": (DIAMETER_DIMENSION, diameter_bin_lower), "diameter_bin_upper": (DIAMETER_DIMENSION, diameter_bin_upper), } return coords_dict
[docs] def resample_drop_number_concentration( drop_number_concentration, diameter_bin_edges, method="log_pchip", ): """Resample drop number concentration N(D) DataArray to high resolution diameter bins.""" da_dst_d_bin_centers = define_diameter_datarray(diameter_bin_edges, dim="d_new") da_resampled = resample_density( da_density=drop_number_concentration, d_src=drop_number_concentration["diameter_bin_center"], d_dst=da_dst_d_bin_centers, dim="diameter_bin_center", new_dim="d_new", dD_src=drop_number_concentration["diameter_bin_width"], dD_dst=da_dst_d_bin_centers["diameter_bin_width"], method=method, ) da_resampled = da_resampled.rename({"d_new": "diameter_bin_center"}) return da_resampled
# def interpolate_drop_number_concentration(drop_number_concentration, diameter_bin_edges, method="linear"): # """Interpolate drop number concentration N(D) DataArray to high resolution diameter bins. # This should be done only for visualization purposes as it change the distribution moments. # """ # diameters_bin_center = diameter_bin_edges[:-1] + np.diff(diameter_bin_edges) / 2 # da = drop_number_concentration.interp(coords={"diameter_bin_center": diameters_bin_center}, method=method) # coords_dict = get_diameter_coords_dict_from_bin_edges(diameter_bin_edges) # da = da.assign_coords(coords_dict) # return da def _conservative_remapping(y_src, d_src, d_dst, dD_src, dD_dst): # Source edges src_left = d_src - 0.5 * dD_src src_right = d_src + 0.5 * dD_src # Destination edges dst_left = d_dst - 0.5 * dD_dst dst_right = d_dst + 0.5 * dD_dst # Overlap matrix (Ns, Nd) overlap = np.minimum(src_right[:, None], dst_right[None, :]) - np.maximum(src_left[:, None], dst_left[None, :]) overlap = np.clip(overlap, 0.0, None) # # Convert density to bin-integrated number # N_src = y_src * dD_src # # Redistribute integrated number conservatively # N_dst = (N_src[:, None] * overlap / dD_src[:, None]).sum(axis=0) # Integrated number in destination bins N_dst = (y_src[:, None] * overlap).sum(axis=0) # Convert back to density return N_dst / dD_dst def _log_pchip_conservative_remapping(y_src, d_src, d_dst, dD_src, dD_dst): """Smooth remapping in log-space, scaled to conserve global integrated number.""" y_src = np.asarray(y_src, dtype=float) d_src = np.asarray(d_src, dtype=float) d_dst = np.asarray(d_dst, dtype=float) dD_src = np.asarray(dD_src, dtype=float) dD_dst = np.asarray(dD_dst, dtype=float) y_src = np.where(np.isfinite(y_src), y_src, 0.0) y_src = np.clip(y_src, 0.0, None) valid_src = np.isfinite(d_src) & np.isfinite(dD_src) & (dD_src > 0) if np.count_nonzero(valid_src) == 0: return np.zeros_like(d_dst, dtype=float) y_src = y_src[valid_src] d_src = d_src[valid_src] dD_src = dD_src[valid_src] order = np.argsort(d_src) y_src = y_src[order] d_src = d_src[order] dD_src = dD_src[order] n_src_total = np.sum(y_src * dD_src) if n_src_total <= 0: return np.zeros_like(d_dst, dtype=float) positive = y_src > 0 if np.count_nonzero(positive) < 2: return _conservative_remapping(y_src, d_src, d_dst, dD_src, dD_dst) log_interp = PchipInterpolator( d_src[positive], np.log(y_src[positive]), extrapolate=False, ) y_dst = np.exp(log_interp(d_dst)) y_dst = np.where(np.isfinite(y_dst), y_dst, 0.0) # Keep signal only where destination bins overlap support of positive source bins. src_left = d_src - 0.5 * dD_src src_right = d_src + 0.5 * dD_src support_left = np.min(src_left[positive]) support_right = np.max(src_right[positive]) dst_left = d_dst - 0.5 * dD_dst dst_right = d_dst + 0.5 * dD_dst overlaps_support = (dst_right > support_left) & (dst_left < support_right) # Fill boundary half-bins that overlap support but lie outside interpolation center range. first_center = d_src[positive][0] last_center = d_src[positive][-1] first_value = y_src[positive][0] last_value = y_src[positive][-1] left_boundary = overlaps_support & (d_dst < first_center) right_boundary = overlaps_support & (d_dst > last_center) y_dst[left_boundary] = first_value y_dst[right_boundary] = last_value y_dst[~overlaps_support] = 0.0 n_dst_total = np.sum(y_dst * dD_dst) if n_dst_total > 0: y_dst *= n_src_total / n_dst_total return np.clip(y_dst, 0.0, None)
[docs] def resample_density( da_density, d_src, d_dst, dim, new_dim, dD_src, dD_dst, method="log_pchip", ): """Conservative resampling of density. Parameters ---------- da_density : xr.DataArray Density defined per unit diameter. d_src : xr.DataArray Source diameter centers (can be 2D: time, D). d_dst : xr.DataArray Destination diameter centers (1D). dim : str Source diameter dimension. new_dim : str Destination dimension name. dD_src : xr.DataArray Source bin widths (same dim as dim). dD_dst : xr.DataArray Destination bin widths (same dim as new_dim). method : str or callable Remapping strategy used within ``xr.apply_ufunc``. If str, available methods are: - ``"constant"``: first-order conservative remapping (piecewise constant in source bins). - ``"log_pchip"``: conservative, smooth remapping using PCHIP in log-density space. If callable, it must have signature ``f(y_src, d_src, d_dst, dD_src, dD_dst)`` and return remapped destination density. Returns ------- xr.DataArray Remapped density conserving total number. """ if len(d_dst) < len(d_src) and method != "constant": # coarsening print("Resampling method set to 'constant' for coarsening.") method = "constant" da_density = da_density.where(da_density > 0, 0) methods = { "constant": _conservative_remapping, "log_pchip": _log_pchip_conservative_remapping, } if callable(method): resampling_func = method else: if method not in methods: valid_methods = ", ".join(methods) msg = f"Unknown {method!r}. Valid options are: {valid_methods}." raise ValueError(msg) resampling_func = methods[method] da_density_new = xr.apply_ufunc( resampling_func, da_density, d_src, d_dst, dD_src, dD_dst, input_core_dims=[[dim], [dim], [new_dim], [dim], [new_dim]], output_core_dims=[[new_dim]], vectorize=True, dask="parallelized", output_dtypes=[float], ) # Assign coordinate name = da_density.name da_density_new = da_density_new.assign_coords({new_dim: d_dst}) da_density_new.name = name if name is not None else "drop_number_concentration" return da_density_new
[docs] def remap_to_diameter( da, d_src, d_dst, dim, new_dim, method="linear", ): """Remap DataArray from source to destination diameter coordinate. Parameters ---------- da : xr.DataArray DataArray with dimension `dim` and typically another dim (e.g., time). d_src : xr.DataArray Source diameter coordinate (can be 2D, e.g., D/Dm (time, D)). Must share dimensions with da. d_dst : xr.DataArray 1D target coordinate. dim : str Original diameter dimension. new_dim : str Name of output diameter dimension. method : {"linear", "pchip"} Interpolation method used for remapping. Returns ------- xr.DataArray """ if method not in {"linear", "pchip"}: msg = f"Unknown {method!r}. Valid options are: linear, pchip." raise ValueError(msg) def _interp_1d_linear(x_new, x_old, y_old): valid = np.isfinite(x_old) & np.isfinite(y_old) if np.count_nonzero(valid) == 0: return np.full_like(x_new, np.nan, dtype=float) x_old = np.asarray(x_old[valid], dtype=float) y_old = np.asarray(y_old[valid], dtype=float) order = np.argsort(x_old) x_old = x_old[order] y_old = y_old[order] # np.interp expects increasing xp. Duplicate xp values are collapsed. x_old_unique, unique_idx = np.unique(x_old, return_index=True) y_old_unique = y_old[unique_idx] return np.interp(x_new, x_old_unique, y_old_unique, left=np.nan, right=np.nan) def _interp_1d_pchip(x_new, x_old, y_old): valid = np.isfinite(x_old) & np.isfinite(y_old) if np.count_nonzero(valid) < 2: return _interp_1d_linear(x_new, x_old, y_old) x_old = np.asarray(x_old[valid], dtype=float) y_old = np.asarray(y_old[valid], dtype=float) order = np.argsort(x_old) x_old = x_old[order] y_old = y_old[order] # PCHIP requires strictly increasing x values. x_old_unique, unique_idx = np.unique(x_old, return_index=True) y_old_unique = y_old[unique_idx] if x_old_unique.size < 2: return _interp_1d_linear(x_new, x_old_unique, y_old_unique) pchip = PchipInterpolator(x_old_unique, y_old_unique, extrapolate=False) return pchip(x_new) interp_func = _interp_1d_linear if method == "linear" else _interp_1d_pchip da_out = xr.apply_ufunc( interp_func, d_dst, d_src, da, input_core_dims=[[new_dim], [dim], [dim]], output_core_dims=[[new_dim]], vectorize=True, dask="parallelized", output_dtypes=[float], ) da_out = da_out.assign_coords({new_dim: d_dst}) return da_out
[docs] def compute_normalized_dsd_datarray( ds, Nc="Nw", Dc="Dm", d_min=0, d_max=6, d_step=0.001, method="linear", ): """Compute normalized DSD and remap to regular D/Dc dimension.""" # Compute Normalized DSD and normalized diameter ds["N(D)/Nc"] = ds["drop_number_concentration"] / ds[Nc] ds["D/Dc"] = ds["diameter_bin_center"] / ds[Dc] ds["dD/Dc"] = ds["diameter_bin_width"] / ds[Dc] ds["D/Dc"] = ds["D/Dc"].transpose("diameter_bin_center", "time") ds["N(D)/Nc"] = ds["N(D)/Nc"].transpose("diameter_bin_center", "time") # Define normalized diameter coordinate da_normalized_diameter = define_diameter_datarray(np.arange(d_min, d_max, d_step), dim="D/Dc") # Map N(D)/Nc value for each D/Dc to regular D/Dc array da_dsd_norm = remap_to_diameter( da=ds["N(D)/Nc"], d_src=ds["D/Dc"], d_dst=da_normalized_diameter, dim="diameter_bin_center", new_dim="D/Dc", method=method, ) da_dsd_norm = da_dsd_norm.assign_coords({"diameter_bin_width": da_normalized_diameter["diameter_bin_width"]}) da_dsd_norm.name = "N(D)/Nc" return da_dsd_norm