# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Routines for PSD fitting."""
import numpy as np
import scipy.stats as ss
import xarray as xr
from scipy.integrate import quad
from scipy.optimize import minimize
from scipy.special import gamma, gammainc, gammaln # Regularized lower incomplete gamma function
from disdrodb.constants import DIAMETER_DIMENSION
from disdrodb.l2.empirical_dsd import (
get_median_volume_drop_diameter,
get_moment,
get_normalized_intercept_parameter_from_moments,
get_total_number_concentration,
)
from disdrodb.psd.models import ExponentialPSD, GammaPSD, LognormalPSD, NormalizedGammaPSD
from disdrodb.utils.manipulations import get_diameter_bin_edges
from disdrodb.utils.warnings import suppress_warnings
# gamma(>171) return inf !
####--------------------------------------------------------------------------------------.
#### Notes
## Variable requirements for fitting PSD Models
# - drop_number_concentration and diameter coordinates
# - Always recompute other parameters to ensure not use model parameters of L2M
# ML: None
# MOM: moments
# --> get_moment(drop_number_concentration, diameter, diameter_bin_width, moment)
# GS: fall_velocity if target optimization is R (rain)
# - NormalizedGamma: "Nw", "D50"
# --> get_normalized_intercept_parameter_from_moments(moment_3, moment_4)
# --> get_median_volume_drop_diameter(drop_number_concentration, diameter, diameter_bin_width):
# --> get_mean_volume_drop_diameter(moment_3, moment_4) (Dm)
# - LogNormal,Exponential, Gamma: Nt
# --> get_total_number_concentration(drop_number_concentration, diameter_bin_width)
####--------------------------------------------------------------------------------------.
#### Goodness of fit (GOF)
[docs]
def compute_gof_stats(obs, pred, dim=DIAMETER_DIMENSION):
"""
Compute various goodness-of-fit (GoF) statistics between obs and predicted values.
Parameters
----------
obs: xarray.DataArray
Observations DataArray with at least dimension ``dim``.
pred: xarray.DataArray
Predictions DataArray with at least dimension ``dim``.
dim: str
DataArray dimension over which to compute GOF statistics.
The default is DIAMETER_DIMENSION.
Returns
-------
ds: xarray.Dataset
Dataset containing the computed GoF statistics.
"""
from disdrodb.l2.empirical_dsd import get_mode_diameter
# Retrieve diameter and diameter bin width
diameter = obs["diameter_bin_center"]
diameter_bin_width = obs["diameter_bin_width"]
# Compute errors
error = obs - pred
# Compute max obs and pred
obs_max = obs.max(dim=dim, skipna=False)
pred_max = pred.max(dim=dim, skipna=False)
# Compute NaN mask
mask_nan = np.logical_or(np.isnan(obs_max), np.isnan(pred_max))
# Compute GOF statistics
with suppress_warnings():
# Compute Pearson Correlation
pearson_r = xr.corr(obs, pred, dim=dim)
# Compute Mean Absolute Error (MAE)
mae = np.abs(error).mean(dim=dim, skipna=False)
# Compute maximum absolute error
max_error = np.abs(error).max(dim=dim, skipna=False)
relative_max_error = xr.where(max_error == 0, 0, xr.where(obs_max == 0, np.nan, max_error / obs_max))
# Compute deviation of N(D) at distribution mode
mode_deviation = obs_max - pred_max
mode_relative_deviation = xr.where(
mode_deviation == 0,
0,
xr.where(obs_max == 0, np.nan, mode_deviation / obs_max),
)
# Compute diameter difference of the distribution mode
diameter_mode_pred = get_mode_diameter(pred, diameter)
diameter_mode_obs = get_mode_diameter(obs, diameter)
diameter_mode_deviation = diameter_mode_obs - diameter_mode_pred
# Compute difference in total number concentration
total_number_concentration_obs = (obs * diameter_bin_width).sum(dim=dim, skipna=False)
total_number_concentration_pred = (pred * diameter_bin_width).sum(dim=dim, skipna=False)
total_number_concentration_difference = total_number_concentration_pred - total_number_concentration_obs
# Compute Kullback-Leibler divergence
# - Compute pdf per bin
pk_pdf = obs / total_number_concentration_obs
qk_pdf = pred / total_number_concentration_pred
# - Compute probabilities per bin
pk = pk_pdf * diameter_bin_width
pk = pk / pk.sum(dim=dim, skipna=False) # this might not be necessary
qk = qk_pdf * diameter_bin_width
qk = qk / qk.sum(dim=dim, skipna=False) # this might not be necessary
# - Compute log probability ratio
epsilon = 1e-10
pk = xr.where(pk == 0, epsilon, pk)
qk = xr.where(qk == 0, epsilon, qk)
log_prob_ratio = np.log(pk / qk)
log_prob_ratio = log_prob_ratio.where(np.isfinite(log_prob_ratio))
# - Compute divergence
kl_divergence = (pk * log_prob_ratio).sum(dim=dim, skipna=False)
kl_divergence = xr.where((error == 0).all(dim=dim), 0, kl_divergence)
# Create an xarray.Dataset to hold the computed statistics
ds = xr.Dataset(
{
"R2": pearson_r**2, # Squared Pearson correlation coefficient
"MAE": mae, # Mean Absolute Error
"MaxAE": max_error, # Maximum Absolute Error
"RelMaxAE": relative_max_error, # Relative Maximum Absolute Error
"PeakDiff": mode_deviation, # Difference at distribution peak
"RelPeakDiff": mode_relative_deviation, # Relative difference at peak
"DmodeDiff": diameter_mode_deviation, # Difference in mode diameters
"NtDiff": total_number_concentration_difference,
"KLDiv": kl_divergence, # Kullback-Leibler divergence
},
)
# Round
ds = ds.round(2)
# Mask where input obs or pred is NaN
ds = ds.where(~mask_nan)
return ds
####--------------------------------------------------------------------------------------.
#### Maximum Likelihood (ML)
[docs]
def get_expected_probabilities(params, cdf_func, pdf_func, bin_edges, probability_method, normalized=False):
"""
Compute the expected probabilities for each bin given the distribution parameters.
Parameters
----------
params : array-like
Parameters for the CDF or PDF function.
cdf_func : callable
Cumulative distribution function (CDF) that takes bin edges and parameters as inputs.
pdf_func : callable
Probability density function (PDF) that takes a value and parameters as inputs.
bin_edges : array-like
Edges of the bins for which to compute the probabilities.
probability_method : {'cdf', 'pdf'}
Method to compute the probabilities. If 'cdf', use the CDF to compute probabilities.
If 'pdf', integrate the PDF over each bin range.
normalized : bool, optional
If True, normalize the probabilities to sum to 1. Default is False.
Returns
-------
expected_probabilities : numpy.ndarray
Array of expected probabilities for each bin.
Notes
-----
- If the 'cdf' method is used, the probabilities are computed as the difference in CDF values at the bin edges.
- If the 'pdf' method is used, the probabilities are computed by integrating the PDF over each bin range.
- Any zero or negative probabilities are replaced with a very small positive number (1e-10) to ensure optimization.
- If `normalized` is True, the probabilities are normalized to sum to 1.
"""
if probability_method == "cdf":
# Compute the CDF at bin edges
cdf_vals = cdf_func(bin_edges, params)
# Compute probabilities for each bin
expected_probabilities = np.diff(cdf_vals)
# Replace any zero or negative probabilities with a very small positive number
# --> Otherwise do not optimize ...
expected_probabilities = np.maximum(expected_probabilities, 1e-10)
# Or integrate PDF over the bin range
else: # probability_method == "pdf":
# For each bin, integrate the PDF over the bin range
expected_probabilities = np.array(
[quad(lambda x: pdf_func(x, params), bin_edges[i], bin_edges[i + 1])[0] for i in range(len(bin_edges) - 1)],
)
if normalized:
# Normalize probabilities to sum to 1
total_probability = np.sum(expected_probabilities)
expected_probabilities /= total_probability
return expected_probabilities
[docs]
def get_adjusted_nt(cdf, params, Nt, bin_edges):
"""Adjust Nt for the proportion of missing drops. See Johnson's et al., 2013 Eqs. 3 and 4."""
# Estimate proportion of missing drops (Johnson's 2011 Eqs. 3)
# --> Alternative: p = 1 - np.sum(pdf(diameter, params)* diameter_bin_width) # [-]
p = 1 - np.diff(cdf([bin_edges[0], bin_edges[-1]], params)).item() # [-]
# Adjusts Nt for the proportion of drops not obs
# p = np.clip(p, 0, 1 - 1e-12)
if np.isclose(p, 1, atol=1e-12):
return np.nan
return Nt / (1 - p) # [m-3]
[docs]
def compute_negative_log_likelihood(
params,
bin_edges,
counts,
cdf_func,
pdf_func,
param_constraints=None,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
):
"""
General negative log-likelihood function for fitting distributions to binned data.
Parameters
----------
params : array-like
Parameters of the distribution.
bin_edges : array-like
Edges of the bins (length N+1).
counts : array-like
obs counts in each bin (length N).
cdf_func : callable
Cumulative distribution function of the distribution.
pdf_func : callable
Probability density function of the distribution.
param_constraints : callable, optional
Function that checks if parameters are valid.
probability_method : str, optional
Method to compute expected probabilities, either 'cdf' or 'pdf'. Default is 'cdf'.
likelihood : str, optional
Type of likelihood to compute, either 'multinomial' or 'poisson'. Default is 'multinomial'.
truncated_likelihood : bool, optional
Whether to normalize the expected probabilities. Default is True.
nll : float
Negative log-likelihood value.
Returns
-------
nll: float
The negative log-likelihood value.
"""
# Check if parameters are valid
if param_constraints is not None and not param_constraints(params):
return np.inf
# Compute (unormalized) expected probabilities using CDF
expected_probabilities = get_expected_probabilities(
params=params,
cdf_func=cdf_func,
pdf_func=pdf_func,
bin_edges=bin_edges,
probability_method=probability_method,
normalized=truncated_likelihood,
)
# Ensure expected probabilities are valid
if np.any(expected_probabilities <= 0):
return np.inf
# Compute negative log-likelihood
if likelihood == "poisson":
n_total = np.sum(counts)
expected_counts = expected_probabilities * n_total
expected_counts = np.maximum(expected_counts, 1e-10) # Avoid zero expected counts
nll = -np.sum(counts * np.log(expected_counts) - expected_counts)
else: # likelihood == "multinomial":
# Compute likelihood
nll = -np.sum(counts * np.log(expected_probabilities))
return nll
[docs]
def estimate_lognormal_parameters(
counts,
mu,
sigma,
bin_edges,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
output_dictionary=True,
optimizer="Nelder-Mead",
):
"""
Estimate the parameters of a lognormal distribution given histogram data.
Parameters
----------
counts : array-like
The counts for each bin in the histogram.
mu: float
The initial guess of the mean of the log of the distribution.
A good default value is 0.
sigma: float
The initial guess of the standard deviation of the log distribution.
A good default value is 1.
bin_edges : array-like
The edges of the bins.
probability_method : str, optional
The method to compute probabilities, either ``"cdf"`` or ``"pdf"``. The default value is ``"cdf"``.
likelihood : str, optional
The likelihood function to use, either ``"multinomial"`` or ``"poisson"``.
The default value is ``"multinomial"``.
truncated_likelihood : bool, optional
Whether to use truncated likelihood. The default value is ``True``.
output_dictionary : bool, optional
Whether to return the output as a dictionary.
If False, returns a numpy array. The default value is ``True``
optimizer : str, optional
The optimization method to use. Default is ``"Nelder-Mead"``.
Returns
-------
dict or numpy.ndarray
The estimated parameters of the lognormal distribution.
If ``output_dictionary`` is ``True``, returns a dictionary with keys ``Nt``, ``mu``, and ``sigma``.
If ``output_dictionary`` is ``False``,returns a numpy array with values [Nt, mu, sigma].
Notes
-----
The lognormal distribution is defined as:
N(D) = Nt / (sqrt(2 * pi) * sigma * D) * exp(-(ln(D) - mu)**2 / (2 * sigma**2))
where Nt is the total number of counts, mu is the mean of the log of the distribution,
and sigma is the standard deviation of the log of the distribution.
References
----------
.. [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.lognorm.html#scipy.stats.lognorm
"""
# Definite initial guess for the parameters
scale = np.exp(mu) # mu = np.log(scale)
initial_params = [sigma, scale]
# Initialize bad results
null_output = (
{"Nt": np.nan, "mu": np.nan, "sigma": np.nan} if output_dictionary else np.array([np.nan, np.nan, np.nan])
)
# Define the CDF and PDF functions for the lognormal distribution
def lognorm_cdf(x, params):
sigma, scale = params
return ss.lognorm.cdf(x, sigma, loc=0, scale=scale)
def lognorm_pdf(x, params):
sigma, scale = params
return ss.lognorm.pdf(x, sigma, loc=0, scale=scale)
# Define valid parameters for the lognormal distribution
def param_constraints(params):
sigma, scale = params
return sigma > 0 and scale > 0
# Define bounds for sigma and scale
bounds = [(1e-6, None), (1e-6, None)]
# Minimize the negative log-likelihood
with suppress_warnings():
result = minimize(
compute_negative_log_likelihood,
initial_params,
args=(
bin_edges,
counts,
lognorm_cdf,
lognorm_pdf,
param_constraints,
probability_method,
likelihood,
truncated_likelihood,
),
bounds=bounds,
method=optimizer,
)
# Check if the fit had success
if not result.success:
return null_output
# Define Nt
Nt = np.sum(counts).item()
# Retrieve parameters
params = result.x
if truncated_likelihood:
Nt = get_adjusted_nt(cdf=lognorm_cdf, params=params, Nt=Nt, bin_edges=bin_edges)
sigma, scale = params
mu = np.log(scale)
# Define output
output = {"Nt": Nt, "mu": mu, "sigma": sigma} if output_dictionary else np.array([Nt, mu, sigma])
return output
[docs]
def estimate_exponential_parameters(
counts,
Lambda,
bin_edges,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
output_dictionary=True,
optimizer="Nelder-Mead",
):
"""
Estimate the parameters of an exponential distribution given histogram data.
Parameters
----------
counts : array-like
The counts for each bin in the histogram.
Lambda : float
The initial guess of the scale parameter.
scale = 1 / lambda correspond to the scale parameter of the scipy.stats.expon distribution.
A good default value is 1.
bin_edges : array-like
The edges of the bins.
probability_method : str, optional
The method to compute probabilities, either ``"cdf"`` or ``"pdf"``. The default value is ``"cdf"``.
likelihood : str, optional
The likelihood function to use, either ``"multinomial"`` or ``"poisson"``.
The default value is ``"multinomial"``.
truncated_likelihood : bool, optional
Whether to use truncated likelihood. The default value is ``True``.
output_dictionary : bool, optional
Whether to return the output as a dictionary.
If False, returns a numpy array. The default value is ``True``
optimizer : str, optional
The optimization method to use. Default is ``"Nelder-Mead"``.
Returns
-------
dict or numpy.ndarray
The estimated parameters of the exponential distribution.
If ``output_dictionary`` is ``True``, returns a dictionary with keys ``N0`` and ``Lambda``.
If `output_dictionary` is ``False``, returns a numpy array with [N0, Lambda].
Notes
-----
The exponential distribution is defined as:
N(D) = N0 * exp(-Lambda * D) = Nt * Lambda * exp(-Lambda * D)
where Lambda = 1 / scale and N0 = Nt * Lambda.
References
----------
.. [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.expon.html
"""
# Definite initial guess for parameters
scale = 1 / Lambda
initial_params = [scale]
# Initialize bad results
null_output = {"N0": np.nan, "Lambda": np.nan} if output_dictionary else np.array([np.nan, np.nan])
# Define the CDF and PDF functions for the exponential distribution
def exp_cdf(x, params):
scale = params[0]
return ss.expon.cdf(x, loc=0, scale=scale)
def exp_pdf(x, params):
scale = params[0]
return ss.expon.pdf(x, loc=0, scale=scale)
# Define valid parameters for the exponential distribution
def param_constraints(params):
scale = params[0]
return scale > 0
# Define bounds for scale
bounds = [(1e-6, None)]
# Minimize the negative log-likelihood
with suppress_warnings():
result = minimize(
compute_negative_log_likelihood,
initial_params,
args=(
bin_edges,
counts,
exp_cdf,
exp_pdf,
param_constraints,
probability_method,
likelihood,
truncated_likelihood,
),
bounds=bounds,
method=optimizer,
)
# Check if the fit had success
if not result.success:
return null_output
# Define Nt
Nt = np.sum(counts).item()
# Retrieve parameters
params = result.x
if truncated_likelihood:
Nt = get_adjusted_nt(cdf=exp_cdf, params=params, Nt=Nt, bin_edges=bin_edges)
scale = params[0]
Lambda = 1 / scale
N0 = Nt * Lambda
# Define output
output = {"N0": N0, "Lambda": Lambda} if output_dictionary else np.array([N0, Lambda])
return output
[docs]
def estimate_gamma_parameters(
counts,
mu,
Lambda,
bin_edges,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
output_dictionary=True,
optimizer="Nelder-Mead",
):
"""
Estimate the parameters of a gamma distribution given histogram data.
Parameters
----------
counts : array-like
The counts for each bin in the histogram.
mu: float
The initial guess of the shape parameter.
a = mu + 1 correspond to the shape parameter of the scipy.stats.gamma distribution.
A good default value is 0.
lambda: float
The initial guess of the scale parameter.
scale = 1 / lambda correspond to the scale parameter of the scipy.stats.gamma distribution.
A good default value is 1.
bin_edges : array-like
The edges of the bins.
probability_method : str, optional
The method to compute probabilities, either ``"cdf"`` or ``"pdf"``. The default value is ``"cdf"``.
likelihood : str, optional
The likelihood function to use, either ``"multinomial"`` or ``"poisson"``.
The default value is ``"multinomial"``.
truncated_likelihood : bool, optional
Whether to use truncated likelihood. The default value is ``True``.
output_dictionary : bool, optional
Whether to return the output as a dictionary.
If False, returns a numpy array. The default value is ``True``
optimizer : str, optional
The optimization method to use. Default is ``"Nelder-Mead"``.
Returns
-------
dict or numpy.ndarray
The estimated parameters of the gamma distribution.
If ``output_dictionary`` is ``True``, returns a dictionary with keys ``N0``, ``mu`` and ``Lambda``.
If `output_dictionary` is ``False``, returns a numpy array with [N0, mu, Lambda].
Notes
-----
The gamma distribution is defined as:
N(D) = N0 * D**mu * exp(-Lambda*D)
where Lambda = 1/scale, and mu = a - 1 with ``a`` being the shape parameter of the gamma distribution.
N0 is defined as N0 = Nt*Lambda**(mu+1)/gamma(mu+1).
References
----------
.. [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.gamma.html
"""
# Define initial guess for parameters
a = mu + 1 # (mu = a-1, a = mu+1)
scale = 1 / Lambda
initial_params = [a, scale]
# Initialize bad results
null_output = (
{"N0": np.nan, "mu": np.nan, "lambda": np.nan} if output_dictionary else np.array([np.nan, np.nan, np.nan])
)
# Define the CDF and PDF functions for the gamma distribution
def gamma_cdf(x, params):
a, scale = params
return ss.gamma.cdf(x, a, loc=0, scale=scale)
def gamma_pdf(x, params):
a, scale = params
return ss.gamma.pdf(x, a, loc=0, scale=scale)
# Define valid parameters for the gamma distribution
# mu = -0.99 is a vertical line essentially ...
def param_constraints(params):
a, scale = params
return a > 0.1 and scale > 0 # using a > 0 cause some troubles
# Define bounds for a and scale
bounds = [(1e-6, None), (1e-6, None)]
# Minimize the negative log-likelihood
with suppress_warnings():
result = minimize(
compute_negative_log_likelihood,
initial_params,
args=(
bin_edges,
counts,
gamma_cdf,
gamma_pdf,
param_constraints,
probability_method,
likelihood,
truncated_likelihood,
),
method=optimizer,
bounds=bounds,
)
# Check if the fit had success
if not result.success:
return null_output
# Define Nt
Nt = np.sum(counts).item()
# Retrieve parameters
params = result.x
if truncated_likelihood:
Nt = get_adjusted_nt(cdf=gamma_cdf, params=params, Nt=Nt, bin_edges=bin_edges)
a, scale = params
mu = a - 1
Lambda = 1 / scale
# Compute N0
# - Use logarithmic computations to prevent overflow
# - N0 = Nt * Lambda ** (mu + 1) / gamma(mu + 1)
with suppress_warnings():
log_N0 = np.log(Nt) + (mu + 1) * np.log(Lambda) - gammaln(mu + 1)
N0 = np.exp(log_N0)
# Set parameters to np.nan if any of the parameters is not a finite number
if not np.isfinite(N0) or not np.isfinite(mu) or not np.isfinite(Lambda):
return null_output
# Define output
output = {"N0": N0, "mu": mu, "Lambda": Lambda} if output_dictionary else np.array([N0, mu, Lambda])
return output
def _get_initial_lognormal_parameters(ds, mom_method=None):
default_mu = 0 # mu = np.log(scale)
default_sigma = 1
if mom_method is None or mom_method == "None":
ds_init = xr.Dataset(
{
"mu": default_mu,
"sigma": default_sigma,
},
)
else:
ds_init = get_mom_parameters(
ds=ds,
psd_model="LognormalPSD",
mom_methods=mom_method,
)
# If initialization results in some not finite number, set default value
ds_init["mu"] = xr.where(
np.logical_and(np.isfinite(ds_init["mu"]), ds_init["mu"] > 0),
ds_init["mu"],
default_mu,
)
ds_init["sigma"] = xr.where(np.isfinite(ds_init["sigma"]), ds_init["sigma"], default_sigma)
return ds_init
def _get_initial_exponential_parameters(ds, mom_method=None):
default_lambda = 1 # lambda = 1 /scale
if mom_method is None or mom_method == "None":
ds_init = xr.Dataset(
{
"Lambda": default_lambda,
},
)
else:
ds_init = get_mom_parameters(
ds=ds,
psd_model="ExponentialPSD",
mom_methods=mom_method,
)
# If initialization results in some not finite number, set default value
ds_init["Lambda"] = xr.where(np.isfinite(ds_init["Lambda"]), ds_init["Lambda"], default_lambda)
return ds_init
def _get_initial_gamma_parameters(ds, mom_method=None):
default_mu = 0 # a = mu + 1 | mu = a - 1
default_lambda = 1 # scale = 1 / Lambda
if mom_method is None or mom_method == "None":
ds_init = xr.Dataset(
{
"mu": default_mu,
"Lambda": default_lambda,
},
)
else:
ds_init = get_mom_parameters(
ds=ds,
psd_model="GammaPSD",
mom_methods=mom_method,
)
# If initialization results in some not finite number, set default value
ds_init["mu"] = xr.where(
np.logical_and(np.isfinite(ds_init["mu"]), ds_init["mu"] > -1),
ds_init["mu"],
default_mu,
)
ds_init["Lambda"] = xr.where(np.isfinite(ds_init["Lambda"]), ds_init["Lambda"], default_lambda)
return ds_init
[docs]
def get_gamma_parameters(
ds,
init_method=None,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
optimizer="Nelder-Mead",
):
"""
Estimate gamma distribution parameters for drop size distribution (DSD) data.
Parameters
----------
ds : xarray.Dataset
Input dataset containing drop size distribution data. It must include the following variables:
- ``drop_number_concentration``: The number concentration of drops.
- ``diameter_bin_width``": The width of each diameter bin.
- ``diameter_bin_lower``: The lower bounds of the diameter bins.
- ``diameter_bin_upper``: The upper bounds of the diameter bins.
- ``diameter_bin_center``: The center values of the diameter bins.
- The moments M0...M6 variables required to compute the initial parameters
with the specified mom_method.
init_method: str or list
The method(s) of moments used to initialize the gamma parameters.
If None (or 'None'), the scale parameter is set to 1 and mu to 0 (a=1).
probability_method : str, optional
Method to compute probabilities. The default value is ``cdf``.
likelihood : str, optional
Likelihood function to use for fitting. The default value is ``multinomial``.
truncated_likelihood : bool, optional
Whether to use truncated likelihood. The default value is ``True``.
optimizer : str, optional
Optimization method to use. The default value is ``Nelder-Mead``.
Returns
-------
xarray.Dataset
Dataset containing the estimated gamma distribution parameters:
- ``N0``: Intercept parameter.
- ``mu``: Shape parameter.
- ``Lambda``: Scale parameter.
The dataset will also have an attribute ``disdrodb_psd_model`` set to ``GammaPSD``.
Notes
-----
The function uses `xr.apply_ufunc` to fit the lognormal distribution parameters
in parallel, leveraging Dask for parallel computation.
"""
# Define inputs
counts = ds["drop_number_concentration"] * ds["diameter_bin_width"]
diameter_breaks = get_diameter_bin_edges(ds)
# Define initial parameters (mu, Lambda)
ds_init = _get_initial_gamma_parameters(ds, mom_method=init_method)
# Define kwargs
kwargs = {
"output_dictionary": False,
"bin_edges": diameter_breaks,
"probability_method": probability_method,
"likelihood": likelihood,
"truncated_likelihood": truncated_likelihood,
"optimizer": optimizer,
}
# Fit distribution in parallel
da_params = xr.apply_ufunc(
estimate_gamma_parameters,
counts,
ds_init["mu"],
ds_init["Lambda"],
kwargs=kwargs,
input_core_dims=[[DIAMETER_DIMENSION], [], []],
output_core_dims=[["parameters"]],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"parameters": 3}}, # lengths of the new output_core_dims dimensions.
output_dtypes=["float64"],
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["N0", "mu", "Lambda"]})
# Create parameters dataset
ds_params = da_params.to_dataset(dim="parameters")
# Add DSD model name to the attribute
ds_params.attrs["disdrodb_psd_model"] = "GammaPSD"
return ds_params
[docs]
def get_lognormal_parameters(
ds,
init_method=None,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
optimizer="Nelder-Mead",
):
"""
Estimate lognormal distribution parameters for drop size distribution (DSD) data.
Parameters
----------
ds : xarray.Dataset
Input dataset containing drop size distribution data. It must include the following variables:
- ``drop_number_concentration``: The number concentration of drops.
- ``diameter_bin_width``": The width of each diameter bin.
- ``diameter_bin_lower``: The lower bounds of the diameter bins.
- ``diameter_bin_upper``: The upper bounds of the diameter bins.
- ``diameter_bin_center``: The center values of the diameter bins.
probability_method : str, optional
Method to compute probabilities. The default value is ``cdf``.
likelihood : str, optional
Likelihood function to use for fitting. The default value is ``multinomial``.
truncated_likelihood : bool, optional
Whether to use truncated likelihood. The default value is ``True``.
optimizer : str, optional
Optimization method to use. The default value is ``Nelder-Mead``.
Returns
-------
xarray.Dataset
Dataset containing the estimated lognormal distribution parameters:
- ``Nt``: Total number concentration.
- ``mu``: Mean of the lognormal distribution.
- ``sigma``: Standard deviation of the lognormal distribution.
The resulting dataset will have an attribute ``disdrodb_psd_model`` set to ``LognormalPSD``.
Notes
-----
The function uses `xr.apply_ufunc` to fit the lognormal distribution parameters
in parallel, leveraging Dask for parallel computation.
"""
# Define inputs
counts = ds["drop_number_concentration"] * ds["diameter_bin_width"]
diameter_breaks = get_diameter_bin_edges(ds)
# Define initial parameters (mu, sigma)
ds_init = _get_initial_lognormal_parameters(ds, mom_method=init_method)
# Define kwargs
kwargs = {
"output_dictionary": False,
"bin_edges": diameter_breaks,
"probability_method": probability_method,
"likelihood": likelihood,
"truncated_likelihood": truncated_likelihood,
"optimizer": optimizer,
}
# Fit distribution in parallel
da_params = xr.apply_ufunc(
estimate_lognormal_parameters,
counts,
ds_init["mu"],
ds_init["sigma"],
kwargs=kwargs,
input_core_dims=[[DIAMETER_DIMENSION], [], []],
output_core_dims=[["parameters"]],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"parameters": 3}}, # lengths of the new output_core_dims dimensions.
output_dtypes=["float64"],
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["Nt", "mu", "sigma"]})
# Create parameters dataset
ds_params = da_params.to_dataset(dim="parameters")
# Add DSD model name to the attribute
ds_params.attrs["disdrodb_psd_model"] = "LognormalPSD"
return ds_params
[docs]
def get_exponential_parameters(
ds,
init_method=None,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
optimizer="Nelder-Mead",
):
"""
Estimate the parameters of an exponential particle size distribution (PSD) from the given dataset.
Fitting this model is equivalent to fitting a GammaPSD model fixing ``mu`` to 0.
Parameters
----------
ds : xarray.Dataset
Input dataset containing drop number concentration data and diameter information.
It must include the following variables:
- ``drop_number_concentration``: The number concentration of drops.
- ``diameter_bin_width``": The width of each diameter bin.
- ``diameter_bin_lower``: The lower bounds of the diameter bins.
- ``diameter_bin_upper``: The upper bounds of the diameter bins.
- ``diameter_bin_center``: The center values of the diameter bins.
probability_method : str, optional
Method to compute probabilities. The default value is ``cdf``.
likelihood : str, optional
Likelihood function to use for fitting. The default value is ``multinomial``.
truncated_likelihood : bool, optional
Whether to use truncated likelihood. The default value is ``True``.
optimizer : str, optional
Optimization method to use. The default value is ``Nelder-Mead``.
Returns
-------
xarray.Dataset
Dataset containing the estimated expontial distribution parameters:
- ``N0``: Intercept parameter.
- ``Lambda``: Scale parameter.
The resulting dataset will have an attribute ``disdrodb_psd_model`` set to ``ExponentialPSD``.
Notes
-----
The function uses `xr.apply_ufunc` to fit the exponential distribution parameters
in parallel, leveraging Dask for parallel computation.
"""
# Define inputs
counts = ds["drop_number_concentration"] * ds["diameter_bin_width"]
diameter_breaks = get_diameter_bin_edges(ds)
# Define initial parameters (Lambda)
ds_init = _get_initial_exponential_parameters(ds, mom_method=init_method)
# Define kwargs
kwargs = {
"output_dictionary": False,
"bin_edges": diameter_breaks,
"probability_method": probability_method,
"likelihood": likelihood,
"truncated_likelihood": truncated_likelihood,
"optimizer": optimizer,
}
# Fit distribution in parallel
da_params = xr.apply_ufunc(
estimate_exponential_parameters,
counts,
ds_init["Lambda"],
kwargs=kwargs,
input_core_dims=[[DIAMETER_DIMENSION], []],
output_core_dims=[["parameters"]],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"parameters": 2}}, # lengths of the new output_core_dims dimensions.
output_dtypes=["float64"],
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["N0", "Lambda"]})
# Create parameters dataset
ds_params = da_params.to_dataset(dim="parameters")
# Add DSD model name to the attribute
ds_params.attrs["disdrodb_psd_model"] = "ExponentialPSD"
return ds_params
####-------------------------------------------------------------------------------------------------------------------.
def _estimate_gamma_parameters_johnson(
drop_number_concentration,
diameter,
diameter_breaks,
output_dictionary=True,
method="Nelder-Mead",
mu=0.5,
Lambda=3,
**kwargs,
):
"""Deprecated Maximum likelihood estimation of Gamma model.
N(D) = N_t * lambda**(mu+1) / gamma(mu+1) D**mu exp(-lambda*D)
Args:
spectra: The DSD for which to find parameters [mm-1 m-3].
widths: Class widths for each DSD bin [mm].
diams: Class-centre diameters for each DSD bin [mm].
mu: Initial value for shape parameter mu [-].
lambda_param: Initial value for slope parameter lambda [mm^-1].
kwargs: Extra arguments for the optimization process.
Returns
-------
Dictionary with estimated mu, lambda, and N0.
mu (shape) N0 (scale) lambda(slope)
Notes
-----
The last bin counts are not accounted in the fitting procedure !
References
----------
Johnson, R. W., D. V. Kliche, and P. L. Smith, 2011: Comparison of Estimators for Parameters of Gamma Distributions
with Left-Truncated Samples. J. Appl. Meteor. Climatol., 50, 296-310, https://doi.org/10.1175/2010JAMC2478.1
Johnson, R.W., Kliche, D., & Smith, P.L. (2010).
Maximum likelihood estimation of gamma parameters for coarsely binned and truncated raindrop size data.
Quarterly Journal of the Royal Meteorological Society, 140. DOI:10.1002/qj.2209
"""
# Initialize bad results
if output_dictionary:
null_output = {"mu": np.nan, "lambda": np.nan, "N0": np.nan}
else:
null_output = np.array([np.nan, np.nan, np.nan])
# Initialize parameters
# --> Ideally with method of moments estimate
# --> See equation 8 of Johnson's 2013
x0 = [mu, Lambda]
# Compute diameter_bin_width
diameter_bin_width = np.diff(diameter_breaks)
# Convert drop_number_concentration from mm-1 m-3 to m-3.
spectra = np.asarray(drop_number_concentration) * diameter_bin_width
# Define cost function
# - Parameter to be optimized on first positions
def _cost_function(parameters, spectra, diameter_breaks):
# Assume spectra to be in unit [m-3] (drop_number_concentration*diameter_bin_width) !
mu, Lambda = parameters
# Precompute gamma integrals between various diameter bins
# - gamminc(mu+1) already divides the integral by gamma(mu+1) !
pgamma_d = gammainc(mu + 1, Lambda * diameter_breaks)
# Compute probability with interval
delta_pgamma_bins = pgamma_d[1:] - pgamma_d[:-1]
# Compute normalization over interval
denominator = pgamma_d[-1] - pgamma_d[0]
# Compute cost function
# a = mu - 1, x = lambda
if mu > -1 and Lambda > 0:
cost = np.sum(-spectra * np.log(delta_pgamma_bins / denominator))
return cost
return np.inf
# Minimize the cost function
with suppress_warnings():
bounds = [(0, None), (0, None)] # Force mu and lambda to be non-negative
res = minimize(
_cost_function,
x0=x0,
args=(spectra, diameter_breaks),
method=method,
bounds=bounds,
**kwargs,
)
# Check if the fit had success
if not res.success:
return null_output
# Extract parameters
mu = res.x[0] # [-]
Lambda = res.x[1] # [mm-1]
# Estimate tilde_N_T using the total drop concentration
tilde_N_T = np.sum(drop_number_concentration * diameter_bin_width) # [m-3]
# Estimate proportion of missing drops (Johnson's 2011 Eqs. 3)
with suppress_warnings():
D = diameter
p = 1 - np.sum((Lambda ** (mu + 1)) / gamma(mu + 1) * D**mu * np.exp(-Lambda * D) * diameter_bin_width) # [-]
# Convert tilde_N_T to N_T using Johnson's 2013 Eqs. 3 and 4.
# - Adjusts for the proportion of drops not obs
N_T = tilde_N_T / (1 - p) # [m-3]
# Compute N0
N0 = N_T * (Lambda ** (mu + 1)) / gamma(mu + 1) # [m-3 * mm^(-mu-1)]
# Compute Dm
# Dm = (mu + 4)/ Lambda
# Compute Nw
# Nw = N0* D^mu / f(mu) , with f(mu of the Normalized PSD)
# Define output
output = {"mu": mu, "Lambda": Lambda, "N0": N0} if output_dictionary else np.array([mu, Lambda, N0])
return output
[docs]
def get_gamma_parameters_johnson2014(ds, method="Nelder-Mead"):
"""Deprecated model. See Gamma Model with truncated_likelihood and 'pdf'."""
drop_number_concentration = ds["drop_number_concentration"]
diameter = ds["diameter_bin_center"]
diameter_breaks = get_diameter_bin_edges(ds)
# Define kwargs
kwargs = {
"output_dictionary": False,
"diameter_breaks": diameter_breaks,
"method": method,
}
da_params = xr.apply_ufunc(
_estimate_gamma_parameters_johnson,
drop_number_concentration,
diameter,
# diameter_bin_width,
kwargs=kwargs,
input_core_dims=[[DIAMETER_DIMENSION], [DIAMETER_DIMENSION]], # [DIAMETER_DIMENSION],
output_core_dims=[["parameters"]],
vectorize=True,
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["mu", "Lambda", "N0"]})
# Convert to skill Dataset
ds_params = da_params.to_dataset(dim="parameters")
return ds_params
####-----------------------------------------------------------------------------------------.
#### Grid Search (GS)
def _compute_rain_rate(ND, D, dD, V):
axis = 1 if ND.ndim == 2 else None
rain_rate = np.pi / 6 * np.sum(ND * V * (D / 1000) ** 3 * dD, axis=axis) * 3600 * 1000
return rain_rate # mm/h
def _compute_lwc(ND, D, dD, rho_w=1000):
axis = 1 if ND.ndim == 2 else None
lwc = np.pi / 6.0 * (rho_w * 1000) * np.sum((D / 1000) ** 3 * ND * dD, axis=axis)
return lwc # g/m3
def _compute_z(ND, D, dD):
axis = 1 if ND.ndim == 2 else None
z = np.sum(((D) ** 6 * ND * dD), axis=axis) # mm⁶·m⁻³
Z = 10 * np.log10(z)
return Z
def _compute_cost_function(ND_obs, ND_preds, D, dD, V, target, transformation, error_order):
# Assume ND_obs of shape (D bins) and ND_preds of shape (# params, D bins)
if target == "ND":
if transformation == "identity":
errors = np.mean(np.abs(ND_obs[None, :] - ND_preds) ** error_order, axis=1)
if transformation == "log":
errors = np.mean(np.abs(np.log(ND_obs[None, :] + 1) - np.log(ND_preds + 1)) ** error_order, axis=1)
if transformation == "np.sqrt":
errors = np.mean(np.abs(np.sqrt(ND_obs[None, :]) - np.sqrt(ND_preds)) ** error_order, axis=1)
elif target == "Z":
errors = np.abs(_compute_z(ND_obs, D, dD) - _compute_z(ND_preds, D, dD))
elif target == "R":
errors = np.abs(_compute_rain_rate(ND_obs, D, dD, V) - _compute_rain_rate(ND_preds, D, dD, V))
elif target == "LWC":
errors = np.abs(_compute_lwc(ND_obs, D, dD) - _compute_lwc(ND_preds, D, dD))
else:
raise ValueError("Invalid target")
return errors
[docs]
def apply_exponential_gs(
Nt,
ND_obs,
V,
# Coords
D,
dD,
# Error options
target,
transformation,
error_order,
):
"""Apply Grid Search for the ExponentialPSD distribution."""
# Define set of mu values
lambda_arr = np.arange(0.01, 20, step=0.01)
# Perform grid search
with suppress_warnings():
# Compute ND
N0_arr = Nt * lambda_arr
ND_preds = ExponentialPSD.formula(D=D[None, :], N0=N0_arr[:, None], Lambda=lambda_arr[:, None])
# Compute errors
errors = _compute_cost_function(
ND_obs=ND_obs,
ND_preds=ND_preds,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
# Identify best parameter set
best_index = np.argmin(errors)
return np.array([N0_arr[best_index].item(), lambda_arr[best_index].item()])
def _apply_gamma_gs(mu_values, lambda_values, Nt, ND_obs, D, dD, V, target, transformation, error_order):
"""Routine for GammaPSD parameters grid search."""
# Define combinations of parameters for grid search
combo = np.meshgrid(mu_values, lambda_values, indexing="xy")
mu_arr = combo[0].ravel()
lambda_arr = combo[1].ravel()
# Perform grid search
with suppress_warnings():
# Compute ND
N0 = np.exp(np.log(Nt) + (mu_arr[:, None] + 1) * np.log(lambda_arr[:, None]) - gammaln(mu_arr[:, None] + 1))
ND_preds = GammaPSD.formula(D=D[None, :], N0=N0, Lambda=lambda_arr[:, None], mu=mu_arr[:, None])
# Compute errors
errors = _compute_cost_function(
ND_obs=ND_obs,
ND_preds=ND_preds,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
# Best parameter
best_index = np.argmin(errors)
return N0[best_index].item(), mu_arr[best_index].item(), lambda_arr[best_index].item()
[docs]
def apply_gamma_gs(
Nt,
ND_obs,
V,
# Coords
D,
dD,
# Error options
target,
transformation,
error_order,
):
"""Estimate GammaPSD model parameters using Grid Search."""
# Define initial set of parameters
mu_step = 0.5
lambda_step = 0.5
mu_values = np.arange(0.01, 20, step=mu_step)
lambda_values = np.arange(0, 60, step=lambda_step)
# First round of GS
N0, mu, Lambda = _apply_gamma_gs(
mu_values=mu_values,
lambda_values=lambda_values,
Nt=Nt,
ND_obs=ND_obs,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
# Second round of GS
mu_values = np.arange(mu - mu_step * 2, mu + mu_step * 2, step=mu_step / 20)
lambda_values = np.arange(Lambda - lambda_step * 2, Lambda + lambda_step * 2, step=lambda_step / 20)
N0, mu, Lambda = _apply_gamma_gs(
mu_values=mu_values,
lambda_values=lambda_values,
Nt=Nt,
ND_obs=ND_obs,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
return np.array([N0, mu, Lambda])
def _apply_lognormal_gs(mu_values, sigma_values, Nt, ND_obs, D, dD, V, target, transformation, error_order):
"""Routine for LognormalPSD parameters grid search."""
# Define combinations of parameters for grid search
combo = np.meshgrid(mu_values, sigma_values, indexing="xy")
mu_arr = combo[0].ravel()
sigma_arr = combo[1].ravel()
# Perform grid search
with suppress_warnings():
# Compute ND
ND_preds = LognormalPSD.formula(D=D[None, :], Nt=Nt, mu=mu_arr[:, None], sigma=sigma_arr[:, None])
# Compute errors
errors = _compute_cost_function(
ND_obs=ND_obs,
ND_preds=ND_preds,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
# Best parameter
best_index = np.argmin(errors)
return Nt, mu_arr[best_index].item(), sigma_arr[best_index].item()
[docs]
def apply_lognormal_gs(
Nt,
ND_obs,
V,
# Coords
D,
dD,
# Error options
target,
transformation,
error_order,
):
"""Estimate LognormalPSD model parameters using Grid Search."""
# Define initial set of parameters
mu_step = 0.5
sigma_step = 0.5
mu_values = np.arange(0.01, 20, step=mu_step) # TODO: define realistic values
sigma_values = np.arange(0, 20, step=sigma_step) # TODO: define realistic values
# First round of GS
Nt, mu, sigma = _apply_lognormal_gs(
mu_values=mu_values,
sigma_values=sigma_values,
Nt=Nt,
ND_obs=ND_obs,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
# Second round of GS
mu_values = np.arange(mu - mu_step * 2, mu + mu_step * 2, step=mu_step / 20)
sigma_values = np.arange(sigma - sigma_step * 2, sigma + sigma_step * 2, step=sigma_step / 20)
Nt, mu, sigma = _apply_lognormal_gs(
mu_values=mu_values,
sigma_values=sigma_values,
Nt=Nt,
ND_obs=ND_obs,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
return np.array([Nt, mu, sigma])
[docs]
def apply_normalized_gamma_gs(
Nw,
D50,
ND_obs,
V,
# Coords
D,
dD,
# Error options
target,
transformation,
error_order,
):
"""Estimate NormalizedGammaPSD model parameters using Grid Search."""
# Define set of mu values
mu_arr = np.arange(0.01, 20, step=0.01)
# Perform grid search
with suppress_warnings():
# Compute ND
ND_preds = NormalizedGammaPSD.formula(D=D[None, :], D50=D50, Nw=Nw, mu=mu_arr[:, None])
# Compute errors
errors = _compute_cost_function(
ND_obs=ND_obs,
ND_preds=ND_preds,
D=D,
dD=dD,
V=V,
target=target,
transformation=transformation,
error_order=error_order,
)
# Identify best parameter set
mu = mu_arr[np.argmin(errors)]
return np.array([Nw, mu, D50])
[docs]
def get_exponential_parameters_gs(ds, target="ND", transformation="log", error_order=1):
"""Estimate the parameters of an Exponential distribution using Grid Search."""
# "target": ["ND", "LWC", "Z", "R"]
# "transformation": "log", "identity", "sqrt", # only for drop_number_concentration
# "error_order": 1, # MAE/MSE ... only for drop_number_concentration
# Compute required variables
ds["Nt"] = get_total_number_concentration(
drop_number_concentration=ds["drop_number_concentration"],
diameter_bin_width=ds["diameter_bin_width"],
)
# Define kwargs
kwargs = {
"D": ds["diameter_bin_center"].data,
"dD": ds["diameter_bin_width"].data,
"target": target,
"transformation": transformation,
"error_order": error_order,
}
# Fit distribution in parallel
da_params = xr.apply_ufunc(
apply_exponential_gs,
# Variables varying over time
ds["Nt"],
ds["drop_number_concentration"],
ds["fall_velocity"],
# Other options
kwargs=kwargs,
# Settings
input_core_dims=[[], [DIAMETER_DIMENSION], [DIAMETER_DIMENSION]],
output_core_dims=[["parameters"]],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"parameters": 2}}, # lengths of the new output_core_dims dimensions.
output_dtypes=["float64"],
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["N0", "Lambda"]})
# Create parameters dataset
ds_params = da_params.to_dataset(dim="parameters")
# Add DSD model name to the attribute
ds_params.attrs["disdrodb_psd_model"] = "ExponentialPSD"
return ds_params
[docs]
def get_gamma_parameters_gs(ds, target="ND", transformation="log", error_order=1):
"""Compute Grid Search to identify mu and Lambda Gamma distribution parameters."""
# "target": ["ND", "LWC", "Z", "R"]
# "transformation": "log", "identity", "sqrt", # only for drop_number_concentration
# "error_order": 1, # MAE/MSE ... only for drop_number_concentration
# Compute required variables
ds["Nt"] = get_total_number_concentration(
drop_number_concentration=ds["drop_number_concentration"],
diameter_bin_width=ds["diameter_bin_width"],
)
# Define kwargs
kwargs = {
"D": ds["diameter_bin_center"].data,
"dD": ds["diameter_bin_width"].data,
"target": target,
"transformation": transformation,
"error_order": error_order,
}
# Fit distribution in parallel
da_params = xr.apply_ufunc(
apply_gamma_gs,
# Variables varying over time
ds["Nt"],
ds["drop_number_concentration"],
ds["fall_velocity"],
# Other options
kwargs=kwargs,
# Settings
input_core_dims=[[], [DIAMETER_DIMENSION], [DIAMETER_DIMENSION]],
output_core_dims=[["parameters"]],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"parameters": 3}}, # lengths of the new output_core_dims dimensions.
output_dtypes=["float64"],
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["N0", "mu", "Lambda"]})
# Create parameters dataset
ds_params = da_params.to_dataset(dim="parameters")
# Add DSD model name to the attribute
ds_params.attrs["disdrodb_psd_model"] = "GammaPSD"
return ds_params
[docs]
def get_lognormal_parameters_gs(ds, target="ND", transformation="log", error_order=1):
"""Compute Grid Search to identify mu and sigma lognormal distribution parameters."""
# "target": ["ND", "LWC", "Z", "R"]
# "transformation": "log", "identity", "sqrt", # only for drop_number_concentration
# "error_order": 1, # MAE/MSE ... only for drop_number_concentration
# Compute required variables
ds["Nt"] = get_total_number_concentration(
drop_number_concentration=ds["drop_number_concentration"],
diameter_bin_width=ds["diameter_bin_width"],
)
# Define kwargs
kwargs = {
"D": ds["diameter_bin_center"].data,
"dD": ds["diameter_bin_width"].data,
"target": target,
"transformation": transformation,
"error_order": error_order,
}
# Fit distribution in parallel
da_params = xr.apply_ufunc(
apply_lognormal_gs,
# Variables varying over time
ds["Nt"],
ds["drop_number_concentration"],
ds["fall_velocity"],
# Other options
kwargs=kwargs,
# Settings
input_core_dims=[[], [DIAMETER_DIMENSION], [DIAMETER_DIMENSION]],
output_core_dims=[["parameters"]],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"parameters": 3}}, # lengths of the new output_core_dims dimensions.
output_dtypes=["float64"],
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["Nt", "mu", "sigma"]})
# Create parameters dataset
ds_params = da_params.to_dataset(dim="parameters")
# Add DSD model name to the attribute
ds_params.attrs["disdrodb_psd_model"] = "LognormalPSD"
return ds_params
[docs]
def get_normalized_gamma_parameters_gs(ds, target="ND", transformation="log", error_order=1):
r"""Estimate $\mu$ of a Normalized Gamma distribution using Grid Search.
The D50 and Nw parameters of the Normalized Gamma distribution are derived empirically from the obs DSD.
$\mu$ is derived by minimizing the errors between the obs DSD and modelled Normalized Gamma distribution.
Parameters
----------
Nd : array_like
A drop size distribution
D50: optional, float
Median drop diameter in mm. If none is given, it will be estimated.
Nw: optional, float
Normalized Intercept Parameter. If none is given, it will be estimated.
order: optional, float
Order to which square the error when computing the sum of errors.
Order = 2 is equivalent to minimize the mean squared error (MSE) (L2 norm). The default is 2.
Order = 1 is equivalent to minimize the mean absolute error (MAE) (L1 norm).
Higher orders typically stretch higher the gamma distribution.
Returns
-------
ds_params : xarray.Dataset
Dataset containing the estimated Normalized Gamma distribution parameters.
"""
# "target": ["ND", "LWC", "Z", "R"]
# "transformation": "log", "identity", "sqrt", # only for drop_number_concentration
# "error_order": 1, # MAE/MSE ... only for drop_number_concentration
# Compute required variables
drop_number_concentration = ds["drop_number_concentration"]
diameter_bin_width = ds["diameter_bin_width"]
diameter = ds["diameter_bin_center"] / 1000 # conversion from mm to m
m3 = get_moment(
drop_number_concentration=drop_number_concentration,
diameter=diameter, # m
diameter_bin_width=diameter_bin_width, # mm
moment=3,
)
m4 = get_moment(
drop_number_concentration=drop_number_concentration,
diameter=diameter, # m
diameter_bin_width=diameter_bin_width, # mm
moment=4,
)
ds["Nw"] = get_normalized_intercept_parameter_from_moments(moment_3=m3, moment_4=m4)
ds["D50"] = get_median_volume_drop_diameter(
drop_number_concentration=drop_number_concentration,
diameter=diameter, # m
diameter_bin_width=diameter_bin_width, # mm
)
# Define kwargs
kwargs = {
"D": ds["diameter_bin_center"].data,
"dD": ds["diameter_bin_width"].data,
"target": target,
"transformation": transformation,
"error_order": error_order,
}
# Fit distribution in parallel
da_params = xr.apply_ufunc(
apply_normalized_gamma_gs,
# Variables varying over time
ds["Nw"],
ds["D50"],
ds["drop_number_concentration"],
ds["fall_velocity"],
# Other options
kwargs=kwargs,
# Settings
input_core_dims=[[], [], [DIAMETER_DIMENSION], [DIAMETER_DIMENSION]],
output_core_dims=[["parameters"]],
vectorize=True,
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"parameters": 3}}, # lengths of the new output_core_dims dimensions.
output_dtypes=["float64"],
)
# Add parameters coordinates
da_params = da_params.assign_coords({"parameters": ["Nw", "mu", "D50"]})
# Create parameters dataset
ds_params = da_params.to_dataset(dim="parameters")
# Add DSD model name to the attribute
ds_params.attrs["disdrodb_psd_model"] = "NormalizedGammaPSD"
return ds_params
####-----------------------------------------------------------------.
#### Methods of Moments (MOM)
# - M246 DEFAULT FOR GAMMA ?
# - LMOM (Johnson et al., 2014)
[docs]
def get_exponential_parameters_Zhang2008(moment_l, moment_m, l, m): # noqa: E741
"""Calculate Exponential DSD parameters using the method of moments (MOM).
The choice of moments is given in the parameters.
Parameters
----------
moment_l: float
First moment to use.
moment_l: float
Second moment to use.
l : float
Moment order.
m : float
Moment order,
References
----------
[1] Zhang, et. al., 2008, Diagnosing the Intercept Parameter for Exponential Raindrop Size
Distribution Based on Video Disdrometer Observations: Model Development. J. Appl.
Meteor. Climatol.,
https://doi.org/10.1175/2008JAMC1876.1
"""
num = moment_l * gamma(m + 1)
den = moment_m * gamma(l + 1)
Lambda = np.power(num / den, (1 / (m - l)))
N0 = moment_l * np.power(Lambda, l + 1) / gamma(l + 1)
return N0, Lambda
[docs]
def get_exponential_parameters_M34(moment_3, moment_4):
"""Compute exponential distribution parameters following Testud 2001.
References
----------
Testud, J., S. Oury, R. A. Black, P. Amayenc, and X. Dou, 2001:
The Concept of “Normalized” Distribution to Describe Raindrop Spectra:
A Tool for Cloud Physics and Cloud Remote Sensing.
J. Appl. Meteor. Climatol., 40, 1118-1140,
https://doi.org/10.1175/1520-0450(2001)040<1118:TCONDT>2.0.CO;2
"""
N0 = 256 / gamma(4) * moment_3**5 / moment_4**4
Dm = moment_4 / moment_3
Lambda = 4 / Dm
return N0, Lambda
[docs]
def get_gamma_parameters_M012(M0, M1, M2):
"""Compute gamma distribution parameters following Cao et al., 2009.
References
----------
Cao, Q., and G. Zhang, 2009:
Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra.
J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1.
"""
# TODO: really bad results. check formula !
G = M1**3 / M0 / M2
mu = 1 / (1 - G) - 2
Lambda = M0 / M1 * (mu + 1)
N0 = Lambda ** (mu + 1) * M0 / gamma(mu + 1)
return N0, mu, Lambda
[docs]
def get_gamma_parameters_M234(M2, M3, M4):
"""Compute gamma distribution parameters following Cao et al., 2009.
References
----------
Cao, Q., and G. Zhang, 2009:
Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra.
J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1.
"""
G = M3**2 / M2 / M4
mu = 1 / (1 - G) - 4
Lambda = M2 / M3 * (mu + 3)
N0 = Lambda ** (mu + 3) * M2 / gamma(mu + 3)
return N0, mu, Lambda
[docs]
def get_gamma_parameters_M246(M2, M4, M6):
"""Compute gamma distribution parameters following Ulbrich 1998.
References
----------
Ulbrich, C. W., and D. Atlas, 1998:
Rainfall Microphysics and Radar Properties: Analysis Methods for Drop Size Spectra.
J. Appl. Meteor. Climatol., 37, 912-923,
https://doi.org/10.1175/1520-0450(1998)037<0912:RMARPA>2.0.CO;2
Cao, Q., and G. Zhang, 2009:
Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra.
J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1.
Thurai, M., Williams, C.R., Bringi, V.N., 2014:
Examining the correlations between drop size distribution parameters using data
from two side-by-side 2D-video disdrometers.
Atmospheric Research, 144, 95-110, https://doi.org/10.1016/j.atmosres.2014.01.002.
"""
G = M4**2 / M2 / M6
# TODO: Different formulas !
# Thurai et al., 2014 (A4), Ulbrich et al., 1998 (2)
# mu = ((7.0 - 11.0 * G) -
# np.sqrt((7.0 - 11.0 * G) ** 2.0 - 4.0 * (G - 1.0) * (30.0 * G - 12.0)) / (2.0 * (G - 1.0)))
mu = (7.0 - 11.0 * G) - np.sqrt(G**2 + 89 * G + 1) / (2.0 * (G - 1.0))
# Cao et al., 2009 (B3)
# --> Wrong ???
mu = (7.0 - 11.0 * G) - np.sqrt(G**2 + 14 * G + 1) / (2.0 * (G - 1.0))
Lambda = np.sqrt((4 + mu) * (3 + mu) * M2 / M4)
# Cao et al., 2009
N0 = M2 * Lambda ** (3 + mu) / gamma(3 + mu)
# # Thurai et al., 2014
# N0 = M3 * Lambda ** (4 + mu) / gamma(4 + mu)
# # Ulbrich et al., 1998
# N0 = M6 * Lambda ** (7.0 + mu) / gamma(7 + mu)
return N0, mu, Lambda
[docs]
def get_gamma_parameters_M456(M4, M5, M6):
"""Compute gamma distribution parameters following Cao et al., 2009.
References
----------
Cao, Q., and G. Zhang, 2009:
Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra.
J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1.
"""
G = M5**2 / M4 / M6
mu = 1 / (1 - G) - 6
Lambda = M4 / M5 * (mu + 5)
N0 = Lambda ** (mu + 5) * M4 / gamma(mu + 5)
return N0, mu, Lambda
[docs]
def get_gamma_parameters_M346(M3, M4, M6):
"""Compute gamma distribution parameters following Kozu 1991.
References
----------
Kozu, T., and K. Nakamura, 1991:
Rainfall Parameter Estimation from Dual-Radar Measurements
Combining Reflectivity Profile and Path-integrated Attenuation.
J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2
Tokay, A., and D. A. Short, 1996:
Evidence from Tropical Raindrop Spectra of the Origin of Rain from
Stratiform versus Convective Clouds.
J. Appl. Meteor. Climatol., 35, 355-371,
https://doi.org/10.1175/1520-0450(1996)035<0355:EFTRSO>2.0.CO;2
Cao, Q., and G. Zhang, 2009:
Errors in Estimating Raindrop Size Distribution Parameters Employing Disdrometer and Simulated Raindrop Spectra.
J. Appl. Meteor. Climatol., 48, 406-425, https://doi.org/10.1175/2008JAMC2026.1.
"""
G = M4**3 / M3**2 / M6
# Kozu
mu = (5.5 * G - 4 + np.sqrt(G * (G * 0.25 + 2))) / (1 - G)
# Cao et al., 2009 (equivalent)
# mu = (11 * G - 8 + np.sqrt(G * (G + 8))) / (2 * (1 - G))
Lambda = (mu + 4) * M3 / M4
N0 = Lambda ** (mu + 4) * M3 / gamma(mu + 4)
return N0, mu, Lambda
[docs]
def get_lognormal_parameters_M346(M3, M4, M6):
"""Compute lognormal distribution parameters following Kozu1991.
References
----------
Kozu, T., and K. Nakamura, 1991:
Rainfall Parameter Estimation from Dual-Radar Measurements
Combining Reflectivity Profile and Path-integrated Attenuation.
J. Atmos. Oceanic Technol., 8, 259-270, https://doi.org/10.1175/1520-0426(1991)008<0259:RPEFDR>2.0.CO;2
"""
L3 = np.log(M3)
L4 = np.log(M4)
L6 = np.log(M6)
Nt = np.exp((24 * L3 - 27 * L4 - 6 * L6) / 3)
mu = (-10 * L3 + 13.5 * L4 - 3.5 * L6) / 3
sigma = (2 * L3 - 3 * L4 + L6) / 3
return Nt, mu, sigma
def _compute_moments(ds, moments):
list_moments = [
get_moment(
drop_number_concentration=ds["drop_number_concentration"],
diameter=ds["diameter_bin_center"] / 1000, # m
diameter_bin_width=ds["diameter_bin_width"], # mm
moment=int(moment.replace("M", "")),
)
for moment in moments
]
return list_moments
def _get_gamma_parameters_mom(ds: xr.Dataset, mom_method: str) -> xr.Dataset:
# Get the correct function and list of variables for the requested method
func, needed_moments = MOM_METHODS_DICT["GammaPSD"][mom_method]
# Compute required moments
arrs = _compute_moments(ds, moments=needed_moments)
# Apply the function. This will produce (mu, Lambda, N0) with the same coords/shapes as input data
N0, mu, Lambda = func(*arrs)
# Return a new Dataset containing the results
ds = xr.Dataset(
{
"N0": N0,
"mu": mu,
"Lambda": Lambda,
},
coords=ds.coords,
)
return ds
def _get_lognormal_parameters_mom(ds: xr.Dataset, mom_method: str) -> xr.Dataset:
# Get the correct function and list of variables for the requested method
func, needed_moments = MOM_METHODS_DICT["LognormalPSD"][mom_method]
# Compute required moments
arrs = _compute_moments(ds, moments=needed_moments)
# Apply the function. This will produce (mu, Lambda, N0) with the same coords/shapes as input data
Nt, mu, sigma = func(*arrs)
# Return a new Dataset containing the results
ds = xr.Dataset(
{
"Nt": Nt,
"mu": mu,
"sigma": sigma,
},
coords=ds.coords,
)
return ds
def _get_exponential_parameters_mom(ds: xr.Dataset, mom_method: str) -> xr.Dataset:
# Get the correct function and list of variables for the requested method
func, needed_moments = MOM_METHODS_DICT["ExponentialPSD"][mom_method]
# Compute required moments
arrs = _compute_moments(ds, moments=needed_moments)
# Apply the function. This will produce (mu, Lambda, N0) with the same coords/shapes as input data
N0, Lambda = func(*arrs)
# Return a new Dataset containing the results
ds = xr.Dataset(
{
"N0": N0,
"Lambda": Lambda,
},
coords=ds.coords,
)
return ds
####--------------------------------------------------------------------------------------.
#### Routines dictionary
####--------------------------------------------------------------------------------------.
ATTRS_PARAMS_DICT = {
"GammaPSD": {
"N0": {
"description": "Intercept parameter of the Gamma PSD",
"standard_name": "particle_size_distribution_intercept",
"units": "mm**(-1-mu) m-3",
"long_name": "GammaPSD intercept parameter",
},
"mu": {
"description": "Shape parameter of the Gamma PSD",
"standard_name": "particle_size_distribution_shape",
"units": "",
"long_name": "GammaPSD shape parameter",
},
"Lambda": {
"description": "Slope (rate) parameter of the Gamma PSD",
"standard_name": "particle_size_distribution_slope",
"units": "mm-1",
"long_name": "GammaPSD slope parameter",
},
},
"NormalizedGammaPSD": {
"Nw": {
"standard_name": "normalized_intercept_parameter",
"units": "mm-1 m-3",
"long_name": "NormalizedGammaPSD Normalized Intercept Parameter",
},
"mu": {
"description": "Dimensionless shape parameter controlling the curvature of the Normalized Gamma PSD",
"standard_name": "particle_size_distribution_shape",
"units": "",
"long_name": "NormalizedGammaPSD Shape Parameter ",
},
"D50": {
"standard_name": "median_volume_diameter",
"units": "mm",
"long_name": "NormalizedGammaPSD Median Volume Drop Diameter",
},
},
"LognormalPSD": {
"Nt": {
"standard_name": "number_concentration_of_rain_drops_in_air",
"units": "m-3",
"long_name": "Total Number Concentration",
},
"mu": {
"description": "Mean of the Lognormal PSD",
"units": "log(mm)",
"long_name": "Mean of the Lognormal PSD",
},
"sigma": {
"standard_name": "Standard Deviation of the Lognormal PSD",
"units": "",
"long_name": "Standard Deviation of the Lognormal PSD",
},
},
"ExponentialPSD": {
"N0": {
"description": "Intercept parameter of the Exponential PSD",
"standard_name": "particle_size_distribution_intercept",
"units": "mm-1 m-3",
"long_name": "ExponentialPSD intercept parameter",
},
"Lambda": {
"description": "Slope (rate) parameter of the Exponential PSD",
"standard_name": "particle_size_distribution_slope",
"units": "mm-1",
"long_name": "ExponentialPSD slope parameter",
},
},
}
MOM_METHODS_DICT = {
"GammaPSD": {
# "M012": (get_gamma_parameters_M012, ["M0", "M1", "M2"]),
"M234": (get_gamma_parameters_M234, ["M2", "M3", "M4"]),
"M246": (get_gamma_parameters_M246, ["M2", "M4", "M6"]),
"M456": (get_gamma_parameters_M456, ["M4", "M5", "M6"]),
"M346": (get_gamma_parameters_M346, ["M3", "M4", "M6"]),
},
"LognormalPSD": {
"M346": (get_lognormal_parameters_M346, ["M3", "M4", "M6"]),
},
"ExponentialPSD": {
"M234": (get_exponential_parameters_M34, ["M3", "M4"]),
},
}
OPTIMIZATION_ROUTINES_DICT = {
"MOM": {
"GammaPSD": _get_gamma_parameters_mom,
"LognormalPSD": _get_lognormal_parameters_mom,
"ExponentialPSD": _get_exponential_parameters_mom,
},
"GS": {
"GammaPSD": get_gamma_parameters_gs,
"NormalizedGammaPSD": get_normalized_gamma_parameters_gs,
"LognormalPSD": get_lognormal_parameters_gs,
"ExponentialPSD": get_exponential_parameters_gs,
},
"ML": {
"GammaPSD": get_gamma_parameters,
"LognormalPSD": get_lognormal_parameters,
"ExponentialPSD": get_exponential_parameters,
},
}
[docs]
def available_mom_methods(psd_model):
"""Implemented MOM methods for a given PSD model."""
if psd_model not in MOM_METHODS_DICT:
raise NotImplementedError(f"No MOM methods available for {psd_model}")
return list(MOM_METHODS_DICT[psd_model])
[docs]
def available_optimization(psd_model):
"""Implemented fitting methods for a given PSD model."""
return [opt for opt in list(OPTIMIZATION_ROUTINES_DICT) if psd_model in OPTIMIZATION_ROUTINES_DICT[opt]]
####--------------------------------------------------------------------------------------.
#### Argument checkers
[docs]
def check_psd_model(psd_model, optimization):
"""Check valid psd_model argument."""
valid_psd_models = list(OPTIMIZATION_ROUTINES_DICT[optimization])
if psd_model not in valid_psd_models:
msg = (
f"{optimization} optimization is not available for 'psd_model' {psd_model}. "
f"Accepted PSD models are {valid_psd_models}."
)
raise NotImplementedError(msg)
[docs]
def check_target(target):
"""Check valid target argument."""
valid_targets = ["ND", "R", "Z", "LWC"]
if target not in valid_targets:
raise ValueError(f"Invalid 'target' {target}. Valid targets are {valid_targets}.")
return target
[docs]
def check_likelihood(likelihood):
"""Check valid likelihood argument."""
valid_likelihood = ["multinomial", "poisson"]
if likelihood not in valid_likelihood:
raise ValueError(f"Invalid 'likelihood' {likelihood}. Valid values are {valid_likelihood}.")
return likelihood
[docs]
def check_truncated_likelihood(truncated_likelihood):
"""Check valid truncated_likelihood argument."""
if not isinstance(truncated_likelihood, bool):
raise TypeError(f"Invalid 'truncated_likelihood' argument {truncated_likelihood}. Must be True or False.")
return truncated_likelihood
[docs]
def check_probability_method(probability_method):
"""Check valid probability_method argument."""
# Check valid probability_method
valid_probability_method = ["cdf", "pdf"]
if probability_method not in valid_probability_method:
raise ValueError(
f"Invalid 'probability_method' {probability_method}. Valid values are {valid_probability_method}.",
)
return probability_method
[docs]
def check_optimizer(optimizer):
"""Check valid optimizer argument."""
# Check valid probability_method
valid_optimizer = ["Nelder-Mead", "Powell", "L-BFGS-B"]
if optimizer not in valid_optimizer:
raise ValueError(
f"Invalid 'optimizer' {optimizer}. Valid values are {valid_optimizer}.",
)
return optimizer
[docs]
def check_mom_methods(mom_methods, psd_model, allow_none=False):
"""Check valid mom_methods arguments."""
if isinstance(mom_methods, (str, type(None))):
mom_methods = [mom_methods]
mom_methods = [str(v) for v in mom_methods] # None --> 'None'
valid_mom_methods = available_mom_methods(psd_model)
if allow_none:
valid_mom_methods = [*valid_mom_methods, "None"]
invalid_mom_methods = np.array(mom_methods)[np.isin(mom_methods, valid_mom_methods, invert=True)]
if len(invalid_mom_methods) > 0:
raise ValueError(
f"Unknown mom_methods '{invalid_mom_methods}' for {psd_model}. Choose from {valid_mom_methods}.",
)
return mom_methods
[docs]
def check_optimization(optimization):
"""Check valid optimization argument."""
valid_optimization = list(OPTIMIZATION_ROUTINES_DICT)
if optimization not in valid_optimization:
raise ValueError(
f"Invalid 'optimization' {optimization}. Valid procedure are {valid_optimization}.",
)
return optimization
[docs]
def check_optimization_kwargs(optimization_kwargs, optimization, psd_model):
"""Check valid optimization_kwargs."""
dict_arguments = {
"ML": {
"init_method": None,
"probability_method": check_probability_method,
"likelihood": check_likelihood,
"truncated_likelihood": check_truncated_likelihood,
"optimizer": check_optimizer,
},
"GS": {
"target": check_target,
"transformation": check_transformation,
"error_order": None,
},
"MOM": {
"mom_methods": None,
},
}
optimization = check_optimization(optimization)
check_psd_model(psd_model=psd_model, optimization=optimization)
# Retrieve the expected arguments for the given optimization method
expected_arguments = dict_arguments.get(optimization, {})
# Check for missing arguments in optimization_kwargs
# missing_args = [arg for arg in expected_arguments if arg not in optimization_kwargs]
# if missing_args:
# raise ValueError(f"Missing required arguments for {optimization} optimization: {missing_args}")
# Validate arguments values
_ = [
check(optimization_kwargs[arg])
for arg, check in expected_arguments.items()
if callable(check) and arg in optimization_kwargs
]
# Further special checks
if optimization == "MOM" and "mom_methods" in optimization_kwargs:
_ = check_mom_methods(mom_methods=optimization_kwargs["mom_methods"], psd_model=psd_model)
if optimization == "ML" and optimization_kwargs.get("init_method", None) is not None:
_ = check_mom_methods(mom_methods=optimization_kwargs["init_method"], psd_model=psd_model, allow_none=True)
####--------------------------------------------------------------------------------------.
#### Wrappers for fitting
def _finalize_attributes(ds_params, psd_model, optimization, optimization_kwargs):
ds_params.attrs["disdrodb_psd_model"] = psd_model
ds_params.attrs["disdrodb_psd_optimization"] = optimization
ds_params.attrs["disdrodb_psd_optimization_kwargs"] = ", ".join(
[f"{k}: {v}" for k, v in optimization_kwargs.items()],
)
return ds_params
[docs]
def get_mom_parameters(ds: xr.Dataset, psd_model: str, mom_methods=None) -> xr.Dataset:
"""
Compute PSD model parameters using various method-of-moments (MOM) approaches.
The method is specified by the `mom_methods` abbreviations, e.g. 'M012', 'M234', 'M246'.
Parameters
----------
ds : xarray.Dataset
An xarray Dataset with the required moments M0...M6 as data variables.
mom_methods: str or list (optional)
See valid values with disdrodb.psd.available_mom_methods(psd_model)
If None (the default), compute model parameters with all available MOM methods.
Returns
-------
xarray.Dataset
A Dataset containing mu, Lambda, and N0 variables.
If multiple mom_methods are specified, the dataset has the dimension mom_method.
"""
# Check inputs
check_psd_model(psd_model=psd_model, optimization="MOM")
if mom_methods is None:
mom_methods = available_mom_methods(psd_model)
mom_methods = check_mom_methods(mom_methods, psd_model=psd_model)
# Retrieve function
func = OPTIMIZATION_ROUTINES_DICT["MOM"][psd_model]
# Compute parameters
if len(mom_methods) == 1:
ds_params = func(ds=ds, mom_method=mom_methods[0])
else:
list_ds = [func(ds=ds, mom_method=mom_method) for mom_method in mom_methods]
ds_params = xr.concat(list_ds, dim="mom_method")
ds_params = ds_params.assign_coords({"mom_method": mom_methods})
# Add model attributes
optimization_kwargs = {"mom_methods": mom_methods}
ds_params = _finalize_attributes(
ds_params=ds_params,
psd_model=psd_model,
optimization="MOM",
optimization_kwargs=optimization_kwargs,
)
return ds_params
[docs]
def get_ml_parameters(
ds,
psd_model,
init_method=None,
probability_method="cdf",
likelihood="multinomial",
truncated_likelihood=True,
optimizer="Nelder-Mead",
):
"""
Estimate model parameters for a given distribution using Maximum Likelihood.
Parameters
----------
ds : xarray.Dataset
Input dataset containing drop number concentration data and diameter information.
It must include the following variables:
- ``drop_number_concentration``: The number concentration of drops.
- ``diameter_bin_width``": The width of each diameter bin.
- ``diameter_bin_lower``: The lower bounds of the diameter bins.
- ``diameter_bin_upper``: The upper bounds of the diameter bins.
- ``diameter_bin_center``: The center values of the diameter bins.
psd_model : str
The PSD model to fit. See ``available_psd_models()``.
init_method: str or list
The method(s) of moments used to initialize the PSD model parameters.
Multiple methods can be specified. See ``available_mom_methods(psd_model)``.
probability_method : str, optional
Method to compute probabilities. The default value is ``cdf``.
likelihood : str, optional
Likelihood function to use for fitting. The default value is ``multinomial``.
truncated_likelihood : bool, optional
Whether to use Truncated Maximum Likelihood (TML). The default value is ``True``.
optimizer : str, optional
Optimization method to use. The default value is ``Nelder-Mead``.
Returns
-------
xarray.Dataset
The dataset containing the estimated parameters.
"""
# -----------------------------------------------------------------------------.
# Check arguments
check_psd_model(psd_model, optimization="ML")
likelihood = check_likelihood(likelihood)
probability_method = check_probability_method(probability_method)
optimizer = check_optimizer(optimizer)
# Check valid init_method
init_method = check_mom_methods(mom_methods=init_method, psd_model=psd_model, allow_none=True)
# Retrieve estimation function
func = OPTIMIZATION_ROUTINES_DICT["ML"][psd_model]
# Compute parameters
if init_method is None or len(init_method) == 1:
ds_params = func(
ds=ds,
init_method=init_method[0],
probability_method=probability_method,
likelihood=likelihood,
truncated_likelihood=truncated_likelihood,
optimizer=optimizer,
)
else:
list_ds = [
func(
ds=ds,
init_method=method,
probability_method=probability_method,
likelihood=likelihood,
truncated_likelihood=truncated_likelihood,
optimizer=optimizer,
)
for method in init_method
]
ds_params = xr.concat(list_ds, dim="init_method")
ds_params = ds_params.assign_coords({"init_method": init_method})
# Add model attributes
optimization_kwargs = {
"init_method": init_method,
"probability_method": "probability_method",
"likelihood": likelihood,
"truncated_likelihood": truncated_likelihood,
"optimizer": optimizer,
}
ds_params = _finalize_attributes(
ds_params=ds_params,
psd_model=psd_model,
optimization="ML",
optimization_kwargs=optimization_kwargs,
)
# Return dataset with parameters
return ds_params
[docs]
def get_gs_parameters(ds, psd_model, target="ND", transformation="log", error_order=1):
"""Retrieve PSD model parameters using Grid Search."""
# Check valid psd_model
check_psd_model(psd_model, optimization="GS")
# Check valid target
target = check_target(target)
# Check valid transformation
transformation = check_transformation(transformation)
# Retrieve estimation function
func = OPTIMIZATION_ROUTINES_DICT["GS"][psd_model]
# Estimate parameters
ds_params = func(ds, target=target, transformation=transformation, error_order=error_order)
# Add model attributes
optimization_kwargs = {
"target": target,
"transformation": transformation,
"error_order": error_order,
}
ds_params = _finalize_attributes(
ds_params=ds_params,
psd_model=psd_model,
optimization="GS",
optimization_kwargs=optimization_kwargs,
)
# Return dataset with parameters
return ds_params
[docs]
def estimate_model_parameters(
ds,
psd_model,
optimization,
optimization_kwargs=None,
):
"""Routine to estimate PSD model parameters."""
optimization_kwargs = {} if optimization_kwargs is None else optimization_kwargs
optimization = check_optimization(optimization)
check_optimization_kwargs(optimization_kwargs=optimization_kwargs, optimization=optimization, psd_model=psd_model)
# Define function
dict_func = {
"ML": get_ml_parameters,
"MOM": get_mom_parameters,
"GS": get_gs_parameters,
}
func = dict_func[optimization]
# Retrieve parameters
ds_params = func(ds, psd_model=psd_model, **optimization_kwargs)
# Add parameters attributes (and units)
for var, attrs in ATTRS_PARAMS_DICT[psd_model].items():
ds_params[var].attrs = attrs
return ds_params