# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities to create summary statistics."""
import gc
import importlib
import os
import shutil
import subprocess
import tempfile
import warnings
import matplotlib.lines as mlines
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
from matplotlib.colors import ListedColormap, LogNorm, Normalize
from matplotlib.gridspec import GridSpec
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
from scipy.optimize import curve_fit
import disdrodb
from disdrodb.api.path import define_station_dir
from disdrodb.constants import DIAMETER_DIMENSION, VELOCITY_DIMENSION
from disdrodb.l2.empirical_dsd import bringi_nw_dm_classification, get_drop_average_velocity
from disdrodb.scattering import RADAR_OPTIONS
from disdrodb.utils.dataframe import compute_2d_histogram, log_arange
from disdrodb.utils.event import group_timesteps_into_event
from disdrodb.utils.manipulations import (
get_diameter_bin_edges,
resample_drop_number_concentration,
unstack_radar_variables,
)
from disdrodb.utils.time import get_sampling_information
from disdrodb.utils.warnings import suppress_warnings
from disdrodb.utils.yaml import write_yaml
from disdrodb.viz.plots import (
compute_dense_lines,
max_blend_images,
plot_raw_and_filtered_spectra,
plot_spectrum,
to_rgba,
)
####-----------------------------------------------------------------
#### PDF Latex Utilities
[docs]
def is_latex_engine_available() -> bool:
"""
Determine whether the Tectonic TeX/LaTeX engine is installed and accessible.
Returns
-------
bool
True if tectonic is found, False otherwise.
"""
return shutil.which("tectonic") is not None
[docs]
def save_table_to_pdf(
df: pd.DataFrame,
filepath: str,
index=True,
caption=None,
fontsize: str = r"\tiny",
orientation: str = "landscape",
) -> None:
r"""Render a pandas DataFrame as a well-formatted table in PDF via LaTeX.
Parameters
----------
df : pandas.DataFrame
The data to render.
filepath : str
File path where write the final PDF (e.g. '<...>/table.pdf').
caption : str, optional
LaTeX caption for the table environment.
fontsize : str, optional
LaTeX font-size command to wrap the table (e.g. '\\small').
The default is '\\tiny'.
orientation : str
Page orientation. Allowed values are 'portrait' and 'landscape'.
If 'landscape', the table will be laid out horizontally.
The default is 'landscape'.
"""
from disdrodb.utils.cli import subprocess_run
# Export table to LaTeX
table_tex = df.to_latex(
index=index,
longtable=True,
caption=caption,
label=None,
escape=False,
)
# Define LaTeX document
opts = "a4paper"
doc = [
f"\\documentclass[{opts}]{{article}}",
"\\usepackage[margin=0.1in]{geometry}",
# Reduce column separation
"\\setlength{\\tabcolsep}{3pt}",
"\\usepackage{booktabs}",
"\\usepackage{longtable}",
"\\usepackage{caption}",
"\\captionsetup[longtable]{font=tiny}",
"\\usepackage{pdflscape}",
"\\begin{document}",
# Remove page numbers
"\\pagestyle{empty}",
]
if orientation.lower() == "landscape":
doc.append("\\begin{landscape}")
doc.append(f"{{{fontsize}\n{table_tex}\n}}")
if orientation.lower() == "landscape":
doc.append("\\end{landscape}")
doc.append("\\end{document}")
document = "\n".join(doc)
# Compile with pdflatex in a temp dir
with tempfile.TemporaryDirectory() as td:
tex_path = os.path.join(td, "table.tex")
with open(tex_path, "w", encoding="utf-8") as f:
f.write(document)
for _ in range(2):
subprocess_run(
[
"tectonic",
"--outdir",
td,
tex_path,
],
cwd=td,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
# Move result
shutil.move(os.path.join(td, "table.pdf"), filepath)
####-----------------------------------------------------------------
#### Tables summaries
[docs]
def create_table_rain_summary(df, temporal_resolution):
"""Create rainy table summary."""
# Initialize dictionary
table = {}
# Retrieve accumulation interval
accumulation_interval, _ = get_sampling_information(temporal_resolution)
accumulation_interval_minutes = accumulation_interval / 60
# Keep rows with R > 0
df = df[df["R"] > 0]
# Number of years, months, days, minutes
if df.index.name == "time":
df = df.reset_index()
time = np.sort(np.asanyarray(df["time"]))
# Define start_time and end_time
start_time = pd.Timestamp(time[0])
end_time = pd.Timestamp(time[-1])
# Define years and years-month coverage
start_year = start_time.year
start_month = start_time.month_name()
end_year = end_time.year
end_month = end_time.month_name()
if start_year == end_year:
years_coverage = f"{start_year}"
years_month_coverage = f"{start_month[0:3]}-{end_month[0:3]} {start_year}"
else:
years_coverage = f"{start_year} - {end_year}"
years_month_coverage = f"{start_month[0:3]} {start_year} - {end_month[0:3]} {end_year}"
table["years_coverage"] = years_coverage
table["years_month_coverage"] = years_month_coverage
# Rainy minutes statistics
table["n_rainy_minutes"] = len(df["R"]) * accumulation_interval_minutes
table["n_rainy_minutes_<0.1"] = (
df["R"].between(0, 0.1, inclusive="right").sum().item() * accumulation_interval_minutes
)
table["n_rainy_minutes_0.1_1"] = (
df["R"].between(0.1, 1, inclusive="right").sum().item() * accumulation_interval_minutes
)
table["n_rainy_minutes_1_10"] = (
df["R"].between(1, 10, inclusive="right").sum().item() * accumulation_interval_minutes
)
table["n_rainy_minutes_10_25"] = (
df["R"].between(10, 25, inclusive="right").sum().item() * accumulation_interval_minutes
)
table["n_rainy_minutes_25_50"] = (
df["R"].between(25, 50, inclusive="right").sum().item() * accumulation_interval_minutes
)
table["n_rainy_minutes_50_100"] = (
df["R"].between(50, 100, inclusive="right").sum().item() * accumulation_interval_minutes
)
table["n_rainy_minutes_100_200"] = (
df["R"].between(100, 200, inclusive="right").sum().item() * accumulation_interval_minutes
)
table["n_rainy_minutes_>200"] = np.sum(df["R"] > 200).item() * accumulation_interval_minutes
# Minutes with larger Dmax
table["n_minutes_Dmax_>7"] = np.sum(df["Dmax"] > 7).item() * accumulation_interval_minutes
table["n_minutes_Dmax_>8"] = np.sum(df["Dmax"] > 8).item() * accumulation_interval_minutes
table["n_minutes_Dmax_>9"] = np.sum(df["Dmax"] > 9).item() * accumulation_interval_minutes
# Convert all minutes columns to integer
for key in table:
if "minutes" in key:
table[key] = int(table[key])
return table
[docs]
def create_table_dsd_summary(df):
"""Create table with integral DSD parameters statistics."""
# Define additional variables
df["log10(Nw)"] = np.log10(df["Nw"])
df["log10(Nt)"] = np.log10(df["Nt"])
# List of variables to summarize
variables = ["LWC", "R", "Z", "D50", "Dm", "sigma_m", "Dmax", "Nw", "log10(Nw)", "Nt", "log10(Nt)"]
# Define subset dataframe
df_subset = df[variables]
# Prepare summary DataFrame
stats_cols = [
"MIN",
"Q1",
"Q5",
"Q10",
"Q50",
"Q90",
"Q95",
"Q99",
"MAX",
"MEAN",
"STD",
"MAD",
"SKEWNESS",
"KURTOSIS",
]
df_stats = pd.DataFrame(index=variables, columns=stats_cols)
# Compute DSD integral parameters statistics
df_stats["MIN"] = df_subset.min()
df_stats["Q1"] = df_subset.quantile(0.01)
df_stats["Q5"] = df_subset.quantile(0.05)
df_stats["Q10"] = df_subset.quantile(0.10)
df_stats["Q50"] = df_subset.median()
df_stats["Q90"] = df_subset.quantile(0.90)
df_stats["Q95"] = df_subset.quantile(0.95)
df_stats["Q99"] = df_subset.quantile(0.99)
df_stats["MAX"] = df_subset.max()
df_stats["MEAN"] = df_subset.mean()
df_stats["STD"] = df_subset.std()
df_stats["MAD"] = df_subset.apply(lambda x: np.mean(np.abs(x - x.mean())))
df_stats["SKEWNESS"] = df_subset.skew()
df_stats["KURTOSIS"] = df_subset.kurt()
# Round float columns to nearest integer, leave ints unchanged
float_cols = df_stats.select_dtypes(include=["float"]).columns
df_stats[float_cols] = df_stats[float_cols].astype(float).round(decimals=2)
return df_stats
[docs]
def create_table_events_summary(df, temporal_resolution):
"""Create table with events statistics."""
# Retrieve accumulation interval
accumulation_interval, _ = get_sampling_information(temporal_resolution)
accumulation_interval_minutes = accumulation_interval / 60
# Define event settings
# - Events are separated by 1 hour or more rain-free periods in rain rate time series.
# - The events that are less than 'min_duration' minutes or the rain total is less than 0.1 mm
# are not reported.
if accumulation_interval_minutes >= 5 * 60:
neighbor_time_interval = temporal_resolution
event_min_duration = temporal_resolution
neighbor_min_size = 1
else:
neighbor_time_interval = "5MIN"
event_min_duration = "5MIN"
neighbor_min_size = 2
event_settings = {
"neighbor_min_size": neighbor_min_size,
"neighbor_time_interval": neighbor_time_interval,
"event_max_time_gap": "1H",
"event_min_duration": event_min_duration,
"event_min_size": 3,
}
# Keep rows with R > 0
df = df[df["R"] > 0]
# Number of years, months, days, minutes
if df.index.name == "time":
df = df.reset_index()
timesteps = np.sort(np.asanyarray(df["time"]))
# Define event list
event_list = group_timesteps_into_event(
timesteps=timesteps,
neighbor_min_size=event_settings["neighbor_min_size"],
neighbor_time_interval=event_settings["neighbor_time_interval"],
event_max_time_gap=event_settings["event_max_time_gap"],
event_min_duration=event_settings["event_min_duration"],
event_min_size=event_settings["event_min_size"],
)
# Create dataframe with statistics for each event
events_stats = []
rain_thresholds = [0.1, 1, 5, 10, 20, 50, 100]
for event in event_list:
# Retrieve event start_time and end_time
start, end = event["start_time"], event["end_time"]
# Retrieve event dataframe
df_event = df[(df["time"] >= start) & (df["time"] <= end)]
# Initialize event record
event_stats = {
# Event time info
"start_time": start,
"end_time": end,
"duration": int((end - start) / np.timedelta64(1, "m")) + accumulation_interval_minutes,
# Rainy minutes above thresholds
**{
f"rainy_minutes_>{thr}": int((df_event["R"] > thr).sum()) * accumulation_interval_minutes
for thr in rain_thresholds
},
# Total precipitation (mm)
"P_total": df_event["P"].sum(),
# R statistics
"mean_R": df_event["R"].mean(),
"median_R": df_event["R"].median(),
"max_R": df_event["R"].max(),
# DSD statistics
"max_Dmax": df_event["Dmax"].max(),
"mean_Dm": df_event["Dm"].mean(),
"median_Dm": df_event["Dm"].median(),
"max_Dm": df_event["Dm"].max(),
"mean_sigma_m": df_event["sigma_m"].mean(),
"median_sigma_m": df_event["sigma_m"].median(),
"max_sigma_m": df_event["sigma_m"].max(),
"mean_LWC": df_event["LWC"].mean(),
"median_LWC": df_event["LWC"].median(),
"max_LWC": df_event["LWC"].max(),
"max_Z": df_event["Z"].max(),
"mean_Nbins": int(df_event["Nbins"].mean()),
"max_Nbins": int(df_event["Nbins"].max()),
# TODO in future:
# - rain_detected = True
# - snow_detected = True
# - hail_detected = True
}
events_stats.append(event_stats)
df_events = pd.DataFrame.from_records(events_stats)
# Round and cast columns
# - Round to 3 decimals
df_events["P_total"] = df_events["P_total"].round(decimals=3)
lwc_columns = [c for c in df_events.columns if "LWC" in c]
df_events[lwc_columns] = df_events[lwc_columns].round(decimals=3)
# - Cast to integer
df_events["duration"] = df_events["duration"].astype(int)
minutes_columns = [c for c in df_events.columns if "minutes" in c]
df_events[minutes_columns] = df_events[minutes_columns].astype(int)
# - Round to 2 decimals
for var in ["Dmax", "Dm", "sigma_m", "R", "Z"]:
names = [c for c in df_events.columns if var in c]
df_events[names] = df_events[names].round(decimals=2)
return df_events
[docs]
def prepare_latex_table_dsd_summary(df):
"""Prepare a DataFrame with DSD statistics for LaTeX table output."""
df = df.copy()
# Cast numeric columns to string
numeric_cols = df.select_dtypes(include=["float", "int"]).columns
df[numeric_cols] = df[numeric_cols].astype(str)
# Rename
rename_dict = {
"LWC": r"$LWC\,[\mathrm{g}\,\mathrm{m}^{-3}]$", # [g/m3]
"R": r"$R\,[\mathrm{mm}\,\mathrm{h}^{-1}]$", # [mm/hr]
"Z": r"$Z\,[\mathrm{dBZ}]$", # [dBZ]
"D50": r"$D_{50}\,[\mathrm{mm}]$", # [mm]
"Dm": r"$D_{m}\,[\mathrm{mm}]$", # [mm]
"sigma_m": r"$\sigma_{m}\,[\mathrm{mm}]$", # [mm]
"Dmax": r"$D_{\max}\,[\mathrm{mm}]$", # [mm]
"Nw": r"$N_{w}\,[\mathrm{mm}^{-1}\,\mathrm{m}^{-3}]$", # [mm$^{-1}$ m$^{-3}$]
"log10(Nw)": r"$\log_{10}(N_{w})$", # [$\log_{10}$(mm$^{-1}$ m$^{-3}$)]
"Nt": r"$N_{t}\,[\mathrm{m}^{-3}]$", # [m$^{-3}$]
"log10(Nt)": r"$\log_{10}(N_{t})$", # [log10(m$^{-3}$)]
}
df = df.rename(index=rename_dict)
return df
[docs]
def prepare_latex_table_events_summary(df):
"""Prepare a DataFrame with events statistics for LaTeX table output."""
df = df.copy()
# Round datetime to minutes
df["start_time"] = df["start_time"].dt.strftime("%Y-%m-%d %H:%M")
df["end_time"] = df["end_time"].dt.strftime("%Y-%m-%d %H:%M")
# Cast numeric columns to string
numeric_cols = df.select_dtypes(include=["float", "int"]).columns
df[numeric_cols] = df[numeric_cols].astype(str)
# Rename
rename_dict = {
"start_time": r"Start",
"end_time": r"End",
"duration": r"Min.",
"rainy_minutes_>0.1": r"R>0.1",
"rainy_minutes_>1": r"R>1",
"rainy_minutes_>5": r"R>5",
# 'rainy_minutes_>10': r'R>10',
# 'rainy_minutes_>20': r'R>20',
"rainy_minutes_>50": r"R>50",
# 'rainy_minutes_>100': r'R>100',
"P_total": r"$P_{\mathrm{tot}} [mm]$",
"mean_R": r"$R_{\mathrm{mean}}$",
"median_R": r"$R_{\mathrm{median}}$",
"max_R": r"$R_{\max}$",
"max_Dmax": r"$D_{\max}$",
"mean_Dm": r"$D_{m,\mathrm{mean}}$",
"median_Dm": r"$D_{m,\mathrm{median}}$",
"max_Dm": r"$D_{m,\max}$",
"mean_sigma_m": r"$\sigma_{m,\mathrm{mean}}$",
"median_sigma_m": r"$\sigma_{m,\mathrm{median}}$",
"max_sigma_m": r"$\sigma_{m,\max}$",
"mean_LWC": r"$LWC_{\mathrm{mean}}$",
"median_LWC": r"$LWC_{\mathrm{median}}$",
"max_LWC": r"$LWC_{\max}$",
"max_Z": r"$Z_{\max}$",
"mean_Nbins": r"$N_{\mathrm{bins},\mathrm{mean}}$",
"max_Nbins": r"$N_{\mathrm{bins},\max}$",
}
df = df[list(rename_dict)]
df = df.rename(columns=rename_dict)
return df
[docs]
def create_table_l1_event_summary(
ds,
variable="n_particles",
threshold=4,
neighbor_min_size=2,
neighbor_time_interval="5MIN",
event_max_time_gap="1H",
event_min_duration="20MIN",
event_min_size=5,
sortby="duration",
sortby_order="decreasing",
):
"""Return a table with the hydrometeor statistics for each event."""
# Load in memory needed variables
ds["precipitation_type"] = ds["precipitation_type"].compute()
ds["hydrometeor_type"] = ds["hydrometeor_type"].compute()
ds["n_particles"] = ds["n_particles"].compute()
ds["flag_hail"] = ds["flag_hail"].compute()
# Compute stats
event_stats = []
for ds_event in ds.disdrodb.split_into_events(
variable=variable,
threshold=threshold,
neighbor_min_size=neighbor_min_size,
neighbor_time_interval=neighbor_time_interval,
event_max_time_gap=event_max_time_gap,
event_min_duration=event_min_duration,
event_min_size=event_min_size,
sortby=sortby,
sortby_order=sortby_order,
):
fraction_precip = np.sum(ds_event["precipitation_type"] >= 0) / len(ds_event["time"])
n_rainy = np.sum(ds_event["precipitation_type"] == 0)
n_snow = np.sum(ds_event["precipitation_type"] == 1)
n_mixed = np.sum(ds_event["precipitation_type"] == 2)
n_precip = np.sum(ds_event["precipitation_type"] >= 0)
n_undefined = np.sum(ds_event["precipitation_type"] == -2)
n_graupel = np.sum(ds_event["hydrometeor_type"] == 8)
n_small_hail = np.sum(ds_event["flag_hail"] == 1)
n_large_hail = np.sum(ds_event["flag_hail"] == 2)
fraction_rain = n_rainy / n_precip
fraction_snow = n_snow / n_precip
fraction_mixed = n_mixed / n_precip
fraction_undefined = n_undefined / n_precip
start_time = str(ds_event.disdrodb.start_time)
end_time = str(ds_event.disdrodb.end_time)
total_particles = np.sum(ds_event["n_particles"])
duration_hours = ds_event["duration"].astype(int) / 60 / 60
event_stats.append(
{
"start_time": start_time,
"end_time": end_time,
"duration_hours": duration_hours,
"total_particles": total_particles,
"n_rainy": n_rainy.item(),
"n_snow": n_snow.item(),
"n_mixed": n_mixed.item(),
"n_precip": n_precip.item(),
"n_undefined": n_undefined.item(),
"n_small_hail": n_small_hail.item(),
"n_large_hail": n_large_hail.item(),
"n_graupel": n_graupel.item(),
"fraction_precip": fraction_precip.item(),
"fraction_rain": fraction_rain.item(),
"fraction_snow": fraction_snow.item(),
"fraction_mixed": fraction_mixed.item(),
"fraction_undefined": fraction_undefined.item(),
},
)
df_events = pd.DataFrame(event_stats).reset_index(drop=True)
return df_events
####-------------------------------------------------------------------
#### Powerlaw routines
[docs]
def apply_2d_outlier_removal(df_data, xbins, ybins, min_counts):
"""Remove observations that fall in 2D histogram bins having less than min_counts."""
df_data["x_bin"] = pd.cut(df_data["x"], bins=xbins, right=False)
df_data["y_bin"] = pd.cut(df_data["y"], bins=ybins, right=False)
# Remove values outside bins
df_data = df_data[(df_data["x_bin"].cat.codes != -1) & (df_data["y_bin"].cat.codes != -1)]
# Count per 2D bin
counts_2d = df_data.groupby(["x_bin", "y_bin"], observed=True).size().reset_index(name="n")
# Keep only dense cells
dense_cells = counts_2d[counts_2d["n"] >= min_counts]
# Merge back to original data
df_data = df_data.merge(
dense_cells[["x_bin", "y_bin"]],
on=["x_bin", "y_bin"],
how="inner",
)
return df_data
[docs]
def fit_powerlaw(x, y, xbins, ybins, quantile=0.5, min_counts=10, x_in_db=False, robust=True, quadratic=False):
r"""Fit a power-law relationship to binned quantile values.
This function bins ``x`` into intervals defined by ``xbins``, computes a
specified quantile of ``y`` in each bin (median by default, robust to outliers),
and fits a power-law model using either the RANSAC algorithm (if robust=True)
or weighted nonlinear least squares. Optionally, ``x`` can be converted from
decibel units to linear scale automatically before fitting.
The fitted model depends on the ``quadratic`` parameter:
- If ``quadratic=False`` (default), fit a classical power law:
.. math::
y = a x^b
- If ``quadratic=True``, fit a log-quadratic power law:
.. math::
y = a x^b 10^{c (\log_{10} x)^2}
Parameters
----------
x : array-like
Independent variable values. Must be positive and finite after filtering.
y : array-like
Dependent variable values. Must be positive and finite after filtering.
xbins : array-like
Bin edges for grouping ``x`` values (passed to ``pandas.cut``).
ybins : array-like, optional
Bin edges for 2D histogram outlier removal. If provided, removes observations
in 2D bins with fewer than ``min_counts`` samples before fitting.
quantile : float, optional
Quantile of ``y`` to compute in each bin (between 0 and 1).
For example: 0.5 = median, 0.25 = lower quartile, 0.75 = upper quartile.
Default is 0.5 (median).
min_counts : int, optional
Minimum number of observations required in each bin for it to be included
in the fit. Default is 10.
x_in_db : bool, optional
If True, converts ``x`` values from decibels (dB) to linear scale using
:func:`disdrodb.idecibel`. Default is False.
robust : bool, optional
Whether to fit the power law using the Random Sample Consensus (RANSAC)
algorithm (robust=True) or using weighted nonlinear least squares solved with
the Levenberg-Marquardt algorithm (robust=False).
Default is True.
To fit with RANSAC, scikit-learn must be installed.
quadratic : bool, optional
If False (default), fit a classical power law :math:`y = a x^b` in log-log space.
If True, fit a log-quadratic power law :math:`y = a x^b 10^{c (\log_{10} x)^2}`.
Default is False.
Returns
-------
params : tuple of float
Estimated parameters ``(a, b)`` of the power-law relationship.
If quadratic=True, returns ``(a, b, c)`` for the log-quadratic power law.
params_std : tuple of float
One standard deviation uncertainties ``(a_std, b_std)`` estimated from
the covariance matrix of the fit.
If quadratic=True, returns ``(a_std, b_std, c_std)``.
Parameters standard deviation is currently not available when fitting
with the RANSAC algorithm (returns None values).
Notes
-----
- This implementation uses quantile statistics within bins, which reduces
the influence of outliers.
- Both ``x`` and ``y`` are filtered to retain only positive, finite values
before binning.
- Fitting is performed on the bin centers (midpoints between bin edges).
- The median absolute deviation (MAD) is used to estimate uncertainties
for weighted nonlinear least squares fitting.
See Also
--------
predict_from_powerlaw : Predict values from the fitted power-law parameters.
predict_from_logquadratic_powerlaw : Predict values from the log-quadratic power law.
inverse_powerlaw_parameters : Compute parameters of the inverse power law.
Examples
--------
>>> import numpy as np
>>> x = np.linspace(1, 50, 200)
>>> y = 2 * x**1.5 + np.random.normal(0, 5, size=x.size)
>>> xbins = np.arange(0, 60, 5)
>>> (a, b), (a_std, b_std) = fit_powerlaw(x, y, xbins, ybins=None)
"""
# Set min_counts to 0 during pytest execution in order to test the summary routine
if os.environ.get("PYTEST_CURRENT_TEST"):
min_counts = 0
# Check if RANSAC algorithm is available
sklearn_available = importlib.util.find_spec("sklearn") is not None
if robust and not sklearn_available:
robust = False
# Ensure numpy array
x = np.asanyarray(x)
y = np.asanyarray(y)
# Ensure values > 0 and finite
valid_values = (x > 0) & (y > 0) & np.isfinite(x) & np.isfinite(y)
x = x[valid_values]
y = y[valid_values]
# Define dataframe
df_data = pd.DataFrame({"x": x, "y": y})
# Outlier removal in 2D histogram
if ybins is not None:
df_data = apply_2d_outlier_removal(df_data, xbins=xbins, ybins=ybins, min_counts=min_counts)
# Alternative code
# from disdrodb.utils.dataframe import compute_1d_histogram
# df_agg = compute_1d_histogram(
# df=df_data,
# column="x",
# variables="y",
# bins=xbins,
# prefix_name=True,
# include_quantiles=False
# )
# df_agg["count"] # instead of N
# - Keep only data within bin range
df_data = df_data[(df_data["x"] >= xbins[0]) & (df_data["x"] < xbins[-1])]
# - Define bins
df_data["x_bins"] = pd.cut(df_data["x"], bins=xbins, right=False)
# - Remove data outside specified bins
df_data = df_data[df_data["x_bins"].cat.codes != -1]
# Derive median y values at various x points
# - This typically remove outliers
df_agg = df_data.groupby(by="x_bins", observed=True)["y"].agg(
y=lambda s: s.quantile(quantile),
n="count",
mad=lambda s: (s - s.median()).abs().median(),
)
df_agg["x"] = np.array([iv.left + (iv.right - iv.left) / 2 for iv in df_agg.index])
# Keep only data where y > 0
df_agg = df_agg[df_agg["y"] > 0]
# If input is in decibel, convert to linear scale
if x_in_db:
df_agg["x"] = disdrodb.idecibel(df_agg["x"])
# Remove bins with less than n counts
df_agg = df_agg[df_agg["n"] > min_counts]
if len(df_agg) < 5:
raise ValueError("Not enough data to fit a power law.")
# Estimate standard deviation for WNLS
# - Ensure is not 0 (can occur if within bins all same value)
sigma = 1.4826 * df_agg["mad"]
sigma_lb = np.percentile(sigma[sigma > 0], 1) # or small fraction
sigma = np.maximum(sigma, sigma_lb)
# Fit the data
with suppress_warnings():
if not quadratic:
if robust:
return fit_powerlaw_with_ransac(x=df_agg["x"], y=df_agg["y"])
return fit_powerlaw_with_wnls(x=df_agg["x"], y=df_agg["y"], sigma=sigma)
# quadratic=True
if robust:
return fit_logquadratic_powerlaw_with_ransac(df_agg["x"], y=df_agg["y"])
return fit_logquadratic_powerlaw_with_wnls(
x=df_agg["x"],
y=df_agg["y"],
sigma=sigma,
)
[docs]
def fit_powerlaw_with_wnls(x, y, sigma):
r"""Fit a power-law relationship using Weighted Nonlinear Least Squares (WNLS).
This function fits a power-law model :math:`y = a x^b` by transforming
to log10-space and performing weighted linear regression with the
Levenberg-Marquardt algorithm.
.. math::
y = a x^b
The fitting is performed in log10-space:
.. math::
\log_{10}(y) = a_0 + b \log_{10}(x)
where :math:`a = 10^{a_0}`.
Parameters
----------
x : array-like
Independent variable values. Must be positive.
y : array-like
Dependent variable values. Must be positive.
sigma : array-like
Standard deviation of ``y`` values, used as weights in the fit.
Values are transformed to log10-space for fitting.
Returns
-------
params : tuple of float
Estimated parameters ``(a, b)`` of the power-law relationship.
params_std : tuple of float
One standard deviation uncertainties ``(a_std, b_std)`` estimated from
the covariance matrix of the fit.
Notes
-----
- The function uses ``scipy.optimize.curve_fit`` with the Levenberg-Marquardt
method for optimization.
- Uncertainties are propagated from log10-space back to linear space.
- The transformation :math:`\sigma_{\log_{10}(y)} = \sigma_y / (y \ln(10))`
is used for the weights.
See Also
--------
fit_powerlaw : Main power-law fitting function with binning and outlier removal.
fit_powerlaw_with_ransac : Fit power law using RANSAC algorithm.
"""
def _linear_model(lx, a0, b):
return a0 + b * lx
# Transform sigma_y -> sigma_log10(y)
sigma_log = sigma / (y * np.log(10))
# Fit
(a0, b), pcov = curve_fit(
_linear_model,
np.log10(x),
np.log10(y),
method="lm",
sigma=sigma_log,
absolute_sigma=True,
maxfev=10_000, # max n iterations
)
a0_std, b_std = np.sqrt(np.diag(pcov))
# Convert back to classical power-law form
a = 10**a0
a_std = np.log(10) * a * a0_std # error propagation
return (float(a), float(b)), (float(a_std), float(b_std))
[docs]
def fit_powerlaw_with_ransac(x, y):
r"""Fit a power-law relationship using the RANSAC algorithm.
This function fits a power-law model :math:`y = a x^b` by performing
linear regression in log10-space using the Random Sample Consensus (RANSAC)
algorithm, which is robust to outliers.
.. math::
y = a x^b
The fitting is performed in log10-space:
.. math::
\log_{10}(y) = a_0 + b \log_{10}(x)
where :math:`a = 10^{a_0}`.
Parameters
----------
x : array-like
Independent variable values. Must be positive.
y : array-like
Dependent variable values. Must be positive.
Returns
-------
params : tuple of float
Estimated parameters ``(a, b)`` of the power-law relationship.
params_std : tuple of None
Placeholder for standard deviations ``(None, None)``.
Parameter uncertainties are not computed with RANSAC.
Notes
-----
- Requires scikit-learn to be installed.
- RANSAC is robust to outliers by iteratively fitting random subsets
of the data and selecting the model with the most inliers.
- The residual threshold for inlier classification is set to 0.3 in log10-space.
- Parameter uncertainties are not provided by RANSAC and are returned as None.
See Also
--------
fit_powerlaw : Main power-law fitting function with binning and outlier removal.
fit_powerlaw_with_wnls : Fit power law using weighted nonlinear least squares.
"""
from sklearn.linear_model import LinearRegression, RANSACRegressor
x = np.asanyarray(x)
y = np.asanyarray(y)
X = np.log10(x).reshape(-1, 1)
Y = np.log10(y)
ransac = RANSACRegressor(
estimator=LinearRegression(),
min_samples=0.5,
residual_threshold=0.3,
random_state=42,
)
ransac.fit(X, Y)
b = ransac.estimator_.coef_[0] # slope
loga = ransac.estimator_.intercept_ # intercept
a = 10**loga
a_std = None
b_std = None
return (float(a), float(b)), (a_std, b_std)
[docs]
def fit_logquadratic_powerlaw_with_wnls(x, y, sigma):
r"""Fit a log-quadratic power-law using Weighted Nonlinear Least Squares (WNLS).
This function fits a log-quadratic (log-parabolic) power-law model
by performing weighted regression in log10-space with the Levenberg-Marquardt
algorithm.
The fitted model in original space is:
.. math::
y = a x^b 10^{c (\log_{10} x)^2}
where the fitting is performed in log10-space:
.. math::
\log_{10}(y) = a_0 + b \log_{10}(x) + c (\log_{10} x)^2
with :math:`a = 10^{a_0}`.
Parameters
----------
x : array-like
Independent variable values. Must be positive.
y : array-like
Dependent variable values. Must be positive.
sigma : array-like
Standard deviation of ``y`` values, used as weights in the fit.
Values are transformed to log10-space for fitting.
Returns
-------
params : tuple of float
Estimated parameters ``(a, b, c)`` of the log-quadratic power law.
params_std : tuple of float
One standard deviation uncertainties ``(a_std, b_std, c_std)`` estimated
from the covariance matrix of the fit.
Notes
-----
- The function uses ``scipy.optimize.curve_fit`` with the Levenberg-Marquardt
method for optimization.
- Uncertainties are propagated from log10-space back to linear space.
- If :math:`c = 0`, the model reduces to a classical power law :math:`y = a x^b`.
- The transformation :math:`\sigma_{\log_{10}(y)} = \sigma_y / (y \ln(10))`
is used for the weights.
See Also
--------
fit_powerlaw : Main power-law fitting function with binning and outlier removal.
fit_logquadratic_powerlaw_with_ransac : Fit log-quadratic power law using RANSAC.
predict_from_logquadratic_powerlaw : Predict values from the fitted model.
"""
def _linear_model(lx, a0, b, c):
return a0 + b * lx + c * lx**2
# Transform sigma_y -> sigma_log10(y)
sigma_log = sigma / (y * np.log(10))
# Fit
(a0, b, c), pcov = curve_fit(
_linear_model,
np.log10(x),
np.log10(y),
method="lm",
sigma=sigma_log,
absolute_sigma=True,
maxfev=10_000, # max n iterations
)
a0_std, b_std, c_std = np.sqrt(np.diag(pcov))
a = 10**a0
a_std = np.log(10) * a * a0_std # error propagation
return (float(a), float(b), float(c)), (float(a_std), float(b_std), float(c_std))
[docs]
def fit_logquadratic_powerlaw_with_ransac(x, y):
r"""Fit a log-quadratic power-law model using RANSAC regression.
This function fits a log-quadratic (log-parabolic) power-law model
by performing linear regression in log10-space using the Random Sample
Consensus (RANSAC) algorithm, which is robust to outliers.
The fitted model in original space is:
.. math::
y = a x^b 10^{c (\log_{10} x)^2}
where the fitting is performed in log10-space:
.. math::
\log_{10}(y) = a_0 + b \log_{10}(x) + c (\log_{10} x)^2
with :math:`a = 10^{a_0}`.
This model (also known as a log-parabolic power law) extends the
classical power law by allowing smooth curvature in log-log space.
The effective scale-dependent exponent is:
.. math::
\frac{d(\log_{10} y)}{d(\log_{10} x)} = b + 2c \log_{10}(x)
Parameters
----------
x : array-like
Independent variable values. Must be strictly positive.
y : array-like
Dependent variable values. Must be strictly positive.
Returns
-------
params : tuple of float
Estimated parameters ``(a, b, c)`` of the log-quadratic power law.
params_std : tuple of None
Placeholder for standard deviations ``(None, None, None)``.
Parameter uncertainties are not computed with RANSAC.
Notes
-----
- Requires scikit-learn to be installed.
- The fitting is performed in log10-space using RANSAC with
a linear regression estimator.
- Non-positive x or y values are removed prior to fitting.
- If :math:`c = 0`, the model reduces to a pure power law :math:`y = a x^b`.
- The residual threshold for inlier classification is set to 0.3 in log10-space.
- Parameter uncertainties are not provided by RANSAC and are returned as None.
See Also
--------
fit_powerlaw : Main power-law fitting function with binning and outlier removal.
fit_logquadratic_powerlaw_with_wnls : Fit log-quadratic power law using WNLS.
predict_from_logquadratic_powerlaw : Predict values from the fitted model.
"""
import numpy as np
from sklearn.linear_model import LinearRegression, RANSACRegressor
x = np.asarray(x)
y = np.asarray(y)
mask = (x > 0) & (y > 0)
x = x[mask]
y = y[mask]
lx = np.log10(x)
Y = np.log10(y)
X = np.column_stack(
[
lx,
lx**2,
],
)
ransac = RANSACRegressor(
estimator=LinearRegression(),
min_samples=0.5,
residual_threshold=0.3,
random_state=42,
)
ransac.fit(X, Y)
b = ransac.estimator_.coef_[0]
c = ransac.estimator_.coef_[1]
a0 = ransac.estimator_.intercept_
a = np.power(10, a0)
a_std = None
b_std = None
c_std = None
return (float(a), float(b), float(c)), (a_std, b_std, c_std)
[docs]
def predict_from_powerlaw(x, a, b):
r"""Predict values from a power-law relationship.
This function computes predictions using the power-law model:
.. math::
y = a x^b
Parameters
----------
x : array-like
Independent variable values.
a : float
Power-law coefficient.
b : float
Power-law exponent.
Returns
-------
y : numpy.ndarray
Predicted dependent variable values.
Notes
-----
This function does not check for invalid (negative or zero) ``x`` values.
Ensure that ``x`` is compatible with the model before calling.
See Also
--------
fit_powerlaw : Fit a power-law relationship to data.
predict_from_logquadratic_powerlaw : Predict from log-quadratic power law.
"""
return a * np.power(x, b)
[docs]
def predict_from_logquadratic_powerlaw(x, a, b, c):
r"""Predict values from a log-quadratic power-law model.
This function computes predictions using the log-quadratic power-law model:
.. math::
y = a x^b 10^{c (\log_{10} x)^2}
Parameters
----------
x : array-like
Independent variable values. Must be positive.
a : float
Multiplicative coefficient in original space.
b : float
Linear log10 slope coefficient.
c : float
Quadratic log10 curvature coefficient.
Returns
-------
y : numpy.ndarray
Predicted dependent variable values.
Notes
-----
If :math:`c = 0`, the model reduces to the classical power law:
.. math::
y = a x^b
See Also
--------
fit_powerlaw : Fit a power-law relationship to data.
predict_from_powerlaw : Predict from classical power law.
fit_logquadratic_powerlaw_with_wnls : Fit log-quadratic power law using WNLS.
fit_logquadratic_powerlaw_with_ransac : Fit log-quadratic power law using RANSAC.
"""
logx = np.log10(x)
return a * np.power(10, b * logx + c * logx**2)
[docs]
def inverse_powerlaw_parameters(a, b):
r"""Compute parameters of the inverse power-law relationship.
Given a power-law model:
.. math::
y = a x^b
this function returns parameters :math:`(A, B)` such that the inverse
relation holds:
.. math::
x = A y^B
where:
.. math::
A = a^{-1/b}, \quad B = 1/b
Parameters
----------
a : float
Power-law coefficient in :math:`y = a x^b`.
b : float
Power-law exponent in :math:`y = a x^b`.
Returns
-------
A : float
Coefficient of the inverse power-law model.
B : float
Exponent of the inverse power-law model.
See Also
--------
fit_powerlaw : Fit a power-law relationship to data.
predict_from_inverse_powerlaw : Predict using inverse power-law parameters.
"""
A = 1 / (a ** (1 / b))
B = 1 / b
return A, B
[docs]
def predict_from_inverse_powerlaw(x, a, b):
r"""Predict values from the inverse power-law relationship.
Given an inverse power-law model:
.. math::
x = a y^b
this function solves for :math:`y` given :math:`x`:
.. math::
y = (x / a)^{1/b} = x^{1/b} \cdot a^{-1/b}
Parameters
----------
x : array-like
Values of ``x`` (independent variable in the inverse power law).
a : float
Power-law coefficient of the inverse power-law model.
b : float
Power-law exponent of the inverse power-law model.
Returns
-------
y : numpy.ndarray
Predicted dependent variable values.
See Also
--------
inverse_powerlaw_parameters : Compute parameters of the inverse power law.
predict_from_powerlaw : Predict from the original power law.
"""
return (x ** (1 / b)) / (a ** (1 / b))
def _define_coeff_string(a):
# Format a coefficient as m * 10^{e}
m_str, e_str = f"{a:.2e}".split("e")
m, e = float(m_str), int(e_str)
# Build coefficient string
a_str = f"{a:.2f}" if e >= -1 else f"{m:.2f} \\times 10^{{{e}}}"
return a_str
[docs]
def define_powerlaw_legend_str(x_symbol, y_symbol, a, b):
r"""Build two-line LaTeX legend string for y = a x^b and x = A y^B.
Parameters
----------
x_symbol : str
Symbol of independent variable (e.g. 'R', 'z', 'A_H', '\\mathrm{KED}')
y_symbol : str
Symbol of dependent variable
a : float
Power-law coefficient
b : float
Power-law exponent
Returns
-------
legend_str : str
LaTeX formatted legend string (two lines)
"""
# Inverse parameters
A, B = inverse_powerlaw_parameters(a, b)
# Format coefficients
a_str = _define_coeff_string(a)
A_str = _define_coeff_string(A)
legend_str = (
rf"${y_symbol} = {a_str}\, {x_symbol}^{{{b:.2f}}}$" "\n" rf"${x_symbol} = {A_str}\, {y_symbol}^{{{B:.2f}}}$"
)
return legend_str
[docs]
def define_logquadratic_powerlaw_legend_str(
x_symbol,
y_symbol,
a,
b,
c,
):
r"""Build LaTeX legend string for log-quadratic power law.
.. math::
y = a x^b 10^{c (\log_{10} x)^2}
Parameters
----------
x_symbol : str
Symbol of independent variable (e.g. 'R')
y_symbol : str
Symbol of dependent variable
a : float
Multiplicative coefficient in original space.
b : float
Linear log10 slope coefficient
c : float
Quadratic log10 curvature coefficient.
If c = 0, the model reduces to the classical power law.
Returns
-------
legend_str : str
LaTeX formatted legend string
"""
# Format coefficients
a_str = _define_coeff_string(a)
c_str = f"{c:.3f}"
b_str = f"{b:.2f}"
legend_str = rf"${y_symbol} = {a_str}\, {x_symbol}^{{{b_str}}}" rf"\, 10^{{{c_str}(\log_{{10}} {x_symbol})^2}}$"
return legend_str
####-------------------------------------------------------------------
#### N(D) Climatological plots
[docs]
def create_dsd_dataframe(ds, variables=None):
"""Create pandas Dataframe with N(D) data."""
# Define variables to select
if isinstance(variables, str):
variables = [variables]
variables = [] if variables is None else variables
variables = ["drop_number_concentration", "Nw", "diameter_bin_center", "Dm", "D50", "R", *variables]
variables = np.unique(variables).tolist()
# Retrieve stacked N(D) dataframe
ds_stack = ds[variables].stack( # noqa: PD013
dim={"obs": ["time", "diameter_bin_center"]},
)
# Drop coordinates
coords_to_drop = [
"velocity_method",
"sample_interval",
*RADAR_OPTIONS,
]
df_dsd = ds_stack.to_dask_dataframe().drop(columns=coords_to_drop, errors="ignore").compute()
df_dsd["D"] = df_dsd["diameter_bin_center"]
df_dsd["N(D)"] = df_dsd["drop_number_concentration"]
df_dsd = df_dsd[df_dsd["R"] != 0]
df_dsd = df_dsd[df_dsd["N(D)"] != 0]
# Compute normalized density
df_dsd["D/D50"] = df_dsd["D"] / df_dsd["D50"]
df_dsd["D/Dm"] = df_dsd["D"] / df_dsd["Dm"]
df_dsd["N(D)/Nw"] = df_dsd["N(D)"] / df_dsd["Nw"]
df_dsd["log10[N(D)/Nw]"] = np.log10(df_dsd["N(D)/Nw"])
return df_dsd
[docs]
def plot_normalized_dsd_density(df_dsd, x="D/D50", figsize=(3.4, 3.0), dpi=300):
"""Plot normalized DSD N(D)/Nw ~ D/D50 (or D/Dm) density."""
ds_stats = compute_2d_histogram(
df_dsd,
x=x,
y="N(D)/Nw",
x_bins=np.arange(0, 4, 0.025),
y_bins=log_arange(1e-5, 50, log_step=0.1, base=10),
)
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None)
ds_stats = ds_stats.isel({"N(D)/Nw": ds_stats["N(D)/Nw"] > 0})
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
p = ds_stats["count"].plot.pcolormesh(
x=x,
y="N(D)/Nw",
ax=ax,
vmin=1,
cmap=cmap,
norm=norm,
extend="max",
yscale="log",
)
ax.set_ylim(1e-5, 20)
ax.set_xlim(0, 4)
ax.set_xlabel(f"{x} [-]")
ax.set_ylabel(r"$N(D)/N_w$ [-]")
ax.set_title("Normalized DSD")
return p
[docs]
def plot_dsd_density(df_dsd, diameter_bin_edges, figsize=(3.4, 3.0), dpi=300):
"""Plot N(D) ~ D density."""
ds_stats = compute_2d_histogram(
df_dsd,
x="D",
y="N(D)",
x_bins=diameter_bin_edges,
y_bins=log_arange(0.1, 20_000, log_step=0.1, base=10),
)
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None)
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
p = ds_stats["count"].plot.pcolormesh(x="D", y="N(D)", ax=ax, cmap=cmap, norm=norm, extend="max", yscale="log")
ax.set_xlim(0, 8)
ax.set_ylim(1, 20_000)
ax.set_xlabel(r"$D$ [mm]")
ax.set_ylabel(r"$N(D)$ [m$^{-3}$ mm$^{-1}$]")
ax.set_title("DSD")
return p
[docs]
def plot_dsd_with_dense_lines(drop_number_concentration, r, figsize=(3.4, 3.0), dpi=300):
"""Plot N(D) ~ D using dense lines."""
# Define intervals for rain rates
r_bins = [0, 2, 5, 10, 50, 100, 500]
# Define N(D) bins and diameter bin edeges
y_bins = log_arange(1, 20_000, log_step=0.025, base=10)
diameter_bin_edges = np.arange(0, 8, 0.01)
# Interpolate N(D) to high resolution !
# - For better visualization of dense lines
da = resample_drop_number_concentration(
drop_number_concentration.compute(),
diameter_bin_edges=diameter_bin_edges,
)
ds_resampled = xr.Dataset(
{
"R": r.compute(),
"drop_number_concentration": da,
},
)
# Define diameter bin edges to compute dense lines
x_bins = da.disdrodb.diameter_bin_edges
# Define discrete colormap (one color per rain-interval):
cmap_list = [
plt.get_cmap("Reds"),
plt.get_cmap("Oranges"),
plt.get_cmap("Purples"),
plt.get_cmap("Greens"),
plt.get_cmap("Blues"),
plt.get_cmap("Grays"),
]
cmap_list = [ListedColormap(cmap(np.arange(0, cmap.N))[-40:]) for cmap in cmap_list]
# Compute dense lines
dict_rgb = {}
for i in range(0, len(r_bins) - 1):
# Define dataset subset
idx_rain_interval = np.logical_and(ds_resampled["R"] >= r_bins[i], ds_resampled["R"] < r_bins[i + 1])
da = ds_resampled.isel(time=idx_rain_interval)["drop_number_concentration"]
if da.sizes["time"] == 0:
continue
# Retrieve dense lines
da_dense_lines = compute_dense_lines(
da=da,
coord="diameter_bin_center",
x_bins=x_bins,
y_bins=y_bins,
normalization="max",
)
# Define cmap
cmap = cmap_list[i]
# Map colors and transparency
# da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="linear")
# da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="exp")
# da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="log")
da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="sqrt")
# Add to dictionary
dict_rgb[i] = da_rgb
# Blend images with max-alpha
ds_rgb = xr.concat(dict_rgb.values(), dim="r_class")
da_blended = max_blend_images(ds_rgb, dim="r_class")
# Prepare legend handles
handles = []
labels = []
for i in range(len(r_bins) - 1):
color = cmap_list[i](0.8) # pick a representative color from each cmap
handle = mlines.Line2D([], [], color=color, alpha=0.6, linewidth=2)
label = f"[{r_bins[i]} - {r_bins[i+1]}]"
handles.append(handle)
labels.append(label)
# Create figure
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
p = ax.pcolormesh(
da_blended["diameter_bin_center"],
da_blended["drop_number_concentration"],
da_blended.data,
)
# Set axis scale and limits
ax.set_yscale("log")
ax.set_xlim(0, 8)
ax.set_ylim(1, 20_000)
# Add axis labels and title
ax.set_xlabel(r"$D$ [mm]")
ax.set_ylabel(r"$N(D)$ [m$^{-3}$ mm$^{-1}$]")
ax.set_title("DSD")
# Add legend with title
ax.legend(handles, labels, title="Rain rate [mm/hr]", loc="upper right")
# Return figure
return p
####-------------------------------------------------------------------
#### DSD parameters plots
[docs]
def define_lognorm_max_value(value):
"""Round up to next nice number: 90->100, 400->500, 1200->2000."""
if value <= 0:
return 1
magnitude = 10 ** np.floor(np.log10(value))
first_digit = value / magnitude
nice_value = 1 if first_digit <= 1 else 2 if first_digit <= 2 else 5 if first_digit <= 5 else 10
return nice_value * magnitude
[docs]
def plot_dsd_params_relationships(df, add_nt=False, log_step=0.05, dpi=300):
"""Create a figure illustrating the relationships between DSD parameters."""
# TODO: option to use D50 instead of Dm
# Compute the required datasets for the plots
# - Dm vs Nw
ds_Dm_Nw_stats = compute_2d_histogram(
df,
x="Dm",
y="Nw",
variables=["R", "LWC", "Nt"],
x_bins=np.arange(0, 8, 0.1),
y_bins=log_arange(10, 1_000_000, log_step=log_step, base=10),
)
# - Dm vs LWC (W)
ds_Dm_LWC_stats = compute_2d_histogram(
df,
x="Dm",
y="LWC",
variables=["R", "Nw", "Nt"],
x_bins=np.arange(0, 8, 0.1),
y_bins=log_arange(0.01, 10, log_step=log_step, base=10),
)
# - Dm vs R
ds_Dm_R_stats = compute_2d_histogram(
df,
x="Dm",
y="R",
variables=["Nw", "LWC", "Nt"],
x_bins=np.arange(0, 8, 0.1),
y_bins=log_arange(0.1, 500, log_step=log_step, base=10),
)
# - Dm vs Nt
ds_Dm_Nt_stats = compute_2d_histogram(
df,
x="Dm",
y="Nt",
variables=["R", "LWC", "Nw"],
x_bins=np.arange(0, 8, 0.1),
y_bins=log_arange(1, 100_000, log_step=log_step, base=10),
)
# Define different colormaps for each column
cmap_counts = plt.get_cmap("viridis")
cmap_lwc = plt.get_cmap("YlOrRd")
cmap_r = plt.get_cmap("Blues")
cmap_nt = plt.get_cmap("Greens")
# Define normalizations for each variable
norm_counts = LogNorm(1, None)
norm_lwc = LogNorm(0.01, 10)
norm_r = LogNorm(0.2, 500)
# norm_nw = LogNorm(10, 100000)
norm_nt = LogNorm(1, 10000)
# Define axis limits
dm_lim = (0.3, 6)
nw_lim = (10, 1_000_000)
lwc_lim = (0.01, 10)
r_lim = (0.1, 500)
nt_lim = (1, 100_000)
# Define figure size
if add_nt:
figsize = (6.9, 6.9)
nrows = 4
height_ratios = [0.40, 1, 1, 1, 1]
else:
figsize = (6.9, 5.8)
nrows = 3
height_ratios = [0.40, 1, 1, 1]
# Create figure with 4x4 subplots
fig = plt.figure(figsize=figsize, dpi=dpi)
gs = fig.add_gridspec(nrows + 1, 4, height_ratios=height_ratios, hspace=0.0, wspace=0.02)
axes = np.empty((nrows + 1, 4), dtype=object)
# Create colorbar axes in the bottom row of the gridspec
cbar_axes = [fig.add_subplot(gs[0, j]) for j in range(4)]
# Create axes for the grid
for i in range(1, nrows + 1):
for j in range(4):
axes[i, j] = fig.add_subplot(gs[i, j])
# Create empty subplot for diagonal elements (when y-axis variable matches column variable)
# axes[2, 1].set_visible(False)
# axes[3, 2].set_visible(False)
# axes[4, 3].set_visible(False)
####-------------------------------------------------------------------.
#### Dm vs Nw
#### - Counts
im_counts = ds_Dm_Nw_stats["count"].plot.pcolormesh(
x="Dm",
y="Nw",
cmap=cmap_counts,
norm=norm_counts,
extend="max",
yscale="log",
add_colorbar=False,
ax=axes[1, 0],
)
axes[1, 0].set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]")
axes[1, 0].set_xlim(*dm_lim)
axes[1, 0].set_ylim(nw_lim)
#### - LWC
im_lwc = ds_Dm_Nw_stats["LWC_median"].plot.pcolormesh(
x="Dm",
y="Nw",
cmap=cmap_lwc,
norm=norm_lwc,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[1, 1],
)
axes[1, 1].set_xlim(*dm_lim)
axes[1, 1].set_ylim(nw_lim)
#### - R
im_r = ds_Dm_Nw_stats["R_median"].plot.pcolormesh(
x="Dm",
y="Nw",
cmap=cmap_r,
norm=norm_r,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[1, 2],
)
axes[1, 2].set_xlim(*dm_lim)
axes[1, 2].set_ylim(nw_lim)
axes[1, 2].set_yticklabels([])
#### - Nt
im_nt = ds_Dm_Nw_stats["Nt_median"].plot.pcolormesh(
x="Dm",
y="Nw",
cmap=cmap_nt,
norm=norm_nt,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[1, 3],
)
axes[1, 3].set_xlim(*dm_lim)
axes[1, 3].set_ylim(nw_lim)
axes[1, 3].set_yticklabels([])
####-------------------------------------------------------------------.
#### Dm vs LWC
#### - Counts
ds_Dm_LWC_stats["count"].plot.pcolormesh(
x="Dm",
y="LWC",
cmap=cmap_counts,
norm=norm_counts,
extend="max",
yscale="log",
add_colorbar=False,
ax=axes[2, 0],
)
axes[2, 0].set_ylabel(r"LWC [g/m³]")
axes[2, 0].set_xlim(*dm_lim)
axes[2, 0].set_ylim(lwc_lim)
#### - LWC
# - Empty (diagonal where y-axis is W) - handled above in the loop
ds_Dm_LWC_stats["R_median"].plot.pcolormesh(
x="Dm",
y="LWC",
cmap=cmap_r,
norm=norm_r,
alpha=0, # fully transparent
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[2, 1],
)
axes[2, 1].set_xlim(*dm_lim)
axes[2, 1].set_ylim(lwc_lim)
axes[2, 1].set_yticklabels([])
#### - R
ds_Dm_LWC_stats["R_median"].plot.pcolormesh(
x="Dm",
y="LWC",
cmap=cmap_r,
norm=norm_r,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[2, 2],
)
axes[2, 2].set_xlim(*dm_lim)
axes[2, 2].set_ylim(lwc_lim)
axes[2, 2].set_yticklabels([])
#### - Nt
im_nt = ds_Dm_LWC_stats["Nt_median"].plot.pcolormesh(
x="Dm",
y="LWC",
cmap=cmap_nt,
norm=norm_nt,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[2, 3],
)
axes[2, 3].set_xlim(*dm_lim)
axes[2, 3].set_ylim(lwc_lim)
axes[2, 3].set_yticklabels([])
####-------------------------------------------------------------------.
#### Dm vs R
#### - Counts
ds_Dm_R_stats["count"].plot.pcolormesh(
x="Dm",
y="R",
cmap=cmap_counts,
norm=norm_counts,
extend="max",
yscale="log",
add_colorbar=False,
ax=axes[3, 0],
)
axes[3, 0].set_ylabel(r"$R$ [mm h$^{-1}$]")
axes[3, 0].set_xlim(*dm_lim)
axes[3, 0].set_ylim(r_lim)
#### - LWC
im_lwc = ds_Dm_R_stats["LWC_median"].plot.pcolormesh(
x="Dm",
y="R",
cmap=cmap_lwc,
norm=norm_lwc,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[3, 1],
)
axes[3, 1].set_xlim(*dm_lim)
axes[3, 1].set_ylim(r_lim)
axes[3, 1].set_yticklabels([])
#### - R
# - Empty (diagonal where y-axis is R) - handled above in the loop
#### - Nt
ds_Dm_R_stats["Nt_median"].plot.pcolormesh(
x="Dm",
y="R",
cmap=cmap_nt,
norm=norm_nt,
alpha=0, # fully transparent
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[3, 2],
)
axes[3, 2].set_xlim(*dm_lim)
axes[3, 2].set_ylim(r_lim)
axes[3, 2].set_yticklabels([])
#### - Nt
ds_Dm_R_stats["Nt_median"].plot.pcolormesh(
x="Dm",
y="R",
cmap=cmap_nt,
norm=norm_nt,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[3, 3],
)
axes[3, 3].set_xlim(*dm_lim)
axes[3, 3].set_ylim(r_lim)
axes[3, 3].set_yticklabels([])
####-------------------------------------------------------------------.
#### Dm vs Nt
if add_nt:
#### - Counts
ds_Dm_Nt_stats["count"].plot.pcolormesh(
x="Dm",
y="Nt",
cmap=cmap_counts,
norm=norm_counts,
extend="max",
yscale="log",
add_colorbar=False,
ax=axes[4, 0],
)
axes[4, 0].set_ylabel(r"$N_t$ [m$^{-3}$]")
axes[4, 0].set_xlabel(r"$D_m$ [mm]")
axes[4, 0].set_xlim(*dm_lim)
axes[4, 0].set_ylim(nt_lim)
#### - LWC
ds_Dm_Nt_stats["LWC_median"].plot.pcolormesh(
x="Dm",
y="Nt",
cmap=cmap_lwc,
norm=norm_lwc,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[4, 1],
)
axes[4, 1].set_xlabel(r"$D_m$ [mm]")
axes[4, 1].set_xlim(*dm_lim)
axes[4, 1].set_ylim(nt_lim)
axes[4, 1].set_yticklabels([])
#### - R
ds_Dm_Nt_stats["R_median"].plot.pcolormesh(
x="Dm",
y="Nt",
cmap=cmap_r,
norm=norm_r,
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[4, 2],
)
axes[4, 2].set_xlabel(r"$D_m$ [mm]")
axes[4, 2].set_xlim(*dm_lim)
axes[4, 2].set_ylim(nt_lim)
axes[4, 2].set_yticklabels([])
#### - Nt
# - Empty plot - handled above in the loop
ds_Dm_Nt_stats["R_median"].plot.pcolormesh(
x="Dm",
y="Nt",
cmap=cmap_r,
norm=norm_r,
alpha=0, # fully transparent
extend="both",
yscale="log",
add_colorbar=False,
ax=axes[4, 3],
)
axes[4, 3].set_xlabel("$D_m$ [mm]")
axes[4, 3].set_xlim(*dm_lim)
axes[4, 3].set_ylim(nt_lim)
axes[4, 3].set_yticklabels([])
####-------------------------------------------------------------------.
#### Finalize figure
# Remove x ticks and labels for all but bottom row
for i in range(1, nrows):
for j in range(4):
if axes[i, j].get_visible():
axes[i, j].set_xticklabels([])
axes[i, j].set_xticks([])
axes[i, j].set_xlabel("")
# Remove y ticks and labels for all but left row
for i in range(1, nrows + 1):
for j in range(1, 4):
if axes[i, j].get_visible():
axes[i, j].set_yticks([])
axes[i, j].set_yticklabels([])
axes[i, j].set_ylabel("")
axes[i, j].tick_params(which="both", left=False)
# Clear overlapping ticks between Nt and R
if add_nt:
ylim = axes[4, 0].get_ylim()
yticks = axes[4, 0].get_yticks()
yticks = yticks[(yticks <= 20_000)]
axes[4, 0].set_yticks(yticks)
axes[4, 0].set_ylim(*ylim)
# Add Dm label in the center
for j in range(4):
axes[-1, j].set_xlabel("")
fig.text(0.5, 0.05, r"$D_m$ [mm]", ha="center", va="center")
# Add gray background to subplots
for i in range(1, nrows + 1):
for j in range(4):
axes[i, j].set_facecolor("#e6e6e6") # strong gray
# axes[i, j].set_facecolor("#f2f2f2") # light gray
# -------------------------------------------------.
# Add colorbars
# - Counts colorbar
cbar1 = plt.colorbar(im_counts, cax=cbar_axes[0], orientation="horizontal", extend="both")
cbar1.set_label("Counts", fontweight="bold")
cbar1.ax.xaxis.set_label_position("top")
cbar1.ax.tick_params(axis="x", pad=2)
cbar1.ax.set_aspect(0.2)
# - LWC colorbar
cbar2 = plt.colorbar(im_lwc, cax=cbar_axes[1], orientation="horizontal", extend="both")
cbar2.set_label("Median LWC [g/m³]", fontweight="bold")
cbar2.ax.xaxis.set_label_position("top")
cbar2.ax.tick_params(axis="x", pad=2)
cbar2.ax.set_aspect(0.25)
# - R colorbar
cbar3 = plt.colorbar(im_r, cax=cbar_axes[2], orientation="horizontal", extend="both")
cbar3.set_label("Median R [mm/h]", fontweight="bold")
cbar3.ax.xaxis.set_label_position("top")
cbar3.ax.tick_params(axis="x", pad=2)
cbar3.ax.set_aspect(0.3)
# - Nt colorbar
cbar4 = plt.colorbar(im_nt, cax=cbar_axes[3], orientation="horizontal", extend="both")
cbar4.set_label("Median $N_t$ [m$^{-3}$]", fontweight="bold")
cbar4.ax.xaxis.set_label_position("top")
cbar4.ax.tick_params(axis="x", pad=2)
cbar4.ax.set_aspect(0.3)
# -------------------------------------------------.
# Align labels
fig.align_xlabels()
fig.align_ylabels()
# -------------------------------------------------.
# Return figure
return fig
[docs]
def plot_dsd_params_density(
df,
log_dm=False,
lwc=True,
log_normalize=False,
log_step=0.05,
linear_step=0.1,
figsize=(6.9, 7.2),
dpi=300,
):
"""Generate a figure with various DSD relationships.
All histograms are computed first, then normalized, and finally plotted together.
Parameters
----------
df : pandas.DataFrame
DataFrame containing DSD parameters (Dm, Nt, Nw, LWC/W, R, sigma_m, M2, M3, M4, M6)
log_dm : bool, optional
If True, use linear scale for Dm axes. If False, use log scale. Default is True.
lwc : bool, optional
If True, use Liquid Water Content (W). If False, use Rain Rate (R). Default is True.
figsize : tuple, optional
Figure size (width, height) in inches. Default is (18, 18).
Returns
-------
fig : matplotlib.figure.Figure
The figure object containing all subplots
axes : numpy.ndarray
Array of all subplot axes
"""
# TODO: option to use D50 instead of Dm
# Common parameters
cmap = plt.get_cmap("Spectral_r").copy()
norm = Normalize(0, 1) # Normalized data goes from 0 to 1
ylabelpad = 0
moment_labelpad = -2
# Define the water variable based on lwc flag
df["LWC"] = df["LWC"]
water_var = "LWC" if lwc else "R"
water_label = "LWC [g/m³]" if lwc else "R [mm/h]"
# Dm range and scale settings
if not log_dm:
dm_bins = np.arange(0, 8, linear_step)
dm_scale = None
dm_lim = (0, 6)
dm_ticklabels = [0, 2, 4, 6]
else:
dm_bins = log_arange(0.1, 10, log_step=log_step, base=10)
dm_scale = "log"
dm_lim = (0.3, 6)
dm_ticklabels = [0.5, 1, 2, 5]
# Nt and Nw range
nt_bins = log_arange(1, 100_000, log_step=log_step, base=10)
nw_bins = log_arange(1, 1_000_000, log_step=log_step, base=10)
nw_lim = (10, 1_000_000)
nt_lim = (1, 100_000)
# Water range
if lwc:
water_bins = log_arange(0.001, 10, log_step=log_step, base=10)
water_lim = (0.005, 10)
else:
water_bins = log_arange(0.1, 500, log_step=log_step, base=10)
water_lim = (0.1, 500)
# Define sigma_m bins
sigma_bins = np.arange(0, 4, linear_step / 2)
sigma_lim = (0, 3)
# Compute all histograms first
# 1. Dm vs Nt
ds_stats_dm_nt = compute_2d_histogram(
df,
x="Dm",
y="Nt",
x_bins=dm_bins,
y_bins=nt_bins,
)
# 2. Dm vs Nw
ds_stats_dm_nw = compute_2d_histogram(
df,
x="Dm",
y="Nw",
x_bins=dm_bins,
y_bins=nw_bins,
)
# 3. Dm vs LWC/R
ds_stats_dm_w = compute_2d_histogram(
df,
x="Dm",
y=water_var,
x_bins=dm_bins,
y_bins=water_bins,
)
# 4. LWC/R vs Nt
ds_stats_w_nt = compute_2d_histogram(
df,
x=water_var,
y="Nt",
x_bins=water_bins,
y_bins=nt_bins,
)
# 5. LWC/R vs Nw
ds_stats_w_nw = compute_2d_histogram(
df,
x=water_var,
y="Nw",
x_bins=water_bins,
y_bins=nw_bins,
)
# 6. LWC/R vs sigma_m
ds_stats_w_sigma = compute_2d_histogram(
df,
x=water_var,
y="sigma_m",
x_bins=water_bins,
y_bins=sigma_bins,
)
# 7. M2 vs M4
ds_stats_m2_m4 = compute_2d_histogram(
df,
x="M2",
y="M4",
x_bins=log_arange(1, 10_000, log_step=log_step, base=10),
y_bins=log_arange(1, 40_000, log_step=log_step, base=10),
)
# 8. M3 vs M6
ds_stats_m3_m6 = compute_2d_histogram(
df,
x="M3",
y="M6",
x_bins=log_arange(1, 10_000, log_step=log_step, base=10),
y_bins=log_arange(0.1, 1000_000, log_step=log_step, base=10),
)
# 9. M2 vs M6
ds_stats_m2_m6 = compute_2d_histogram(
df,
x="M2",
y="M6",
x_bins=log_arange(1, 10_000, log_step=log_step, base=10),
y_bins=log_arange(0.1, 1000_000, log_step=log_step, base=10),
)
# Define normalization
def max_normalize(ds):
return ds["count"].where(ds["count"] > 0) / ds["count"].max().item()
def log_max_normalize(ds):
counts = ds["count"].where(ds["count"] > 0)
log_counts = np.log10(counts)
max_log = float(log_counts.max().item())
return log_counts / max_log
normalizer = log_max_normalize if log_normalize else max_normalize
# Normalize all histograms
ds_stats_dm_nt["normalized"] = normalizer(ds_stats_dm_nt)
ds_stats_dm_nw["normalized"] = normalizer(ds_stats_dm_nw)
ds_stats_dm_w["normalized"] = normalizer(ds_stats_dm_w)
ds_stats_w_nt["normalized"] = normalizer(ds_stats_w_nt)
ds_stats_w_nw["normalized"] = normalizer(ds_stats_w_nw)
ds_stats_w_sigma["normalized"] = normalizer(ds_stats_w_sigma)
ds_stats_m2_m4["normalized"] = normalizer(ds_stats_m2_m4)
ds_stats_m3_m6["normalized"] = normalizer(ds_stats_m3_m6)
ds_stats_m2_m6["normalized"] = normalizer(ds_stats_m2_m6)
# Set up figure and axes
fig, axes = plt.subplots(3, 3, figsize=figsize, dpi=dpi)
fig.subplots_adjust(hspace=0.05, wspace=0.35)
# COLUMN 1: All plots with Dm as x-axis
# 1. Dm vs Nt (0,0)
_ = ds_stats_dm_nt["normalized"].plot.pcolormesh(
x="Dm",
y="Nt",
cmap=cmap,
norm=norm,
extend="max",
xscale=dm_scale,
yscale="log",
add_colorbar=False,
ax=axes[0, 0],
)
axes[0, 0].set_xlabel("") # Hide x labels except for bottom row
axes[0, 0].set_ylabel(r"$N_t$ [m$^{-3}$]", labelpad=ylabelpad)
axes[0, 0].set_xlim(*dm_lim)
axes[0, 0].set_ylim(*nt_lim)
axes[0, 0].set_title(r"$D_m$ vs $N_t$")
# 2. Dm vs Nw (1,0)
_ = ds_stats_dm_nw["normalized"].plot.pcolormesh(
x="Dm",
y="Nw",
cmap=cmap,
norm=norm,
extend="max",
xscale=dm_scale,
yscale="log",
add_colorbar=False,
ax=axes[1, 0],
)
axes[1, 0].set_xlabel("") # Hide x labels except for bottom row
axes[1, 0].set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]", labelpad=ylabelpad)
axes[1, 0].set_xlim(*dm_lim)
axes[1, 0].set_ylim(*nw_lim)
axes[1, 0].set_title(r"$D_m$ vs $N_w$")
# 3. Dm vs LWC/R (2,0)
_ = ds_stats_dm_w["normalized"].plot.pcolormesh(
x="Dm",
y=water_var,
cmap=cmap,
norm=norm,
extend="max",
xscale=dm_scale,
yscale="log",
add_colorbar=False,
ax=axes[2, 0],
)
axes[2, 0].set_xlabel(r"$D_m$ [mm]")
axes[2, 0].set_ylabel(water_label, labelpad=ylabelpad)
axes[2, 0].set_xlim(*dm_lim)
if lwc:
axes[2, 0].set_ylim(*water_lim)
axes[2, 0].set_yticks([0.01, 0.1, 0.5, 1, 5])
axes[2, 0].set_yticklabels(["0.01", "0.1", "0.5", "1", "5"])
else:
axes[2, 0].set_ylim(*water_lim)
axes[2, 0].set_title(f"$D_m$ vs {water_var}")
axes[2, 0].set_xticks(dm_ticklabels)
axes[2, 0].set_xticklabels([str(v) for v in dm_ticklabels])
# COLUMN 2: All plots with LWC/R as x-axis
# 4. LWC/R vs Nt (0,1)
mappable = ds_stats_w_nt["normalized"].plot.pcolormesh(
x=water_var,
y="Nt",
cmap=cmap,
norm=norm,
extend="max",
xscale="log",
yscale="log",
add_colorbar=False,
ax=axes[0, 1],
)
axes[0, 1].set_xlabel("") # Hide x labels except for bottom row
axes[0, 1].set_ylabel(r"$N_t$ [m$^{-3}$]", labelpad=ylabelpad)
if lwc:
axes[0, 1].set_xlim(*water_lim)
axes[0, 1].set_xticks([0.01, 0.1, 1, 10])
axes[0, 1].set_xticklabels(["0.01", "0.1", "1", "10"])
else:
axes[0, 1].set_xlim(*water_lim)
axes[0, 1].set_ylim(*nt_lim)
axes[0, 1].set_title(f"{water_var} vs $N_t$")
# 5. LWC/R vs Nw (1,1)
_ = ds_stats_w_nw["normalized"].plot.pcolormesh(
x=water_var,
y="Nw",
cmap=cmap,
norm=norm,
extend="max",
xscale="log",
yscale="log",
add_colorbar=False,
ax=axes[1, 1],
)
axes[1, 1].set_xlabel("") # Hide x labels except for bottom row
axes[1, 1].set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]", labelpad=ylabelpad)
if lwc:
axes[1, 1].set_xlim(*water_lim)
axes[1, 1].set_xticks([0.01, 0.1, 1, 10])
axes[1, 1].set_xticklabels(["0.01", "0.1", "1", "10"])
else:
axes[1, 1].set_xlim(*water_lim)
axes[1, 1].set_ylim(*nw_lim)
axes[1, 1].set_title(f"{water_var} vs $N_w$")
# 6. LWC/R vs sigma_m (2,1)
_ = ds_stats_w_sigma["normalized"].plot.pcolormesh(
x=water_var,
y="sigma_m",
cmap=cmap,
norm=norm,
extend="max",
xscale="log",
add_colorbar=False,
ax=axes[2, 1],
)
axes[2, 1].set_xlabel(water_label)
axes[2, 1].set_ylabel(r"$\sigma_m$ [mm]", labelpad=ylabelpad)
if lwc:
axes[2, 1].set_xlim(*water_lim)
axes[2, 1].set_xticks([0.01, 0.1, 1, 10])
axes[2, 1].set_xticklabels(["0.01", "0.1", "1", "10"])
else:
axes[2, 1].set_xlim(*water_lim)
axes[2, 1].set_ylim(*sigma_lim)
axes[2, 1].set_title(rf"{water_var} vs $\sigma_m$")
# COLUMN 3: Moment relationships
# 7. M2 vs M4 (0,2)
_ = ds_stats_m2_m4["normalized"].plot.pcolormesh(
x="M2",
y="M4",
cmap=cmap,
norm=norm,
extend="max",
xscale="log",
yscale="log",
add_colorbar=False,
ax=axes[0, 2],
)
axes[0, 2].set_xlabel("") # Hide x labels except for bottom row
axes[0, 2].set_ylabel(r"M4 [m$^{-3}$ mm$^{4}$]", labelpad=moment_labelpad)
axes[0, 2].set_xlim(1, 10_000)
axes[0, 2].set_ylim(1, 40_000)
axes[0, 2].set_title(r"M2 vs M4")
# 8. M3 vs M6 (1,2)
_ = ds_stats_m3_m6["normalized"].plot.pcolormesh(
x="M3",
y="M6",
cmap=cmap,
norm=norm,
extend="max",
xscale="log",
yscale="log",
add_colorbar=False,
ax=axes[1, 2],
)
axes[1, 2].set_xlabel("") # Hide x labels except for bottom row
axes[1, 2].set_ylabel(r"M6 [m$^{-3}$ mm$^{6}$]", labelpad=moment_labelpad)
axes[1, 2].set_xlim(1, 10_000)
axes[1, 2].set_ylim(0.1, 1000_000)
axes[1, 2].set_title(r"M3 vs M6")
# 9. M2 vs M6 (2,2)
_ = ds_stats_m2_m6["normalized"].plot.pcolormesh(
x="M2",
y="M6",
cmap=cmap,
norm=norm,
extend="max",
xscale="log",
yscale="log",
add_colorbar=False,
ax=axes[2, 2],
)
axes[2, 2].set_xlabel(r"M* [m$^{-3}$ mm$^{*}$]")
axes[2, 2].set_ylabel(r"M6 [m$^{-3}$ mm$^{6}$]", labelpad=moment_labelpad)
axes[2, 2].set_xlim(1, 10_000)
axes[2, 2].set_ylim(0.1, 1000_000)
axes[2, 2].set_title(r"M2 vs M6")
# Remove x-axis ticks and ticklabels for all but bottom row
for i in range(2):
for j in range(3):
axes[i, j].set_xticklabels([])
axes[i, j].tick_params(axis="x", which="both", bottom=False)
# Add subplot titles as text in top left corner of each plot
title_bbox_dict = {
"facecolor": "white",
"alpha": 0.7,
"edgecolor": "none",
"pad": 1,
}
for ax in axes.flatten():
# Add text in top left corner with some padding
ax.text(
0.03,
0.95,
ax.get_title(),
transform=ax.transAxes,
fontsize=9,
# fontweight='bold',
ha="left",
va="top",
bbox=title_bbox_dict,
)
ax.set_title("")
# --------------------------------------
# Add colorbar
ax_cbar_parent = axes[0, 1]
# Create inset axis in axis-fraction coordinates
x_pos_cbar = 0.40
y_pos_cbar = 0.09
cbar_width = 0.55
cbar_height = 0.055
cbar_fontsize = 6
cax = ax_cbar_parent.inset_axes(
[
x_pos_cbar, # e.g. 0.45
y_pos_cbar, # e.g. 0.08
cbar_width, # e.g. 0.5
cbar_height, # e.g. 0.06
],
)
cb = fig.colorbar(
mappable,
cax=cax,
orientation="horizontal",
)
cb.ax.tick_params(labelsize=5)
cb.ax.tick_params(pad=-1)
# Label on top
cb.ax.xaxis.set_label_position("top")
if log_normalize:
cbar_label = r"$\frac{\log_{10}(\mathrm{count})}" r"{\max\left(\log_{10}(\mathrm{count})\right)}$"
else:
cbar_label = r"$\frac{\mathrm{count}}" r"{\max\left(\mathrm{count}\right)}$"
cb.set_label(
cbar_label,
fontsize=cbar_fontsize,
labelpad=6,
)
# -------------------------------------------------.
# Align labels
fig.align_xlabels()
fig.align_ylabels()
return fig
[docs]
def plot_dmax_relationships(df, diameter_bin_edges, dmax="Dmax", diameter_max=10, norm_vmax=None, dpi=300):
"""
Plot 2x2 subplots showing relationships between Dmax and precipitation parameters.
Parameters
----------
df : pandas.DataFrame
Input dataframe containing the precipitation data
dmax : str, optional
Column name for maximum diameter. Default is 'Dmax'.
vmax : float, optional
Maximum value for Dmax axis limits. Default is 10 mm.
dpi : int, optional
Resolution for the figure. The default is 300.
"""
# Compute 2D histograms
# - Dmax-R
ds_stats_dmax_r = compute_2d_histogram(
df,
x=dmax,
y="R",
x_bins=diameter_bin_edges,
y_bins=log_arange(0.1, 500, log_step=0.05, base=10),
)
# - Dmax-Nw
ds_stats_dmax_nw = compute_2d_histogram(
df,
x=dmax,
y="Nw",
x_bins=diameter_bin_edges,
y_bins=log_arange(10, 1_000_000, log_step=0.05, base=10),
)
# - Dmax-Nt
ds_stats_dmax_nt = compute_2d_histogram(
df,
x=dmax,
y="Nt",
x_bins=diameter_bin_edges,
y_bins=log_arange(1, 100_000, log_step=0.05, base=10),
)
# - Dmax-Dm
ds_stats_dmax_dm = compute_2d_histogram(
df,
x=dmax,
y="Dm",
variables=["R", "Nw", "sigma_m"],
x_bins=diameter_bin_edges,
y_bins=np.arange(0, 8, 0.05),
)
# Define vmax for counts
if norm_vmax:
norm_vmax = define_lognorm_max_value(ds_stats_dmax_r["count"].max().item())
# Define plotting parameters
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, norm_vmax)
# Create figure with 2x2 subplots
figsize = (6.9, 5.2)
# fig = plt.figure(figsize=figsize, dpi=dpi)
fig, axes = plt.subplots(
2,
2,
figsize=figsize,
dpi=dpi,
gridspec_kw={"hspace": 0.05, "wspace": 0.045},
)
axes = np.asarray(axes)
# - Dmax vs R (top-left)
ax1 = axes[0, 0]
p1 = ds_stats_dmax_r["count"].plot.pcolormesh(
x=dmax,
y="R",
cmap=cmap,
norm=norm,
extend="max",
yscale="log",
add_colorbar=False,
ax=ax1,
)
ax1.set_xlabel(r"$D_{max}$ [mm]")
ax1.set_ylabel(r"$R$ [mm h$^{-1}$]")
ax1.set_xlim(0.2, diameter_max)
ax1.set_ylim(0.1, 500)
# - Dmax vs Nw (top-right)
ax2 = axes[0, 1]
_ = ds_stats_dmax_nw["count"].plot.pcolormesh(
x=dmax,
y="Nw",
cmap=cmap,
norm=norm,
extend="max",
yscale="log",
add_colorbar=False,
ax=ax2,
)
ax2.set_xlabel(r"$D_{max}$ [mm]")
ax2.set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]")
ax2.set_xlim(0.2, diameter_max)
ax2.set_ylim(10, 1_000_000)
# - Dmax vs Nt (bottom-left)
ax3 = axes[1, 0]
_ = ds_stats_dmax_nt["count"].plot.pcolormesh(
x=dmax,
y="Nt",
cmap=cmap,
norm=norm,
extend="max",
yscale="log",
add_colorbar=False,
ax=ax3,
)
ax3.set_xlabel(r"$D_{max}$ [mm]")
ax3.set_ylabel(r"$N_t$ [m$^{-3}$]")
ax3.set_xlim(0.2, diameter_max)
ax3.set_ylim(1, 100_000)
# - Dmax vs Dm (bottom-right)
ax4 = axes[1, 1]
_ = ds_stats_dmax_dm["count"].plot.pcolormesh(
x=dmax,
y="Dm",
cmap=cmap,
norm=norm,
extend="max",
add_colorbar=False,
ax=ax4,
)
ax4.set_xlabel(r"$D_{max}$ [mm]")
ax4.set_ylabel(r"$D_m$ [mm]")
ax4.set_xlim(0.2, diameter_max)
ax4.set_ylim(0, 6)
# Remove xaxis labels and ticklables labels on first row
for ax in axes[0, :]: # First row (both columns)
ax.set_xlabel("")
ax.set_xticks([])
ax.set_xticklabels([])
ax.tick_params(axis="x", which="both", bottom=True, top=False, labelbottom=False)
# Move y-axis of second column to the right
for ax in axes[:, 1]: # Second column (both rows)
ax.yaxis.tick_right()
ax.yaxis.set_label_position("right")
# Add titles as legends in upper corners
title_bbox_dict = {
"facecolor": "white",
"alpha": 0.7,
"edgecolor": "none",
"pad": 1,
}
axes[0, 0].text(
0.05,
0.95,
r"$D_{max}$ vs $R$",
transform=axes[0, 0].transAxes,
fontsize=12,
verticalalignment="top",
bbox=title_bbox_dict,
)
axes[0, 1].text(
0.05,
0.95,
r"$D_{max}$ vs $N_w$",
transform=axes[0, 1].transAxes,
fontsize=12,
verticalalignment="top",
bbox=title_bbox_dict,
)
axes[1, 0].text(
0.05,
0.95,
r"$D_{max}$ vs $N_t$",
transform=axes[1, 0].transAxes,
fontsize=12,
verticalalignment="top",
bbox=title_bbox_dict,
)
axes[1, 1].text(
0.05,
0.95,
r"$D_{max}$ vs $D_m$",
transform=axes[1, 1].transAxes,
fontsize=12,
verticalalignment="top",
bbox=title_bbox_dict,
)
# Add colorbar as legend
cax = ax1.inset_axes([0.52, 0.15, 0.43, 0.06]) # [x0, y0, width, height]
cbar = fig.colorbar(
p1,
cax=cax,
orientation="horizontal",
extend="max",
)
cbar.set_label("Counts", fontsize=9, labelpad=2)
cbar.ax.tick_params(labelsize=8, pad=1)
cbar.ax.xaxis.set_label_position("top")
# Align labels
fig.align_xlabels()
fig.align_ylabels()
return fig
####-------------------------------------------------------------------
#### Radar plots
[docs]
def add_legend_powerlaw(ax, legend_str, legend_fontsize):
"""Add legend fitted powerlaw relationship to plot."""
legend_bbox_dict = {"facecolor": "white", "edgecolor": "black", "alpha": 0.7}
ax.text(
0.05,
0.955,
legend_str,
transform=ax.transAxes,
ha="left",
va="top",
fontsize=legend_fontsize,
bbox=legend_bbox_dict,
)
[docs]
def get_symbol_str(symbol, pol=""):
"""Generate symbol string with optional polarization subscript.
Parameters
----------
symbol : str
The base symbol (e.g., 'A', 'Z', 'z')
pol : str, optional
Polarization identifier (e.g., 'H', 'V')
Returns
-------
str
LaTeX formatted symbol string
"""
if pol:
return rf"{symbol}_{{\mathrm{{{pol}}}}}"
return symbol
[docs]
def plot_A_R(
df,
a,
r,
cmap=None,
norm=None,
add_colorbar=True,
add_fit=True,
pol="",
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
fit_linewidth=1,
fit_linestyle="dashed",
legend_fontsize=None,
xlabel_pad=None,
ylabel_pad=None,
):
"""Create a 2D histogram of A vs R."""
# Define a_min and a_max
a_min = 0.001
a_max = 10
a_bins = log_arange(a_min, a_max, log_step=0.025, base=10)
rlims = (0.1, 500)
r_bins = log_arange(*rlims, log_step=0.025, base=10)
# Compute 2D histogram
ds_stats = compute_2d_histogram(
df,
x=r,
y=a,
x_bins=r_bins,
y_bins=a_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Define ticks and ticklabels
r_ticks = [0.1, 1, 10, 50, 100, 500]
a_ticks = [0.001, 0.01, 0.1, 0.5, 1, 5] # Adapt on a_max
# Define A symbol
a_symbol = get_symbol_str("A", pol)
# Set default title if not provided
if title is None:
title = rf"${a_symbol}$ vs $R$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=r,
y=a,
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
xscale="log",
yscale="log",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$R$ [mm h$^{-1}$]", labelpad=xlabel_pad)
ax.set_ylabel(rf"${a_symbol}$ [dB km$^{{-1}}$]", labelpad=ylabel_pad)
ax.set_ylim(a_min, a_max)
ax.set_xlim(*rlims)
ax.set_xticks(r_ticks)
ax.set_xticklabels([str(v) for v in r_ticks])
ax.set_yticks(a_ticks)
ax.set_yticklabels([str(v) for v in a_ticks])
ax.set_title(title)
if add_fit:
try:
# Fit powerlaw k = a * R ** b
(a_c, b), _ = fit_powerlaw(x=df[r], y=df[a], xbins=r_bins, ybins=None, x_in_db=False)
# Define legend string
legend_str = define_powerlaw_legend_str("R", a_symbol, a_c, b)
# Get power law predictions
x_pred = np.arange(*rlims)
r_pred = predict_from_powerlaw(x_pred, a=a_c, b=b)
# Add fitted power law
ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_A_R: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_A_Z(
df,
a,
z,
cmap=None,
norm=None,
add_colorbar=True,
add_fit=True,
pol="",
title=None,
ax=None,
a_lim=(0.0001, 10),
z_lim=(0, 70),
figsize=(3.4, 3.0),
dpi=300,
fit_linewidth=1,
fit_linestyle="dashed",
legend_fontsize=None,
xlabel_pad=None,
ylabel_pad=None,
):
"""Create a 2D histogram of A vs Z."""
# Define bins
a_bins = log_arange(*a_lim, log_step=0.025, base=10)
z_bins = np.arange(*z_lim, 0.5)
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x=z,
y=a,
x_bins=z_bins,
y_bins=a_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
if norm is None:
norm = LogNorm(1, None)
# Ticks
a_ticks = [0.001, 0.01, 0.1, 0.5, 1, 5]
# Define symbols
a_symbol = get_symbol_str("A", pol)
z_symbol = get_symbol_str("Z", pol)
z_lower_symbol = get_symbol_str("z", pol)
# Set default title if not provided
if title is None:
title = rf"${a_symbol}$ vs ${z_symbol}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=z,
y=a,
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
xscale=None,
yscale="log",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad)
ax.set_ylabel(rf"${a_symbol}$ [dB km$^{{-1}}$]", labelpad=ylabel_pad)
ax.set_xlim(*z_lim)
ax.set_ylim(*a_lim)
ax.set_yticks(a_ticks)
ax.set_yticklabels([str(v) for v in a_ticks])
ax.set_title(title)
# Fit and plot the power law
if add_fit:
try:
# Fit powerlaw k = a * Z ** b (Z in dBZ -> x_in_db=True)
(a_c, b), _ = fit_powerlaw(
x=df[z],
y=df[a],
xbins=z_bins,
ybins=a_bins,
x_in_db=True,
)
# Define legend string
legend_str = define_powerlaw_legend_str(z_lower_symbol, a_symbol, a_c, b)
# Predictions
x_pred = np.arange(*z_lim)
x_pred_linear = disdrodb.idecibel(x_pred) # convert to linear for prediction
y_pred = predict_from_powerlaw(x_pred_linear, a=a_c, b=b)
ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_A_Z: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_A_KDP(
df,
a,
kdp,
log_a=True,
log_kdp=False,
a_lim=(0.001, 10),
kdp_lim=None,
pol="",
ax=None,
cmap=None,
norm=None,
add_colorbar=True,
add_fit=True,
title=None,
figsize=(3.4, 3.0),
dpi=300,
legend_fontsize=None,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of k(H/V) vs KDP."""
# Bins & limits for a
if log_a:
a_bins = log_arange(*a_lim, log_step=0.025, base=10)
yscale = "log"
a_ticks = [0.001, 0.01, 0.1, 0.5, 1, 5]
else:
a_bins = np.arange(a_lim[0], a_lim[1], 0.01)
yscale = None
a_ticks = None
# Bins & limits for KDP
if log_kdp:
kdp_lim = (0.05, 10) if kdp_lim is None else kdp_lim
kdp_bins = log_arange(*kdp_lim, log_step=0.05, base=10)
xscale = "log"
kdp_ticks = [0.05, 0.1, 0.5, 1, 5, 10]
else:
kdp_lim = (0, 8) if kdp_lim is None else kdp_lim
kdp_bins = np.arange(kdp_lim[0], kdp_lim[1], 0.1)
xscale = None
kdp_ticks = None
# Compute 2D histogram
ds_stats = compute_2d_histogram(
df,
x=kdp,
y=a,
x_bins=kdp_bins,
y_bins=a_bins,
)
# Colormap & norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
if norm is None:
norm = LogNorm(1, None)
# Define symbols
a_symbol = get_symbol_str("A", pol)
# Set default title if not provided
if title is None:
title = rf"${a_symbol}$ vs $K_{{\mathrm{{DP}}}}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=kdp,
y=a,
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
xscale=xscale,
yscale=yscale,
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$K_{\mathrm{DP}}$ [deg km$^{-1}$]", labelpad=xlabel_pad)
ax.set_ylabel(rf"${a_symbol}$ [dB km$^{{-1}}$]", labelpad=ylabel_pad)
ax.set_xlim(*kdp_lim)
ax.set_ylim(*a_lim)
if kdp_ticks is not None:
ax.set_xticks(kdp_ticks)
ax.set_xticklabels([str(v) for v in kdp_ticks])
if a_ticks is not None:
ax.set_yticks(a_ticks)
ax.set_yticklabels([str(v) for v in a_ticks])
ax.set_title(title)
# Fit and overlay power law: k = a * KDP^b
if add_fit:
try:
(a_c, b), _ = fit_powerlaw(
x=df[kdp],
y=df[a],
xbins=kdp_bins,
ybins=a_bins,
x_in_db=False,
)
# Define legend string
legend_str = define_powerlaw_legend_str("K_{\\mathrm{DP}}", a_symbol, a_c, b)
# Predictions along KDP axis
if log_kdp:
x_pred = np.logspace(np.log10(kdp_lim[0]), np.log10(kdp_lim[1]), 400)
else:
x_pred = np.arange(kdp_lim[0], kdp_lim[1], 0.05)
y_pred = predict_from_powerlaw(x_pred, a=a_c, b=b)
ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_A_KDP: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_R_Z(
df,
z,
r,
cmap=None,
norm=None,
add_colorbar=True,
add_fit=True,
pol="",
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
fit_linewidth=1,
fit_linestyle="dashed",
legend_fontsize=None,
xlabel_pad=None,
ylabel_pad=None,
):
"""Create a 2D histogram of Z vs R."""
# Define axis limits
z_lims = (0, 70)
r_lims = (0.1, 500)
z_bins = np.arange(0.01, 500, 0.5)
r_bins = log_arange(0.01, 500, log_step=0.05, base=10)
z_bins_fit = np.arange(0, 50, 1)
r_bins_fit = log_arange(0.01, 500, log_step=0.05, base=10)
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x=z,
y=r,
x_bins=z_bins,
y_bins=r_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Define rain ticks and ticklabels
r_ticks = [0.1, 1, 10, 50, 100, 500]
# Define symbols
z_symbol = get_symbol_str("Z", pol)
z_lower_symbol = get_symbol_str("z", pol)
# Set default title if not provided
if title is None:
title = rf"${z_symbol}$ vs $R$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=z,
y=r,
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
yscale="log",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_ylabel(r"$R$ [mm h$^{-1}$]", labelpad=ylabel_pad)
ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad)
ax.set_xlim(*z_lims)
ax.set_ylim(*r_lims)
ax.set_yticks(r_ticks)
ax.set_yticklabels([str(v) for v in r_ticks])
ax.set_title(title)
# Fit and plot the powerlaw
if add_fit:
try:
# Fit powerlaw R = a * z ** b
(a, b), _ = fit_powerlaw(x=df[z], y=df[r], xbins=z_bins_fit, ybins=r_bins_fit, x_in_db=True)
# Define legend string
legend_str = define_powerlaw_legend_str(z_lower_symbol, "R", a, b)
# Get power law predictions
x_pred = np.arange(*z_lims)
x_pred_linear = disdrodb.idecibel(x_pred)
r_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b)
# Add fitted powerlaw
ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_R_Z: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_R_KDP(
df,
kdp,
r,
kdp_lim=None,
r_lim=None,
cmap=None,
norm=None,
add_colorbar=True,
log_scale=False,
add_fit=True,
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
legend_fontsize=None,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of KDP vs R."""
# Define bins
if not log_scale:
kdp_lim = (0, 8) if kdp_lim is None else kdp_lim
r_lim = (0, 200) if r_lim is None else r_lim
xbins = np.arange(*kdp_lim, 0.1)
ybins = np.arange(*r_lim, 1)
xscale = None
yscale = None
else:
kdp_lim = (0.01, 10) if kdp_lim is None else kdp_lim
r_lim = (0.1, 500) if r_lim is None else r_lim
xbins = log_arange(*kdp_lim, log_step=0.05, base=10)
ybins = log_arange(*r_lim, log_step=0.05, base=10)
xscale = "log"
yscale = "log"
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x=kdp,
y=r,
x_bins=xbins,
y_bins=ybins,
# y_bins=log_arange(0.1, 500, log_step=0.05, base=10),
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Set default title if not provided
if title is None:
title = r"$K_{\mathrm{DP}}$ vs $R$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=kdp,
y=r,
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
xscale=xscale,
yscale=yscale,
extend="max",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_ylabel(r"$R$ [mm h$^{-1}$]", labelpad=ylabel_pad)
ax.set_xlabel(r"$K_{\mathrm{DP}}$ [deg km$^{-1}$]", labelpad=xlabel_pad)
ax.set_xlim(*kdp_lim)
ax.set_ylim(*r_lim)
ax.set_title(title)
# Fit and plot the power law
if add_fit:
try:
# Fit powerlaw R = a * KDP ** b
(a, b), _ = fit_powerlaw(x=df[kdp], y=df[r], xbins=xbins, ybins=ybins, x_in_db=False)
# Define legend string
legend_str = define_powerlaw_legend_str("K_{\\mathrm{DP}}", "R", a, b)
# Get power law predictions
x_pred = np.arange(*kdp_lim)
r_pred = predict_from_powerlaw(x_pred, a=a, b=b)
# Add fitted line
ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_R_KDP: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_ZDR_Z(
df,
z,
zdr,
zdr_lim=(0, 2.5),
z_lim=(0, 70),
cmap=None,
norm=None,
add_colorbar=True,
add_fit=False,
pol="",
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
legend_fontsize=None,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of Zdr vs Z."""
z_bins = np.arange(*z_lim, 0.5)
zdr_bins = np.arange(*zdr_lim, 0.025)
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x=z,
y=zdr,
x_bins=z_bins,
y_bins=zdr_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Define symbols
z_symbol = get_symbol_str("Z", pol)
z_lower_symbol = get_symbol_str("z", pol)
# Set default title if not provided
if title is None:
title = rf"$Z_{{\mathrm{{DR}}}}$ vs ${z_symbol}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=z,
y=zdr,
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad)
ax.set_ylabel(r"$Z_{DR}$ [dB]", labelpad=ylabel_pad)
ax.set_xlim(*z_lim)
ax.set_ylim(*zdr_lim)
ax.set_title(title)
# Fit and plot the power law
if add_fit:
try:
# Fit powerlaw ZDR = a * Z ** b
(a, b), _ = fit_powerlaw(
x=df[z],
y=df[zdr],
xbins=z_bins,
ybins=zdr_bins,
x_in_db=True,
)
# Define legend string
legend_str = define_powerlaw_legend_str(z_lower_symbol, "Z_{\\mathrm{DR}}", a, b)
# Get power law predictions
x_pred = np.arange(0, 70)
x_pred_linear = disdrodb.idecibel(x_pred)
r_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b)
# Add fitted line
ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_ZDR_Z: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_KDP_Z(
df,
kdp,
z,
z_lim=(0, 70),
log_kdp=False,
kdp_lim=None,
cmap=None,
norm=None,
add_colorbar=True,
add_fit=True,
pol="",
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
legend_fontsize=None,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of KDP vs Z."""
# Bins & limits
z_bins = np.arange(*z_lim, 0.5)
if log_kdp:
kdp_lim = (0.01, 10) if kdp_lim is None else kdp_lim
kdp_bins = log_arange(*kdp_lim, log_step=0.05, base=10)
yscale = "log"
kdp_ticks = [0.01, 0.1, 0.5, 1, 5, 10]
else:
kdp_lim = (0, 10) if kdp_lim is None else kdp_lim
kdp_bins = np.arange(*kdp_lim, 0.1)
yscale = None
kdp_ticks = None
# Compute 2D histogram
ds_stats = compute_2d_histogram(
df,
x=z,
y=kdp,
x_bins=z_bins,
y_bins=kdp_bins,
)
# Colormap & norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
if norm is None:
norm = LogNorm(1, None)
# Define symbols
z_symbol = get_symbol_str("Z", pol)
z_lower_symbol = get_symbol_str("z", pol)
# Set default title if not provided
if title is None:
title = rf"$K_{{\mathrm{{DP}}}}$ vs ${z_symbol}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=z,
y=kdp,
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
yscale=yscale,
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad)
ax.set_ylabel(r"$K_{\mathrm{DP}}$ [deg km$^{-1}$]", labelpad=ylabel_pad)
ax.set_xlim(*z_lim)
ax.set_ylim(*kdp_lim)
if kdp_ticks is not None:
ax.set_yticks(kdp_ticks)
ax.set_yticklabels([str(v) for v in kdp_ticks])
ax.set_title(title)
# Fit and overlay power law
if add_fit:
try:
# Fit: KDP = a * Z^b (Z in dBZ → x_in_db=True)
(a, b), _ = fit_powerlaw(
x=df[z],
y=df[kdp],
xbins=z_bins,
ybins=kdp_bins,
x_in_db=True,
)
# Define legend string
legend_str = define_powerlaw_legend_str(z_lower_symbol, "K_{\\mathrm{DP}}", a, b)
# Get power law predictions
x_pred = np.arange(*z_lim)
x_pred_linear = disdrodb.idecibel(x_pred)
y_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b)
# Add fitted power law
ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_KDP_Z: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_ADP_KDP_ZDR(
df,
adp,
kdp,
zdr,
y_lim=(0, 0.015),
zdr_lim=(0, 6),
cmap=None,
norm=None,
add_colorbar=True,
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
xlabel_pad=None,
ylabel_pad=None,
):
"""Create a 2D histogram of ADP/KDP vs ZDR.
References
----------
Ryzhkov, A., P. Zhang, and J. Hu, 2025.
Suggested Modifications for the S-Band Polarimetric Radar Rainfall Estimation Algorithm.
J. Hydrometeor., 26, 1053-1062. https://doi.org/10.1175/JHM-D-25-0014.1.
"""
# Compute ADP/KDP
df["ADP/KDP"] = df[adp] / df[kdp]
# Bins & limits
y_bins = np.arange(y_lim[0], y_lim[1], (y_lim[1] - y_lim[0]) / 200)
zdr_bins = np.arange(zdr_lim[0], zdr_lim[1] + 0.025, 0.025)
# Compute 2D histogram
ds_stats = compute_2d_histogram(
df,
x=zdr,
y="ADP/KDP",
x_bins=zdr_bins,
y_bins=y_bins,
)
# Colormap & norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
if norm is None:
norm = LogNorm(1, None)
# Set default title if not provided
if title is None:
title = r"$A_{\mathrm{DP}} / K_{\mathrm{DP}}$ vs $Z_{\mathrm{DR}}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=zdr,
y="ADP/KDP",
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$Z_{\mathrm{DR}}$ [dB]", labelpad=xlabel_pad)
ax.set_ylabel(r"$A_{\mathrm{DP}} / K_{\mathrm{DP}}$ [dB deg$^{{-1}}$]", labelpad=ylabel_pad)
ax.set_xlim(*zdr_lim)
ax.set_ylim(*y_lim)
ax.set_title(title)
return p
[docs]
def plot_A_KDP_ZDR(
df,
a,
kdp,
zdr,
y_lim=(0, 0.05),
zdr_lim=(0, 3),
cmap=None,
norm=None,
add_colorbar=True,
pol="",
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
xlabel_pad=None,
ylabel_pad=None,
):
"""Create a 2D histogram of k/KDP vs ZDR.
References
----------
Ryzhkov, A., P. Zhang, and J. Hu, 2025.
Suggested Modifications for the S-Band Polarimetric Radar Rainfall Estimation Algorithm.
J. Hydrometeor., 26, 1053-1062. https://doi.org/10.1175/JHM-D-25-0014.1.
"""
# Compute A/KDP
df["A/KDP"] = df[a] / df[kdp]
# Bins & limits
y_bins = np.arange(y_lim[0], y_lim[1], (y_lim[1] - y_lim[0]) / 200)
x_bins = np.arange(zdr_lim[0], zdr_lim[1] + 0.025, 0.025)
# Compute 2D histogram
ds_stats = compute_2d_histogram(
df,
x=zdr,
y="A/KDP",
x_bins=x_bins,
y_bins=y_bins,
)
# Colormap & norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
if norm is None:
norm = LogNorm(1, None)
# Define symbols
a_symbol = get_symbol_str("A", pol)
# Set default title if not provided
if title is None:
title = rf"${a_symbol} / K_{{\mathrm{{DP}}}}$ vs $Z_{{\mathrm{{DR}}}}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=zdr,
y="A/KDP",
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$Z_{\mathrm{DR}}$ [dB]", labelpad=xlabel_pad)
ax.set_ylabel(rf"${a_symbol} / K_{{\mathrm{{DP}}}}$ [dB/deg]", labelpad=ylabel_pad)
ax.set_xlim(*zdr_lim)
ax.set_ylim(*y_lim)
ax.set_title(title)
return p
[docs]
def plot_KDP_Z_ZDR(
df,
kdp,
z,
zdr,
y_lim=None,
zdr_lim=(0, 5),
z_linear=True,
cmap=None,
norm=None,
add_colorbar=True,
title=None,
ax=None,
figsize=(3.4, 3.0),
dpi=300,
xlabel_pad=None,
ylabel_pad=None,
):
"""Create a 2D histogram of (KDP/Z) vs ZDR with log-scale y-axis (no fit)."""
# Define y limits and KDP/Z
if z_linear:
df["KDP/Z"] = df[kdp] / disdrodb.idecibel(df[z])
y_lim = (1e-6, 1e-3) if y_lim is None else y_lim
y_label = r"$K_{\mathrm{DP}} / Z$ [deg km$^{-1}$ / mm$^6$ m$^{-3}$]"
else:
df["KDP/Z"] = df[kdp] / df[z]
y_lim = (1e-5, 1e-1) if y_lim is None else y_lim
y_label = r"$K_{\mathrm{DP}} / Z$ [deg km$^{-1}$ / dBZ]"
# Define bins
y_bins = log_arange(y_lim[0], y_lim[1], log_step=0.025, base=10)
x_bins = np.arange(zdr_lim[0], zdr_lim[1] + 0.025, 0.025)
# Compute 2D histogram
ds_stats = compute_2d_histogram(
df,
x=zdr,
y="KDP/Z",
x_bins=x_bins,
y_bins=y_bins,
)
# Colormap & norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
if norm is None:
norm = LogNorm(1, None)
# Set default title if not provided
if title is None:
title = r"$K_{\mathrm{DP}}/Z$ vs $Z_{\mathrm{DR}}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=zdr,
y="KDP/Z",
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
yscale="log",
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$Z_{\mathrm{DR}}$ [dB]", labelpad=xlabel_pad)
ax.set_ylabel(y_label, labelpad=ylabel_pad)
ax.set_xlim(*zdr_lim)
ax.set_ylim(*y_lim)
ax.set_title(title)
return p
[docs]
def plot_KED_R(
df,
log_r=True,
log_ked=False,
add_fit=True,
cmap=None,
norm=None,
add_colorbar=True,
title=None,
ax=None,
legend_fontsize=None,
figsize=(3.4, 3.0),
dpi=300,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of KED vs R."""
# R settings
if log_r:
r_bins = log_arange(0.1, 500, log_step=0.05, base=10)
r_lims = (0.1, 500)
r_ticks = [0.1, 1, 10, 50, 100, 500]
xscale = "log"
else:
r_bins = np.arange(0, 500, step=2)
r_lims = (0, 500)
r_ticks = None
xscale = "linear"
# KED settings
if log_ked:
ked_bins = log_arange(1, 100, log_step=0.025, base=10)
if log_r:
ked_lims = (2, 100)
ked_ticks = [5, 10, 50, 100]
else:
ked_lims = (1, 50)
ked_ticks = [1, 10, 50]
yscale = "log"
else:
ked_bins = np.arange(0, 50, step=1)
ked_lims = (0, 50)
ked_ticks = None
yscale = "linear"
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x="R",
y="KED",
x_bins=r_bins,
y_bins=ked_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Set default title if not provided
if title is None:
title = "KED vs R"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x="R",
y="KED",
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
xscale=xscale,
yscale=yscale,
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$R$ [mm h$^{-1}$]", labelpad=xlabel_pad)
ax.set_ylabel(r"KED [J m$^{-2}$ mm$^{-1}$]", labelpad=ylabel_pad)
ax.set_xlim(*r_lims)
ax.set_ylim(*ked_lims)
if r_ticks is not None:
ax.set_xticks(r_ticks)
ax.set_xticklabels([str(v) for v in r_ticks])
if ked_ticks is not None:
ax.set_yticks(ked_ticks)
ax.set_yticklabels([str(v) for v in ked_ticks])
ax.tick_params(axis="y", which="both", left=True)
ax.set_title(title)
# Fit and plot a powerlaw
if add_fit:
try:
# Fit a power law KED = a * R**b
(a, b), _ = fit_powerlaw(
x=df["R"],
y=df["KED"],
xbins=r_bins,
ybins=ked_bins,
x_in_db=False,
)
# Define legend string
legend_str = define_powerlaw_legend_str("R", "\\mathrm{KED}", a, b)
# Get power law predictions
x_pred = np.arange(r_lims[0], r_lims[1])
y_pred = predict_from_powerlaw(x_pred, a=a, b=b)
# Add fitted powerlaw
ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_KED_R: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_KEF_R(
df,
log_r=True,
log_kef=True,
add_fit=True,
cmap=None,
norm=None,
add_colorbar=True,
title=None,
ax=None,
legend_fontsize=None,
figsize=(3.4, 3.0),
dpi=300,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of KEF vs R."""
if log_r:
r_bins = log_arange(0.1, 500, log_step=0.05, base=10)
r_lims = (0.1, 500)
r_ticks = [0.1, 1, 10, 50, 100, 500]
xscale = "log"
else:
r_bins = np.arange(0, 500, step=2)
r_lims = (0, 500)
r_ticks = None
xscale = "linear"
if log_kef:
kef_bins = log_arange(0.1, 10_000, log_step=0.05, base=10)
kef_lims = (0.1, 10_000)
kef_ticks = [0.1, 1, 10, 100, 1000, 10000]
yscale = "log"
else:
kef_bins = np.arange(0, 5000, step=50)
kef_lims = (0, 5000)
kef_ticks = None
yscale = "linear"
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x="R",
y="KEF",
x_bins=r_bins,
y_bins=kef_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Set default title if not provided
if title is None:
title = "KEF vs R"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x="R",
y="KEF",
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
xscale=xscale,
yscale=yscale,
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$R$ [mm h$^{-1}$]", labelpad=xlabel_pad)
ax.set_ylabel(r"KEF [J m$^{-2}$ h$^{-1}$]", labelpad=ylabel_pad)
ax.set_xlim(*r_lims)
ax.set_ylim(*kef_lims)
if r_ticks is not None:
ax.set_xticks(r_ticks)
ax.set_xticklabels([str(v) for v in r_ticks])
if kef_ticks is not None:
ax.set_yticks(kef_ticks)
ax.set_yticklabels([str(v) for v in kef_ticks])
ax.set_title(title)
# Fit and plot the power law
# - Alternative fit model: a + I *(1 - b*exp(c*I)) (a is upper limit)
if add_fit:
try:
# Fit power law KEF = a * R ** b
(a, b), _ = fit_powerlaw(
x=df["R"],
y=df["KEF"],
xbins=r_bins,
ybins=kef_bins,
x_in_db=False,
)
# Define legend string
legend_str = define_powerlaw_legend_str("R", "\\mathrm{KEF}", a, b)
# Get power law predictions
x_pred = np.arange(*r_lims)
kef_pred = predict_from_powerlaw(x_pred, a=a, b=b)
# Add fitted powerlaw
ax.plot(x_pred, kef_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_KEF_R: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_KEF_Z(
df,
z="Z",
log_kef=True,
add_fit=True,
pol="",
cmap=None,
norm=None,
add_colorbar=True,
title=None,
ax=None,
legend_fontsize=None,
figsize=(3.4, 3.0),
dpi=300,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of KEF vs Z."""
# Define limits and bins
z_lims = (0, 70)
z_bins = np.arange(*z_lims, step=1)
if log_kef:
kef_lims = (0.1, 10_000)
kef_bins = log_arange(*kef_lims, log_step=0.05, base=10)
kef_ticks = [0.1, 1, 10, 100, 1000, 10000]
yscale = "log"
else:
kef_lims = (0, 5000)
kef_bins = np.arange(*kef_lims, step=50)
kef_ticks = None
yscale = "linear"
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x=z,
y="KEF",
x_bins=z_bins,
y_bins=kef_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Define symbols
z_symbol = get_symbol_str("Z", pol)
z_lower_symbol = get_symbol_str("z", pol)
# Set default title if not provided
if title is None:
title = rf"KEF vs ${z_symbol}$"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=z,
y="KEF",
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
yscale=yscale,
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(rf"${z_symbol}$ [dB]", labelpad=xlabel_pad)
ax.set_ylabel(r"KEF [$J$ m$^{-2}$ h$^{-1}$]", labelpad=ylabel_pad)
ax.set_xlim(*z_lims)
ax.set_ylim(*kef_lims)
if kef_ticks is not None:
ax.set_yticks(kef_ticks)
ax.set_yticklabels([str(v) for v in kef_ticks])
ax.set_title(title)
# Fit and plot the powerlaw
if add_fit:
try:
# Fit power law KEF = a * Z ** b
(a, b), _ = fit_powerlaw(
x=df[z],
y=df["KEF"],
xbins=z_bins,
ybins=kef_bins,
x_in_db=True,
)
# Define legend string
legend_str = define_powerlaw_legend_str(z_lower_symbol, "\\mathrm{KEF}", a, b)
# Get power law predictions
x_pred = np.arange(*z_lims)
x_pred_linear = disdrodb.idecibel(x_pred)
kef_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b)
# Add fitted powerlaw
ax.plot(x_pred, kef_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_KEF_Z: {e!s}", UserWarning, stacklevel=2)
return p
[docs]
def plot_TKE_Z(
df,
z="Z",
log_tke=True,
add_fit=True,
cmap=None,
norm=None,
add_colorbar=True,
title=None,
ax=None,
legend_fontsize=None,
figsize=(3.4, 3.0),
dpi=300,
xlabel_pad=None,
ylabel_pad=None,
fit_linewidth=1,
fit_linestyle="dashed",
):
"""Create a 2D histogram of TKE vs Z."""
z_bins = np.arange(0, 70, step=1)
z_lims = (0, 70)
if log_tke:
tke_bins = log_arange(0.01, 500, log_step=0.05, base=10)
tke_lims = (0.01, 200)
tke_ticks = [0.01, 0.1, 1, 10, 100, 200]
yscale = "log"
else:
tke_bins = np.arange(0, 200, step=1)
tke_lims = (0, 200)
tke_ticks = None
yscale = "linear"
# Compute 2d histogram
ds_stats = compute_2d_histogram(
df,
x=z,
y="TKE",
x_bins=z_bins,
y_bins=tke_bins,
)
# Define colormap and norm
if cmap is None:
cmap = plt.get_cmap("Spectral_r").copy()
cmap.set_under(alpha=0)
norm = LogNorm(1, None) if norm is None else norm
# Set default title if not provided
if title is None:
title = "TKE vs Z"
# Create figure if ax is None
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi)
# Plot 2D histogram
p = ds_stats["count"].plot.pcolormesh(
x=z,
y="TKE",
ax=ax,
cmap=cmap,
norm=norm,
add_colorbar=add_colorbar,
extend="max",
yscale=yscale,
cbar_kwargs={"label": "Counts"} if add_colorbar else {},
)
ax.set_xlabel(r"$Z$ [dB]", labelpad=xlabel_pad)
ax.set_ylabel(r"TKE [$J$ m$^{-2}$]", labelpad=ylabel_pad)
ax.set_xlim(*z_lims)
ax.set_ylim(*tke_lims)
if tke_ticks is not None:
ax.set_yticks(tke_ticks)
ax.set_yticklabels([str(v) for v in tke_ticks])
ax.set_title(title)
# Fit and plot the powerlaw
if add_fit:
try:
# Fit power law TKE = a * Z ** b
(a, b), _ = fit_powerlaw(
x=df[z],
y=df["TKE"],
xbins=z_bins,
ybins=tke_bins,
x_in_db=True,
)
# Define legend string
legend_str = define_powerlaw_legend_str("z", "\\mathrm{TKE}", a, b)
# Get power law predictions
x_pred = np.arange(*z_lims)
x_pred_linear = disdrodb.idecibel(x_pred)
y_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b)
# Add fitted powerlaw
ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black")
# Add legend
add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize)
except Exception as e:
warnings.warn(f"Could not fit power law in plot_TKE_Z: {e!s}", UserWarning, stacklevel=2)
return p
####-----------------------------------------------------------------------.
#### Radar and Kinetic Energy Summary figures
[docs]
def plot_radar_relationships(df, band):
"""Create 3x3 radar relationships figure formatted for AMT journal."""
if band not in {"X", "C", "S"}:
raise ValueError("Plotting function developed only for bands: 'X', 'C', 'S'.")
# --------------------------------------------------
# Styling parameters
# --------------------------------------------------
plt.rcParams.update(
{
"font.size": 8,
"axes.labelsize": 8,
"axes.titlesize": 8,
"xtick.labelsize": 7,
"ytick.labelsize": 7,
},
)
# Appearance
norm = LogNorm(1, None)
add_colorbar = False
legend_fontsize = 8
xlabel_pad = -0.5
ylabel_pad = -1
# --------------------------------------------------
# Define columns
# --------------------------------------------------
z = f"DBZH_{band}"
zdr = f"ZDR_{band}"
kdp = f"KDP_{band}"
a = f"AH_{band}"
adp = f"ADP_{band}"
adp_kdp_ylim_dict = {
"X": (0.0, 0.05),
"C": (0.0, 0.05),
"S": (0.0, 0.015),
}
a_ylim_dict = {
"S": (0.00001, 1),
"C": (0.0001, 10),
"X": (0.0001, 10),
}
adp_kdp_ylim = adp_kdp_ylim_dict[band]
a_ylim = a_ylim_dict[band]
# --------------------------------------------------
# Figure size
# --------------------------------------------------
fig = plt.figure(figsize=(6.9, 9), dpi=300)
gs = GridSpec(
4,
3,
figure=fig,
height_ratios=[1, 1, 1, 0.05],
hspace=0.38,
wspace=0.32,
left=0.085,
right=0.985,
top=0.975,
bottom=0.03,
)
axes = np.array([[fig.add_subplot(gs[i, j]) for j in range(3)] for i in range(3)])
# ==================================================
# Row 1
# ==================================================
p = plot_R_Z(
df,
z=z,
r="R",
pol="H",
norm=norm,
add_colorbar=add_colorbar,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[0, 0],
)
norm = p.norm
plot_R_KDP(
df,
kdp=kdp,
r="R",
log_scale=True,
norm=norm,
add_colorbar=add_colorbar,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[0, 1],
)
plot_KDP_Z(
df,
kdp=kdp,
z=z,
pol="H",
log_kdp=True,
norm=norm,
add_colorbar=add_colorbar,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[0, 2],
)
# ==================================================
# Row 2
# ==================================================
plot_A_Z(
df,
a=a,
z=z,
pol="H",
norm=norm,
add_colorbar=add_colorbar,
a_lim=a_ylim,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[1, 0],
)
plot_A_KDP(
df,
a=a,
kdp=kdp,
pol="H",
norm=norm,
add_colorbar=add_colorbar,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[1, 1],
)
plot_A_R(
df,
a=a,
r="R",
pol="H",
norm=norm,
add_colorbar=add_colorbar,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[1, 2],
)
# ==================================================
# Row 3
# ==================================================
plot_ZDR_Z(
df,
z=z,
zdr=zdr,
pol="H",
norm=norm,
add_fit=False,
add_colorbar=add_colorbar,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[2, 0],
)
plot_ADP_KDP_ZDR(
df,
adp=adp,
kdp=kdp,
zdr=zdr,
norm=norm,
add_colorbar=add_colorbar,
y_lim=adp_kdp_ylim,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[2, 1],
)
p = plot_KDP_Z_ZDR(
df,
kdp=kdp,
z=z,
zdr=zdr,
norm=norm,
add_colorbar=add_colorbar,
z_linear=False,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
ax=axes[2, 2],
)
# ==================================================
# Bottom colorbar (shortened)
# ==================================================
cax_full = fig.add_subplot(gs[3, :])
cax_full.axis("off")
cax = inset_axes(
cax_full,
width="40%",
height="100%",
loc="center",
)
cbar = fig.colorbar(
p,
cax=cax,
orientation="horizontal",
extend="max",
extendfrac=0.02,
)
cbar.set_label("Counts", labelpad=6)
cbar.ax.tick_params(labelsize=7)
cbar.ax.xaxis.set_label_position("top")
fig.align_xlabels()
fig.align_ylabels()
return fig
[docs]
def plot_kinetic_energy_relationships(df):
"""Create a 2x2 multipanel figure showing kinetic energy relationships (AMT format)."""
# --------------------------------------------------
# Plot styling
# --------------------------------------------------
plt.rcParams.update(
{
"font.size": 8,
"axes.labelsize": 8,
"axes.titlesize": 8,
"xtick.labelsize": 7,
"ytick.labelsize": 7,
"legend.fontsize": 8,
},
)
# Appearance
add_colorbar = False
norm = LogNorm(1, None)
legend_fontsize = 8
xlabel_pad = None
ylabel_pad = 2
# --------------------------------------------------
# Figure (A4 width - some margin)
# --------------------------------------------------
fig = plt.figure(figsize=(6.9, 5.8), dpi=300)
gs = GridSpec(
3,
2,
figure=fig,
height_ratios=[1, 1, 0.1],
hspace=0.0,
wspace=0.24,
left=0.10,
right=0.98,
top=0.97,
bottom=0.05,
)
# --------------------------------------------------
# Create axes
# --------------------------------------------------
ax11 = fig.add_subplot(gs[0, 0])
ax21 = fig.add_subplot(gs[1, 0])
ax12 = fig.add_subplot(gs[0, 1])
ax22 = fig.add_subplot(gs[1, 1])
# ==================================================
# ax11 — R vs KED
# ==================================================
p = plot_KED_R(
df,
log_ked=True,
log_r=True,
norm=norm,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
add_colorbar=add_colorbar,
title="",
ax=ax11,
)
norm = p.norm
# ==================================================
# ax12 — Z vs TKE
# ==================================================
plot_TKE_Z(
df,
z="Z",
norm=norm,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
add_colorbar=add_colorbar,
title="",
ax=ax12,
)
# ==================================================
# ax21 — R vs KEF
# ==================================================
plot_KEF_R(
df,
log_kef=True,
log_r=True,
norm=norm,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
add_colorbar=add_colorbar,
title="",
ax=ax21,
)
# ==================================================
# ax22 — Z vs KEF
# ==================================================
p_last = plot_KEF_Z(
df,
z="Z",
norm=norm,
legend_fontsize=legend_fontsize,
xlabel_pad=xlabel_pad,
ylabel_pad=ylabel_pad,
add_colorbar=add_colorbar,
title="",
ax=ax22,
)
# ==================================================
# Clean axis
# ==================================================
# Remove x-tick labels and xticks from first row
for ax in [ax11, ax12]:
ax.set_xticklabels([])
ax.set_xlabel("")
ax.tick_params(axis="x", which="both", left=False)
# Clear overlapping ticks between ax11 and ax21
ylim = ax21.get_ylim()
yticks = ax21.get_yticks()
yticks = yticks[(yticks <= 5000)]
ax21.set_yticks(yticks)
ax21.set_ylim(*ylim)
# Clear overlapping ticks between ax11 and ax21
ylim = ax22.get_ylim()
yticks = ax22.get_yticks()
yticks = yticks[(yticks <= 5000)]
ax22.set_yticks(yticks)
ax22.set_ylim(*ylim)
# Remove redundant labels
ax11.set_xlabel("")
ax12.set_xlabel("")
# ==================================================
# Colorbar inside ax21 (bottom-left)
# ==================================================
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
cax = inset_axes(
ax21,
width="65%", # length of colorbar
height="6%", # thickness
loc="lower right",
borderpad=2.4,
)
cbar = fig.colorbar(
p_last,
cax=cax,
orientation="horizontal",
extend="max",
extendfrac=0.05,
)
# Label above colorbar
cbar.ax.xaxis.set_label_position("top")
cbar.set_label("Counts", labelpad=4)
cbar.ax.tick_params(labelsize=7)
# -------------------------------------
# - Align labels
fig.align_xlabels()
fig.align_ylabels()
return fig
####-----------------------------------------------------------------------.
#### Summary routine
[docs]
def define_filename(prefix, extension, data_source, campaign_name, station_name, temporal_resolution):
"""Define filename for summary files."""
if extension in ["png", "jpeg"]:
filename = f"Figure.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}"
elif extension in ["csv", "pdf", "yaml", "yml"]:
filename = f"Table.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}"
elif extension in ["nc"]:
filename = f"Dataset.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}"
elif extension in ["parquet"]:
filename = f"Dataframe.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}"
else:
raise NotImplementedError(f"Standardized filename not implemented for extension {extension}.")
return filename
[docs]
def create_l2_dataframe(ds):
"""Create pandas Dataframe for L2 analysis."""
dims_to_drop = set(ds.dims).intersection({DIAMETER_DIMENSION, VELOCITY_DIMENSION, "crs"})
# - Drop array variables and convert to pandas
df = ds.drop_dims(dims_to_drop).to_pandas()
# - Drop coordinates
coords_to_drop = ["velocity_method", "sample_interval", *RADAR_OPTIONS]
df = df.drop(columns=coords_to_drop, errors="ignore")
# - Drop rows with missing rain
df = df[df["R"] > 0]
return df
[docs]
def prepare_summary_dataset(ds, velocity_method="theoretical_velocity", source="drop_number", minimum_rain_rate=0):
"""Prepare the L2E or L2M dataset to be converted to a dataframe."""
# Select fall velocity method
if "velocity_method" in ds.dims:
ds = ds.sel(velocity_method=velocity_method)
# Select first occurrence of radars options (except frequency)
for dim in RADAR_OPTIONS:
if dim in ds.dims and dim != "frequency":
ds = ds.isel({dim: 0})
# Select only one elevation angle
if "elevation_angle" in ds.dims:
ds = ds.isel(elevation_angle=0)
# Unstack frequency dimension
if "frequency" in ds.dims:
ds = unstack_radar_variables(ds)
# For kinetic energy variables, select source="drop_number"
if "source" in ds.dims:
ds = ds.sel(source=source)
# Select only timesteps with R > minimum_rain_rate
if "Rm" in ds: # in L2E
rainy_timesteps = np.logical_and(ds["Rm"].compute() > minimum_rain_rate, ds["R"].compute() > minimum_rain_rate)
else: # L2M without Rm
rainy_timesteps = ds["R"].compute() > minimum_rain_rate
ds = ds.isel(time=rainy_timesteps)
return ds
[docs]
def generate_station_summary(ds, summary_dir_path, data_source, campaign_name, station_name, temporal_resolution):
"""Generate station summary using L2E dataset."""
# Create summary directory if does not exist
os.makedirs(summary_dir_path, exist_ok=True)
####---------------------------------------------------------------------.
#### Prepare dataset
ds = prepare_summary_dataset(ds) # Select only rainy timesteps
# Ensure all data are in memory
ds = ds.compute()
# Filter dataset to remove noisy timesteps
# - Keep only timesteps with at least 3 Nbins to remove noise
# - Keep only timesteps with sigma_m >= 0.2
mask = (
(ds["Nbins"] >= 3)
& (ds["sigma_m"] >= 0.2)
& (ds["Nbins_missing_fraction"] <= 0.3)
& (ds["Dmin"] <= 1)
& (ds["R"] > 0.01)
)
valid_idx = np.where(mask)[0]
ds_filtered = ds.isel(time=valid_idx)
####---------------------------------------------------------------------.
#### Create drop spectrum figures and statistics
if VELOCITY_DIMENSION in ds_filtered.dims:
# Compute sum of raw and filtered spectrum over time
raw_drop_number = ds_filtered["raw_drop_number"].sum(dim="time")
drop_number = ds_filtered["drop_number"].sum(dim="time")
# Define theoretical and measured average velocity
if "time" in ds_filtered["fall_velocity"].dims:
theoretical_average_velocity = ds_filtered["fall_velocity"].mean(dim="time")
else:
theoretical_average_velocity = ds_filtered["fall_velocity"]
measured_average_velocity = get_drop_average_velocity(drop_number)
# Save raw and filtered spectrum over time & theoretical and measured average fall velocity
ds_stats = xr.Dataset()
ds_stats["raw_drop_number"] = raw_drop_number
ds_stats["drop_number"] = raw_drop_number
ds_stats["theoretical_average_velocity"] = theoretical_average_velocity
if measured_average_velocity is not None:
ds_stats["measured_average_velocity"] = measured_average_velocity
filename = define_filename(
prefix="SpectrumStats",
extension="nc",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
ds_stats.to_netcdf(os.path.join(summary_dir_path, filename))
# Create figures with raw and filtered spectrum
# - Raw
filename = define_filename(
prefix="SpectrumRaw",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig, ax = plt.subplots(1, 1, figsize=(3.4, 2.8), dpi=300)
p = plot_spectrum(raw_drop_number, ax=ax, title="Raw Drop Spectrum")
p.figure.tight_layout()
p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
# - Filtered
filename = define_filename(
prefix="SpectrumFiltered",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig, ax = plt.subplots(1, 1, figsize=(3.4, 2.8), dpi=300)
p = plot_spectrum(drop_number, ax=ax, title="Filtered Drop Spectrum")
p.figure.tight_layout()
p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
# Create figure comparing raw and filtered spectrum
filename = define_filename(
prefix="SpectrumSummary",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_raw_and_filtered_spectra(ds_filtered)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
####---------------------------------------------------------------------.
#### Create L2E dataframe
df = create_l2_dataframe(ds_filtered)
# Define diameter bin edges
diameter_bin_edges = get_diameter_bin_edges(ds_filtered)
# ---------------------------------------------------------------------.
#### Save L2E Parquet
l2e_parquet_filename = define_filename(
prefix="L2E",
extension="parquet",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
l2e_parquet_filepath = os.path.join(summary_dir_path, l2e_parquet_filename)
df.to_parquet(l2e_parquet_filepath, engine="pyarrow", compression="snappy")
#### ---------------------------------------------------------------------.
#### Create table with rain summary
if not temporal_resolution.startswith("ROLL"):
table_rain_summary = create_table_rain_summary(df, temporal_resolution=temporal_resolution)
table_rain_summary_filename = define_filename(
prefix="Station_Summary",
extension="yaml",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
table_rain_summary_filepath = os.path.join(summary_dir_path, table_rain_summary_filename)
write_yaml(table_rain_summary, filepath=table_rain_summary_filepath)
# ---------------------------------------------------------------------.
#### Create table with events summary
table_events_summary = None
if not temporal_resolution.startswith("ROLL"):
table_events_summary = create_table_events_summary(df, temporal_resolution=temporal_resolution)
if len(table_events_summary) > 0:
# - Save table as csv
table_events_summary_csv_filename = define_filename(
prefix="Events_Summary",
extension="csv",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
table_events_summary_csv_filepath = os.path.join(summary_dir_path, table_events_summary_csv_filename)
table_events_summary.to_csv(table_events_summary_csv_filepath)
# - Save table as pdf
if is_latex_engine_available():
# Sorted by time
table_events_summary_pdf_filename = table_events_summary_csv_filename.replace(".csv", ".pdf")
table_events_summary_pdf_filepath = os.path.join(summary_dir_path, table_events_summary_pdf_filename)
save_table_to_pdf(
df=prepare_latex_table_events_summary(table_events_summary),
filepath=table_events_summary_pdf_filepath,
index=True,
caption="Events Summary",
orientation="landscape",
)
# Sorted by maximum rain intensity
table_events_summary = table_events_summary.sort_values(by="max_R", ascending=False)
table_events_summary = table_events_summary.reset_index(drop=True)
table_events_summary_pdf_filename = define_filename(
prefix="Events_Summary_Sorted_By_Rmax",
extension="pdf",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
table_events_summary_pdf_filepath = os.path.join(summary_dir_path, table_events_summary_pdf_filename)
save_table_to_pdf(
df=prepare_latex_table_events_summary(table_events_summary),
filepath=table_events_summary_pdf_filepath,
index=True,
caption="Events Summary",
orientation="landscape",
)
# Sorted by Ptotal
table_events_summary = table_events_summary.sort_values(by="P_total", ascending=False)
table_events_summary = table_events_summary.reset_index(drop=True)
table_events_summary_pdf_filename = define_filename(
prefix="Events_Summary_Sorted_By_Ptot",
extension="pdf",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
table_events_summary_pdf_filepath = os.path.join(summary_dir_path, table_events_summary_pdf_filename)
save_table_to_pdf(
df=prepare_latex_table_events_summary(table_events_summary),
filepath=table_events_summary_pdf_filepath,
index=True,
caption="Events Summary",
orientation="landscape",
)
# ---------------------------------------------------------------------.
#### Create table with integral DSD parameters statistics
table_dsd_summary = create_table_dsd_summary(df)
# - Save table as csv
table_dsd_summary_csv_filename = define_filename(
prefix="DSD_Summary",
extension="csv",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
table_dsd_summary_csv_filepath = os.path.join(summary_dir_path, table_dsd_summary_csv_filename)
table_dsd_summary.to_csv(table_dsd_summary_csv_filepath)
# - Save table as pdf
if is_latex_engine_available():
table_dsd_summary_pdf_filename = table_dsd_summary_csv_filename.replace(".csv", ".pdf")
table_dsd_summary_pdf_filepath = os.path.join(summary_dir_path, table_dsd_summary_pdf_filename)
save_table_to_pdf(
df=prepare_latex_table_dsd_summary(table_dsd_summary),
index=True,
filepath=table_dsd_summary_pdf_filepath,
caption="DSD Summary",
orientation="portrait", # "landscape",
)
####---------------------------------------------------------------------.
#### Create events quicklooks (using unfiltered dataset !)
if table_events_summary is not None:
# Define number of quicklooks
n_quicklooks = 10
# Create quicklooks for Rmax
table_events_summary = table_events_summary.sort_values(by="max_R", ascending=False)
for i in range(min(n_quicklooks, len(table_events_summary))):
event_stats = table_events_summary.iloc[i]
if event_stats["duration"] >= 10: # minimum 10 minutes
ds_event = ds.sel(time=slice(event_stats["start_time"], event_stats["end_time"]))
ds_event["rain_type"] = bringi_nw_dm_classification(ds_event["Dm"], ds_event["Nw"])
# Plot quicklook
fig = ds_event.disdrodb.plot_dsd_quicklook(
# Plot layout
hours_per_slice=3,
max_rows=6,
aligned=False,
verbose=False,
cbar_as_legend=True,
precipitation_type="rain_type",
d_lim=(0.3, 7),
# Figure options
dpi=300,
)
# Create quicklooks directory
quicklook_dir = os.path.join(summary_dir_path, "Quicklooks_Rmax")
os.makedirs(quicklook_dir, exist_ok=True)
# Define quicklooks filename
rmax = event_stats["max_R"]
start_time_str = event_stats["start_time"].strftime("%Y%m%dT%H%M%S")
end_time_str = event_stats["end_time"].strftime("%Y%m%dT%H%M%S")
quicklook_filename = define_filename(
prefix=f"Quicklook_Rmax_{rmax:.0f}_{start_time_str}_{end_time_str}",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
quicklook_filepath = os.path.join(quicklook_dir, quicklook_filename)
fig.savefig(quicklook_filepath, bbox_inches="tight")
plt.close()
# Create quicklooks for P_total
table_events_summary = table_events_summary.sort_values(by="P_total", ascending=False)
for i in range(min(n_quicklooks, len(table_events_summary))):
event_stats = table_events_summary.iloc[i]
if event_stats["duration"] >= 10: # minimum 10 minutes
ds_event = ds.sel(time=slice(event_stats["start_time"], event_stats["end_time"]))
ds_event["rain_type"] = bringi_nw_dm_classification(ds_event["Dm"], ds_event["Nw"])
# Plot quicklook
fig = ds_event.disdrodb.plot_dsd_quicklook(
# Plot layout
hours_per_slice=3,
max_rows=6,
aligned=False,
verbose=False,
cbar_as_legend=True,
precipitation_type="rain_type",
d_lim=(0.3, 7),
# Figure options
dpi=300,
)
# Create quicklooks directory
quicklook_dir = os.path.join(summary_dir_path, "Quicklooks_Ptot")
os.makedirs(quicklook_dir, exist_ok=True)
# Define quicklooks filename
p_tot = event_stats["P_total"]
start_time_str = event_stats["start_time"].strftime("%Y%m%dT%H%M%S")
end_time_str = event_stats["end_time"].strftime("%Y%m%dT%H%M%S")
quicklook_filename = define_filename(
prefix=f"Quicklook_P_tot_{p_tot:.0f}_{start_time_str}_{end_time_str}",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
quicklook_filepath = os.path.join(quicklook_dir, quicklook_filename)
fig.savefig(quicklook_filepath, bbox_inches="tight")
plt.close()
# Remove unfiltered dataset (as not used anymore)
del ds
#### ---------------------------------------------------------------------.
#### Create L2E RADAR Summary Plots
if len(df) > 1000 or os.environ.get("PYTEST_CURRENT_TEST"):
# Summary plots at X, C, S bands
if "DBZH_X" in df:
filename = define_filename(
prefix="Radar_Band_X",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_radar_relationships(df, band="X")
fig.savefig(os.path.join(summary_dir_path, filename))
if "DBZH_C" in df:
filename = define_filename(
prefix="Radar_Band_C",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_radar_relationships(df, band="C")
fig.savefig(os.path.join(summary_dir_path, filename))
if "DBZH_S" in df:
filename = define_filename(
prefix="Radar_Band_S",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_radar_relationships(df, band="S")
fig.savefig(os.path.join(summary_dir_path, filename))
# ---------------------------------------------------------------------.
#### Create L2E Z-R figure
filename = define_filename(
prefix="Z-R",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
p = plot_R_Z(df, z="Z", r="R", pol="Rayleigh", title=r"$Z_{Rayleigh}$ vs $R$")
p.figure.tight_layout()
p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
#### ---------------------------------------------------------------------.
#### Create L2E Kinetic Energy Summary Plots
filename = define_filename(
prefix="KineticEnergy",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_kinetic_energy_relationships(df)
fig.savefig(os.path.join(summary_dir_path, filename))
#### ---------------------------------------------------------------------.
#### Create L2E DSD Parameters summary plots
#### - Create DSD parameters density figures with LWC
filename = define_filename(
prefix="DSD_Params_Density_with_LWC_LinearDm_MaxNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=False, lwc=True, log_normalize=False)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
filename = define_filename(
prefix="DSD_Params_Density_with_LWC_LogDm_MaxNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=True, lwc=True, log_normalize=False)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
filename = define_filename(
prefix="DSD_Params_Density_with_LWC_LinearDm_LogNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=False, lwc=True, log_normalize=True)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
filename = define_filename(
prefix="DSD_Params_Density_with_LWC_LogDm_LogNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=True, lwc=True, log_normalize=True)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
###------------------------------------------------------------------------.
#### - Create DSD parameters density figures with R
filename = define_filename(
prefix="DSD_Params_Density_with_R_LinearDm_MaxNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=False, lwc=False, log_normalize=False)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
filename = define_filename(
prefix="DSD_Params_Density_with_R_LogDm_MaxNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=True, lwc=False, log_normalize=False)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
filename = define_filename(
prefix="DSD_Params_Density_with_R_LinearDm_LogNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=False, lwc=False, log_normalize=True)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
filename = define_filename(
prefix="DSD_Params_Density_with_R_LogDm_LogNormalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_density(df, log_dm=True, lwc=False, log_normalize=True)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
###------------------------------------------------------------------------.
#### - Create DSD parameters relationship figures
filename = define_filename(
prefix="DSD_Params_Relations",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dsd_params_relationships(df, add_nt=True)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
###------------------------------------------------------------------------.
#### - Create Dmax relationship figures
filename = define_filename(
prefix="DSD_Dmax_Relations",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
fig = plot_dmax_relationships(df, diameter_bin_edges=diameter_bin_edges, dmax="Dmax", diameter_max=10)
fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
#### ---------------------------------------------------------------------.
#### Create L2E QC summary plots
# TODO:
####------------------------------------------------------------------------.
#### Free space - Remove df from memory
del df
gc.collect()
####------------------------------------------------------------------------.
#### Create N(D) densities
df_dsd = create_dsd_dataframe(ds_filtered)
#### - Plot N(D) vs D
filename = define_filename(
prefix="N(D)",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
p = plot_dsd_density(df_dsd, diameter_bin_edges=diameter_bin_edges)
p.figure.tight_layout()
p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
#### - Plot N(D)/Nw vs D/Dm
filename = define_filename(
prefix="N(D)_Normalized",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
p = plot_normalized_dsd_density(df_dsd)
p.figure.tight_layout()
p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
#### Free space - Remove df_dsd from memory
del df_dsd
gc.collect()
#### - Plot N(D) vs D with DenseLines
# Extract required variables and free memory
drop_number_concentration = ds_filtered["drop_number_concentration"].compute().copy()
r = ds_filtered["R"].compute().copy()
del ds_filtered
gc.collect()
# Create figure
filename = define_filename(
prefix="N(D)_DenseLines",
extension="png",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
p = plot_dsd_with_dense_lines(drop_number_concentration=drop_number_concentration, r=r)
p.figure.tight_layout()
p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight")
plt.close()
####------------------------------------------------------------------------.
#### Wrappers
[docs]
def create_station_summary(
data_source,
campaign_name,
station_name,
parallel=False,
data_archive_dir=None,
temporal_resolution="1MIN",
):
"""Create summary figures and tables for a DISDRODB station."""
# Print processing info
print(f"Creation of station summary for {data_source} {campaign_name} {station_name} has started.")
# Define station summary directory
summary_dir_path = define_station_dir(
product="SUMMARY",
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
data_archive_dir=data_archive_dir,
check_exists=False,
)
os.makedirs(summary_dir_path, exist_ok=True)
# Load L2E dataset
try:
ds = disdrodb.open_dataset(
data_archive_dir=data_archive_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
product="L2E",
temporal_resolution=temporal_resolution,
parallel=parallel,
chunks=-1,
compute=True,
)
except Exception as e:
print("Impossible to create the station summary." + str(e))
return
# Generate station summary figures and table
generate_station_summary(
ds=ds,
summary_dir_path=summary_dir_path,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
temporal_resolution=temporal_resolution,
)
print(f"Creation of station summary for {data_source} {campaign_name} {station_name} has terminated.")
# -------------------------------------------------------------------------------------------------.