Source code for disdrodb.summary.routines

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities to create summary statistics."""

import gc
import importlib
import os
import shutil
import subprocess
import tempfile
import warnings

import matplotlib.lines as mlines
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
from matplotlib.colors import ListedColormap, LogNorm, Normalize
from matplotlib.gridspec import GridSpec
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
from scipy.optimize import curve_fit

import disdrodb
from disdrodb.api.path import define_station_dir
from disdrodb.constants import DIAMETER_DIMENSION, VELOCITY_DIMENSION
from disdrodb.l2.empirical_dsd import bringi_nw_dm_classification, get_drop_average_velocity
from disdrodb.scattering import RADAR_OPTIONS
from disdrodb.utils.dataframe import compute_2d_histogram, log_arange
from disdrodb.utils.event import group_timesteps_into_event
from disdrodb.utils.manipulations import (
    get_diameter_bin_edges,
    resample_drop_number_concentration,
    unstack_radar_variables,
)
from disdrodb.utils.time import get_sampling_information
from disdrodb.utils.warnings import suppress_warnings
from disdrodb.utils.yaml import write_yaml
from disdrodb.viz.plots import (
    compute_dense_lines,
    max_blend_images,
    plot_raw_and_filtered_spectra,
    plot_spectrum,
    to_rgba,
)

####-----------------------------------------------------------------
#### PDF Latex Utilities


[docs] def is_latex_engine_available() -> bool: """ Determine whether the Tectonic TeX/LaTeX engine is installed and accessible. Returns ------- bool True if tectonic is found, False otherwise. """ return shutil.which("tectonic") is not None
[docs] def save_table_to_pdf( df: pd.DataFrame, filepath: str, index=True, caption=None, fontsize: str = r"\tiny", orientation: str = "landscape", ) -> None: r"""Render a pandas DataFrame as a well-formatted table in PDF via LaTeX. Parameters ---------- df : pandas.DataFrame The data to render. filepath : str File path where write the final PDF (e.g. '<...>/table.pdf'). caption : str, optional LaTeX caption for the table environment. fontsize : str, optional LaTeX font-size command to wrap the table (e.g. '\\small'). The default is '\\tiny'. orientation : str Page orientation. Allowed values are 'portrait' and 'landscape'. If 'landscape', the table will be laid out horizontally. The default is 'landscape'. """ from disdrodb.utils.cli import subprocess_run # Export table to LaTeX table_tex = df.to_latex( index=index, longtable=True, caption=caption, label=None, escape=False, ) # Define LaTeX document opts = "a4paper" doc = [ f"\\documentclass[{opts}]{{article}}", "\\usepackage[margin=0.1in]{geometry}", # Reduce column separation "\\setlength{\\tabcolsep}{3pt}", "\\usepackage{booktabs}", "\\usepackage{longtable}", "\\usepackage{caption}", "\\captionsetup[longtable]{font=tiny}", "\\usepackage{pdflscape}", "\\begin{document}", # Remove page numbers "\\pagestyle{empty}", ] if orientation.lower() == "landscape": doc.append("\\begin{landscape}") doc.append(f"{{{fontsize}\n{table_tex}\n}}") if orientation.lower() == "landscape": doc.append("\\end{landscape}") doc.append("\\end{document}") document = "\n".join(doc) # Compile with pdflatex in a temp dir with tempfile.TemporaryDirectory() as td: tex_path = os.path.join(td, "table.tex") with open(tex_path, "w", encoding="utf-8") as f: f.write(document) for _ in range(2): subprocess_run( [ "tectonic", "--outdir", td, tex_path, ], cwd=td, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) # Move result shutil.move(os.path.join(td, "table.pdf"), filepath)
####----------------------------------------------------------------- #### Tables summaries
[docs] def create_table_rain_summary(df, temporal_resolution): """Create rainy table summary.""" # Initialize dictionary table = {} # Retrieve accumulation interval accumulation_interval, _ = get_sampling_information(temporal_resolution) accumulation_interval_minutes = accumulation_interval / 60 # Keep rows with R > 0 df = df[df["R"] > 0] # Number of years, months, days, minutes if df.index.name == "time": df = df.reset_index() time = np.sort(np.asanyarray(df["time"])) # Define start_time and end_time start_time = pd.Timestamp(time[0]) end_time = pd.Timestamp(time[-1]) # Define years and years-month coverage start_year = start_time.year start_month = start_time.month_name() end_year = end_time.year end_month = end_time.month_name() if start_year == end_year: years_coverage = f"{start_year}" years_month_coverage = f"{start_month[0:3]}-{end_month[0:3]} {start_year}" else: years_coverage = f"{start_year} - {end_year}" years_month_coverage = f"{start_month[0:3]} {start_year} - {end_month[0:3]} {end_year}" table["years_coverage"] = years_coverage table["years_month_coverage"] = years_month_coverage # Rainy minutes statistics table["n_rainy_minutes"] = len(df["R"]) * accumulation_interval_minutes table["n_rainy_minutes_<0.1"] = ( df["R"].between(0, 0.1, inclusive="right").sum().item() * accumulation_interval_minutes ) table["n_rainy_minutes_0.1_1"] = ( df["R"].between(0.1, 1, inclusive="right").sum().item() * accumulation_interval_minutes ) table["n_rainy_minutes_1_10"] = ( df["R"].between(1, 10, inclusive="right").sum().item() * accumulation_interval_minutes ) table["n_rainy_minutes_10_25"] = ( df["R"].between(10, 25, inclusive="right").sum().item() * accumulation_interval_minutes ) table["n_rainy_minutes_25_50"] = ( df["R"].between(25, 50, inclusive="right").sum().item() * accumulation_interval_minutes ) table["n_rainy_minutes_50_100"] = ( df["R"].between(50, 100, inclusive="right").sum().item() * accumulation_interval_minutes ) table["n_rainy_minutes_100_200"] = ( df["R"].between(100, 200, inclusive="right").sum().item() * accumulation_interval_minutes ) table["n_rainy_minutes_>200"] = np.sum(df["R"] > 200).item() * accumulation_interval_minutes # Minutes with larger Dmax table["n_minutes_Dmax_>7"] = np.sum(df["Dmax"] > 7).item() * accumulation_interval_minutes table["n_minutes_Dmax_>8"] = np.sum(df["Dmax"] > 8).item() * accumulation_interval_minutes table["n_minutes_Dmax_>9"] = np.sum(df["Dmax"] > 9).item() * accumulation_interval_minutes # Convert all minutes columns to integer for key in table: if "minutes" in key: table[key] = int(table[key]) return table
[docs] def create_table_dsd_summary(df): """Create table with integral DSD parameters statistics.""" # Define additional variables df["log10(Nw)"] = np.log10(df["Nw"]) df["log10(Nt)"] = np.log10(df["Nt"]) # List of variables to summarize variables = ["LWC", "R", "Z", "D50", "Dm", "sigma_m", "Dmax", "Nw", "log10(Nw)", "Nt", "log10(Nt)"] # Define subset dataframe df_subset = df[variables] # Prepare summary DataFrame stats_cols = [ "MIN", "Q1", "Q5", "Q10", "Q50", "Q90", "Q95", "Q99", "MAX", "MEAN", "STD", "MAD", "SKEWNESS", "KURTOSIS", ] df_stats = pd.DataFrame(index=variables, columns=stats_cols) # Compute DSD integral parameters statistics df_stats["MIN"] = df_subset.min() df_stats["Q1"] = df_subset.quantile(0.01) df_stats["Q5"] = df_subset.quantile(0.05) df_stats["Q10"] = df_subset.quantile(0.10) df_stats["Q50"] = df_subset.median() df_stats["Q90"] = df_subset.quantile(0.90) df_stats["Q95"] = df_subset.quantile(0.95) df_stats["Q99"] = df_subset.quantile(0.99) df_stats["MAX"] = df_subset.max() df_stats["MEAN"] = df_subset.mean() df_stats["STD"] = df_subset.std() df_stats["MAD"] = df_subset.apply(lambda x: np.mean(np.abs(x - x.mean()))) df_stats["SKEWNESS"] = df_subset.skew() df_stats["KURTOSIS"] = df_subset.kurt() # Round float columns to nearest integer, leave ints unchanged float_cols = df_stats.select_dtypes(include=["float"]).columns df_stats[float_cols] = df_stats[float_cols].astype(float).round(decimals=2) return df_stats
[docs] def create_table_events_summary(df, temporal_resolution): """Create table with events statistics.""" # Retrieve accumulation interval accumulation_interval, _ = get_sampling_information(temporal_resolution) accumulation_interval_minutes = accumulation_interval / 60 # Define event settings # - Events are separated by 1 hour or more rain-free periods in rain rate time series. # - The events that are less than 'min_duration' minutes or the rain total is less than 0.1 mm # are not reported. if accumulation_interval_minutes >= 5 * 60: neighbor_time_interval = temporal_resolution event_min_duration = temporal_resolution neighbor_min_size = 1 else: neighbor_time_interval = "5MIN" event_min_duration = "5MIN" neighbor_min_size = 2 event_settings = { "neighbor_min_size": neighbor_min_size, "neighbor_time_interval": neighbor_time_interval, "event_max_time_gap": "1H", "event_min_duration": event_min_duration, "event_min_size": 3, } # Keep rows with R > 0 df = df[df["R"] > 0] # Number of years, months, days, minutes if df.index.name == "time": df = df.reset_index() timesteps = np.sort(np.asanyarray(df["time"])) # Define event list event_list = group_timesteps_into_event( timesteps=timesteps, neighbor_min_size=event_settings["neighbor_min_size"], neighbor_time_interval=event_settings["neighbor_time_interval"], event_max_time_gap=event_settings["event_max_time_gap"], event_min_duration=event_settings["event_min_duration"], event_min_size=event_settings["event_min_size"], ) # Create dataframe with statistics for each event events_stats = [] rain_thresholds = [0.1, 1, 5, 10, 20, 50, 100] for event in event_list: # Retrieve event start_time and end_time start, end = event["start_time"], event["end_time"] # Retrieve event dataframe df_event = df[(df["time"] >= start) & (df["time"] <= end)] # Initialize event record event_stats = { # Event time info "start_time": start, "end_time": end, "duration": int((end - start) / np.timedelta64(1, "m")) + accumulation_interval_minutes, # Rainy minutes above thresholds **{ f"rainy_minutes_>{thr}": int((df_event["R"] > thr).sum()) * accumulation_interval_minutes for thr in rain_thresholds }, # Total precipitation (mm) "P_total": df_event["P"].sum(), # R statistics "mean_R": df_event["R"].mean(), "median_R": df_event["R"].median(), "max_R": df_event["R"].max(), # DSD statistics "max_Dmax": df_event["Dmax"].max(), "mean_Dm": df_event["Dm"].mean(), "median_Dm": df_event["Dm"].median(), "max_Dm": df_event["Dm"].max(), "mean_sigma_m": df_event["sigma_m"].mean(), "median_sigma_m": df_event["sigma_m"].median(), "max_sigma_m": df_event["sigma_m"].max(), "mean_LWC": df_event["LWC"].mean(), "median_LWC": df_event["LWC"].median(), "max_LWC": df_event["LWC"].max(), "max_Z": df_event["Z"].max(), "mean_Nbins": int(df_event["Nbins"].mean()), "max_Nbins": int(df_event["Nbins"].max()), # TODO in future: # - rain_detected = True # - snow_detected = True # - hail_detected = True } events_stats.append(event_stats) df_events = pd.DataFrame.from_records(events_stats) # Round and cast columns # - Round to 3 decimals df_events["P_total"] = df_events["P_total"].round(decimals=3) lwc_columns = [c for c in df_events.columns if "LWC" in c] df_events[lwc_columns] = df_events[lwc_columns].round(decimals=3) # - Cast to integer df_events["duration"] = df_events["duration"].astype(int) minutes_columns = [c for c in df_events.columns if "minutes" in c] df_events[minutes_columns] = df_events[minutes_columns].astype(int) # - Round to 2 decimals for var in ["Dmax", "Dm", "sigma_m", "R", "Z"]: names = [c for c in df_events.columns if var in c] df_events[names] = df_events[names].round(decimals=2) return df_events
[docs] def prepare_latex_table_dsd_summary(df): """Prepare a DataFrame with DSD statistics for LaTeX table output.""" df = df.copy() # Cast numeric columns to string numeric_cols = df.select_dtypes(include=["float", "int"]).columns df[numeric_cols] = df[numeric_cols].astype(str) # Rename rename_dict = { "LWC": r"$LWC\,[\mathrm{g}\,\mathrm{m}^{-3}]$", # [g/m3] "R": r"$R\,[\mathrm{mm}\,\mathrm{h}^{-1}]$", # [mm/hr] "Z": r"$Z\,[\mathrm{dBZ}]$", # [dBZ] "D50": r"$D_{50}\,[\mathrm{mm}]$", # [mm] "Dm": r"$D_{m}\,[\mathrm{mm}]$", # [mm] "sigma_m": r"$\sigma_{m}\,[\mathrm{mm}]$", # [mm] "Dmax": r"$D_{\max}\,[\mathrm{mm}]$", # [mm] "Nw": r"$N_{w}\,[\mathrm{mm}^{-1}\,\mathrm{m}^{-3}]$", # [mm$^{-1}$ m$^{-3}$] "log10(Nw)": r"$\log_{10}(N_{w})$", # [$\log_{10}$(mm$^{-1}$ m$^{-3}$)] "Nt": r"$N_{t}\,[\mathrm{m}^{-3}]$", # [m$^{-3}$] "log10(Nt)": r"$\log_{10}(N_{t})$", # [log10(m$^{-3}$)] } df = df.rename(index=rename_dict) return df
[docs] def prepare_latex_table_events_summary(df): """Prepare a DataFrame with events statistics for LaTeX table output.""" df = df.copy() # Round datetime to minutes df["start_time"] = df["start_time"].dt.strftime("%Y-%m-%d %H:%M") df["end_time"] = df["end_time"].dt.strftime("%Y-%m-%d %H:%M") # Cast numeric columns to string numeric_cols = df.select_dtypes(include=["float", "int"]).columns df[numeric_cols] = df[numeric_cols].astype(str) # Rename rename_dict = { "start_time": r"Start", "end_time": r"End", "duration": r"Min.", "rainy_minutes_>0.1": r"R>0.1", "rainy_minutes_>1": r"R>1", "rainy_minutes_>5": r"R>5", # 'rainy_minutes_>10': r'R>10', # 'rainy_minutes_>20': r'R>20', "rainy_minutes_>50": r"R>50", # 'rainy_minutes_>100': r'R>100', "P_total": r"$P_{\mathrm{tot}} [mm]$", "mean_R": r"$R_{\mathrm{mean}}$", "median_R": r"$R_{\mathrm{median}}$", "max_R": r"$R_{\max}$", "max_Dmax": r"$D_{\max}$", "mean_Dm": r"$D_{m,\mathrm{mean}}$", "median_Dm": r"$D_{m,\mathrm{median}}$", "max_Dm": r"$D_{m,\max}$", "mean_sigma_m": r"$\sigma_{m,\mathrm{mean}}$", "median_sigma_m": r"$\sigma_{m,\mathrm{median}}$", "max_sigma_m": r"$\sigma_{m,\max}$", "mean_LWC": r"$LWC_{\mathrm{mean}}$", "median_LWC": r"$LWC_{\mathrm{median}}$", "max_LWC": r"$LWC_{\max}$", "max_Z": r"$Z_{\max}$", "mean_Nbins": r"$N_{\mathrm{bins},\mathrm{mean}}$", "max_Nbins": r"$N_{\mathrm{bins},\max}$", } df = df[list(rename_dict)] df = df.rename(columns=rename_dict) return df
[docs] def create_table_l1_event_summary( ds, variable="n_particles", threshold=4, neighbor_min_size=2, neighbor_time_interval="5MIN", event_max_time_gap="1H", event_min_duration="20MIN", event_min_size=5, sortby="duration", sortby_order="decreasing", ): """Return a table with the hydrometeor statistics for each event.""" # Load in memory needed variables ds["precipitation_type"] = ds["precipitation_type"].compute() ds["hydrometeor_type"] = ds["hydrometeor_type"].compute() ds["n_particles"] = ds["n_particles"].compute() ds["flag_hail"] = ds["flag_hail"].compute() # Compute stats event_stats = [] for ds_event in ds.disdrodb.split_into_events( variable=variable, threshold=threshold, neighbor_min_size=neighbor_min_size, neighbor_time_interval=neighbor_time_interval, event_max_time_gap=event_max_time_gap, event_min_duration=event_min_duration, event_min_size=event_min_size, sortby=sortby, sortby_order=sortby_order, ): fraction_precip = np.sum(ds_event["precipitation_type"] >= 0) / len(ds_event["time"]) n_rainy = np.sum(ds_event["precipitation_type"] == 0) n_snow = np.sum(ds_event["precipitation_type"] == 1) n_mixed = np.sum(ds_event["precipitation_type"] == 2) n_precip = np.sum(ds_event["precipitation_type"] >= 0) n_undefined = np.sum(ds_event["precipitation_type"] == -2) n_graupel = np.sum(ds_event["hydrometeor_type"] == 8) n_small_hail = np.sum(ds_event["flag_hail"] == 1) n_large_hail = np.sum(ds_event["flag_hail"] == 2) fraction_rain = n_rainy / n_precip fraction_snow = n_snow / n_precip fraction_mixed = n_mixed / n_precip fraction_undefined = n_undefined / n_precip start_time = str(ds_event.disdrodb.start_time) end_time = str(ds_event.disdrodb.end_time) total_particles = np.sum(ds_event["n_particles"]) duration_hours = ds_event["duration"].astype(int) / 60 / 60 event_stats.append( { "start_time": start_time, "end_time": end_time, "duration_hours": duration_hours, "total_particles": total_particles, "n_rainy": n_rainy.item(), "n_snow": n_snow.item(), "n_mixed": n_mixed.item(), "n_precip": n_precip.item(), "n_undefined": n_undefined.item(), "n_small_hail": n_small_hail.item(), "n_large_hail": n_large_hail.item(), "n_graupel": n_graupel.item(), "fraction_precip": fraction_precip.item(), "fraction_rain": fraction_rain.item(), "fraction_snow": fraction_snow.item(), "fraction_mixed": fraction_mixed.item(), "fraction_undefined": fraction_undefined.item(), }, ) df_events = pd.DataFrame(event_stats).reset_index(drop=True) return df_events
####------------------------------------------------------------------- #### Powerlaw routines
[docs] def apply_2d_outlier_removal(df_data, xbins, ybins, min_counts): """Remove observations that fall in 2D histogram bins having less than min_counts.""" df_data["x_bin"] = pd.cut(df_data["x"], bins=xbins, right=False) df_data["y_bin"] = pd.cut(df_data["y"], bins=ybins, right=False) # Remove values outside bins df_data = df_data[(df_data["x_bin"].cat.codes != -1) & (df_data["y_bin"].cat.codes != -1)] # Count per 2D bin counts_2d = df_data.groupby(["x_bin", "y_bin"], observed=True).size().reset_index(name="n") # Keep only dense cells dense_cells = counts_2d[counts_2d["n"] >= min_counts] # Merge back to original data df_data = df_data.merge( dense_cells[["x_bin", "y_bin"]], on=["x_bin", "y_bin"], how="inner", ) return df_data
[docs] def fit_powerlaw(x, y, xbins, ybins, quantile=0.5, min_counts=10, x_in_db=False, robust=True, quadratic=False): r"""Fit a power-law relationship to binned quantile values. This function bins ``x`` into intervals defined by ``xbins``, computes a specified quantile of ``y`` in each bin (median by default, robust to outliers), and fits a power-law model using either the RANSAC algorithm (if robust=True) or weighted nonlinear least squares. Optionally, ``x`` can be converted from decibel units to linear scale automatically before fitting. The fitted model depends on the ``quadratic`` parameter: - If ``quadratic=False`` (default), fit a classical power law: .. math:: y = a x^b - If ``quadratic=True``, fit a log-quadratic power law: .. math:: y = a x^b 10^{c (\log_{10} x)^2} Parameters ---------- x : array-like Independent variable values. Must be positive and finite after filtering. y : array-like Dependent variable values. Must be positive and finite after filtering. xbins : array-like Bin edges for grouping ``x`` values (passed to ``pandas.cut``). ybins : array-like, optional Bin edges for 2D histogram outlier removal. If provided, removes observations in 2D bins with fewer than ``min_counts`` samples before fitting. quantile : float, optional Quantile of ``y`` to compute in each bin (between 0 and 1). For example: 0.5 = median, 0.25 = lower quartile, 0.75 = upper quartile. Default is 0.5 (median). min_counts : int, optional Minimum number of observations required in each bin for it to be included in the fit. Default is 10. x_in_db : bool, optional If True, converts ``x`` values from decibels (dB) to linear scale using :func:`disdrodb.idecibel`. Default is False. robust : bool, optional Whether to fit the power law using the Random Sample Consensus (RANSAC) algorithm (robust=True) or using weighted nonlinear least squares solved with the Levenberg-Marquardt algorithm (robust=False). Default is True. To fit with RANSAC, scikit-learn must be installed. quadratic : bool, optional If False (default), fit a classical power law :math:`y = a x^b` in log-log space. If True, fit a log-quadratic power law :math:`y = a x^b 10^{c (\log_{10} x)^2}`. Default is False. Returns ------- params : tuple of float Estimated parameters ``(a, b)`` of the power-law relationship. If quadratic=True, returns ``(a, b, c)`` for the log-quadratic power law. params_std : tuple of float One standard deviation uncertainties ``(a_std, b_std)`` estimated from the covariance matrix of the fit. If quadratic=True, returns ``(a_std, b_std, c_std)``. Parameters standard deviation is currently not available when fitting with the RANSAC algorithm (returns None values). Notes ----- - This implementation uses quantile statistics within bins, which reduces the influence of outliers. - Both ``x`` and ``y`` are filtered to retain only positive, finite values before binning. - Fitting is performed on the bin centers (midpoints between bin edges). - The median absolute deviation (MAD) is used to estimate uncertainties for weighted nonlinear least squares fitting. See Also -------- predict_from_powerlaw : Predict values from the fitted power-law parameters. predict_from_logquadratic_powerlaw : Predict values from the log-quadratic power law. inverse_powerlaw_parameters : Compute parameters of the inverse power law. Examples -------- >>> import numpy as np >>> x = np.linspace(1, 50, 200) >>> y = 2 * x**1.5 + np.random.normal(0, 5, size=x.size) >>> xbins = np.arange(0, 60, 5) >>> (a, b), (a_std, b_std) = fit_powerlaw(x, y, xbins, ybins=None) """ # Set min_counts to 0 during pytest execution in order to test the summary routine if os.environ.get("PYTEST_CURRENT_TEST"): min_counts = 0 # Check if RANSAC algorithm is available sklearn_available = importlib.util.find_spec("sklearn") is not None if robust and not sklearn_available: robust = False # Ensure numpy array x = np.asanyarray(x) y = np.asanyarray(y) # Ensure values > 0 and finite valid_values = (x > 0) & (y > 0) & np.isfinite(x) & np.isfinite(y) x = x[valid_values] y = y[valid_values] # Define dataframe df_data = pd.DataFrame({"x": x, "y": y}) # Outlier removal in 2D histogram if ybins is not None: df_data = apply_2d_outlier_removal(df_data, xbins=xbins, ybins=ybins, min_counts=min_counts) # Alternative code # from disdrodb.utils.dataframe import compute_1d_histogram # df_agg = compute_1d_histogram( # df=df_data, # column="x", # variables="y", # bins=xbins, # prefix_name=True, # include_quantiles=False # ) # df_agg["count"] # instead of N # - Keep only data within bin range df_data = df_data[(df_data["x"] >= xbins[0]) & (df_data["x"] < xbins[-1])] # - Define bins df_data["x_bins"] = pd.cut(df_data["x"], bins=xbins, right=False) # - Remove data outside specified bins df_data = df_data[df_data["x_bins"].cat.codes != -1] # Derive median y values at various x points # - This typically remove outliers df_agg = df_data.groupby(by="x_bins", observed=True)["y"].agg( y=lambda s: s.quantile(quantile), n="count", mad=lambda s: (s - s.median()).abs().median(), ) df_agg["x"] = np.array([iv.left + (iv.right - iv.left) / 2 for iv in df_agg.index]) # Keep only data where y > 0 df_agg = df_agg[df_agg["y"] > 0] # If input is in decibel, convert to linear scale if x_in_db: df_agg["x"] = disdrodb.idecibel(df_agg["x"]) # Remove bins with less than n counts df_agg = df_agg[df_agg["n"] > min_counts] if len(df_agg) < 5: raise ValueError("Not enough data to fit a power law.") # Estimate standard deviation for WNLS # - Ensure is not 0 (can occur if within bins all same value) sigma = 1.4826 * df_agg["mad"] sigma_lb = np.percentile(sigma[sigma > 0], 1) # or small fraction sigma = np.maximum(sigma, sigma_lb) # Fit the data with suppress_warnings(): if not quadratic: if robust: return fit_powerlaw_with_ransac(x=df_agg["x"], y=df_agg["y"]) return fit_powerlaw_with_wnls(x=df_agg["x"], y=df_agg["y"], sigma=sigma) # quadratic=True if robust: return fit_logquadratic_powerlaw_with_ransac(df_agg["x"], y=df_agg["y"]) return fit_logquadratic_powerlaw_with_wnls( x=df_agg["x"], y=df_agg["y"], sigma=sigma, )
[docs] def fit_powerlaw_with_wnls(x, y, sigma): r"""Fit a power-law relationship using Weighted Nonlinear Least Squares (WNLS). This function fits a power-law model :math:`y = a x^b` by transforming to log10-space and performing weighted linear regression with the Levenberg-Marquardt algorithm. .. math:: y = a x^b The fitting is performed in log10-space: .. math:: \log_{10}(y) = a_0 + b \log_{10}(x) where :math:`a = 10^{a_0}`. Parameters ---------- x : array-like Independent variable values. Must be positive. y : array-like Dependent variable values. Must be positive. sigma : array-like Standard deviation of ``y`` values, used as weights in the fit. Values are transformed to log10-space for fitting. Returns ------- params : tuple of float Estimated parameters ``(a, b)`` of the power-law relationship. params_std : tuple of float One standard deviation uncertainties ``(a_std, b_std)`` estimated from the covariance matrix of the fit. Notes ----- - The function uses ``scipy.optimize.curve_fit`` with the Levenberg-Marquardt method for optimization. - Uncertainties are propagated from log10-space back to linear space. - The transformation :math:`\sigma_{\log_{10}(y)} = \sigma_y / (y \ln(10))` is used for the weights. See Also -------- fit_powerlaw : Main power-law fitting function with binning and outlier removal. fit_powerlaw_with_ransac : Fit power law using RANSAC algorithm. """ def _linear_model(lx, a0, b): return a0 + b * lx # Transform sigma_y -> sigma_log10(y) sigma_log = sigma / (y * np.log(10)) # Fit (a0, b), pcov = curve_fit( _linear_model, np.log10(x), np.log10(y), method="lm", sigma=sigma_log, absolute_sigma=True, maxfev=10_000, # max n iterations ) a0_std, b_std = np.sqrt(np.diag(pcov)) # Convert back to classical power-law form a = 10**a0 a_std = np.log(10) * a * a0_std # error propagation return (float(a), float(b)), (float(a_std), float(b_std))
[docs] def fit_powerlaw_with_ransac(x, y): r"""Fit a power-law relationship using the RANSAC algorithm. This function fits a power-law model :math:`y = a x^b` by performing linear regression in log10-space using the Random Sample Consensus (RANSAC) algorithm, which is robust to outliers. .. math:: y = a x^b The fitting is performed in log10-space: .. math:: \log_{10}(y) = a_0 + b \log_{10}(x) where :math:`a = 10^{a_0}`. Parameters ---------- x : array-like Independent variable values. Must be positive. y : array-like Dependent variable values. Must be positive. Returns ------- params : tuple of float Estimated parameters ``(a, b)`` of the power-law relationship. params_std : tuple of None Placeholder for standard deviations ``(None, None)``. Parameter uncertainties are not computed with RANSAC. Notes ----- - Requires scikit-learn to be installed. - RANSAC is robust to outliers by iteratively fitting random subsets of the data and selecting the model with the most inliers. - The residual threshold for inlier classification is set to 0.3 in log10-space. - Parameter uncertainties are not provided by RANSAC and are returned as None. See Also -------- fit_powerlaw : Main power-law fitting function with binning and outlier removal. fit_powerlaw_with_wnls : Fit power law using weighted nonlinear least squares. """ from sklearn.linear_model import LinearRegression, RANSACRegressor x = np.asanyarray(x) y = np.asanyarray(y) X = np.log10(x).reshape(-1, 1) Y = np.log10(y) ransac = RANSACRegressor( estimator=LinearRegression(), min_samples=0.5, residual_threshold=0.3, random_state=42, ) ransac.fit(X, Y) b = ransac.estimator_.coef_[0] # slope loga = ransac.estimator_.intercept_ # intercept a = 10**loga a_std = None b_std = None return (float(a), float(b)), (a_std, b_std)
[docs] def fit_logquadratic_powerlaw_with_wnls(x, y, sigma): r"""Fit a log-quadratic power-law using Weighted Nonlinear Least Squares (WNLS). This function fits a log-quadratic (log-parabolic) power-law model by performing weighted regression in log10-space with the Levenberg-Marquardt algorithm. The fitted model in original space is: .. math:: y = a x^b 10^{c (\log_{10} x)^2} where the fitting is performed in log10-space: .. math:: \log_{10}(y) = a_0 + b \log_{10}(x) + c (\log_{10} x)^2 with :math:`a = 10^{a_0}`. Parameters ---------- x : array-like Independent variable values. Must be positive. y : array-like Dependent variable values. Must be positive. sigma : array-like Standard deviation of ``y`` values, used as weights in the fit. Values are transformed to log10-space for fitting. Returns ------- params : tuple of float Estimated parameters ``(a, b, c)`` of the log-quadratic power law. params_std : tuple of float One standard deviation uncertainties ``(a_std, b_std, c_std)`` estimated from the covariance matrix of the fit. Notes ----- - The function uses ``scipy.optimize.curve_fit`` with the Levenberg-Marquardt method for optimization. - Uncertainties are propagated from log10-space back to linear space. - If :math:`c = 0`, the model reduces to a classical power law :math:`y = a x^b`. - The transformation :math:`\sigma_{\log_{10}(y)} = \sigma_y / (y \ln(10))` is used for the weights. See Also -------- fit_powerlaw : Main power-law fitting function with binning and outlier removal. fit_logquadratic_powerlaw_with_ransac : Fit log-quadratic power law using RANSAC. predict_from_logquadratic_powerlaw : Predict values from the fitted model. """ def _linear_model(lx, a0, b, c): return a0 + b * lx + c * lx**2 # Transform sigma_y -> sigma_log10(y) sigma_log = sigma / (y * np.log(10)) # Fit (a0, b, c), pcov = curve_fit( _linear_model, np.log10(x), np.log10(y), method="lm", sigma=sigma_log, absolute_sigma=True, maxfev=10_000, # max n iterations ) a0_std, b_std, c_std = np.sqrt(np.diag(pcov)) a = 10**a0 a_std = np.log(10) * a * a0_std # error propagation return (float(a), float(b), float(c)), (float(a_std), float(b_std), float(c_std))
[docs] def fit_logquadratic_powerlaw_with_ransac(x, y): r"""Fit a log-quadratic power-law model using RANSAC regression. This function fits a log-quadratic (log-parabolic) power-law model by performing linear regression in log10-space using the Random Sample Consensus (RANSAC) algorithm, which is robust to outliers. The fitted model in original space is: .. math:: y = a x^b 10^{c (\log_{10} x)^2} where the fitting is performed in log10-space: .. math:: \log_{10}(y) = a_0 + b \log_{10}(x) + c (\log_{10} x)^2 with :math:`a = 10^{a_0}`. This model (also known as a log-parabolic power law) extends the classical power law by allowing smooth curvature in log-log space. The effective scale-dependent exponent is: .. math:: \frac{d(\log_{10} y)}{d(\log_{10} x)} = b + 2c \log_{10}(x) Parameters ---------- x : array-like Independent variable values. Must be strictly positive. y : array-like Dependent variable values. Must be strictly positive. Returns ------- params : tuple of float Estimated parameters ``(a, b, c)`` of the log-quadratic power law. params_std : tuple of None Placeholder for standard deviations ``(None, None, None)``. Parameter uncertainties are not computed with RANSAC. Notes ----- - Requires scikit-learn to be installed. - The fitting is performed in log10-space using RANSAC with a linear regression estimator. - Non-positive x or y values are removed prior to fitting. - If :math:`c = 0`, the model reduces to a pure power law :math:`y = a x^b`. - The residual threshold for inlier classification is set to 0.3 in log10-space. - Parameter uncertainties are not provided by RANSAC and are returned as None. See Also -------- fit_powerlaw : Main power-law fitting function with binning and outlier removal. fit_logquadratic_powerlaw_with_wnls : Fit log-quadratic power law using WNLS. predict_from_logquadratic_powerlaw : Predict values from the fitted model. """ import numpy as np from sklearn.linear_model import LinearRegression, RANSACRegressor x = np.asarray(x) y = np.asarray(y) mask = (x > 0) & (y > 0) x = x[mask] y = y[mask] lx = np.log10(x) Y = np.log10(y) X = np.column_stack( [ lx, lx**2, ], ) ransac = RANSACRegressor( estimator=LinearRegression(), min_samples=0.5, residual_threshold=0.3, random_state=42, ) ransac.fit(X, Y) b = ransac.estimator_.coef_[0] c = ransac.estimator_.coef_[1] a0 = ransac.estimator_.intercept_ a = np.power(10, a0) a_std = None b_std = None c_std = None return (float(a), float(b), float(c)), (a_std, b_std, c_std)
[docs] def predict_from_powerlaw(x, a, b): r"""Predict values from a power-law relationship. This function computes predictions using the power-law model: .. math:: y = a x^b Parameters ---------- x : array-like Independent variable values. a : float Power-law coefficient. b : float Power-law exponent. Returns ------- y : numpy.ndarray Predicted dependent variable values. Notes ----- This function does not check for invalid (negative or zero) ``x`` values. Ensure that ``x`` is compatible with the model before calling. See Also -------- fit_powerlaw : Fit a power-law relationship to data. predict_from_logquadratic_powerlaw : Predict from log-quadratic power law. """ return a * np.power(x, b)
[docs] def predict_from_logquadratic_powerlaw(x, a, b, c): r"""Predict values from a log-quadratic power-law model. This function computes predictions using the log-quadratic power-law model: .. math:: y = a x^b 10^{c (\log_{10} x)^2} Parameters ---------- x : array-like Independent variable values. Must be positive. a : float Multiplicative coefficient in original space. b : float Linear log10 slope coefficient. c : float Quadratic log10 curvature coefficient. Returns ------- y : numpy.ndarray Predicted dependent variable values. Notes ----- If :math:`c = 0`, the model reduces to the classical power law: .. math:: y = a x^b See Also -------- fit_powerlaw : Fit a power-law relationship to data. predict_from_powerlaw : Predict from classical power law. fit_logquadratic_powerlaw_with_wnls : Fit log-quadratic power law using WNLS. fit_logquadratic_powerlaw_with_ransac : Fit log-quadratic power law using RANSAC. """ logx = np.log10(x) return a * np.power(10, b * logx + c * logx**2)
[docs] def inverse_powerlaw_parameters(a, b): r"""Compute parameters of the inverse power-law relationship. Given a power-law model: .. math:: y = a x^b this function returns parameters :math:`(A, B)` such that the inverse relation holds: .. math:: x = A y^B where: .. math:: A = a^{-1/b}, \quad B = 1/b Parameters ---------- a : float Power-law coefficient in :math:`y = a x^b`. b : float Power-law exponent in :math:`y = a x^b`. Returns ------- A : float Coefficient of the inverse power-law model. B : float Exponent of the inverse power-law model. See Also -------- fit_powerlaw : Fit a power-law relationship to data. predict_from_inverse_powerlaw : Predict using inverse power-law parameters. """ A = 1 / (a ** (1 / b)) B = 1 / b return A, B
[docs] def predict_from_inverse_powerlaw(x, a, b): r"""Predict values from the inverse power-law relationship. Given an inverse power-law model: .. math:: x = a y^b this function solves for :math:`y` given :math:`x`: .. math:: y = (x / a)^{1/b} = x^{1/b} \cdot a^{-1/b} Parameters ---------- x : array-like Values of ``x`` (independent variable in the inverse power law). a : float Power-law coefficient of the inverse power-law model. b : float Power-law exponent of the inverse power-law model. Returns ------- y : numpy.ndarray Predicted dependent variable values. See Also -------- inverse_powerlaw_parameters : Compute parameters of the inverse power law. predict_from_powerlaw : Predict from the original power law. """ return (x ** (1 / b)) / (a ** (1 / b))
def _define_coeff_string(a): # Format a coefficient as m * 10^{e} m_str, e_str = f"{a:.2e}".split("e") m, e = float(m_str), int(e_str) # Build coefficient string a_str = f"{a:.2f}" if e >= -1 else f"{m:.2f} \\times 10^{{{e}}}" return a_str
[docs] def define_powerlaw_legend_str(x_symbol, y_symbol, a, b): r"""Build two-line LaTeX legend string for y = a x^b and x = A y^B. Parameters ---------- x_symbol : str Symbol of independent variable (e.g. 'R', 'z', 'A_H', '\\mathrm{KED}') y_symbol : str Symbol of dependent variable a : float Power-law coefficient b : float Power-law exponent Returns ------- legend_str : str LaTeX formatted legend string (two lines) """ # Inverse parameters A, B = inverse_powerlaw_parameters(a, b) # Format coefficients a_str = _define_coeff_string(a) A_str = _define_coeff_string(A) legend_str = ( rf"${y_symbol} = {a_str}\, {x_symbol}^{{{b:.2f}}}$" "\n" rf"${x_symbol} = {A_str}\, {y_symbol}^{{{B:.2f}}}$" ) return legend_str
[docs] def define_logquadratic_powerlaw_legend_str( x_symbol, y_symbol, a, b, c, ): r"""Build LaTeX legend string for log-quadratic power law. .. math:: y = a x^b 10^{c (\log_{10} x)^2} Parameters ---------- x_symbol : str Symbol of independent variable (e.g. 'R') y_symbol : str Symbol of dependent variable a : float Multiplicative coefficient in original space. b : float Linear log10 slope coefficient c : float Quadratic log10 curvature coefficient. If c = 0, the model reduces to the classical power law. Returns ------- legend_str : str LaTeX formatted legend string """ # Format coefficients a_str = _define_coeff_string(a) c_str = f"{c:.3f}" b_str = f"{b:.2f}" legend_str = rf"${y_symbol} = {a_str}\, {x_symbol}^{{{b_str}}}" rf"\, 10^{{{c_str}(\log_{{10}} {x_symbol})^2}}$" return legend_str
####------------------------------------------------------------------- #### N(D) Climatological plots
[docs] def create_dsd_dataframe(ds, variables=None): """Create pandas Dataframe with N(D) data.""" # Define variables to select if isinstance(variables, str): variables = [variables] variables = [] if variables is None else variables variables = ["drop_number_concentration", "Nw", "diameter_bin_center", "Dm", "D50", "R", *variables] variables = np.unique(variables).tolist() # Retrieve stacked N(D) dataframe ds_stack = ds[variables].stack( # noqa: PD013 dim={"obs": ["time", "diameter_bin_center"]}, ) # Drop coordinates coords_to_drop = [ "velocity_method", "sample_interval", *RADAR_OPTIONS, ] df_dsd = ds_stack.to_dask_dataframe().drop(columns=coords_to_drop, errors="ignore").compute() df_dsd["D"] = df_dsd["diameter_bin_center"] df_dsd["N(D)"] = df_dsd["drop_number_concentration"] df_dsd = df_dsd[df_dsd["R"] != 0] df_dsd = df_dsd[df_dsd["N(D)"] != 0] # Compute normalized density df_dsd["D/D50"] = df_dsd["D"] / df_dsd["D50"] df_dsd["D/Dm"] = df_dsd["D"] / df_dsd["Dm"] df_dsd["N(D)/Nw"] = df_dsd["N(D)"] / df_dsd["Nw"] df_dsd["log10[N(D)/Nw]"] = np.log10(df_dsd["N(D)/Nw"]) return df_dsd
[docs] def plot_normalized_dsd_density(df_dsd, x="D/D50", figsize=(3.4, 3.0), dpi=300): """Plot normalized DSD N(D)/Nw ~ D/D50 (or D/Dm) density.""" ds_stats = compute_2d_histogram( df_dsd, x=x, y="N(D)/Nw", x_bins=np.arange(0, 4, 0.025), y_bins=log_arange(1e-5, 50, log_step=0.1, base=10), ) cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) ds_stats = ds_stats.isel({"N(D)/Nw": ds_stats["N(D)/Nw"] > 0}) fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) p = ds_stats["count"].plot.pcolormesh( x=x, y="N(D)/Nw", ax=ax, vmin=1, cmap=cmap, norm=norm, extend="max", yscale="log", ) ax.set_ylim(1e-5, 20) ax.set_xlim(0, 4) ax.set_xlabel(f"{x} [-]") ax.set_ylabel(r"$N(D)/N_w$ [-]") ax.set_title("Normalized DSD") return p
[docs] def plot_dsd_density(df_dsd, diameter_bin_edges, figsize=(3.4, 3.0), dpi=300): """Plot N(D) ~ D density.""" ds_stats = compute_2d_histogram( df_dsd, x="D", y="N(D)", x_bins=diameter_bin_edges, y_bins=log_arange(0.1, 20_000, log_step=0.1, base=10), ) cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) p = ds_stats["count"].plot.pcolormesh(x="D", y="N(D)", ax=ax, cmap=cmap, norm=norm, extend="max", yscale="log") ax.set_xlim(0, 8) ax.set_ylim(1, 20_000) ax.set_xlabel(r"$D$ [mm]") ax.set_ylabel(r"$N(D)$ [m$^{-3}$ mm$^{-1}$]") ax.set_title("DSD") return p
[docs] def plot_dsd_with_dense_lines(drop_number_concentration, r, figsize=(3.4, 3.0), dpi=300): """Plot N(D) ~ D using dense lines.""" # Define intervals for rain rates r_bins = [0, 2, 5, 10, 50, 100, 500] # Define N(D) bins and diameter bin edeges y_bins = log_arange(1, 20_000, log_step=0.025, base=10) diameter_bin_edges = np.arange(0, 8, 0.01) # Interpolate N(D) to high resolution ! # - For better visualization of dense lines da = resample_drop_number_concentration( drop_number_concentration.compute(), diameter_bin_edges=diameter_bin_edges, ) ds_resampled = xr.Dataset( { "R": r.compute(), "drop_number_concentration": da, }, ) # Define diameter bin edges to compute dense lines x_bins = da.disdrodb.diameter_bin_edges # Define discrete colormap (one color per rain-interval): cmap_list = [ plt.get_cmap("Reds"), plt.get_cmap("Oranges"), plt.get_cmap("Purples"), plt.get_cmap("Greens"), plt.get_cmap("Blues"), plt.get_cmap("Grays"), ] cmap_list = [ListedColormap(cmap(np.arange(0, cmap.N))[-40:]) for cmap in cmap_list] # Compute dense lines dict_rgb = {} for i in range(0, len(r_bins) - 1): # Define dataset subset idx_rain_interval = np.logical_and(ds_resampled["R"] >= r_bins[i], ds_resampled["R"] < r_bins[i + 1]) da = ds_resampled.isel(time=idx_rain_interval)["drop_number_concentration"] if da.sizes["time"] == 0: continue # Retrieve dense lines da_dense_lines = compute_dense_lines( da=da, coord="diameter_bin_center", x_bins=x_bins, y_bins=y_bins, normalization="max", ) # Define cmap cmap = cmap_list[i] # Map colors and transparency # da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="linear") # da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="exp") # da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="log") da_rgb = to_rgba(da_dense_lines, cmap=cmap, scaling="sqrt") # Add to dictionary dict_rgb[i] = da_rgb # Blend images with max-alpha ds_rgb = xr.concat(dict_rgb.values(), dim="r_class") da_blended = max_blend_images(ds_rgb, dim="r_class") # Prepare legend handles handles = [] labels = [] for i in range(len(r_bins) - 1): color = cmap_list[i](0.8) # pick a representative color from each cmap handle = mlines.Line2D([], [], color=color, alpha=0.6, linewidth=2) label = f"[{r_bins[i]} - {r_bins[i+1]}]" handles.append(handle) labels.append(label) # Create figure fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) p = ax.pcolormesh( da_blended["diameter_bin_center"], da_blended["drop_number_concentration"], da_blended.data, ) # Set axis scale and limits ax.set_yscale("log") ax.set_xlim(0, 8) ax.set_ylim(1, 20_000) # Add axis labels and title ax.set_xlabel(r"$D$ [mm]") ax.set_ylabel(r"$N(D)$ [m$^{-3}$ mm$^{-1}$]") ax.set_title("DSD") # Add legend with title ax.legend(handles, labels, title="Rain rate [mm/hr]", loc="upper right") # Return figure return p
####------------------------------------------------------------------- #### DSD parameters plots
[docs] def define_lognorm_max_value(value): """Round up to next nice number: 90->100, 400->500, 1200->2000.""" if value <= 0: return 1 magnitude = 10 ** np.floor(np.log10(value)) first_digit = value / magnitude nice_value = 1 if first_digit <= 1 else 2 if first_digit <= 2 else 5 if first_digit <= 5 else 10 return nice_value * magnitude
[docs] def plot_dsd_params_relationships(df, add_nt=False, log_step=0.05, dpi=300): """Create a figure illustrating the relationships between DSD parameters.""" # TODO: option to use D50 instead of Dm # Compute the required datasets for the plots # - Dm vs Nw ds_Dm_Nw_stats = compute_2d_histogram( df, x="Dm", y="Nw", variables=["R", "LWC", "Nt"], x_bins=np.arange(0, 8, 0.1), y_bins=log_arange(10, 1_000_000, log_step=log_step, base=10), ) # - Dm vs LWC (W) ds_Dm_LWC_stats = compute_2d_histogram( df, x="Dm", y="LWC", variables=["R", "Nw", "Nt"], x_bins=np.arange(0, 8, 0.1), y_bins=log_arange(0.01, 10, log_step=log_step, base=10), ) # - Dm vs R ds_Dm_R_stats = compute_2d_histogram( df, x="Dm", y="R", variables=["Nw", "LWC", "Nt"], x_bins=np.arange(0, 8, 0.1), y_bins=log_arange(0.1, 500, log_step=log_step, base=10), ) # - Dm vs Nt ds_Dm_Nt_stats = compute_2d_histogram( df, x="Dm", y="Nt", variables=["R", "LWC", "Nw"], x_bins=np.arange(0, 8, 0.1), y_bins=log_arange(1, 100_000, log_step=log_step, base=10), ) # Define different colormaps for each column cmap_counts = plt.get_cmap("viridis") cmap_lwc = plt.get_cmap("YlOrRd") cmap_r = plt.get_cmap("Blues") cmap_nt = plt.get_cmap("Greens") # Define normalizations for each variable norm_counts = LogNorm(1, None) norm_lwc = LogNorm(0.01, 10) norm_r = LogNorm(0.2, 500) # norm_nw = LogNorm(10, 100000) norm_nt = LogNorm(1, 10000) # Define axis limits dm_lim = (0.3, 6) nw_lim = (10, 1_000_000) lwc_lim = (0.01, 10) r_lim = (0.1, 500) nt_lim = (1, 100_000) # Define figure size if add_nt: figsize = (6.9, 6.9) nrows = 4 height_ratios = [0.40, 1, 1, 1, 1] else: figsize = (6.9, 5.8) nrows = 3 height_ratios = [0.40, 1, 1, 1] # Create figure with 4x4 subplots fig = plt.figure(figsize=figsize, dpi=dpi) gs = fig.add_gridspec(nrows + 1, 4, height_ratios=height_ratios, hspace=0.0, wspace=0.02) axes = np.empty((nrows + 1, 4), dtype=object) # Create colorbar axes in the bottom row of the gridspec cbar_axes = [fig.add_subplot(gs[0, j]) for j in range(4)] # Create axes for the grid for i in range(1, nrows + 1): for j in range(4): axes[i, j] = fig.add_subplot(gs[i, j]) # Create empty subplot for diagonal elements (when y-axis variable matches column variable) # axes[2, 1].set_visible(False) # axes[3, 2].set_visible(False) # axes[4, 3].set_visible(False) ####-------------------------------------------------------------------. #### Dm vs Nw #### - Counts im_counts = ds_Dm_Nw_stats["count"].plot.pcolormesh( x="Dm", y="Nw", cmap=cmap_counts, norm=norm_counts, extend="max", yscale="log", add_colorbar=False, ax=axes[1, 0], ) axes[1, 0].set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]") axes[1, 0].set_xlim(*dm_lim) axes[1, 0].set_ylim(nw_lim) #### - LWC im_lwc = ds_Dm_Nw_stats["LWC_median"].plot.pcolormesh( x="Dm", y="Nw", cmap=cmap_lwc, norm=norm_lwc, extend="both", yscale="log", add_colorbar=False, ax=axes[1, 1], ) axes[1, 1].set_xlim(*dm_lim) axes[1, 1].set_ylim(nw_lim) #### - R im_r = ds_Dm_Nw_stats["R_median"].plot.pcolormesh( x="Dm", y="Nw", cmap=cmap_r, norm=norm_r, extend="both", yscale="log", add_colorbar=False, ax=axes[1, 2], ) axes[1, 2].set_xlim(*dm_lim) axes[1, 2].set_ylim(nw_lim) axes[1, 2].set_yticklabels([]) #### - Nt im_nt = ds_Dm_Nw_stats["Nt_median"].plot.pcolormesh( x="Dm", y="Nw", cmap=cmap_nt, norm=norm_nt, extend="both", yscale="log", add_colorbar=False, ax=axes[1, 3], ) axes[1, 3].set_xlim(*dm_lim) axes[1, 3].set_ylim(nw_lim) axes[1, 3].set_yticklabels([]) ####-------------------------------------------------------------------. #### Dm vs LWC #### - Counts ds_Dm_LWC_stats["count"].plot.pcolormesh( x="Dm", y="LWC", cmap=cmap_counts, norm=norm_counts, extend="max", yscale="log", add_colorbar=False, ax=axes[2, 0], ) axes[2, 0].set_ylabel(r"LWC [g/m³]") axes[2, 0].set_xlim(*dm_lim) axes[2, 0].set_ylim(lwc_lim) #### - LWC # - Empty (diagonal where y-axis is W) - handled above in the loop ds_Dm_LWC_stats["R_median"].plot.pcolormesh( x="Dm", y="LWC", cmap=cmap_r, norm=norm_r, alpha=0, # fully transparent extend="both", yscale="log", add_colorbar=False, ax=axes[2, 1], ) axes[2, 1].set_xlim(*dm_lim) axes[2, 1].set_ylim(lwc_lim) axes[2, 1].set_yticklabels([]) #### - R ds_Dm_LWC_stats["R_median"].plot.pcolormesh( x="Dm", y="LWC", cmap=cmap_r, norm=norm_r, extend="both", yscale="log", add_colorbar=False, ax=axes[2, 2], ) axes[2, 2].set_xlim(*dm_lim) axes[2, 2].set_ylim(lwc_lim) axes[2, 2].set_yticklabels([]) #### - Nt im_nt = ds_Dm_LWC_stats["Nt_median"].plot.pcolormesh( x="Dm", y="LWC", cmap=cmap_nt, norm=norm_nt, extend="both", yscale="log", add_colorbar=False, ax=axes[2, 3], ) axes[2, 3].set_xlim(*dm_lim) axes[2, 3].set_ylim(lwc_lim) axes[2, 3].set_yticklabels([]) ####-------------------------------------------------------------------. #### Dm vs R #### - Counts ds_Dm_R_stats["count"].plot.pcolormesh( x="Dm", y="R", cmap=cmap_counts, norm=norm_counts, extend="max", yscale="log", add_colorbar=False, ax=axes[3, 0], ) axes[3, 0].set_ylabel(r"$R$ [mm h$^{-1}$]") axes[3, 0].set_xlim(*dm_lim) axes[3, 0].set_ylim(r_lim) #### - LWC im_lwc = ds_Dm_R_stats["LWC_median"].plot.pcolormesh( x="Dm", y="R", cmap=cmap_lwc, norm=norm_lwc, extend="both", yscale="log", add_colorbar=False, ax=axes[3, 1], ) axes[3, 1].set_xlim(*dm_lim) axes[3, 1].set_ylim(r_lim) axes[3, 1].set_yticklabels([]) #### - R # - Empty (diagonal where y-axis is R) - handled above in the loop #### - Nt ds_Dm_R_stats["Nt_median"].plot.pcolormesh( x="Dm", y="R", cmap=cmap_nt, norm=norm_nt, alpha=0, # fully transparent extend="both", yscale="log", add_colorbar=False, ax=axes[3, 2], ) axes[3, 2].set_xlim(*dm_lim) axes[3, 2].set_ylim(r_lim) axes[3, 2].set_yticklabels([]) #### - Nt ds_Dm_R_stats["Nt_median"].plot.pcolormesh( x="Dm", y="R", cmap=cmap_nt, norm=norm_nt, extend="both", yscale="log", add_colorbar=False, ax=axes[3, 3], ) axes[3, 3].set_xlim(*dm_lim) axes[3, 3].set_ylim(r_lim) axes[3, 3].set_yticklabels([]) ####-------------------------------------------------------------------. #### Dm vs Nt if add_nt: #### - Counts ds_Dm_Nt_stats["count"].plot.pcolormesh( x="Dm", y="Nt", cmap=cmap_counts, norm=norm_counts, extend="max", yscale="log", add_colorbar=False, ax=axes[4, 0], ) axes[4, 0].set_ylabel(r"$N_t$ [m$^{-3}$]") axes[4, 0].set_xlabel(r"$D_m$ [mm]") axes[4, 0].set_xlim(*dm_lim) axes[4, 0].set_ylim(nt_lim) #### - LWC ds_Dm_Nt_stats["LWC_median"].plot.pcolormesh( x="Dm", y="Nt", cmap=cmap_lwc, norm=norm_lwc, extend="both", yscale="log", add_colorbar=False, ax=axes[4, 1], ) axes[4, 1].set_xlabel(r"$D_m$ [mm]") axes[4, 1].set_xlim(*dm_lim) axes[4, 1].set_ylim(nt_lim) axes[4, 1].set_yticklabels([]) #### - R ds_Dm_Nt_stats["R_median"].plot.pcolormesh( x="Dm", y="Nt", cmap=cmap_r, norm=norm_r, extend="both", yscale="log", add_colorbar=False, ax=axes[4, 2], ) axes[4, 2].set_xlabel(r"$D_m$ [mm]") axes[4, 2].set_xlim(*dm_lim) axes[4, 2].set_ylim(nt_lim) axes[4, 2].set_yticklabels([]) #### - Nt # - Empty plot - handled above in the loop ds_Dm_Nt_stats["R_median"].plot.pcolormesh( x="Dm", y="Nt", cmap=cmap_r, norm=norm_r, alpha=0, # fully transparent extend="both", yscale="log", add_colorbar=False, ax=axes[4, 3], ) axes[4, 3].set_xlabel("$D_m$ [mm]") axes[4, 3].set_xlim(*dm_lim) axes[4, 3].set_ylim(nt_lim) axes[4, 3].set_yticklabels([]) ####-------------------------------------------------------------------. #### Finalize figure # Remove x ticks and labels for all but bottom row for i in range(1, nrows): for j in range(4): if axes[i, j].get_visible(): axes[i, j].set_xticklabels([]) axes[i, j].set_xticks([]) axes[i, j].set_xlabel("") # Remove y ticks and labels for all but left row for i in range(1, nrows + 1): for j in range(1, 4): if axes[i, j].get_visible(): axes[i, j].set_yticks([]) axes[i, j].set_yticklabels([]) axes[i, j].set_ylabel("") axes[i, j].tick_params(which="both", left=False) # Clear overlapping ticks between Nt and R if add_nt: ylim = axes[4, 0].get_ylim() yticks = axes[4, 0].get_yticks() yticks = yticks[(yticks <= 20_000)] axes[4, 0].set_yticks(yticks) axes[4, 0].set_ylim(*ylim) # Add Dm label in the center for j in range(4): axes[-1, j].set_xlabel("") fig.text(0.5, 0.05, r"$D_m$ [mm]", ha="center", va="center") # Add gray background to subplots for i in range(1, nrows + 1): for j in range(4): axes[i, j].set_facecolor("#e6e6e6") # strong gray # axes[i, j].set_facecolor("#f2f2f2") # light gray # -------------------------------------------------. # Add colorbars # - Counts colorbar cbar1 = plt.colorbar(im_counts, cax=cbar_axes[0], orientation="horizontal", extend="both") cbar1.set_label("Counts", fontweight="bold") cbar1.ax.xaxis.set_label_position("top") cbar1.ax.tick_params(axis="x", pad=2) cbar1.ax.set_aspect(0.2) # - LWC colorbar cbar2 = plt.colorbar(im_lwc, cax=cbar_axes[1], orientation="horizontal", extend="both") cbar2.set_label("Median LWC [g/m³]", fontweight="bold") cbar2.ax.xaxis.set_label_position("top") cbar2.ax.tick_params(axis="x", pad=2) cbar2.ax.set_aspect(0.25) # - R colorbar cbar3 = plt.colorbar(im_r, cax=cbar_axes[2], orientation="horizontal", extend="both") cbar3.set_label("Median R [mm/h]", fontweight="bold") cbar3.ax.xaxis.set_label_position("top") cbar3.ax.tick_params(axis="x", pad=2) cbar3.ax.set_aspect(0.3) # - Nt colorbar cbar4 = plt.colorbar(im_nt, cax=cbar_axes[3], orientation="horizontal", extend="both") cbar4.set_label("Median $N_t$ [m$^{-3}$]", fontweight="bold") cbar4.ax.xaxis.set_label_position("top") cbar4.ax.tick_params(axis="x", pad=2) cbar4.ax.set_aspect(0.3) # -------------------------------------------------. # Align labels fig.align_xlabels() fig.align_ylabels() # -------------------------------------------------. # Return figure return fig
[docs] def plot_dsd_params_density( df, log_dm=False, lwc=True, log_normalize=False, log_step=0.05, linear_step=0.1, figsize=(6.9, 7.2), dpi=300, ): """Generate a figure with various DSD relationships. All histograms are computed first, then normalized, and finally plotted together. Parameters ---------- df : pandas.DataFrame DataFrame containing DSD parameters (Dm, Nt, Nw, LWC/W, R, sigma_m, M2, M3, M4, M6) log_dm : bool, optional If True, use linear scale for Dm axes. If False, use log scale. Default is True. lwc : bool, optional If True, use Liquid Water Content (W). If False, use Rain Rate (R). Default is True. figsize : tuple, optional Figure size (width, height) in inches. Default is (18, 18). Returns ------- fig : matplotlib.figure.Figure The figure object containing all subplots axes : numpy.ndarray Array of all subplot axes """ # TODO: option to use D50 instead of Dm # Common parameters cmap = plt.get_cmap("Spectral_r").copy() norm = Normalize(0, 1) # Normalized data goes from 0 to 1 ylabelpad = 0 moment_labelpad = -2 # Define the water variable based on lwc flag df["LWC"] = df["LWC"] water_var = "LWC" if lwc else "R" water_label = "LWC [g/m³]" if lwc else "R [mm/h]" # Dm range and scale settings if not log_dm: dm_bins = np.arange(0, 8, linear_step) dm_scale = None dm_lim = (0, 6) dm_ticklabels = [0, 2, 4, 6] else: dm_bins = log_arange(0.1, 10, log_step=log_step, base=10) dm_scale = "log" dm_lim = (0.3, 6) dm_ticklabels = [0.5, 1, 2, 5] # Nt and Nw range nt_bins = log_arange(1, 100_000, log_step=log_step, base=10) nw_bins = log_arange(1, 1_000_000, log_step=log_step, base=10) nw_lim = (10, 1_000_000) nt_lim = (1, 100_000) # Water range if lwc: water_bins = log_arange(0.001, 10, log_step=log_step, base=10) water_lim = (0.005, 10) else: water_bins = log_arange(0.1, 500, log_step=log_step, base=10) water_lim = (0.1, 500) # Define sigma_m bins sigma_bins = np.arange(0, 4, linear_step / 2) sigma_lim = (0, 3) # Compute all histograms first # 1. Dm vs Nt ds_stats_dm_nt = compute_2d_histogram( df, x="Dm", y="Nt", x_bins=dm_bins, y_bins=nt_bins, ) # 2. Dm vs Nw ds_stats_dm_nw = compute_2d_histogram( df, x="Dm", y="Nw", x_bins=dm_bins, y_bins=nw_bins, ) # 3. Dm vs LWC/R ds_stats_dm_w = compute_2d_histogram( df, x="Dm", y=water_var, x_bins=dm_bins, y_bins=water_bins, ) # 4. LWC/R vs Nt ds_stats_w_nt = compute_2d_histogram( df, x=water_var, y="Nt", x_bins=water_bins, y_bins=nt_bins, ) # 5. LWC/R vs Nw ds_stats_w_nw = compute_2d_histogram( df, x=water_var, y="Nw", x_bins=water_bins, y_bins=nw_bins, ) # 6. LWC/R vs sigma_m ds_stats_w_sigma = compute_2d_histogram( df, x=water_var, y="sigma_m", x_bins=water_bins, y_bins=sigma_bins, ) # 7. M2 vs M4 ds_stats_m2_m4 = compute_2d_histogram( df, x="M2", y="M4", x_bins=log_arange(1, 10_000, log_step=log_step, base=10), y_bins=log_arange(1, 40_000, log_step=log_step, base=10), ) # 8. M3 vs M6 ds_stats_m3_m6 = compute_2d_histogram( df, x="M3", y="M6", x_bins=log_arange(1, 10_000, log_step=log_step, base=10), y_bins=log_arange(0.1, 1000_000, log_step=log_step, base=10), ) # 9. M2 vs M6 ds_stats_m2_m6 = compute_2d_histogram( df, x="M2", y="M6", x_bins=log_arange(1, 10_000, log_step=log_step, base=10), y_bins=log_arange(0.1, 1000_000, log_step=log_step, base=10), ) # Define normalization def max_normalize(ds): return ds["count"].where(ds["count"] > 0) / ds["count"].max().item() def log_max_normalize(ds): counts = ds["count"].where(ds["count"] > 0) log_counts = np.log10(counts) max_log = float(log_counts.max().item()) return log_counts / max_log normalizer = log_max_normalize if log_normalize else max_normalize # Normalize all histograms ds_stats_dm_nt["normalized"] = normalizer(ds_stats_dm_nt) ds_stats_dm_nw["normalized"] = normalizer(ds_stats_dm_nw) ds_stats_dm_w["normalized"] = normalizer(ds_stats_dm_w) ds_stats_w_nt["normalized"] = normalizer(ds_stats_w_nt) ds_stats_w_nw["normalized"] = normalizer(ds_stats_w_nw) ds_stats_w_sigma["normalized"] = normalizer(ds_stats_w_sigma) ds_stats_m2_m4["normalized"] = normalizer(ds_stats_m2_m4) ds_stats_m3_m6["normalized"] = normalizer(ds_stats_m3_m6) ds_stats_m2_m6["normalized"] = normalizer(ds_stats_m2_m6) # Set up figure and axes fig, axes = plt.subplots(3, 3, figsize=figsize, dpi=dpi) fig.subplots_adjust(hspace=0.05, wspace=0.35) # COLUMN 1: All plots with Dm as x-axis # 1. Dm vs Nt (0,0) _ = ds_stats_dm_nt["normalized"].plot.pcolormesh( x="Dm", y="Nt", cmap=cmap, norm=norm, extend="max", xscale=dm_scale, yscale="log", add_colorbar=False, ax=axes[0, 0], ) axes[0, 0].set_xlabel("") # Hide x labels except for bottom row axes[0, 0].set_ylabel(r"$N_t$ [m$^{-3}$]", labelpad=ylabelpad) axes[0, 0].set_xlim(*dm_lim) axes[0, 0].set_ylim(*nt_lim) axes[0, 0].set_title(r"$D_m$ vs $N_t$") # 2. Dm vs Nw (1,0) _ = ds_stats_dm_nw["normalized"].plot.pcolormesh( x="Dm", y="Nw", cmap=cmap, norm=norm, extend="max", xscale=dm_scale, yscale="log", add_colorbar=False, ax=axes[1, 0], ) axes[1, 0].set_xlabel("") # Hide x labels except for bottom row axes[1, 0].set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]", labelpad=ylabelpad) axes[1, 0].set_xlim(*dm_lim) axes[1, 0].set_ylim(*nw_lim) axes[1, 0].set_title(r"$D_m$ vs $N_w$") # 3. Dm vs LWC/R (2,0) _ = ds_stats_dm_w["normalized"].plot.pcolormesh( x="Dm", y=water_var, cmap=cmap, norm=norm, extend="max", xscale=dm_scale, yscale="log", add_colorbar=False, ax=axes[2, 0], ) axes[2, 0].set_xlabel(r"$D_m$ [mm]") axes[2, 0].set_ylabel(water_label, labelpad=ylabelpad) axes[2, 0].set_xlim(*dm_lim) if lwc: axes[2, 0].set_ylim(*water_lim) axes[2, 0].set_yticks([0.01, 0.1, 0.5, 1, 5]) axes[2, 0].set_yticklabels(["0.01", "0.1", "0.5", "1", "5"]) else: axes[2, 0].set_ylim(*water_lim) axes[2, 0].set_title(f"$D_m$ vs {water_var}") axes[2, 0].set_xticks(dm_ticklabels) axes[2, 0].set_xticklabels([str(v) for v in dm_ticklabels]) # COLUMN 2: All plots with LWC/R as x-axis # 4. LWC/R vs Nt (0,1) mappable = ds_stats_w_nt["normalized"].plot.pcolormesh( x=water_var, y="Nt", cmap=cmap, norm=norm, extend="max", xscale="log", yscale="log", add_colorbar=False, ax=axes[0, 1], ) axes[0, 1].set_xlabel("") # Hide x labels except for bottom row axes[0, 1].set_ylabel(r"$N_t$ [m$^{-3}$]", labelpad=ylabelpad) if lwc: axes[0, 1].set_xlim(*water_lim) axes[0, 1].set_xticks([0.01, 0.1, 1, 10]) axes[0, 1].set_xticklabels(["0.01", "0.1", "1", "10"]) else: axes[0, 1].set_xlim(*water_lim) axes[0, 1].set_ylim(*nt_lim) axes[0, 1].set_title(f"{water_var} vs $N_t$") # 5. LWC/R vs Nw (1,1) _ = ds_stats_w_nw["normalized"].plot.pcolormesh( x=water_var, y="Nw", cmap=cmap, norm=norm, extend="max", xscale="log", yscale="log", add_colorbar=False, ax=axes[1, 1], ) axes[1, 1].set_xlabel("") # Hide x labels except for bottom row axes[1, 1].set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]", labelpad=ylabelpad) if lwc: axes[1, 1].set_xlim(*water_lim) axes[1, 1].set_xticks([0.01, 0.1, 1, 10]) axes[1, 1].set_xticklabels(["0.01", "0.1", "1", "10"]) else: axes[1, 1].set_xlim(*water_lim) axes[1, 1].set_ylim(*nw_lim) axes[1, 1].set_title(f"{water_var} vs $N_w$") # 6. LWC/R vs sigma_m (2,1) _ = ds_stats_w_sigma["normalized"].plot.pcolormesh( x=water_var, y="sigma_m", cmap=cmap, norm=norm, extend="max", xscale="log", add_colorbar=False, ax=axes[2, 1], ) axes[2, 1].set_xlabel(water_label) axes[2, 1].set_ylabel(r"$\sigma_m$ [mm]", labelpad=ylabelpad) if lwc: axes[2, 1].set_xlim(*water_lim) axes[2, 1].set_xticks([0.01, 0.1, 1, 10]) axes[2, 1].set_xticklabels(["0.01", "0.1", "1", "10"]) else: axes[2, 1].set_xlim(*water_lim) axes[2, 1].set_ylim(*sigma_lim) axes[2, 1].set_title(rf"{water_var} vs $\sigma_m$") # COLUMN 3: Moment relationships # 7. M2 vs M4 (0,2) _ = ds_stats_m2_m4["normalized"].plot.pcolormesh( x="M2", y="M4", cmap=cmap, norm=norm, extend="max", xscale="log", yscale="log", add_colorbar=False, ax=axes[0, 2], ) axes[0, 2].set_xlabel("") # Hide x labels except for bottom row axes[0, 2].set_ylabel(r"M4 [m$^{-3}$ mm$^{4}$]", labelpad=moment_labelpad) axes[0, 2].set_xlim(1, 10_000) axes[0, 2].set_ylim(1, 40_000) axes[0, 2].set_title(r"M2 vs M4") # 8. M3 vs M6 (1,2) _ = ds_stats_m3_m6["normalized"].plot.pcolormesh( x="M3", y="M6", cmap=cmap, norm=norm, extend="max", xscale="log", yscale="log", add_colorbar=False, ax=axes[1, 2], ) axes[1, 2].set_xlabel("") # Hide x labels except for bottom row axes[1, 2].set_ylabel(r"M6 [m$^{-3}$ mm$^{6}$]", labelpad=moment_labelpad) axes[1, 2].set_xlim(1, 10_000) axes[1, 2].set_ylim(0.1, 1000_000) axes[1, 2].set_title(r"M3 vs M6") # 9. M2 vs M6 (2,2) _ = ds_stats_m2_m6["normalized"].plot.pcolormesh( x="M2", y="M6", cmap=cmap, norm=norm, extend="max", xscale="log", yscale="log", add_colorbar=False, ax=axes[2, 2], ) axes[2, 2].set_xlabel(r"M* [m$^{-3}$ mm$^{*}$]") axes[2, 2].set_ylabel(r"M6 [m$^{-3}$ mm$^{6}$]", labelpad=moment_labelpad) axes[2, 2].set_xlim(1, 10_000) axes[2, 2].set_ylim(0.1, 1000_000) axes[2, 2].set_title(r"M2 vs M6") # Remove x-axis ticks and ticklabels for all but bottom row for i in range(2): for j in range(3): axes[i, j].set_xticklabels([]) axes[i, j].tick_params(axis="x", which="both", bottom=False) # Add subplot titles as text in top left corner of each plot title_bbox_dict = { "facecolor": "white", "alpha": 0.7, "edgecolor": "none", "pad": 1, } for ax in axes.flatten(): # Add text in top left corner with some padding ax.text( 0.03, 0.95, ax.get_title(), transform=ax.transAxes, fontsize=9, # fontweight='bold', ha="left", va="top", bbox=title_bbox_dict, ) ax.set_title("") # -------------------------------------- # Add colorbar ax_cbar_parent = axes[0, 1] # Create inset axis in axis-fraction coordinates x_pos_cbar = 0.40 y_pos_cbar = 0.09 cbar_width = 0.55 cbar_height = 0.055 cbar_fontsize = 6 cax = ax_cbar_parent.inset_axes( [ x_pos_cbar, # e.g. 0.45 y_pos_cbar, # e.g. 0.08 cbar_width, # e.g. 0.5 cbar_height, # e.g. 0.06 ], ) cb = fig.colorbar( mappable, cax=cax, orientation="horizontal", ) cb.ax.tick_params(labelsize=5) cb.ax.tick_params(pad=-1) # Label on top cb.ax.xaxis.set_label_position("top") if log_normalize: cbar_label = r"$\frac{\log_{10}(\mathrm{count})}" r"{\max\left(\log_{10}(\mathrm{count})\right)}$" else: cbar_label = r"$\frac{\mathrm{count}}" r"{\max\left(\mathrm{count}\right)}$" cb.set_label( cbar_label, fontsize=cbar_fontsize, labelpad=6, ) # -------------------------------------------------. # Align labels fig.align_xlabels() fig.align_ylabels() return fig
[docs] def plot_dmax_relationships(df, diameter_bin_edges, dmax="Dmax", diameter_max=10, norm_vmax=None, dpi=300): """ Plot 2x2 subplots showing relationships between Dmax and precipitation parameters. Parameters ---------- df : pandas.DataFrame Input dataframe containing the precipitation data dmax : str, optional Column name for maximum diameter. Default is 'Dmax'. vmax : float, optional Maximum value for Dmax axis limits. Default is 10 mm. dpi : int, optional Resolution for the figure. The default is 300. """ # Compute 2D histograms # - Dmax-R ds_stats_dmax_r = compute_2d_histogram( df, x=dmax, y="R", x_bins=diameter_bin_edges, y_bins=log_arange(0.1, 500, log_step=0.05, base=10), ) # - Dmax-Nw ds_stats_dmax_nw = compute_2d_histogram( df, x=dmax, y="Nw", x_bins=diameter_bin_edges, y_bins=log_arange(10, 1_000_000, log_step=0.05, base=10), ) # - Dmax-Nt ds_stats_dmax_nt = compute_2d_histogram( df, x=dmax, y="Nt", x_bins=diameter_bin_edges, y_bins=log_arange(1, 100_000, log_step=0.05, base=10), ) # - Dmax-Dm ds_stats_dmax_dm = compute_2d_histogram( df, x=dmax, y="Dm", variables=["R", "Nw", "sigma_m"], x_bins=diameter_bin_edges, y_bins=np.arange(0, 8, 0.05), ) # Define vmax for counts if norm_vmax: norm_vmax = define_lognorm_max_value(ds_stats_dmax_r["count"].max().item()) # Define plotting parameters cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, norm_vmax) # Create figure with 2x2 subplots figsize = (6.9, 5.2) # fig = plt.figure(figsize=figsize, dpi=dpi) fig, axes = plt.subplots( 2, 2, figsize=figsize, dpi=dpi, gridspec_kw={"hspace": 0.05, "wspace": 0.045}, ) axes = np.asarray(axes) # - Dmax vs R (top-left) ax1 = axes[0, 0] p1 = ds_stats_dmax_r["count"].plot.pcolormesh( x=dmax, y="R", cmap=cmap, norm=norm, extend="max", yscale="log", add_colorbar=False, ax=ax1, ) ax1.set_xlabel(r"$D_{max}$ [mm]") ax1.set_ylabel(r"$R$ [mm h$^{-1}$]") ax1.set_xlim(0.2, diameter_max) ax1.set_ylim(0.1, 500) # - Dmax vs Nw (top-right) ax2 = axes[0, 1] _ = ds_stats_dmax_nw["count"].plot.pcolormesh( x=dmax, y="Nw", cmap=cmap, norm=norm, extend="max", yscale="log", add_colorbar=False, ax=ax2, ) ax2.set_xlabel(r"$D_{max}$ [mm]") ax2.set_ylabel(r"$N_w$ [mm$^{-1}$ m$^{-3}$]") ax2.set_xlim(0.2, diameter_max) ax2.set_ylim(10, 1_000_000) # - Dmax vs Nt (bottom-left) ax3 = axes[1, 0] _ = ds_stats_dmax_nt["count"].plot.pcolormesh( x=dmax, y="Nt", cmap=cmap, norm=norm, extend="max", yscale="log", add_colorbar=False, ax=ax3, ) ax3.set_xlabel(r"$D_{max}$ [mm]") ax3.set_ylabel(r"$N_t$ [m$^{-3}$]") ax3.set_xlim(0.2, diameter_max) ax3.set_ylim(1, 100_000) # - Dmax vs Dm (bottom-right) ax4 = axes[1, 1] _ = ds_stats_dmax_dm["count"].plot.pcolormesh( x=dmax, y="Dm", cmap=cmap, norm=norm, extend="max", add_colorbar=False, ax=ax4, ) ax4.set_xlabel(r"$D_{max}$ [mm]") ax4.set_ylabel(r"$D_m$ [mm]") ax4.set_xlim(0.2, diameter_max) ax4.set_ylim(0, 6) # Remove xaxis labels and ticklables labels on first row for ax in axes[0, :]: # First row (both columns) ax.set_xlabel("") ax.set_xticks([]) ax.set_xticklabels([]) ax.tick_params(axis="x", which="both", bottom=True, top=False, labelbottom=False) # Move y-axis of second column to the right for ax in axes[:, 1]: # Second column (both rows) ax.yaxis.tick_right() ax.yaxis.set_label_position("right") # Add titles as legends in upper corners title_bbox_dict = { "facecolor": "white", "alpha": 0.7, "edgecolor": "none", "pad": 1, } axes[0, 0].text( 0.05, 0.95, r"$D_{max}$ vs $R$", transform=axes[0, 0].transAxes, fontsize=12, verticalalignment="top", bbox=title_bbox_dict, ) axes[0, 1].text( 0.05, 0.95, r"$D_{max}$ vs $N_w$", transform=axes[0, 1].transAxes, fontsize=12, verticalalignment="top", bbox=title_bbox_dict, ) axes[1, 0].text( 0.05, 0.95, r"$D_{max}$ vs $N_t$", transform=axes[1, 0].transAxes, fontsize=12, verticalalignment="top", bbox=title_bbox_dict, ) axes[1, 1].text( 0.05, 0.95, r"$D_{max}$ vs $D_m$", transform=axes[1, 1].transAxes, fontsize=12, verticalalignment="top", bbox=title_bbox_dict, ) # Add colorbar as legend cax = ax1.inset_axes([0.52, 0.15, 0.43, 0.06]) # [x0, y0, width, height] cbar = fig.colorbar( p1, cax=cax, orientation="horizontal", extend="max", ) cbar.set_label("Counts", fontsize=9, labelpad=2) cbar.ax.tick_params(labelsize=8, pad=1) cbar.ax.xaxis.set_label_position("top") # Align labels fig.align_xlabels() fig.align_ylabels() return fig
####------------------------------------------------------------------- #### Radar plots
[docs] def add_legend_powerlaw(ax, legend_str, legend_fontsize): """Add legend fitted powerlaw relationship to plot.""" legend_bbox_dict = {"facecolor": "white", "edgecolor": "black", "alpha": 0.7} ax.text( 0.05, 0.955, legend_str, transform=ax.transAxes, ha="left", va="top", fontsize=legend_fontsize, bbox=legend_bbox_dict, )
[docs] def get_symbol_str(symbol, pol=""): """Generate symbol string with optional polarization subscript. Parameters ---------- symbol : str The base symbol (e.g., 'A', 'Z', 'z') pol : str, optional Polarization identifier (e.g., 'H', 'V') Returns ------- str LaTeX formatted symbol string """ if pol: return rf"{symbol}_{{\mathrm{{{pol}}}}}" return symbol
[docs] def plot_A_R( df, a, r, cmap=None, norm=None, add_colorbar=True, add_fit=True, pol="", title=None, ax=None, figsize=(3.4, 3.0), dpi=300, fit_linewidth=1, fit_linestyle="dashed", legend_fontsize=None, xlabel_pad=None, ylabel_pad=None, ): """Create a 2D histogram of A vs R.""" # Define a_min and a_max a_min = 0.001 a_max = 10 a_bins = log_arange(a_min, a_max, log_step=0.025, base=10) rlims = (0.1, 500) r_bins = log_arange(*rlims, log_step=0.025, base=10) # Compute 2D histogram ds_stats = compute_2d_histogram( df, x=r, y=a, x_bins=r_bins, y_bins=a_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Define ticks and ticklabels r_ticks = [0.1, 1, 10, 50, 100, 500] a_ticks = [0.001, 0.01, 0.1, 0.5, 1, 5] # Adapt on a_max # Define A symbol a_symbol = get_symbol_str("A", pol) # Set default title if not provided if title is None: title = rf"${a_symbol}$ vs $R$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=r, y=a, ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", xscale="log", yscale="log", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$R$ [mm h$^{-1}$]", labelpad=xlabel_pad) ax.set_ylabel(rf"${a_symbol}$ [dB km$^{{-1}}$]", labelpad=ylabel_pad) ax.set_ylim(a_min, a_max) ax.set_xlim(*rlims) ax.set_xticks(r_ticks) ax.set_xticklabels([str(v) for v in r_ticks]) ax.set_yticks(a_ticks) ax.set_yticklabels([str(v) for v in a_ticks]) ax.set_title(title) if add_fit: try: # Fit powerlaw k = a * R ** b (a_c, b), _ = fit_powerlaw(x=df[r], y=df[a], xbins=r_bins, ybins=None, x_in_db=False) # Define legend string legend_str = define_powerlaw_legend_str("R", a_symbol, a_c, b) # Get power law predictions x_pred = np.arange(*rlims) r_pred = predict_from_powerlaw(x_pred, a=a_c, b=b) # Add fitted power law ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_A_R: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_A_Z( df, a, z, cmap=None, norm=None, add_colorbar=True, add_fit=True, pol="", title=None, ax=None, a_lim=(0.0001, 10), z_lim=(0, 70), figsize=(3.4, 3.0), dpi=300, fit_linewidth=1, fit_linestyle="dashed", legend_fontsize=None, xlabel_pad=None, ylabel_pad=None, ): """Create a 2D histogram of A vs Z.""" # Define bins a_bins = log_arange(*a_lim, log_step=0.025, base=10) z_bins = np.arange(*z_lim, 0.5) # Compute 2d histogram ds_stats = compute_2d_histogram( df, x=z, y=a, x_bins=z_bins, y_bins=a_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) if norm is None: norm = LogNorm(1, None) # Ticks a_ticks = [0.001, 0.01, 0.1, 0.5, 1, 5] # Define symbols a_symbol = get_symbol_str("A", pol) z_symbol = get_symbol_str("Z", pol) z_lower_symbol = get_symbol_str("z", pol) # Set default title if not provided if title is None: title = rf"${a_symbol}$ vs ${z_symbol}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=z, y=a, ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", xscale=None, yscale="log", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad) ax.set_ylabel(rf"${a_symbol}$ [dB km$^{{-1}}$]", labelpad=ylabel_pad) ax.set_xlim(*z_lim) ax.set_ylim(*a_lim) ax.set_yticks(a_ticks) ax.set_yticklabels([str(v) for v in a_ticks]) ax.set_title(title) # Fit and plot the power law if add_fit: try: # Fit powerlaw k = a * Z ** b (Z in dBZ -> x_in_db=True) (a_c, b), _ = fit_powerlaw( x=df[z], y=df[a], xbins=z_bins, ybins=a_bins, x_in_db=True, ) # Define legend string legend_str = define_powerlaw_legend_str(z_lower_symbol, a_symbol, a_c, b) # Predictions x_pred = np.arange(*z_lim) x_pred_linear = disdrodb.idecibel(x_pred) # convert to linear for prediction y_pred = predict_from_powerlaw(x_pred_linear, a=a_c, b=b) ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_A_Z: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_A_KDP( df, a, kdp, log_a=True, log_kdp=False, a_lim=(0.001, 10), kdp_lim=None, pol="", ax=None, cmap=None, norm=None, add_colorbar=True, add_fit=True, title=None, figsize=(3.4, 3.0), dpi=300, legend_fontsize=None, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of k(H/V) vs KDP.""" # Bins & limits for a if log_a: a_bins = log_arange(*a_lim, log_step=0.025, base=10) yscale = "log" a_ticks = [0.001, 0.01, 0.1, 0.5, 1, 5] else: a_bins = np.arange(a_lim[0], a_lim[1], 0.01) yscale = None a_ticks = None # Bins & limits for KDP if log_kdp: kdp_lim = (0.05, 10) if kdp_lim is None else kdp_lim kdp_bins = log_arange(*kdp_lim, log_step=0.05, base=10) xscale = "log" kdp_ticks = [0.05, 0.1, 0.5, 1, 5, 10] else: kdp_lim = (0, 8) if kdp_lim is None else kdp_lim kdp_bins = np.arange(kdp_lim[0], kdp_lim[1], 0.1) xscale = None kdp_ticks = None # Compute 2D histogram ds_stats = compute_2d_histogram( df, x=kdp, y=a, x_bins=kdp_bins, y_bins=a_bins, ) # Colormap & norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) if norm is None: norm = LogNorm(1, None) # Define symbols a_symbol = get_symbol_str("A", pol) # Set default title if not provided if title is None: title = rf"${a_symbol}$ vs $K_{{\mathrm{{DP}}}}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=kdp, y=a, ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", xscale=xscale, yscale=yscale, cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$K_{\mathrm{DP}}$ [deg km$^{-1}$]", labelpad=xlabel_pad) ax.set_ylabel(rf"${a_symbol}$ [dB km$^{{-1}}$]", labelpad=ylabel_pad) ax.set_xlim(*kdp_lim) ax.set_ylim(*a_lim) if kdp_ticks is not None: ax.set_xticks(kdp_ticks) ax.set_xticklabels([str(v) for v in kdp_ticks]) if a_ticks is not None: ax.set_yticks(a_ticks) ax.set_yticklabels([str(v) for v in a_ticks]) ax.set_title(title) # Fit and overlay power law: k = a * KDP^b if add_fit: try: (a_c, b), _ = fit_powerlaw( x=df[kdp], y=df[a], xbins=kdp_bins, ybins=a_bins, x_in_db=False, ) # Define legend string legend_str = define_powerlaw_legend_str("K_{\\mathrm{DP}}", a_symbol, a_c, b) # Predictions along KDP axis if log_kdp: x_pred = np.logspace(np.log10(kdp_lim[0]), np.log10(kdp_lim[1]), 400) else: x_pred = np.arange(kdp_lim[0], kdp_lim[1], 0.05) y_pred = predict_from_powerlaw(x_pred, a=a_c, b=b) ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_A_KDP: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_R_Z( df, z, r, cmap=None, norm=None, add_colorbar=True, add_fit=True, pol="", title=None, ax=None, figsize=(3.4, 3.0), dpi=300, fit_linewidth=1, fit_linestyle="dashed", legend_fontsize=None, xlabel_pad=None, ylabel_pad=None, ): """Create a 2D histogram of Z vs R.""" # Define axis limits z_lims = (0, 70) r_lims = (0.1, 500) z_bins = np.arange(0.01, 500, 0.5) r_bins = log_arange(0.01, 500, log_step=0.05, base=10) z_bins_fit = np.arange(0, 50, 1) r_bins_fit = log_arange(0.01, 500, log_step=0.05, base=10) # Compute 2d histogram ds_stats = compute_2d_histogram( df, x=z, y=r, x_bins=z_bins, y_bins=r_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Define rain ticks and ticklabels r_ticks = [0.1, 1, 10, 50, 100, 500] # Define symbols z_symbol = get_symbol_str("Z", pol) z_lower_symbol = get_symbol_str("z", pol) # Set default title if not provided if title is None: title = rf"${z_symbol}$ vs $R$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=z, y=r, ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", yscale="log", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_ylabel(r"$R$ [mm h$^{-1}$]", labelpad=ylabel_pad) ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad) ax.set_xlim(*z_lims) ax.set_ylim(*r_lims) ax.set_yticks(r_ticks) ax.set_yticklabels([str(v) for v in r_ticks]) ax.set_title(title) # Fit and plot the powerlaw if add_fit: try: # Fit powerlaw R = a * z ** b (a, b), _ = fit_powerlaw(x=df[z], y=df[r], xbins=z_bins_fit, ybins=r_bins_fit, x_in_db=True) # Define legend string legend_str = define_powerlaw_legend_str(z_lower_symbol, "R", a, b) # Get power law predictions x_pred = np.arange(*z_lims) x_pred_linear = disdrodb.idecibel(x_pred) r_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b) # Add fitted powerlaw ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_R_Z: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_R_KDP( df, kdp, r, kdp_lim=None, r_lim=None, cmap=None, norm=None, add_colorbar=True, log_scale=False, add_fit=True, title=None, ax=None, figsize=(3.4, 3.0), dpi=300, legend_fontsize=None, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of KDP vs R.""" # Define bins if not log_scale: kdp_lim = (0, 8) if kdp_lim is None else kdp_lim r_lim = (0, 200) if r_lim is None else r_lim xbins = np.arange(*kdp_lim, 0.1) ybins = np.arange(*r_lim, 1) xscale = None yscale = None else: kdp_lim = (0.01, 10) if kdp_lim is None else kdp_lim r_lim = (0.1, 500) if r_lim is None else r_lim xbins = log_arange(*kdp_lim, log_step=0.05, base=10) ybins = log_arange(*r_lim, log_step=0.05, base=10) xscale = "log" yscale = "log" # Compute 2d histogram ds_stats = compute_2d_histogram( df, x=kdp, y=r, x_bins=xbins, y_bins=ybins, # y_bins=log_arange(0.1, 500, log_step=0.05, base=10), ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Set default title if not provided if title is None: title = r"$K_{\mathrm{DP}}$ vs $R$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=kdp, y=r, ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, xscale=xscale, yscale=yscale, extend="max", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_ylabel(r"$R$ [mm h$^{-1}$]", labelpad=ylabel_pad) ax.set_xlabel(r"$K_{\mathrm{DP}}$ [deg km$^{-1}$]", labelpad=xlabel_pad) ax.set_xlim(*kdp_lim) ax.set_ylim(*r_lim) ax.set_title(title) # Fit and plot the power law if add_fit: try: # Fit powerlaw R = a * KDP ** b (a, b), _ = fit_powerlaw(x=df[kdp], y=df[r], xbins=xbins, ybins=ybins, x_in_db=False) # Define legend string legend_str = define_powerlaw_legend_str("K_{\\mathrm{DP}}", "R", a, b) # Get power law predictions x_pred = np.arange(*kdp_lim) r_pred = predict_from_powerlaw(x_pred, a=a, b=b) # Add fitted line ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_R_KDP: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_ZDR_Z( df, z, zdr, zdr_lim=(0, 2.5), z_lim=(0, 70), cmap=None, norm=None, add_colorbar=True, add_fit=False, pol="", title=None, ax=None, figsize=(3.4, 3.0), dpi=300, legend_fontsize=None, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of Zdr vs Z.""" z_bins = np.arange(*z_lim, 0.5) zdr_bins = np.arange(*zdr_lim, 0.025) # Compute 2d histogram ds_stats = compute_2d_histogram( df, x=z, y=zdr, x_bins=z_bins, y_bins=zdr_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Define symbols z_symbol = get_symbol_str("Z", pol) z_lower_symbol = get_symbol_str("z", pol) # Set default title if not provided if title is None: title = rf"$Z_{{\mathrm{{DR}}}}$ vs ${z_symbol}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=z, y=zdr, ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad) ax.set_ylabel(r"$Z_{DR}$ [dB]", labelpad=ylabel_pad) ax.set_xlim(*z_lim) ax.set_ylim(*zdr_lim) ax.set_title(title) # Fit and plot the power law if add_fit: try: # Fit powerlaw ZDR = a * Z ** b (a, b), _ = fit_powerlaw( x=df[z], y=df[zdr], xbins=z_bins, ybins=zdr_bins, x_in_db=True, ) # Define legend string legend_str = define_powerlaw_legend_str(z_lower_symbol, "Z_{\\mathrm{DR}}", a, b) # Get power law predictions x_pred = np.arange(0, 70) x_pred_linear = disdrodb.idecibel(x_pred) r_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b) # Add fitted line ax.plot(x_pred, r_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_ZDR_Z: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_KDP_Z( df, kdp, z, z_lim=(0, 70), log_kdp=False, kdp_lim=None, cmap=None, norm=None, add_colorbar=True, add_fit=True, pol="", title=None, ax=None, figsize=(3.4, 3.0), dpi=300, legend_fontsize=None, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of KDP vs Z.""" # Bins & limits z_bins = np.arange(*z_lim, 0.5) if log_kdp: kdp_lim = (0.01, 10) if kdp_lim is None else kdp_lim kdp_bins = log_arange(*kdp_lim, log_step=0.05, base=10) yscale = "log" kdp_ticks = [0.01, 0.1, 0.5, 1, 5, 10] else: kdp_lim = (0, 10) if kdp_lim is None else kdp_lim kdp_bins = np.arange(*kdp_lim, 0.1) yscale = None kdp_ticks = None # Compute 2D histogram ds_stats = compute_2d_histogram( df, x=z, y=kdp, x_bins=z_bins, y_bins=kdp_bins, ) # Colormap & norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) if norm is None: norm = LogNorm(1, None) # Define symbols z_symbol = get_symbol_str("Z", pol) z_lower_symbol = get_symbol_str("z", pol) # Set default title if not provided if title is None: title = rf"$K_{{\mathrm{{DP}}}}$ vs ${z_symbol}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=z, y=kdp, ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", yscale=yscale, cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(rf"${z_symbol}$ [dBZ]", labelpad=xlabel_pad) ax.set_ylabel(r"$K_{\mathrm{DP}}$ [deg km$^{-1}$]", labelpad=ylabel_pad) ax.set_xlim(*z_lim) ax.set_ylim(*kdp_lim) if kdp_ticks is not None: ax.set_yticks(kdp_ticks) ax.set_yticklabels([str(v) for v in kdp_ticks]) ax.set_title(title) # Fit and overlay power law if add_fit: try: # Fit: KDP = a * Z^b (Z in dBZ → x_in_db=True) (a, b), _ = fit_powerlaw( x=df[z], y=df[kdp], xbins=z_bins, ybins=kdp_bins, x_in_db=True, ) # Define legend string legend_str = define_powerlaw_legend_str(z_lower_symbol, "K_{\\mathrm{DP}}", a, b) # Get power law predictions x_pred = np.arange(*z_lim) x_pred_linear = disdrodb.idecibel(x_pred) y_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b) # Add fitted power law ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_KDP_Z: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_ADP_KDP_ZDR( df, adp, kdp, zdr, y_lim=(0, 0.015), zdr_lim=(0, 6), cmap=None, norm=None, add_colorbar=True, title=None, ax=None, figsize=(3.4, 3.0), dpi=300, xlabel_pad=None, ylabel_pad=None, ): """Create a 2D histogram of ADP/KDP vs ZDR. References ---------- Ryzhkov, A., P. Zhang, and J. Hu, 2025. Suggested Modifications for the S-Band Polarimetric Radar Rainfall Estimation Algorithm. J. Hydrometeor., 26, 1053-1062. https://doi.org/10.1175/JHM-D-25-0014.1. """ # Compute ADP/KDP df["ADP/KDP"] = df[adp] / df[kdp] # Bins & limits y_bins = np.arange(y_lim[0], y_lim[1], (y_lim[1] - y_lim[0]) / 200) zdr_bins = np.arange(zdr_lim[0], zdr_lim[1] + 0.025, 0.025) # Compute 2D histogram ds_stats = compute_2d_histogram( df, x=zdr, y="ADP/KDP", x_bins=zdr_bins, y_bins=y_bins, ) # Colormap & norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) if norm is None: norm = LogNorm(1, None) # Set default title if not provided if title is None: title = r"$A_{\mathrm{DP}} / K_{\mathrm{DP}}$ vs $Z_{\mathrm{DR}}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=zdr, y="ADP/KDP", ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$Z_{\mathrm{DR}}$ [dB]", labelpad=xlabel_pad) ax.set_ylabel(r"$A_{\mathrm{DP}} / K_{\mathrm{DP}}$ [dB deg$^{{-1}}$]", labelpad=ylabel_pad) ax.set_xlim(*zdr_lim) ax.set_ylim(*y_lim) ax.set_title(title) return p
[docs] def plot_A_KDP_ZDR( df, a, kdp, zdr, y_lim=(0, 0.05), zdr_lim=(0, 3), cmap=None, norm=None, add_colorbar=True, pol="", title=None, ax=None, figsize=(3.4, 3.0), dpi=300, xlabel_pad=None, ylabel_pad=None, ): """Create a 2D histogram of k/KDP vs ZDR. References ---------- Ryzhkov, A., P. Zhang, and J. Hu, 2025. Suggested Modifications for the S-Band Polarimetric Radar Rainfall Estimation Algorithm. J. Hydrometeor., 26, 1053-1062. https://doi.org/10.1175/JHM-D-25-0014.1. """ # Compute A/KDP df["A/KDP"] = df[a] / df[kdp] # Bins & limits y_bins = np.arange(y_lim[0], y_lim[1], (y_lim[1] - y_lim[0]) / 200) x_bins = np.arange(zdr_lim[0], zdr_lim[1] + 0.025, 0.025) # Compute 2D histogram ds_stats = compute_2d_histogram( df, x=zdr, y="A/KDP", x_bins=x_bins, y_bins=y_bins, ) # Colormap & norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) if norm is None: norm = LogNorm(1, None) # Define symbols a_symbol = get_symbol_str("A", pol) # Set default title if not provided if title is None: title = rf"${a_symbol} / K_{{\mathrm{{DP}}}}$ vs $Z_{{\mathrm{{DR}}}}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=zdr, y="A/KDP", ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$Z_{\mathrm{DR}}$ [dB]", labelpad=xlabel_pad) ax.set_ylabel(rf"${a_symbol} / K_{{\mathrm{{DP}}}}$ [dB/deg]", labelpad=ylabel_pad) ax.set_xlim(*zdr_lim) ax.set_ylim(*y_lim) ax.set_title(title) return p
[docs] def plot_KDP_Z_ZDR( df, kdp, z, zdr, y_lim=None, zdr_lim=(0, 5), z_linear=True, cmap=None, norm=None, add_colorbar=True, title=None, ax=None, figsize=(3.4, 3.0), dpi=300, xlabel_pad=None, ylabel_pad=None, ): """Create a 2D histogram of (KDP/Z) vs ZDR with log-scale y-axis (no fit).""" # Define y limits and KDP/Z if z_linear: df["KDP/Z"] = df[kdp] / disdrodb.idecibel(df[z]) y_lim = (1e-6, 1e-3) if y_lim is None else y_lim y_label = r"$K_{\mathrm{DP}} / Z$ [deg km$^{-1}$ / mm$^6$ m$^{-3}$]" else: df["KDP/Z"] = df[kdp] / df[z] y_lim = (1e-5, 1e-1) if y_lim is None else y_lim y_label = r"$K_{\mathrm{DP}} / Z$ [deg km$^{-1}$ / dBZ]" # Define bins y_bins = log_arange(y_lim[0], y_lim[1], log_step=0.025, base=10) x_bins = np.arange(zdr_lim[0], zdr_lim[1] + 0.025, 0.025) # Compute 2D histogram ds_stats = compute_2d_histogram( df, x=zdr, y="KDP/Z", x_bins=x_bins, y_bins=y_bins, ) # Colormap & norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) if norm is None: norm = LogNorm(1, None) # Set default title if not provided if title is None: title = r"$K_{\mathrm{DP}}/Z$ vs $Z_{\mathrm{DR}}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=zdr, y="KDP/Z", ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", yscale="log", cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$Z_{\mathrm{DR}}$ [dB]", labelpad=xlabel_pad) ax.set_ylabel(y_label, labelpad=ylabel_pad) ax.set_xlim(*zdr_lim) ax.set_ylim(*y_lim) ax.set_title(title) return p
[docs] def plot_KED_R( df, log_r=True, log_ked=False, add_fit=True, cmap=None, norm=None, add_colorbar=True, title=None, ax=None, legend_fontsize=None, figsize=(3.4, 3.0), dpi=300, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of KED vs R.""" # R settings if log_r: r_bins = log_arange(0.1, 500, log_step=0.05, base=10) r_lims = (0.1, 500) r_ticks = [0.1, 1, 10, 50, 100, 500] xscale = "log" else: r_bins = np.arange(0, 500, step=2) r_lims = (0, 500) r_ticks = None xscale = "linear" # KED settings if log_ked: ked_bins = log_arange(1, 100, log_step=0.025, base=10) if log_r: ked_lims = (2, 100) ked_ticks = [5, 10, 50, 100] else: ked_lims = (1, 50) ked_ticks = [1, 10, 50] yscale = "log" else: ked_bins = np.arange(0, 50, step=1) ked_lims = (0, 50) ked_ticks = None yscale = "linear" # Compute 2d histogram ds_stats = compute_2d_histogram( df, x="R", y="KED", x_bins=r_bins, y_bins=ked_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Set default title if not provided if title is None: title = "KED vs R" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x="R", y="KED", ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", xscale=xscale, yscale=yscale, cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$R$ [mm h$^{-1}$]", labelpad=xlabel_pad) ax.set_ylabel(r"KED [J m$^{-2}$ mm$^{-1}$]", labelpad=ylabel_pad) ax.set_xlim(*r_lims) ax.set_ylim(*ked_lims) if r_ticks is not None: ax.set_xticks(r_ticks) ax.set_xticklabels([str(v) for v in r_ticks]) if ked_ticks is not None: ax.set_yticks(ked_ticks) ax.set_yticklabels([str(v) for v in ked_ticks]) ax.tick_params(axis="y", which="both", left=True) ax.set_title(title) # Fit and plot a powerlaw if add_fit: try: # Fit a power law KED = a * R**b (a, b), _ = fit_powerlaw( x=df["R"], y=df["KED"], xbins=r_bins, ybins=ked_bins, x_in_db=False, ) # Define legend string legend_str = define_powerlaw_legend_str("R", "\\mathrm{KED}", a, b) # Get power law predictions x_pred = np.arange(r_lims[0], r_lims[1]) y_pred = predict_from_powerlaw(x_pred, a=a, b=b) # Add fitted powerlaw ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_KED_R: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_KEF_R( df, log_r=True, log_kef=True, add_fit=True, cmap=None, norm=None, add_colorbar=True, title=None, ax=None, legend_fontsize=None, figsize=(3.4, 3.0), dpi=300, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of KEF vs R.""" if log_r: r_bins = log_arange(0.1, 500, log_step=0.05, base=10) r_lims = (0.1, 500) r_ticks = [0.1, 1, 10, 50, 100, 500] xscale = "log" else: r_bins = np.arange(0, 500, step=2) r_lims = (0, 500) r_ticks = None xscale = "linear" if log_kef: kef_bins = log_arange(0.1, 10_000, log_step=0.05, base=10) kef_lims = (0.1, 10_000) kef_ticks = [0.1, 1, 10, 100, 1000, 10000] yscale = "log" else: kef_bins = np.arange(0, 5000, step=50) kef_lims = (0, 5000) kef_ticks = None yscale = "linear" # Compute 2d histogram ds_stats = compute_2d_histogram( df, x="R", y="KEF", x_bins=r_bins, y_bins=kef_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Set default title if not provided if title is None: title = "KEF vs R" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x="R", y="KEF", ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", xscale=xscale, yscale=yscale, cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$R$ [mm h$^{-1}$]", labelpad=xlabel_pad) ax.set_ylabel(r"KEF [J m$^{-2}$ h$^{-1}$]", labelpad=ylabel_pad) ax.set_xlim(*r_lims) ax.set_ylim(*kef_lims) if r_ticks is not None: ax.set_xticks(r_ticks) ax.set_xticklabels([str(v) for v in r_ticks]) if kef_ticks is not None: ax.set_yticks(kef_ticks) ax.set_yticklabels([str(v) for v in kef_ticks]) ax.set_title(title) # Fit and plot the power law # - Alternative fit model: a + I *(1 - b*exp(c*I)) (a is upper limit) if add_fit: try: # Fit power law KEF = a * R ** b (a, b), _ = fit_powerlaw( x=df["R"], y=df["KEF"], xbins=r_bins, ybins=kef_bins, x_in_db=False, ) # Define legend string legend_str = define_powerlaw_legend_str("R", "\\mathrm{KEF}", a, b) # Get power law predictions x_pred = np.arange(*r_lims) kef_pred = predict_from_powerlaw(x_pred, a=a, b=b) # Add fitted powerlaw ax.plot(x_pred, kef_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_KEF_R: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_KEF_Z( df, z="Z", log_kef=True, add_fit=True, pol="", cmap=None, norm=None, add_colorbar=True, title=None, ax=None, legend_fontsize=None, figsize=(3.4, 3.0), dpi=300, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of KEF vs Z.""" # Define limits and bins z_lims = (0, 70) z_bins = np.arange(*z_lims, step=1) if log_kef: kef_lims = (0.1, 10_000) kef_bins = log_arange(*kef_lims, log_step=0.05, base=10) kef_ticks = [0.1, 1, 10, 100, 1000, 10000] yscale = "log" else: kef_lims = (0, 5000) kef_bins = np.arange(*kef_lims, step=50) kef_ticks = None yscale = "linear" # Compute 2d histogram ds_stats = compute_2d_histogram( df, x=z, y="KEF", x_bins=z_bins, y_bins=kef_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Define symbols z_symbol = get_symbol_str("Z", pol) z_lower_symbol = get_symbol_str("z", pol) # Set default title if not provided if title is None: title = rf"KEF vs ${z_symbol}$" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=z, y="KEF", ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", yscale=yscale, cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(rf"${z_symbol}$ [dB]", labelpad=xlabel_pad) ax.set_ylabel(r"KEF [$J$ m$^{-2}$ h$^{-1}$]", labelpad=ylabel_pad) ax.set_xlim(*z_lims) ax.set_ylim(*kef_lims) if kef_ticks is not None: ax.set_yticks(kef_ticks) ax.set_yticklabels([str(v) for v in kef_ticks]) ax.set_title(title) # Fit and plot the powerlaw if add_fit: try: # Fit power law KEF = a * Z ** b (a, b), _ = fit_powerlaw( x=df[z], y=df["KEF"], xbins=z_bins, ybins=kef_bins, x_in_db=True, ) # Define legend string legend_str = define_powerlaw_legend_str(z_lower_symbol, "\\mathrm{KEF}", a, b) # Get power law predictions x_pred = np.arange(*z_lims) x_pred_linear = disdrodb.idecibel(x_pred) kef_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b) # Add fitted powerlaw ax.plot(x_pred, kef_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_KEF_Z: {e!s}", UserWarning, stacklevel=2) return p
[docs] def plot_TKE_Z( df, z="Z", log_tke=True, add_fit=True, cmap=None, norm=None, add_colorbar=True, title=None, ax=None, legend_fontsize=None, figsize=(3.4, 3.0), dpi=300, xlabel_pad=None, ylabel_pad=None, fit_linewidth=1, fit_linestyle="dashed", ): """Create a 2D histogram of TKE vs Z.""" z_bins = np.arange(0, 70, step=1) z_lims = (0, 70) if log_tke: tke_bins = log_arange(0.01, 500, log_step=0.05, base=10) tke_lims = (0.01, 200) tke_ticks = [0.01, 0.1, 1, 10, 100, 200] yscale = "log" else: tke_bins = np.arange(0, 200, step=1) tke_lims = (0, 200) tke_ticks = None yscale = "linear" # Compute 2d histogram ds_stats = compute_2d_histogram( df, x=z, y="TKE", x_bins=z_bins, y_bins=tke_bins, ) # Define colormap and norm if cmap is None: cmap = plt.get_cmap("Spectral_r").copy() cmap.set_under(alpha=0) norm = LogNorm(1, None) if norm is None else norm # Set default title if not provided if title is None: title = "TKE vs Z" # Create figure if ax is None if ax is None: fig, ax = plt.subplots(1, 1, figsize=figsize, dpi=dpi) # Plot 2D histogram p = ds_stats["count"].plot.pcolormesh( x=z, y="TKE", ax=ax, cmap=cmap, norm=norm, add_colorbar=add_colorbar, extend="max", yscale=yscale, cbar_kwargs={"label": "Counts"} if add_colorbar else {}, ) ax.set_xlabel(r"$Z$ [dB]", labelpad=xlabel_pad) ax.set_ylabel(r"TKE [$J$ m$^{-2}$]", labelpad=ylabel_pad) ax.set_xlim(*z_lims) ax.set_ylim(*tke_lims) if tke_ticks is not None: ax.set_yticks(tke_ticks) ax.set_yticklabels([str(v) for v in tke_ticks]) ax.set_title(title) # Fit and plot the powerlaw if add_fit: try: # Fit power law TKE = a * Z ** b (a, b), _ = fit_powerlaw( x=df[z], y=df["TKE"], xbins=z_bins, ybins=tke_bins, x_in_db=True, ) # Define legend string legend_str = define_powerlaw_legend_str("z", "\\mathrm{TKE}", a, b) # Get power law predictions x_pred = np.arange(*z_lims) x_pred_linear = disdrodb.idecibel(x_pred) y_pred = predict_from_powerlaw(x_pred_linear, a=a, b=b) # Add fitted powerlaw ax.plot(x_pred, y_pred, linewidth=fit_linewidth, linestyle=fit_linestyle, color="black") # Add legend add_legend_powerlaw(ax=ax, legend_str=legend_str, legend_fontsize=legend_fontsize) except Exception as e: warnings.warn(f"Could not fit power law in plot_TKE_Z: {e!s}", UserWarning, stacklevel=2) return p
####-----------------------------------------------------------------------. #### Radar and Kinetic Energy Summary figures
[docs] def plot_radar_relationships(df, band): """Create 3x3 radar relationships figure formatted for AMT journal.""" if band not in {"X", "C", "S"}: raise ValueError("Plotting function developed only for bands: 'X', 'C', 'S'.") # -------------------------------------------------- # Styling parameters # -------------------------------------------------- plt.rcParams.update( { "font.size": 8, "axes.labelsize": 8, "axes.titlesize": 8, "xtick.labelsize": 7, "ytick.labelsize": 7, }, ) # Appearance norm = LogNorm(1, None) add_colorbar = False legend_fontsize = 8 xlabel_pad = -0.5 ylabel_pad = -1 # -------------------------------------------------- # Define columns # -------------------------------------------------- z = f"DBZH_{band}" zdr = f"ZDR_{band}" kdp = f"KDP_{band}" a = f"AH_{band}" adp = f"ADP_{band}" adp_kdp_ylim_dict = { "X": (0.0, 0.05), "C": (0.0, 0.05), "S": (0.0, 0.015), } a_ylim_dict = { "S": (0.00001, 1), "C": (0.0001, 10), "X": (0.0001, 10), } adp_kdp_ylim = adp_kdp_ylim_dict[band] a_ylim = a_ylim_dict[band] # -------------------------------------------------- # Figure size # -------------------------------------------------- fig = plt.figure(figsize=(6.9, 9), dpi=300) gs = GridSpec( 4, 3, figure=fig, height_ratios=[1, 1, 1, 0.05], hspace=0.38, wspace=0.32, left=0.085, right=0.985, top=0.975, bottom=0.03, ) axes = np.array([[fig.add_subplot(gs[i, j]) for j in range(3)] for i in range(3)]) # ================================================== # Row 1 # ================================================== p = plot_R_Z( df, z=z, r="R", pol="H", norm=norm, add_colorbar=add_colorbar, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[0, 0], ) norm = p.norm plot_R_KDP( df, kdp=kdp, r="R", log_scale=True, norm=norm, add_colorbar=add_colorbar, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[0, 1], ) plot_KDP_Z( df, kdp=kdp, z=z, pol="H", log_kdp=True, norm=norm, add_colorbar=add_colorbar, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[0, 2], ) # ================================================== # Row 2 # ================================================== plot_A_Z( df, a=a, z=z, pol="H", norm=norm, add_colorbar=add_colorbar, a_lim=a_ylim, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[1, 0], ) plot_A_KDP( df, a=a, kdp=kdp, pol="H", norm=norm, add_colorbar=add_colorbar, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[1, 1], ) plot_A_R( df, a=a, r="R", pol="H", norm=norm, add_colorbar=add_colorbar, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[1, 2], ) # ================================================== # Row 3 # ================================================== plot_ZDR_Z( df, z=z, zdr=zdr, pol="H", norm=norm, add_fit=False, add_colorbar=add_colorbar, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[2, 0], ) plot_ADP_KDP_ZDR( df, adp=adp, kdp=kdp, zdr=zdr, norm=norm, add_colorbar=add_colorbar, y_lim=adp_kdp_ylim, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[2, 1], ) p = plot_KDP_Z_ZDR( df, kdp=kdp, z=z, zdr=zdr, norm=norm, add_colorbar=add_colorbar, z_linear=False, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, ax=axes[2, 2], ) # ================================================== # Bottom colorbar (shortened) # ================================================== cax_full = fig.add_subplot(gs[3, :]) cax_full.axis("off") cax = inset_axes( cax_full, width="40%", height="100%", loc="center", ) cbar = fig.colorbar( p, cax=cax, orientation="horizontal", extend="max", extendfrac=0.02, ) cbar.set_label("Counts", labelpad=6) cbar.ax.tick_params(labelsize=7) cbar.ax.xaxis.set_label_position("top") fig.align_xlabels() fig.align_ylabels() return fig
[docs] def plot_kinetic_energy_relationships(df): """Create a 2x2 multipanel figure showing kinetic energy relationships (AMT format).""" # -------------------------------------------------- # Plot styling # -------------------------------------------------- plt.rcParams.update( { "font.size": 8, "axes.labelsize": 8, "axes.titlesize": 8, "xtick.labelsize": 7, "ytick.labelsize": 7, "legend.fontsize": 8, }, ) # Appearance add_colorbar = False norm = LogNorm(1, None) legend_fontsize = 8 xlabel_pad = None ylabel_pad = 2 # -------------------------------------------------- # Figure (A4 width - some margin) # -------------------------------------------------- fig = plt.figure(figsize=(6.9, 5.8), dpi=300) gs = GridSpec( 3, 2, figure=fig, height_ratios=[1, 1, 0.1], hspace=0.0, wspace=0.24, left=0.10, right=0.98, top=0.97, bottom=0.05, ) # -------------------------------------------------- # Create axes # -------------------------------------------------- ax11 = fig.add_subplot(gs[0, 0]) ax21 = fig.add_subplot(gs[1, 0]) ax12 = fig.add_subplot(gs[0, 1]) ax22 = fig.add_subplot(gs[1, 1]) # ================================================== # ax11 — R vs KED # ================================================== p = plot_KED_R( df, log_ked=True, log_r=True, norm=norm, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, add_colorbar=add_colorbar, title="", ax=ax11, ) norm = p.norm # ================================================== # ax12 — Z vs TKE # ================================================== plot_TKE_Z( df, z="Z", norm=norm, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, add_colorbar=add_colorbar, title="", ax=ax12, ) # ================================================== # ax21 — R vs KEF # ================================================== plot_KEF_R( df, log_kef=True, log_r=True, norm=norm, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, add_colorbar=add_colorbar, title="", ax=ax21, ) # ================================================== # ax22 — Z vs KEF # ================================================== p_last = plot_KEF_Z( df, z="Z", norm=norm, legend_fontsize=legend_fontsize, xlabel_pad=xlabel_pad, ylabel_pad=ylabel_pad, add_colorbar=add_colorbar, title="", ax=ax22, ) # ================================================== # Clean axis # ================================================== # Remove x-tick labels and xticks from first row for ax in [ax11, ax12]: ax.set_xticklabels([]) ax.set_xlabel("") ax.tick_params(axis="x", which="both", left=False) # Clear overlapping ticks between ax11 and ax21 ylim = ax21.get_ylim() yticks = ax21.get_yticks() yticks = yticks[(yticks <= 5000)] ax21.set_yticks(yticks) ax21.set_ylim(*ylim) # Clear overlapping ticks between ax11 and ax21 ylim = ax22.get_ylim() yticks = ax22.get_yticks() yticks = yticks[(yticks <= 5000)] ax22.set_yticks(yticks) ax22.set_ylim(*ylim) # Remove redundant labels ax11.set_xlabel("") ax12.set_xlabel("") # ================================================== # Colorbar inside ax21 (bottom-left) # ================================================== from mpl_toolkits.axes_grid1.inset_locator import inset_axes cax = inset_axes( ax21, width="65%", # length of colorbar height="6%", # thickness loc="lower right", borderpad=2.4, ) cbar = fig.colorbar( p_last, cax=cax, orientation="horizontal", extend="max", extendfrac=0.05, ) # Label above colorbar cbar.ax.xaxis.set_label_position("top") cbar.set_label("Counts", labelpad=4) cbar.ax.tick_params(labelsize=7) # ------------------------------------- # - Align labels fig.align_xlabels() fig.align_ylabels() return fig
####-----------------------------------------------------------------------. #### Summary routine
[docs] def define_filename(prefix, extension, data_source, campaign_name, station_name, temporal_resolution): """Define filename for summary files.""" if extension in ["png", "jpeg"]: filename = f"Figure.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}" elif extension in ["csv", "pdf", "yaml", "yml"]: filename = f"Table.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}" elif extension in ["nc"]: filename = f"Dataset.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}" elif extension in ["parquet"]: filename = f"Dataframe.{prefix}.{data_source}.{campaign_name}.{station_name}.{temporal_resolution}.{extension}" else: raise NotImplementedError(f"Standardized filename not implemented for extension {extension}.") return filename
[docs] def create_l2_dataframe(ds): """Create pandas Dataframe for L2 analysis.""" dims_to_drop = set(ds.dims).intersection({DIAMETER_DIMENSION, VELOCITY_DIMENSION, "crs"}) # - Drop array variables and convert to pandas df = ds.drop_dims(dims_to_drop).to_pandas() # - Drop coordinates coords_to_drop = ["velocity_method", "sample_interval", *RADAR_OPTIONS] df = df.drop(columns=coords_to_drop, errors="ignore") # - Drop rows with missing rain df = df[df["R"] > 0] return df
[docs] def prepare_summary_dataset(ds, velocity_method="theoretical_velocity", source="drop_number", minimum_rain_rate=0): """Prepare the L2E or L2M dataset to be converted to a dataframe.""" # Select fall velocity method if "velocity_method" in ds.dims: ds = ds.sel(velocity_method=velocity_method) # Select first occurrence of radars options (except frequency) for dim in RADAR_OPTIONS: if dim in ds.dims and dim != "frequency": ds = ds.isel({dim: 0}) # Select only one elevation angle if "elevation_angle" in ds.dims: ds = ds.isel(elevation_angle=0) # Unstack frequency dimension if "frequency" in ds.dims: ds = unstack_radar_variables(ds) # For kinetic energy variables, select source="drop_number" if "source" in ds.dims: ds = ds.sel(source=source) # Select only timesteps with R > minimum_rain_rate if "Rm" in ds: # in L2E rainy_timesteps = np.logical_and(ds["Rm"].compute() > minimum_rain_rate, ds["R"].compute() > minimum_rain_rate) else: # L2M without Rm rainy_timesteps = ds["R"].compute() > minimum_rain_rate ds = ds.isel(time=rainy_timesteps) return ds
[docs] def generate_station_summary(ds, summary_dir_path, data_source, campaign_name, station_name, temporal_resolution): """Generate station summary using L2E dataset.""" # Create summary directory if does not exist os.makedirs(summary_dir_path, exist_ok=True) ####---------------------------------------------------------------------. #### Prepare dataset ds = prepare_summary_dataset(ds) # Select only rainy timesteps # Ensure all data are in memory ds = ds.compute() # Filter dataset to remove noisy timesteps # - Keep only timesteps with at least 3 Nbins to remove noise # - Keep only timesteps with sigma_m >= 0.2 mask = ( (ds["Nbins"] >= 3) & (ds["sigma_m"] >= 0.2) & (ds["Nbins_missing_fraction"] <= 0.3) & (ds["Dmin"] <= 1) & (ds["R"] > 0.01) ) valid_idx = np.where(mask)[0] ds_filtered = ds.isel(time=valid_idx) ####---------------------------------------------------------------------. #### Create drop spectrum figures and statistics if VELOCITY_DIMENSION in ds_filtered.dims: # Compute sum of raw and filtered spectrum over time raw_drop_number = ds_filtered["raw_drop_number"].sum(dim="time") drop_number = ds_filtered["drop_number"].sum(dim="time") # Define theoretical and measured average velocity if "time" in ds_filtered["fall_velocity"].dims: theoretical_average_velocity = ds_filtered["fall_velocity"].mean(dim="time") else: theoretical_average_velocity = ds_filtered["fall_velocity"] measured_average_velocity = get_drop_average_velocity(drop_number) # Save raw and filtered spectrum over time & theoretical and measured average fall velocity ds_stats = xr.Dataset() ds_stats["raw_drop_number"] = raw_drop_number ds_stats["drop_number"] = raw_drop_number ds_stats["theoretical_average_velocity"] = theoretical_average_velocity if measured_average_velocity is not None: ds_stats["measured_average_velocity"] = measured_average_velocity filename = define_filename( prefix="SpectrumStats", extension="nc", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) ds_stats.to_netcdf(os.path.join(summary_dir_path, filename)) # Create figures with raw and filtered spectrum # - Raw filename = define_filename( prefix="SpectrumRaw", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig, ax = plt.subplots(1, 1, figsize=(3.4, 2.8), dpi=300) p = plot_spectrum(raw_drop_number, ax=ax, title="Raw Drop Spectrum") p.figure.tight_layout() p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() # - Filtered filename = define_filename( prefix="SpectrumFiltered", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig, ax = plt.subplots(1, 1, figsize=(3.4, 2.8), dpi=300) p = plot_spectrum(drop_number, ax=ax, title="Filtered Drop Spectrum") p.figure.tight_layout() p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() # Create figure comparing raw and filtered spectrum filename = define_filename( prefix="SpectrumSummary", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_raw_and_filtered_spectra(ds_filtered) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() ####---------------------------------------------------------------------. #### Create L2E dataframe df = create_l2_dataframe(ds_filtered) # Define diameter bin edges diameter_bin_edges = get_diameter_bin_edges(ds_filtered) # ---------------------------------------------------------------------. #### Save L2E Parquet l2e_parquet_filename = define_filename( prefix="L2E", extension="parquet", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) l2e_parquet_filepath = os.path.join(summary_dir_path, l2e_parquet_filename) df.to_parquet(l2e_parquet_filepath, engine="pyarrow", compression="snappy") #### ---------------------------------------------------------------------. #### Create table with rain summary if not temporal_resolution.startswith("ROLL"): table_rain_summary = create_table_rain_summary(df, temporal_resolution=temporal_resolution) table_rain_summary_filename = define_filename( prefix="Station_Summary", extension="yaml", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) table_rain_summary_filepath = os.path.join(summary_dir_path, table_rain_summary_filename) write_yaml(table_rain_summary, filepath=table_rain_summary_filepath) # ---------------------------------------------------------------------. #### Create table with events summary table_events_summary = None if not temporal_resolution.startswith("ROLL"): table_events_summary = create_table_events_summary(df, temporal_resolution=temporal_resolution) if len(table_events_summary) > 0: # - Save table as csv table_events_summary_csv_filename = define_filename( prefix="Events_Summary", extension="csv", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) table_events_summary_csv_filepath = os.path.join(summary_dir_path, table_events_summary_csv_filename) table_events_summary.to_csv(table_events_summary_csv_filepath) # - Save table as pdf if is_latex_engine_available(): # Sorted by time table_events_summary_pdf_filename = table_events_summary_csv_filename.replace(".csv", ".pdf") table_events_summary_pdf_filepath = os.path.join(summary_dir_path, table_events_summary_pdf_filename) save_table_to_pdf( df=prepare_latex_table_events_summary(table_events_summary), filepath=table_events_summary_pdf_filepath, index=True, caption="Events Summary", orientation="landscape", ) # Sorted by maximum rain intensity table_events_summary = table_events_summary.sort_values(by="max_R", ascending=False) table_events_summary = table_events_summary.reset_index(drop=True) table_events_summary_pdf_filename = define_filename( prefix="Events_Summary_Sorted_By_Rmax", extension="pdf", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) table_events_summary_pdf_filepath = os.path.join(summary_dir_path, table_events_summary_pdf_filename) save_table_to_pdf( df=prepare_latex_table_events_summary(table_events_summary), filepath=table_events_summary_pdf_filepath, index=True, caption="Events Summary", orientation="landscape", ) # Sorted by Ptotal table_events_summary = table_events_summary.sort_values(by="P_total", ascending=False) table_events_summary = table_events_summary.reset_index(drop=True) table_events_summary_pdf_filename = define_filename( prefix="Events_Summary_Sorted_By_Ptot", extension="pdf", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) table_events_summary_pdf_filepath = os.path.join(summary_dir_path, table_events_summary_pdf_filename) save_table_to_pdf( df=prepare_latex_table_events_summary(table_events_summary), filepath=table_events_summary_pdf_filepath, index=True, caption="Events Summary", orientation="landscape", ) # ---------------------------------------------------------------------. #### Create table with integral DSD parameters statistics table_dsd_summary = create_table_dsd_summary(df) # - Save table as csv table_dsd_summary_csv_filename = define_filename( prefix="DSD_Summary", extension="csv", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) table_dsd_summary_csv_filepath = os.path.join(summary_dir_path, table_dsd_summary_csv_filename) table_dsd_summary.to_csv(table_dsd_summary_csv_filepath) # - Save table as pdf if is_latex_engine_available(): table_dsd_summary_pdf_filename = table_dsd_summary_csv_filename.replace(".csv", ".pdf") table_dsd_summary_pdf_filepath = os.path.join(summary_dir_path, table_dsd_summary_pdf_filename) save_table_to_pdf( df=prepare_latex_table_dsd_summary(table_dsd_summary), index=True, filepath=table_dsd_summary_pdf_filepath, caption="DSD Summary", orientation="portrait", # "landscape", ) ####---------------------------------------------------------------------. #### Create events quicklooks (using unfiltered dataset !) if table_events_summary is not None: # Define number of quicklooks n_quicklooks = 10 # Create quicklooks for Rmax table_events_summary = table_events_summary.sort_values(by="max_R", ascending=False) for i in range(min(n_quicklooks, len(table_events_summary))): event_stats = table_events_summary.iloc[i] if event_stats["duration"] >= 10: # minimum 10 minutes ds_event = ds.sel(time=slice(event_stats["start_time"], event_stats["end_time"])) ds_event["rain_type"] = bringi_nw_dm_classification(ds_event["Dm"], ds_event["Nw"]) # Plot quicklook fig = ds_event.disdrodb.plot_dsd_quicklook( # Plot layout hours_per_slice=3, max_rows=6, aligned=False, verbose=False, cbar_as_legend=True, precipitation_type="rain_type", d_lim=(0.3, 7), # Figure options dpi=300, ) # Create quicklooks directory quicklook_dir = os.path.join(summary_dir_path, "Quicklooks_Rmax") os.makedirs(quicklook_dir, exist_ok=True) # Define quicklooks filename rmax = event_stats["max_R"] start_time_str = event_stats["start_time"].strftime("%Y%m%dT%H%M%S") end_time_str = event_stats["end_time"].strftime("%Y%m%dT%H%M%S") quicklook_filename = define_filename( prefix=f"Quicklook_Rmax_{rmax:.0f}_{start_time_str}_{end_time_str}", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) quicklook_filepath = os.path.join(quicklook_dir, quicklook_filename) fig.savefig(quicklook_filepath, bbox_inches="tight") plt.close() # Create quicklooks for P_total table_events_summary = table_events_summary.sort_values(by="P_total", ascending=False) for i in range(min(n_quicklooks, len(table_events_summary))): event_stats = table_events_summary.iloc[i] if event_stats["duration"] >= 10: # minimum 10 minutes ds_event = ds.sel(time=slice(event_stats["start_time"], event_stats["end_time"])) ds_event["rain_type"] = bringi_nw_dm_classification(ds_event["Dm"], ds_event["Nw"]) # Plot quicklook fig = ds_event.disdrodb.plot_dsd_quicklook( # Plot layout hours_per_slice=3, max_rows=6, aligned=False, verbose=False, cbar_as_legend=True, precipitation_type="rain_type", d_lim=(0.3, 7), # Figure options dpi=300, ) # Create quicklooks directory quicklook_dir = os.path.join(summary_dir_path, "Quicklooks_Ptot") os.makedirs(quicklook_dir, exist_ok=True) # Define quicklooks filename p_tot = event_stats["P_total"] start_time_str = event_stats["start_time"].strftime("%Y%m%dT%H%M%S") end_time_str = event_stats["end_time"].strftime("%Y%m%dT%H%M%S") quicklook_filename = define_filename( prefix=f"Quicklook_P_tot_{p_tot:.0f}_{start_time_str}_{end_time_str}", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) quicklook_filepath = os.path.join(quicklook_dir, quicklook_filename) fig.savefig(quicklook_filepath, bbox_inches="tight") plt.close() # Remove unfiltered dataset (as not used anymore) del ds #### ---------------------------------------------------------------------. #### Create L2E RADAR Summary Plots if len(df) > 1000 or os.environ.get("PYTEST_CURRENT_TEST"): # Summary plots at X, C, S bands if "DBZH_X" in df: filename = define_filename( prefix="Radar_Band_X", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_radar_relationships(df, band="X") fig.savefig(os.path.join(summary_dir_path, filename)) if "DBZH_C" in df: filename = define_filename( prefix="Radar_Band_C", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_radar_relationships(df, band="C") fig.savefig(os.path.join(summary_dir_path, filename)) if "DBZH_S" in df: filename = define_filename( prefix="Radar_Band_S", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_radar_relationships(df, band="S") fig.savefig(os.path.join(summary_dir_path, filename)) # ---------------------------------------------------------------------. #### Create L2E Z-R figure filename = define_filename( prefix="Z-R", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) p = plot_R_Z(df, z="Z", r="R", pol="Rayleigh", title=r"$Z_{Rayleigh}$ vs $R$") p.figure.tight_layout() p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() #### ---------------------------------------------------------------------. #### Create L2E Kinetic Energy Summary Plots filename = define_filename( prefix="KineticEnergy", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_kinetic_energy_relationships(df) fig.savefig(os.path.join(summary_dir_path, filename)) #### ---------------------------------------------------------------------. #### Create L2E DSD Parameters summary plots #### - Create DSD parameters density figures with LWC filename = define_filename( prefix="DSD_Params_Density_with_LWC_LinearDm_MaxNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=False, lwc=True, log_normalize=False) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() filename = define_filename( prefix="DSD_Params_Density_with_LWC_LogDm_MaxNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=True, lwc=True, log_normalize=False) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() filename = define_filename( prefix="DSD_Params_Density_with_LWC_LinearDm_LogNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=False, lwc=True, log_normalize=True) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() filename = define_filename( prefix="DSD_Params_Density_with_LWC_LogDm_LogNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=True, lwc=True, log_normalize=True) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() ###------------------------------------------------------------------------. #### - Create DSD parameters density figures with R filename = define_filename( prefix="DSD_Params_Density_with_R_LinearDm_MaxNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=False, lwc=False, log_normalize=False) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() filename = define_filename( prefix="DSD_Params_Density_with_R_LogDm_MaxNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=True, lwc=False, log_normalize=False) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() filename = define_filename( prefix="DSD_Params_Density_with_R_LinearDm_LogNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=False, lwc=False, log_normalize=True) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() filename = define_filename( prefix="DSD_Params_Density_with_R_LogDm_LogNormalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_density(df, log_dm=True, lwc=False, log_normalize=True) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() ###------------------------------------------------------------------------. #### - Create DSD parameters relationship figures filename = define_filename( prefix="DSD_Params_Relations", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dsd_params_relationships(df, add_nt=True) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() ###------------------------------------------------------------------------. #### - Create Dmax relationship figures filename = define_filename( prefix="DSD_Dmax_Relations", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) fig = plot_dmax_relationships(df, diameter_bin_edges=diameter_bin_edges, dmax="Dmax", diameter_max=10) fig.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() #### ---------------------------------------------------------------------. #### Create L2E QC summary plots # TODO: ####------------------------------------------------------------------------. #### Free space - Remove df from memory del df gc.collect() ####------------------------------------------------------------------------. #### Create N(D) densities df_dsd = create_dsd_dataframe(ds_filtered) #### - Plot N(D) vs D filename = define_filename( prefix="N(D)", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) p = plot_dsd_density(df_dsd, diameter_bin_edges=diameter_bin_edges) p.figure.tight_layout() p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() #### - Plot N(D)/Nw vs D/Dm filename = define_filename( prefix="N(D)_Normalized", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) p = plot_normalized_dsd_density(df_dsd) p.figure.tight_layout() p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close() #### Free space - Remove df_dsd from memory del df_dsd gc.collect() #### - Plot N(D) vs D with DenseLines # Extract required variables and free memory drop_number_concentration = ds_filtered["drop_number_concentration"].compute().copy() r = ds_filtered["R"].compute().copy() del ds_filtered gc.collect() # Create figure filename = define_filename( prefix="N(D)_DenseLines", extension="png", data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) p = plot_dsd_with_dense_lines(drop_number_concentration=drop_number_concentration, r=r) p.figure.tight_layout() p.figure.savefig(os.path.join(summary_dir_path, filename), bbox_inches="tight") plt.close()
####------------------------------------------------------------------------. #### Wrappers
[docs] def create_station_summary( data_source, campaign_name, station_name, parallel=False, data_archive_dir=None, temporal_resolution="1MIN", ): """Create summary figures and tables for a DISDRODB station.""" # Print processing info print(f"Creation of station summary for {data_source} {campaign_name} {station_name} has started.") # Define station summary directory summary_dir_path = define_station_dir( product="SUMMARY", data_source=data_source, campaign_name=campaign_name, station_name=station_name, data_archive_dir=data_archive_dir, check_exists=False, ) os.makedirs(summary_dir_path, exist_ok=True) # Load L2E dataset try: ds = disdrodb.open_dataset( data_archive_dir=data_archive_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, product="L2E", temporal_resolution=temporal_resolution, parallel=parallel, chunks=-1, compute=True, ) except Exception as e: print("Impossible to create the station summary." + str(e)) return # Generate station summary figures and table generate_station_summary( ds=ds, summary_dir_path=summary_dir_path, data_source=data_source, campaign_name=campaign_name, station_name=station_name, temporal_resolution=temporal_resolution, ) print(f"Creation of station summary for {data_source} {campaign_name} {station_name} has terminated.")
# -------------------------------------------------------------------------------------------------.