# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Include functions helping for DISDRODB product manipulations."""
import numpy as np
import xarray as xr
from scipy.interpolate import PchipInterpolator
from disdrodb.constants import DIAMETER_DIMENSION, VELOCITY_DIMENSION
from disdrodb.utils.xarray import unstack_datarray_dimension
[docs]
def define_diameter_datarray(bounds, dim="diameter_bin_center"):
"""Define diameter DataArray."""
bounds = np.asarray(bounds, dtype=float)
diameters_bin_lower = bounds[:-1]
diameters_bin_upper = bounds[1:]
diameters_bin_width = diameters_bin_upper - diameters_bin_lower
diameters_bin_center = diameters_bin_lower + diameters_bin_width / 2
da = xr.DataArray(
diameters_bin_center,
dims=dim,
coords={
"diameter_bin_width": (dim, diameters_bin_width),
"diameter_bin_lower": (dim, diameters_bin_lower),
"diameter_bin_upper": (dim, diameters_bin_upper),
dim: (dim, diameters_bin_center),
},
)
return da
[docs]
def define_velocity_datarray(bounds, dim="velocity_bin_center"):
"""Define velocity DataArray."""
velocitys_bin_lower = bounds[:-1]
velocitys_bin_upper = bounds[1:]
velocitys_bin_width = velocitys_bin_upper - velocitys_bin_lower
velocitys_bin_center = velocitys_bin_lower + velocitys_bin_width / 2
da = xr.DataArray(
velocitys_bin_center,
dims=dim,
coords={
"velocity_bin_width": (dim, velocitys_bin_width),
"velocity_bin_lower": (dim, velocitys_bin_lower),
"velocity_bin_upper": (dim, velocitys_bin_upper),
dim: (dim, velocitys_bin_center),
},
)
return da
[docs]
def define_diameter_array(diameter_min=0, diameter_max=10, diameter_spacing=0.05):
"""
Define an array of diameters and their corresponding bin properties.
Parameters
----------
diameter_min : float, optional
The minimum diameter value. The default value is 0 mm.
diameter_max : float, optional
The maximum diameter value. The default value is 10 mm.
diameter_spacing : float, optional
The spacing between diameter values. The default value is 0.05 mm.
Returns
-------
xarray.DataArray
A DataArray containing the center of each diameter bin, with coordinates for
the bin width, lower bound, upper bound, and center.
"""
diameters_bounds = np.arange(diameter_min, diameter_max + diameter_spacing / 2, step=diameter_spacing)
return define_diameter_datarray(diameters_bounds)
[docs]
def define_velocity_array(velocity_min=0, velocity_max=10, velocity_spacing=0.05):
"""
Define an array of velocities and their corresponding bin properties.
Parameters
----------
velocity_min : float, optional
The minimum velocity value. The default value is 0 mm.
velocity_max : float, optional
The maximum velocity value. The default value is 10 mm.
velocity_spacing : float, optional
The spacing between velocity values. The default value is 0.05 mm.
Returns
-------
xarray.DataArray
A DataArray containing the center of each velocity bin, with coordinates for
the bin width, lower bound, upper bound, and center.
"""
velocitys_bounds = np.arange(velocity_min, velocity_max + velocity_spacing / 2, step=velocity_spacing)
return define_velocity_datarray(velocitys_bounds)
[docs]
def filter_diameter_bins(ds, minimum_diameter=None, maximum_diameter=None):
"""
Filter the dataset to include only diameter bins within specified bounds.
Parameters
----------
ds : xarray.Dataset
The dataset containing diameter bin data.
minimum_diameter : float, optional
The minimum diameter to be included, in millimeters.
Defaults to the minimum value in `ds["diameter_bin_lower"]`.
maximum_diameter : float, optional
The maximum diameter to be included, in millimeters.
Defaults to the maximum value in `ds["diameter_bin_upper"]`.
Returns
-------
xarray.Dataset
The filtered dataset containing only the specified diameter bins.
"""
# Put data into memory
ds["diameter_bin_lower"] = ds["diameter_bin_lower"].compute()
ds["diameter_bin_upper"] = ds["diameter_bin_upper"].compute()
# Initialize default arguments
if minimum_diameter is None:
minimum_diameter = ds["diameter_bin_lower"].min().item()
if maximum_diameter is None:
maximum_diameter = ds["diameter_bin_upper"].max().item()
# Select bins which overlap the specified diameters
valid_indices = np.logical_and(
ds["diameter_bin_upper"] > minimum_diameter,
ds["diameter_bin_lower"] < maximum_diameter,
)
ds = ds.isel({DIAMETER_DIMENSION: valid_indices})
if ds.sizes[DIAMETER_DIMENSION] == 0:
msg = f"Filtering using {minimum_diameter=} removes all diameter bins."
raise ValueError(msg)
return ds
[docs]
def filter_velocity_bins(ds, minimum_velocity=None, maximum_velocity=None):
"""
Filter the dataset to include only velocity bins within specified bounds.
Parameters
----------
ds : xarray.Dataset
The dataset containing velocity bin data.
minimum_velocity : float, optional
The minimum velocity to include in the filter, in meters per second.
Defaults to the minimum value in `ds["velocity_bin_lower"]`.
maximum_velocity : float, optional
The maximum velocity to include in the filter, in meters per second.
Defaults to the maximum value in `ds["velocity_bin_upper"]`.
Returns
-------
xarray.Dataset
The filtered dataset containing only the specified velocity bins.
"""
# Put data into memory
ds["velocity_bin_lower"] = ds["velocity_bin_lower"].compute()
ds["velocity_bin_upper"] = ds["velocity_bin_upper"].compute()
# Initialize default arguments
if minimum_velocity is None:
minimum_velocity = ds["velocity_bin_lower"].min().item()
if maximum_velocity is None:
maximum_velocity = ds["velocity_bin_upper"].max().item()
# Select bins which overlap the specified velocities
valid_indices = np.logical_and(
ds["velocity_bin_upper"] > minimum_velocity,
ds["velocity_bin_lower"] < maximum_velocity,
)
ds = ds.isel({VELOCITY_DIMENSION: valid_indices})
if ds.sizes[VELOCITY_DIMENSION] == 0:
msg = f"Filtering using {minimum_velocity=} removes all velocity bins."
raise ValueError(msg)
return ds
[docs]
def get_diameter_bin_edges(ds):
"""Retrieve diameter bin edges."""
bin_edges = np.append(ds["diameter_bin_lower"].to_numpy(), ds["diameter_bin_upper"].to_numpy()[-1])
return bin_edges
[docs]
def get_velocity_bin_edges(ds):
"""Retrieve velocity bin edges."""
bin_edges = np.append(ds["velocity_bin_lower"].to_numpy(), ds["velocity_bin_upper"].to_numpy()[-1])
return bin_edges
[docs]
def convert_from_decibel(x):
"""Convert dB to unit."""
return np.power(10.0, 0.1 * x) # x/10
[docs]
def convert_to_decibel(x):
"""Convert unit to dB."""
return 10 * np.log10(x)
[docs]
def unstack_radar_variables(ds):
"""Unstack radar variables."""
from disdrodb.scattering import RADAR_VARIABLES
for var in RADAR_VARIABLES:
if var in ds:
ds_unstack = unstack_datarray_dimension(ds[var], dim="frequency", prefix="", suffix="_")
ds.update(ds_unstack)
ds = ds.drop_vars(var)
if "frequency" in ds.dims:
ds = ds.drop_dims("frequency")
return ds
[docs]
def get_diameter_coords_dict_from_bin_edges(diameter_bin_edges):
"""Get dictionary with all relevant diameter coordinates."""
if np.size(diameter_bin_edges) < 2:
raise ValueError("Expecting at least 2 values defining bin edges.")
diameter_bin_center = diameter_bin_edges[:-1] + np.diff(diameter_bin_edges) / 2
diameter_bin_width = np.diff(diameter_bin_edges)
diameter_bin_lower = diameter_bin_edges[:-1]
diameter_bin_upper = diameter_bin_edges[1:]
coords_dict = {
"diameter_bin_center": (DIAMETER_DIMENSION, diameter_bin_center),
"diameter_bin_width": (DIAMETER_DIMENSION, diameter_bin_width),
"diameter_bin_lower": (DIAMETER_DIMENSION, diameter_bin_lower),
"diameter_bin_upper": (DIAMETER_DIMENSION, diameter_bin_upper),
}
return coords_dict
####---------------------------------------------------------------------------.
#### Resampling low-level functions
def _prepare_resampling_inputs(y_src, d_src, d_dst, dD_src, dD_dst):
"""Sanitize and align source and destination arrays for conservative remapping."""
# Ensure numpy array input
y_src = np.asarray(y_src, dtype=float)
d_src = np.asarray(d_src, dtype=float)
d_dst = np.asarray(d_dst, dtype=float)
dD_src = np.asarray(dD_src, dtype=float)
dD_dst = np.asarray(dD_dst, dtype=float)
# Ensure only positive values
y_src = np.where(np.isfinite(y_src), y_src, 0.0)
y_src = np.clip(y_src, 0.0, None)
# Identify bins with invalid values
valid_src = np.isfinite(d_src) & np.isfinite(dD_src) & (dD_src > 0)
valid_dst = np.isfinite(d_dst) & np.isfinite(dD_dst) & (dD_dst > 0)
# If not valid values, return None flag --> zero array returned
if np.count_nonzero(valid_src) == 0 or np.count_nonzero(valid_dst) == 0:
return None
# Keep only bin with valid values
y_src = y_src[valid_src]
d_src = d_src[valid_src]
dD_src = dD_src[valid_src]
# Sort by diameter
order = np.argsort(d_src)
y_src = y_src[order]
d_src = d_src[order]
dD_src = dD_src[order]
# Return dictionary with relevant info quality-checked
return {
"y_src": y_src,
"d_src": d_src,
"d_dst": d_dst,
"dD_src": dD_src,
"dD_dst": dD_dst,
"valid_dst": valid_dst,
"d_dst_valid": d_dst[valid_dst],
"dD_dst_valid": dD_dst[valid_dst],
}
def _assemble_resampled_output(y_dst_valid, d_dst, valid_dst):
"""Populate invalid destination bins with zero after remapping."""
y_dst = np.zeros_like(d_dst, dtype=float)
y_dst[valid_dst] = y_dst_valid
return np.clip(y_dst, 0.0, None)
def _conservative_counts_remapping(n_src, d_src, d_dst, dD_src, dD_dst):
"""First-order conservative remapping of bin-integrated counts between diameter grids.
Preserves total counts and redistributes them by geometric overlap.
"""
# Prepare input data
prepared = _prepare_resampling_inputs(n_src, d_src, d_dst, dD_src, dD_dst)
if prepared is None:
return np.zeros_like(d_dst, dtype=float)
n_src = prepared["y_src"]
d_src = prepared["d_src"]
dD_src = prepared["dD_src"]
d_dst_valid = prepared["d_dst_valid"]
dD_dst_valid = prepared["dD_dst_valid"]
# Source edges
src_left = d_src - 0.5 * dD_src
src_right = d_src + 0.5 * dD_src
# Destination edges
dst_left = d_dst_valid - 0.5 * dD_dst_valid
dst_right = d_dst_valid + 0.5 * dD_dst_valid
# Overlap matrix (Ns, N)
overlap = np.minimum(src_right[:, None], dst_right[None, :]) - np.maximum(src_left[:, None], dst_left[None, :])
overlap = np.clip(overlap, 0.0, None)
# Redistributes only fraction of the bin count that overlaps each destination bin
n_dst_valid = (n_src[:, None] * overlap / dD_src[:, None]).sum(axis=0)
# Return resampled counts
return _assemble_resampled_output(n_dst_valid, prepared["d_dst"], prepared["valid_dst"])
def _conservative_density_remapping(y_src, d_src, d_dst, dD_src, dD_dst):
"""First-order conservative remapping of bin densities between diameter grids.
Preserves total number concentration.
"""
# Prepare input data
prepared = _prepare_resampling_inputs(y_src, d_src, d_dst, dD_src, dD_dst)
if prepared is None:
return np.zeros_like(d_dst, dtype=float)
y_src = prepared["y_src"]
d_src = prepared["d_src"]
dD_src = prepared["dD_src"]
d_dst_valid = prepared["d_dst_valid"]
dD_dst_valid = prepared["dD_dst_valid"]
# Source edges
src_left = d_src - 0.5 * dD_src
src_right = d_src + 0.5 * dD_src
# Destination edges
dst_left = d_dst_valid - 0.5 * dD_dst_valid
dst_right = d_dst_valid + 0.5 * dD_dst_valid
# Overlap matrix (Ns, Nd)
overlap = np.minimum(src_right[:, None], dst_right[None, :]) - np.maximum(src_left[:, None], dst_left[None, :])
overlap = np.clip(overlap, 0.0, None)
# Integrate density over the overlap length to get destination bin counts
n_dst = (y_src[:, None] * overlap).sum(axis=0)
# Divide by destination bin width to go back to density
y_dst_valid = n_dst / dD_dst_valid
# Return resampled counts
return _assemble_resampled_output(y_dst_valid, prepared["d_dst"], prepared["valid_dst"])
def _log_pchip_conservative_density_remapping(y_src, d_src, d_dst, dD_src, dD_dst):
"""Smooth remapping in log-space, scaled to conserve global integrated number."""
prepared = _prepare_resampling_inputs(y_src, d_src, d_dst, dD_src, dD_dst)
if prepared is None:
return np.zeros_like(d_dst, dtype=float)
y_src = prepared["y_src"]
d_src = prepared["d_src"]
dD_src = prepared["dD_src"]
d_dst_valid = prepared["d_dst_valid"]
dD_dst_valid = prepared["dD_dst_valid"]
n_src_total = np.sum(y_src * dD_src)
if n_src_total <= 0:
return np.zeros_like(d_dst, dtype=float)
positive = y_src > 0
if np.count_nonzero(positive) < 2:
y_dst_valid = _conservative_density_remapping(y_src, d_src, d_dst_valid, dD_src, dD_dst_valid)
return _assemble_resampled_output(y_dst_valid, prepared["d_dst"], prepared["valid_dst"])
log_interp = PchipInterpolator(
d_src[positive],
np.log(y_src[positive]),
extrapolate=False,
)
y_dst_valid = np.exp(log_interp(d_dst_valid))
y_dst_valid = np.where(np.isfinite(y_dst_valid), y_dst_valid, 0.0)
# Keep signal only where destination bins overlap support of positive source bins.
src_left = d_src - 0.5 * dD_src
src_right = d_src + 0.5 * dD_src
support_left = np.min(src_left[positive])
support_right = np.max(src_right[positive])
dst_left = d_dst_valid - 0.5 * dD_dst_valid
dst_right = d_dst_valid + 0.5 * dD_dst_valid
overlaps_support = (dst_right > support_left) & (dst_left < support_right)
# Fill boundary half-bins that overlap support but lie outside interpolation center range.
first_center = d_src[positive][0]
last_center = d_src[positive][-1]
first_value = y_src[positive][0]
last_value = y_src[positive][-1]
left_boundary = overlaps_support & (d_dst_valid < first_center)
right_boundary = overlaps_support & (d_dst_valid > last_center)
y_dst_valid[left_boundary] = first_value
y_dst_valid[right_boundary] = last_value
y_dst_valid[~overlaps_support] = 0.0
n_dst_total = np.sum(y_dst_valid * dD_dst_valid)
if n_dst_total > 0:
y_dst_valid *= n_src_total / n_dst_total
return _assemble_resampled_output(y_dst_valid, prepared["d_dst"], prepared["valid_dst"])
def _log_pchip_conservative_counts_remapping(n_src, d_src, d_dst, dD_src, dD_dst):
"""Smooth conservative remapping of bin-integrated counts via count density."""
prepared = _prepare_resampling_inputs(n_src, d_src, d_dst, dD_src, dD_dst)
if prepared is None:
return np.zeros_like(d_dst, dtype=float)
# Smooth interpolation method should be applied to an intensive quantity (not extensive)
# --> Convert counts to density
y_src = prepared["y_src"] / prepared["dD_src"]
# Resample density
y_dst_valid = _log_pchip_conservative_density_remapping(
y_src,
prepared["d_src"],
prepared["d_dst_valid"],
prepared["dD_src"],
prepared["dD_dst_valid"],
)
# Convert back to counts
n_dst_valid = y_dst_valid * prepared["dD_dst_valid"]
return _assemble_resampled_output(n_dst_valid, prepared["d_dst"], prepared["valid_dst"])
[docs]
def resample_counts(
da_counts,
d_src,
d_dst,
dim,
new_dim,
dD_src,
dD_dst,
method="constant",
):
"""Conservatively resample bin-integrated counts between diameter grids.
Parameters
----------
da_counts : xr.DataArray
Bin-integrated counts defined on ``dim``.
d_src : xr.DataArray
Source diameter centers associated with ``da_counts``.
d_dst : xr.DataArray
Destination diameter centers associated with ``new_dim``.
dim : str
Source diameter dimension.
new_dim : str
Destination diameter dimension.
dD_src : xr.DataArray
Source bin widths defined on ``dim``.
dD_dst : xr.DataArray
Destination bin widths defined on ``new_dim``.
method : {"constant", "log_pchip"} or callable, optional
Remapping strategy used within ``xr.apply_ufunc``.
- ``"constant"``: first-order conservative remapping in count space.
Use this for coarsening and when strict robustness is preferred.
- ``"log_pchip"``: smooth conservative remapping obtained by applying
log-PCHIP to the implied count density ``counts / dD`` and then
converting back to counts. Use this mainly for refinement to finer bins.
If coarsening is detected (destination has fewer bins than source), the
method is forced to ``"constant"`` to avoid shape artifacts.
If callable, it must have signature
``f(n_src, d_src, d_dst, dD_src, dD_dst)`` and return remapped destination counts.
Returns
-------
xr.DataArray
Resampled counts on ``new_dim``.
Notes
-----
The remapping preserves total counts over the remapped support and assumes
counts are piecewise-uniform inside each source bin.
"""
if len(d_dst) < len(d_src) and method != "constant": # coarsening
print("Resampling method set to 'constant' for coarsening.")
method = "constant"
da_counts = da_counts.where(da_counts > 0, 0)
methods = {
"constant": _conservative_counts_remapping,
"log_pchip": _log_pchip_conservative_counts_remapping,
}
if callable(method):
resampling_func = method
else:
if method not in methods:
valid_methods = ", ".join(methods)
msg = f"Unknown {method!r}. Valid options are: {valid_methods}."
raise ValueError(msg)
resampling_func = methods[method]
da_counts_new = xr.apply_ufunc(
resampling_func,
da_counts,
d_src,
d_dst,
dD_src,
dD_dst,
input_core_dims=[[dim], [dim], [new_dim], [dim], [new_dim]],
output_core_dims=[[new_dim]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
)
name = da_counts.name
da_counts_new = da_counts_new.assign_coords({new_dim: d_dst})
da_counts_new.name = name if name is not None else "counts"
return da_counts_new
[docs]
def resample_density(
da_density,
d_src,
d_dst,
dim,
new_dim,
dD_src,
dD_dst,
method="log_pchip",
):
"""Conservatively resample a density between diameter grids.
The remapping preserves integrated quantity (``sum(density * bin_width)``)
over the remapped support.
Parameters
----------
da_density : xr.DataArray
Density defined per unit diameter.
d_src : xr.DataArray
Source diameter centers (can be 2D: time, D).
d_dst : xr.DataArray
Destination diameter centers (1D).
dim : str
Source diameter dimension.
new_dim : str
Destination dimension name.
dD_src : xr.DataArray
Source bin widths (same dim as dim).
dD_dst : xr.DataArray
Destination bin widths (same dim as new_dim).
method : str or callable
Remapping strategy used within ``xr.apply_ufunc``.
If str, available methods are:
- ``"constant"``: first-order conservative remapping (piecewise
constant inside each source bin). Use this for coarsening and when
strict robustness is preferred.
- ``"log_pchip"``: conservative smooth remapping using PCHIP in
log-density space. Use this mainly for refinement to finer bins.
If coarsening is detected (destination has fewer bins than source), the
method is forced to ``"constant"`` to avoid shape artifacts.
If callable, it must have signature
``f(y_src, d_src, d_dst, dD_src, dD_dst)`` and return remapped destination density.
Returns
-------
xr.DataArray
Remapped density conserving integrated amount.
Notes
-----
For ``drop_number`` counts already integrated per bin, use
:func:`resample_drop_counts` instead of this function.
"""
if len(d_dst) < len(d_src) and method != "constant": # coarsening
print("Resampling method set to 'constant' for coarsening.")
method = "constant"
da_density = da_density.where(da_density > 0, 0)
methods = {
"constant": _conservative_density_remapping,
"log_pchip": _log_pchip_conservative_density_remapping,
}
if callable(method):
resampling_func = method
else:
if method not in methods:
valid_methods = ", ".join(methods)
msg = f"Unknown {method!r}. Valid options are: {valid_methods}."
raise ValueError(msg)
resampling_func = methods[method]
da_density_new = xr.apply_ufunc(
resampling_func,
da_density,
d_src,
d_dst,
dD_src,
dD_dst,
input_core_dims=[[dim], [dim], [new_dim], [dim], [new_dim]],
output_core_dims=[[new_dim]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
)
# Assign coordinate
name = da_density.name
da_density_new = da_density_new.assign_coords({new_dim: d_dst})
da_density_new.name = name if name is not None else "drop_number_concentration"
return da_density_new
####---------------------------------------------------------------------------.
#### Resampling wrappers for n(D) and N(D)
[docs]
def resample_drop_counts(drop_counts, diameter_bin_edges, method="constant"):
"""Conservatively resample bin-integrated drop counts to new diameter bins.
Parameters
----------
drop_counts : xr.DataArray
Bin-integrated counts (for example ``drop_counts`` or ``drop_number``) defined on
``diameter_bin_center`` and carrying ``diameter_bin_width`` coordinates.
Additional dimensions (for example ``time`` or ``velocity_bin_center``)
are supported and vectorized.
diameter_bin_edges : array-like
Destination diameter bin edges. The destination bins do not need to be
aligned with source bins.
method : {"constant", "log_pchip"} or callable, optional
Remapping method passed to :func:`resample_counts`.
- ``"constant"``: first-order conservative remapping in count space.
Recommended for coarsening and robust for any grid change.
- ``"log_pchip"``: smooth conservative remapping based on the implied
count density. Recommended for refinement/interpolation to finer bins
when the source spectrum is sufficiently resolved.
If coarsening is requested, :func:`resample_counts` automatically
switches to ``"constant"``.
Returns
-------
xr.DataArray
Resampled counts on destination ``diameter_bin_center`` with updated
``diameter_bin_width``, ``diameter_bin_lower`` and
``diameter_bin_upper`` coordinates.
"""
da_dst_d_bin_centers = define_diameter_datarray(diameter_bin_edges, dim="d_new")
da_resampled = resample_counts(
da_counts=drop_counts,
d_src=drop_counts["diameter_bin_center"],
d_dst=da_dst_d_bin_centers,
dim="diameter_bin_center",
new_dim="d_new",
dD_src=drop_counts["diameter_bin_width"],
dD_dst=da_dst_d_bin_centers["diameter_bin_width"],
method=method,
)
da_resampled = da_resampled.rename({"d_new": "diameter_bin_center"})
da_resampled.name = drop_counts.name if drop_counts.name is not None else "drop_counts"
return da_resampled
[docs]
def resample_drop_number_concentration(
drop_number_concentration,
diameter_bin_edges,
method="log_pchip",
):
"""Resample drop number concentration ``N(D)`` to new diameter bins.
The remapping is conservative with respect to integrated number
concentration (that is, ``sum(N(D) * dD)``).
Parameters
----------
drop_number_concentration : xr.DataArray
Drop number concentration defined per unit diameter on
``diameter_bin_center`` and carrying ``diameter_bin_width``.
diameter_bin_edges : array-like
Destination diameter bin edges.
method : {"constant", "log_pchip"} or callable, optional
Remapping method passed to :func:`resample_density`.
- ``"constant"``: first-order conservative remapping. Recommended for
coarsening and robust for any grid change.
- ``"log_pchip"``: smooth conservative remapping in log-space.
Recommended for refinement/interpolation to finer bins when positive
source spectra are sufficiently resolved.
If coarsening is requested, :func:`resample_density` automatically
switches to ``"constant"``.
Returns
-------
xr.DataArray
Resampled ``N(D)`` on destination ``diameter_bin_center`` with updated
diameter-bin coordinates.
"""
da_dst_d_bin_centers = define_diameter_datarray(diameter_bin_edges, dim="d_new")
da_resampled = resample_density(
da_density=drop_number_concentration,
d_src=drop_number_concentration["diameter_bin_center"],
d_dst=da_dst_d_bin_centers,
dim="diameter_bin_center",
new_dim="d_new",
dD_src=drop_number_concentration["diameter_bin_width"],
dD_dst=da_dst_d_bin_centers["diameter_bin_width"],
method=method,
)
da_resampled = da_resampled.rename({"d_new": "diameter_bin_center"})
return da_resampled
####---------------------------------------------------------------------------.
#### Double Moment Normalization utilities
[docs]
def remap_to_diameter(
da,
d_src,
d_dst,
dim,
new_dim,
method="linear",
):
"""Remap DataArray from source to destination diameter coordinate.
Parameters
----------
da : xr.DataArray
DataArray with dimension `dim` and typically another dim (e.g., time).
d_src : xr.DataArray
Source diameter coordinate (can be 2D, e.g., D/Dm (time, D)).
Must share dimensions with da.
d_dst : xr.DataArray
1D target coordinate.
dim : str
Original diameter dimension.
new_dim : str
Name of output diameter dimension.
method : {"linear", "pchip"}
Interpolation method used for remapping.
Returns
-------
xr.DataArray
"""
if method not in {"linear", "pchip"}:
msg = f"Unknown {method!r}. Valid options are: linear, pchip."
raise ValueError(msg)
def _interp_1d_linear(x_new, x_old, y_old):
valid = np.isfinite(x_old) & np.isfinite(y_old)
if np.count_nonzero(valid) == 0:
return np.full_like(x_new, np.nan, dtype=float)
x_old = np.asarray(x_old[valid], dtype=float)
y_old = np.asarray(y_old[valid], dtype=float)
order = np.argsort(x_old)
x_old = x_old[order]
y_old = y_old[order]
# np.interp expects increasing xp. Duplicate xp values are collapsed.
x_old_unique, unique_idx = np.unique(x_old, return_index=True)
y_old_unique = y_old[unique_idx]
return np.interp(x_new, x_old_unique, y_old_unique, left=np.nan, right=np.nan)
def _interp_1d_pchip(x_new, x_old, y_old):
valid = np.isfinite(x_old) & np.isfinite(y_old)
if np.count_nonzero(valid) < 2:
return _interp_1d_linear(x_new, x_old, y_old)
x_old = np.asarray(x_old[valid], dtype=float)
y_old = np.asarray(y_old[valid], dtype=float)
order = np.argsort(x_old)
x_old = x_old[order]
y_old = y_old[order]
# PCHIP requires strictly increasing x values.
x_old_unique, unique_idx = np.unique(x_old, return_index=True)
y_old_unique = y_old[unique_idx]
if x_old_unique.size < 2:
return _interp_1d_linear(x_new, x_old_unique, y_old_unique)
pchip = PchipInterpolator(x_old_unique, y_old_unique, extrapolate=False)
return pchip(x_new)
interp_func = _interp_1d_linear if method == "linear" else _interp_1d_pchip
da_out = xr.apply_ufunc(
interp_func,
d_dst,
d_src,
da,
input_core_dims=[[new_dim], [dim], [dim]],
output_core_dims=[[new_dim]],
vectorize=True,
dask="parallelized",
output_dtypes=[float],
)
da_out = da_out.assign_coords({new_dim: d_dst})
return da_out
[docs]
def compute_normalized_dsd_datarray(
ds,
Nc="Nw",
Dc="Dm",
d_min=0,
d_max=6,
d_step=0.001,
method="linear",
):
"""Compute normalized DSD and remap to regular D/Dc dimension."""
# Compute Normalized DSD and normalized diameter
ds["N(D)/Nc"] = ds["drop_number_concentration"] / ds[Nc]
ds["D/Dc"] = ds["diameter_bin_center"] / ds[Dc]
ds["dD/Dc"] = ds["diameter_bin_width"] / ds[Dc]
ds["D/Dc"] = ds["D/Dc"].transpose("diameter_bin_center", "time")
ds["N(D)/Nc"] = ds["N(D)/Nc"].transpose("diameter_bin_center", "time")
# Define normalized diameter coordinate
da_normalized_diameter = define_diameter_datarray(np.arange(d_min, d_max, d_step), dim="D/Dc")
# Map N(D)/Nc value for each D/Dc to regular D/Dc array
da_dsd_norm = remap_to_diameter(
da=ds["N(D)/Nc"],
d_src=ds["D/Dc"],
d_dst=da_normalized_diameter,
dim="diameter_bin_center",
new_dim="D/Dc",
method=method,
)
da_dsd_norm = da_dsd_norm.assign_coords({"diameter_bin_width": da_normalized_diameter["diameter_bin_width"]})
da_dsd_norm.name = "N(D)/Nc"
return da_dsd_norm
# def interpolate_drop_number_concentration(drop_number_concentration, diameter_bin_edges, method="linear"):
# """Interpolate drop number concentration N(D) DataArray to high resolution diameter bins.
# This should be done only for visualization purposes as it change the distribution moments.
# """
# diameters_bin_center = diameter_bin_edges[:-1] + np.diff(diameter_bin_edges) / 2
# da = drop_number_concentration.interp(coords={"diameter_bin_center": diameters_bin_center}, method=method)
# coords_dict = get_diameter_coords_dict_from_bin_edges(diameter_bin_edges)
# da = da.assign_coords(coords_dict)
# return da