Source code for disdrodb.l1.classification

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""DISDRODB hydrometeor classification and QC module."""

import numpy as np
import pandas as pd
import xarray as xr

from disdrodb.constants import DIAMETER_DIMENSION, VELOCITY_DIMENSION
from disdrodb.fall_velocity import get_hail_fall_velocity
from disdrodb.fall_velocity.graupel import retrieve_graupel_heymsfield2014_fall_velocity
from disdrodb.fall_velocity.rain import get_rain_fall_velocity
from disdrodb.l2.empirical_dsd import get_effective_sampling_area, get_rain_rate_from_drop_number
from disdrodb.l2.processing import define_rain_spectrum_mask
from disdrodb.utils.manipulations import filter_diameter_bins, filter_velocity_bins
from disdrodb.utils.time import ensure_sample_interval_in_seconds
from disdrodb.utils.xarray import xr_remap_numeric_array

# Define possible variable available and corresponding snow_temperature_limit
DICT_TEMPERATURES = {
    "air_temperature": 6,  # generic and PWS100
    "air_temperature_min": 6,  # PWS100
    "temperature_ambient": 6,  # LPM  (often 99999)
    "temperature_interior": 10,  # LPM (maybe could be reduced)
    "sensor_temperature": 15,  # PARSIVEL and SWS250
}
TEMPERATURE_VARIABLES = list(DICT_TEMPERATURES)


[docs] def get_temperature(ds): """Retrieve temperature variable from L0C product. This function extracts temperature data from DISDRODB L0C product. The temperature variable chosen varies with sensor types. It prioritize air_temperature when available, but can fallback to internal sensor temperature if more reliable variables are not present. Parameters ---------- ds : xarray.Dataset DISDRODB L0C dataset containing one or more temperature variables defined in ``DICT_TEMPERATURES``. Expected temperature variables include: 'air_temperature', 'air_temperature_min', 'temperature_ambient', 'temperature_interior', or 'sensor_temperature'. Returns ------- temperature : xarray.DataArray or None Temperature time series [°C] with invalid values removed. Returns ``None`` if no temperature variable is available in the dataset. Values are set to NaN for timesteps with unrealistic values or large sensor disagreement. snow_temperature_limit : xarray.DataArray or None Snow/rain temperature threshold [°C] corresponding to the selected temperature variable for each timestep. Returns ``None`` if no temperature variable is available. Notes ----- The function applies the following processing steps: 1. **Variable Selection Priority**: Searches for temperature variables in the order defined by ``DICT_TEMPERATURES``. Each variable has an associated snow temperature limit (e.g., air_temperature: 6°C, sensor_temperature: 10°C). 2. **Quality Filtering**: Filters out physically unrealistic temperature values outside the range [-40°C, 35°C]. 3. **Preference Handling**: For each timestep, selects the first available valid temperature following the priority order in ``DICT_TEMPERATURES``. This ensures more reliable variables (e.g., air_temperature) are preferred over less reliable ones (e.g., sensor_temperature). 4. **Sensor Consistency Check**: Computes the temperature spread across all available sensors at each timestep. If the spread exceeds 10°C, that timestep is flagged as inconsistent and set to NaN, indicating unreliable measurements. 5. **Snow Limit Assignment**: Returns the temperature threshold for snow/rain discrimination corresponding to the selected temperature variable for each timestep. """ # Check temperature variable is available, otherwise return None if not any(var in ds.data_vars for var in DICT_TEMPERATURES): return None, None # Build a (temperature_variable, time) array preserving DICT_TEMPERATURES order. list_da_temperature = [] for var, snow_limit in DICT_TEMPERATURES.items(): if var in ds.data_vars: da_temperature = ds[var].copy() da_temperature = da_temperature.where((da_temperature >= -40) & (da_temperature <= 35)) da_temperature = da_temperature.expand_dims(dim={"temperature_variable": [var]}) da_snow_limit = (xr.ones_like(da_temperature) * snow_limit).where(~np.isnan(da_temperature)) da_temperature = da_temperature.assign_coords({"snow_limit": da_snow_limit}) list_da_temperature.append(da_temperature) # Concatenate variables, then select first valid value per timestep. da_temperature_stack = xr.concat(list_da_temperature, dim="temperature_variable") temperature = da_temperature_stack.bfill("temperature_variable").isel(temperature_variable=0, drop=True) snow_temperature_limit = ( da_temperature_stack["snow_limit"].bfill("temperature_variable").isel(temperature_variable=0, drop=True) ) # Reject timesteps with large disagreement across available temperature sensors. temperature_spread = da_temperature_stack.max(dim="temperature_variable") - da_temperature_stack.min( dim="temperature_variable", ) is_inconsistent = temperature_spread > 10 temperature = temperature.where(~is_inconsistent) snow_temperature_limit = snow_temperature_limit.where(~is_inconsistent) return temperature, snow_temperature_limit
[docs] def define_qc_temperature(temperature, sample_interval, threshold_minutes=360): """Define segment-based QC rule for temperature. Return a qc array with 1 when temperature is constant for over threshold_minutes. Return a qc of 2 if temperature is not available. """ # If all NaN, return flag equal to 2 if np.all(np.isnan(temperature)): return xr.full_like(temperature, 2) # Fill NaNs temperature = temperature.ffill("time").bfill("time") # Round temperature temperature = temperature.round(0) # Initialize flag qc_flag = xr.zeros_like(temperature, dtype=int) # Assign 1 when temperature changes, 0 otherwise change_points = np.concatenate(([True], np.diff(temperature.values) != 0)) # Assign segment IDs segment_id = np.cumsum(change_points) # Compute duration per segment df = pd.DataFrame( { "segment": segment_id, "time": temperature["time"].to_numpy(), }, ) # Count samples per segment segment_length = df.groupby("segment").size().rename("count").to_frame() # Compute duration based on sample_interval segment_length["duration"] = segment_length["count"] * int(sample_interval) # Flag segments that are constant for over threshold_minutes threshold_seconds = threshold_minutes * 60 long_segments = segment_length[segment_length["duration"] >= threshold_seconds].index # Define QC flag: 1 = no variation, 0 = normal mask = np.isin(segment_id, long_segments) qc_flag.data = xr.where(mask, 1, 0) return qc_flag
[docs] def define_qc_margin_fallers(spectrum, fall_velocity_upper, above_velocity_fraction=None, above_velocity_tolerance=2): """Define QC mask for margin fallers and splashing.""" if above_velocity_fraction is not None: above_fall_velocity = fall_velocity_upper * (1 + above_velocity_fraction) elif above_velocity_tolerance is not None: above_fall_velocity = fall_velocity_upper + above_velocity_tolerance else: above_fall_velocity = np.inf # Define mask velocity_lower = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["velocity_bin_lower"] diameter_upper = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["diameter_bin_upper"] mask = np.logical_and(diameter_upper <= 5, velocity_lower >= above_fall_velocity) return mask
[docs] def define_qc_rain_strong_wind_mask(spectrum): """Define QC mask for strong wind artefacts in heavy rainfall. Based on Katia Friedrich et al. 2013. """ diameter_lower = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["diameter_bin_lower"] velocity_upper = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["velocity_bin_upper"] # Define mask mask = np.logical_and( diameter_lower >= 5, velocity_upper < 1, ) return mask
[docs] def qc_spikes_isolated_precip(hydrometeor_type, sample_interval): """ Identify isolated precipitation spikes based on hydrometeor classification. This quality control (QC) routine flags short, isolated precipitation detections (spikes) that are not supported by neighboring precipitating timesteps within a defined time window. The test helps remove spurious single-sample precipitation detections caused by instrument noise or transient misclassification. The algorithm: 1. Identifies potential precipitation timesteps where `hydrometeor_type>= 1`. 2. Computes time differences between consecutive potential precipitation samples. 3. Flags a timestep as a spike when both the previous and next precipitation detections are separated by more than the configured time window. 4. Skips the QC test entirely if the temporal resolution exceeds 2 minutes. Parameters ---------- hydrometeor_type: xarray.DataArray Hydrometeor type classification array with a ``time`` coordinate. Precipitation presence is defined where ``hydrometeor_type>= 1``. sample_interval : float or int Nominal sampling interval of the dataset in **seconds**. If ``sample_interval >= 120`` (2 minutes), the QC test is skipped. Returns ------- flag_spikes : xarray.DataArray of int Binary QC flag array (same dimensions as input) with: * 0 : no spike detected * 1 : isolated precipitation spike Notes ----- - The time window is currently fixed to ±60 seconds for typical 1-minute sampling intervals but can be adapted to scale with `sample_interval`. - Designed to work with irregular time coordinates; relies on actual timestamp differences instead of fixed rolling windows. - For datasets with coarse sampling (> 2 minutes), the function returns a zero-valued flag (QC not applied). """ # Define potential precipitating timesteps is_potential_precip = xr.where((hydrometeor_type >= 1), 1, 0) # Initialize QC flag flag_spikes = xr.zeros_like(is_potential_precip, dtype=int) flag_spikes.attrs.update( { "long_name": "Isolated precipitation spike flag", "standard_name": "flag_spikes", "units": "1", "flag_values": [0, 1], "flag_meanings": "no_spike isolated_precipitation_spike", "description": ( "Quality control flag indicating isolated precipitation detections, " "without neighboring precipitating timesteps. " "If the sampling interval is 2 minutes or coarser, the QC test is skipped." ), }, ) # Skip QC for coarse temporal data (> 2 min) if sample_interval >= 120: return flag_spikes # Define time window based on sample interval time_window = 60 # Extract arrays times = pd.to_datetime(is_potential_precip["time"].to_numpy()) mask_potential_precip = is_potential_precip.to_numpy() == 1 # If no precipitation, skip and return 0 array if not np.any(mask_potential_precip): return flag_spikes # Get potential precipition indices and timestamps precip_idx = np.where(mask_potential_precip)[0] precip_times = times[precip_idx].astype("datetime64[s]").astype("int64").astype("float64") # Compute Δt to previous and next precip (vectorized) dt_prev = np.empty_like(precip_times) dt_next = np.empty_like(precip_times) dt_prev[0] = np.inf dt_prev[1:] = precip_times[1:] - precip_times[:-1] dt_next[-1] = np.inf dt_next[:-1] = precip_times[1:] - precip_times[:-1] # Create same-size arrays aligned with original time dimension # - Fill NaN for non-precip indices delta_prev = np.full_like(mask_potential_precip, np.inf, dtype=float) delta_next = np.full_like(mask_potential_precip, np.inf, dtype=float) delta_prev[precip_idx] = dt_prev delta_next[precip_idx] = dt_next # Identify isolated spikes isolated = (mask_potential_precip) & (delta_prev > time_window) & (delta_next > time_window) flag_spikes.data[isolated] = 1 return flag_spikes
[docs] def define_hail_mask(spectrum, ds_env, minimum_diameter=5): """Define hail mask.""" # Define velocity limits fall_velocity_lower = get_hail_fall_velocity( spectrum["diameter_bin_lower"], model="Heymsfield2018", ds_env=ds_env, minimum_diameter=minimum_diameter - 1, ) fall_velocity_upper = get_hail_fall_velocity( spectrum["diameter_bin_upper"], model="Laurie1960", ds_env=ds_env, minimum_diameter=minimum_diameter - 1, ) # Define spectrum mask diameter_lower = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["diameter_bin_lower"] velocity_lower = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["velocity_bin_lower"] velocity_upper = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["velocity_bin_upper"] mask_velocity = np.logical_and( velocity_lower >= fall_velocity_lower, velocity_upper <= fall_velocity_upper, ) mask_diameter = diameter_lower >= minimum_diameter mask = np.logical_and(mask_diameter, mask_velocity) return mask
[docs] def define_graupel_mask( spectrum, ds_env, minimum_diameter=0.5, maximum_diameter=5, minimum_density=50, maximum_density=600, ): """Define graupel mask.""" # Define velocity limits fall_velocity_lower = retrieve_graupel_heymsfield2014_fall_velocity( diameter=spectrum["diameter_bin_lower"], graupel_density=minimum_density, ds_env=ds_env, ) fall_velocity_upper = retrieve_graupel_heymsfield2014_fall_velocity( diameter=spectrum["diameter_bin_upper"], graupel_density=maximum_density, ds_env=ds_env, ) # fall_velocity_upper = get_graupel_fall_velocity( # diameter=spectrum["diameter_bin_upper"], # model="Locatelli1974Lump", # ds_env=ds_env, # minimum_diameter=minimum_diameter-1, # maximum_diameter=maximum_diameter+1, # ) # Define spectrum mask diameter_lower = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["diameter_bin_lower"] diameter_upper = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["diameter_bin_upper"] velocity_lower = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["velocity_bin_lower"] velocity_upper = xr.ones_like(spectrum.isel(time=0, missing_dims="ignore")) * spectrum["velocity_bin_upper"] mask_velocity = np.logical_and( velocity_lower >= fall_velocity_lower, velocity_upper <= fall_velocity_upper, ) mask_diameter = np.logical_and( diameter_lower >= minimum_diameter, diameter_upper <= maximum_diameter, ) mask = np.logical_and(mask_diameter, mask_velocity) return mask
[docs] def define_precipitation_type_from_hydrometeor_type(hydrometeor_type): """Define precipitation_type from hydrometeor_type.""" precipitation_type = xr.ones_like(hydrometeor_type["time"], dtype=float) * -2 precipitation_type = xr.where(hydrometeor_type.isin([0]), -1, precipitation_type) precipitation_type = xr.where( hydrometeor_type.isin([1, 2, 3, 9]), 0, precipitation_type, ) # 9 hail in rainfall class currently precipitation_type = xr.where(hydrometeor_type.isin([5, 6, 7, 8]), 1, precipitation_type) precipitation_type = xr.where(hydrometeor_type.isin([4]), 2, precipitation_type) precipitation_type.attrs.update( { "long_name": "precipitation phase classification", "standard_name": "precipitation_phase", "units": "1", "flag_values": [-2, -1, 0, 1, 2], "flag_meanings": "undefined no_precipitation rainfall snowfall mixed_phase", }, ) return precipitation_type
[docs] def add_hydrometeor_type_attrs(hydrometeor_type): """Add CF-attributes to hydrometeor_type DataArray.""" hydrometeor_type.attrs = hydrometeor_type.attrs.copy() hydrometeor_type.attrs.update( { "description": "DISDRODB hydrometeor class", "long_name": "hydrometeor type classification", "standard_name": "hydrometeor_classification", "units": "1", "flag_values": [-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9], "flag_meanings": ( "no_hydrometeor undefined no_precipitation " "drizzle drizzle_and_rain rain mixed " "snow snow_grains ice_pellets graupel hail" ), "comment": "DISDRODB hydrometeor class", }, ) return hydrometeor_type
[docs] def classify_raw_spectrum( ds, ds_env, sample_interval, sensor_name, temperature=None, rain_temperature_lower_limit=-5, snow_temperature_upper_limit=5, ): """Run precipitation and hydrometeor type classification.""" # ------------------------------------------------------------------ # Filter LPM to avoid being impacted by noise in first bins if sensor_name == "LPM": # Remove first two diameter bins (very noisy) ds = filter_diameter_bins(ds=ds, minimum_diameter=0.375) # Remove first velocity bin ds = filter_velocity_bins(ds=ds, minimum_velocity=0.2) if sensor_name == "PWS100": # Remove first two bin ds = filter_diameter_bins(ds=ds, minimum_diameter=0.2) # Remove first two bin ds = filter_velocity_bins(ds=ds, minimum_velocity=0.2) # ------------------------------------------------------------------ # Retrieve raw spectrum raw_spectrum = ds["raw_drop_number"] #### Define masks spectrum_template = raw_spectrum.isel(time=0, missing_dims="ignore") diameter_lower = raw_spectrum["diameter_bin_lower"].broadcast_like(spectrum_template) # [mm] diameter_upper = raw_spectrum["diameter_bin_upper"].broadcast_like(spectrum_template) # Define spectrum areas B1 = (diameter_lower >= 0.0) & (diameter_upper <= 0.5) B2 = (diameter_upper > 0.5) & (diameter_upper <= 5.0) B3 = (diameter_upper > 5.0) & (diameter_upper <= 8.0) B4 = diameter_upper > 8.0 # Define liquid masks # - Compute raindrop fall velocity for lower and upper diameter limits raindrop_fall_velocity_upper = get_rain_fall_velocity( diameter=ds["diameter_bin_upper"], model="Beard1976", ds_env=ds_env, ) raindrop_fall_velocity_lower = get_rain_fall_velocity( diameter=ds["diameter_bin_lower"], model="Beard1976", ds_env=ds_env, ) liquid_mask = define_rain_spectrum_mask( drop_number=raw_spectrum, fall_velocity_lower=raindrop_fall_velocity_lower, fall_velocity_upper=raindrop_fall_velocity_upper, above_velocity_fraction=None, above_velocity_tolerance=2, below_velocity_fraction=None, below_velocity_tolerance=3, maintain_drops_smaller_than=1, # 1, # 2 maintain_drops_slower_than=2.5, # 2.5, # 3 maintain_smallest_drops=False, ) drizzle_mask = liquid_mask & B1 drizzle_rain_mask = liquid_mask & B2 rain_mask = liquid_mask & B3 # potentially mixed with small hail # Define graupel masks graupel_mask = define_graupel_mask( raw_spectrum, ds_env=ds_env, minimum_diameter=0.9, maximum_diameter=5.5, minimum_density=50, maximum_density=900, ) graupel_mask = np.logical_and(graupel_mask, ~liquid_mask) graupel_hd_mask = define_graupel_mask( raw_spectrum, ds_env=ds_env, minimum_diameter=0.9, maximum_diameter=5.5, minimum_density=400, maximum_density=900, ) graupel_hd_mask = np.logical_and(graupel_hd_mask, graupel_mask) graupel_ld_mask = np.logical_and(graupel_mask, ~graupel_hd_mask) # graupel_mask.plot.pcolormesh(x="diameter_bin_center") # liquid_mask.plot.pcolormesh(x="diameter_bin_center") # graupel_hd_mask.plot.pcolormesh(x="diameter_bin_center") # graupel_ld_mask.plot.pcolormesh(x="diameter_bin_center") # Define hail mask hail_mask = define_hail_mask(raw_spectrum, ds_env=ds_env, minimum_diameter=5) hail_mask = np.logical_and(hail_mask, ~graupel_mask) small_hail_mask = hail_mask & B3 # [5,8] large_hail_mask = hail_mask & B4 # > 8 # Define snow masks velocity_upper = xr.ones_like(raw_spectrum.isel(time=0, missing_dims="ignore")) * raw_spectrum["velocity_bin_upper"] snow_mask_full = velocity_upper <= 6.5 # - Without rain and hail snow_mask = np.logical_and(snow_mask_full, ~liquid_mask) snow_mask = np.logical_and(snow_mask, ~hail_mask) snow_mask = np.logical_and(snow_mask, diameter_lower >= 1) # - Without rain, hail and graupel # snow_small_mask = snow_mask & (diameter_upper <= 5.0) snow_large_mask = snow_mask & (diameter_upper > 5.0) # Define snow grain mask snow_grains_mask = (velocity_upper <= 2.5) & (diameter_upper <= 2.2) # ice crystals & prisms (0.1 < D < 1 or 2 mm) # --------------------------------------------------------------------- # Check mask cover all space without leaving empty bins # FUTURE: CHECK IF THERE ARE CASES WHERE EMPTY BINS STILL OCCURS # from functools import reduce # sum_mask = reduce(np.logical_or, [hail_mask, liquid_mask, graupel_mask]) # sum_mask.plot.pcolormesh(x="diameter_bin_center") # --------------------------------------------------------------------- # Estimate rain rate using particles with D <=5 (D > 5 can be contaminated by noise or hail) # - Extract sample interval sample_interval = ensure_sample_interval_in_seconds(ds["sample_interval"]) # s # - Extract diameter in m diameter = ds["diameter_bin_center"] / 1000 # m # - Compute sampling area [m2] sampling_area = get_effective_sampling_area(sensor_name=sensor_name, diameter=diameter) # m2 # - Compute dummy rainfall rate (on D from 0 to 5) to avoid 'hail' contamination rainfall_rate_mask = drizzle_mask + drizzle_rain_mask rainfall_rate = get_rain_rate_from_drop_number( drop_number=raw_spectrum.where(rainfall_rate_mask, 0), # if any NaN --> return NaN sampling_area=sampling_area, diameter=diameter, sample_interval=sample_interval, ) # --------------------------------------------------------------------- # Estimate gross snowfall rate # FUTURE: # - Compute over snow mask area with and without rainy area # - Use Lempio lump parametrization # - Required to define weather codes # --------------------------------------------------------------------- # Define wind artefacts mask (Friedrich et al., 2013) strong_wind_mask = define_qc_rain_strong_wind_mask(spectrum_template) # Define margin fallers mask margin_fallers_mask = define_qc_margin_fallers( spectrum_template, fall_velocity_upper=raindrop_fall_velocity_upper, # above_velocity_fraction=0.6, above_velocity_tolerance=2, ) # Define splash mask splash_mask = (diameter_lower >= 0.0) & (diameter_upper <= 6) & (velocity_upper <= 0.6) # --------------------------------------------------------------------- # Define liquid, snow, and graupel mask (robust to splash) liquid_mask_without_splash = liquid_mask & ~splash_mask snow_mask_without_splash = snow_mask & ~splash_mask # graupel_mask_without_splash = graupel_mask & ~splash_mask graupel_ld_mask_without_splash = graupel_ld_mask & ~splash_mask # --------------------------------------------------------------------- #### Compute statistics dims = [DIAMETER_DIMENSION, VELOCITY_DIMENSION] n_particles = raw_spectrum.sum(dim=dims) # n_particles_1 = raw_spectrum.where(B1).sum(dim=dims) # n_particles_2 = raw_spectrum.where(B2).sum(dim=dims) # n_particles_3 = raw_spectrum.where(B3).sum(dim=dims) # n_particles_4 = raw_spectrum.where(B4).sum(dim=dims) ## ---- # Rain n_drizzle = raw_spectrum.where(drizzle_mask).sum(dim=dims) n_drizzle_rain = raw_spectrum.where(drizzle_rain_mask).sum(dim=dims) n_rain = raw_spectrum.where(rain_mask).sum(dim=dims) n_liquid = n_drizzle + n_drizzle_rain + n_rain n_liquid_robust = raw_spectrum.where(liquid_mask_without_splash).sum(dim=dims) ## ---- # Hail # n_hail = raw_spectrum.where(hail_mask).sum(dim=dims) n_small_hail = raw_spectrum.where(small_hail_mask).sum(dim=dims) n_large_hail = raw_spectrum.where(large_hail_mask).sum(dim=dims) ## ---- # Graupel n_graupel = raw_spectrum.where(graupel_mask).sum(dim=dims) # n_graupel_robust = raw_spectrum.where(graupel_mask_without_splash).sum(dim=dims) n_graupel_ld = raw_spectrum.where(graupel_ld_mask_without_splash).sum(dim=dims) n_graupel_hd = raw_spectrum.where(graupel_hd_mask).sum(dim=dims) ## ---- # Snow n_snow = raw_spectrum.where(snow_mask).sum(dim=dims) n_snow_robust = raw_spectrum.where(snow_mask_without_splash).sum(dim=dims) # n_snow_small = raw_spectrum.where(snow_small_mask).sum(dim=dims) n_snow_large = raw_spectrum.where(snow_large_mask).sum(dim=dims) n_snow_grains = raw_spectrum.where(snow_grains_mask).sum(dim=dims) ## ---- # Auxiliary n_wind_artefacts = raw_spectrum.where(strong_wind_mask).sum(dim=dims) n_margin_fallers = raw_spectrum.where(margin_fallers_mask).sum(dim=dims) n_splashing = raw_spectrum.where(splash_mask).sum(dim=dims) ## ---- # Bins statistics # n_bins = (raw_spectrum.where((~splash_mask) > 0)).sum(dim=dims) n_liquid_bins = (raw_spectrum.where(liquid_mask_without_splash) > 0).sum(dim=dims) n_snow_bins = (raw_spectrum.where(snow_mask_without_splash) > 0).sum(dim=dims) # without rainy area # n_snow_large_bins = (raw_spectrum.where(snow_large_mask) > 0).sum(dim=dims) # only > 5 mm # n_graupel_bins = (raw_spectrum.where(graupel_mask_without_splash) > 0).sum(dim=dims) # Bins fractions fraction_rain_bins = xr.where(n_particles == 0, 0, n_liquid_bins / (n_liquid_bins + n_snow_bins)) # fraction_snow_bins = xr.where(n_particles == 0, 0, n_snow_bins / (n_liquid_bins + n_snow_bins)) ## ---- # Particles fractions # fraction_drizzle_rel = xr.where(n_particles_1 == 0, 0, n_drizzle / n_particles_1) fraction_drizzle_tot = xr.where(n_particles == 0, 0, n_drizzle / n_particles) # fraction_drizzle_rain_rel = xr.where(n_particles_2 == 0, 0, n_drizzle_rain / n_particles_2) fraction_drizzle_rain_tot = xr.where(n_particles == 0, 0, (n_drizzle + n_drizzle_rain) / n_particles) # fraction_rain_rel = xr.where(n_particles_3 == 0, 0, n_rain / n_particles_3) fraction_rain_tot = xr.where(n_particles == 0, 0, n_liquid / n_particles) # fraction_liquid = fraction_rain_tot # fraction_graupel_only_rel = xr.where(n_particles_2 == 0, 0, n_graupel / n_particles_2) fraction_graupel_only_tot = xr.where(n_particles == 0, 0, n_graupel / n_particles) # fraction_hail = xr.where(n_particles_4 == 0, 0, n_hail / n_particles_4) # fraction_snow_large_rel = xr.where((n_particles_3 + n_particles_4) == 0, 0, # n_snow_large / (n_particles_3+n_particles_4)) # fraction_snow_large_tot = xr.where(n_particles == 0, 0, n_snow_large / n_particles) fraction_snow_tot = xr.where(n_particles == 0, 0, n_snow / n_particles) fraction_snow_grains_tot = xr.where(n_particles == 0, 0, n_snow_grains / n_particles) fraction_splash = xr.where(n_particles == 0, 0, n_splashing / n_particles) # fraction_rain_graupel_tot = xr.where(n_particles == 0, 0, (n_liquid + n_graupel) / n_particles) # graupel_liquid_ratio = xr.where(n_particles == 0, 0, n_graupel_robust/n_liquid_robust) solid_liquid_ratio = xr.where(n_particles == 0, 0, n_snow_robust / n_liquid_robust) # -----------------------------------------------------------------------------------------------. #### Classification logic # Class |Conditions | WMO 4680 # ------------------------------------- # Drizzle |D < 0.5 mm | # Rain |D > 0.5, D < 10 | # Snow |D > 1, V < 6 | 71-73 # Snow grains |D < 1 | 77 # Ice Crystals |0 < D < 2 | # Graupel |D >1 , D < 5 | 74-76 # Snow grains are within the drizzle class (<0.5 mm)! # --> Temperature required to classify them # Graupel class # - Ice pellets / Sleets (frozen raindrops, T<0) (1 <= D <= 5 mm) # - Graupel (GS) (Snow pellet coated with ice, T > 0) (2 <= D <= 5 mm) # # WMO 4680 # -------------------------------------------------------------- # Initialize label label = xr.ones_like(ds["time"], dtype=float) * -1 # [mm] # No precipitation label = xr.where(n_particles == 0, 0, label) # Graupel only cond = (fraction_graupel_only_tot > 0.7) & (n_snow_large < 1) label = xr.where(cond & (label == -1), 8, label) # Liquid only # - Drizzle (D < 0.5 mm) cond = (fraction_drizzle_tot > 0.1) & (n_graupel < 1) & (n_snow_large < 1) & (n_drizzle_rain < 1) label = xr.where(cond & (label == -1), 1, label) # - Drizzle + Rain (0.5-5 mm) cond = (fraction_drizzle_rain_tot > 0.1) & (n_graupel < 1) & (n_snow_large < 1) & (n_rain < 1) label = xr.where(cond & (label == -1), 2, label) # - Rain (D > 5 mm) cond = (fraction_rain_tot > 0.1) & (n_graupel < 1) & (n_snow_large < 1) label = xr.where(cond & (label == -1), 3, label) # Snow only cond = fraction_snow_tot > 0.6 # TODO: extend to use snow_mask_full label = xr.where(cond & (label == -1), 5, label) # --------------------------------- # Rain (R > 3 mm/hr) with some graupel cond = (fraction_rain_bins >= 0.75) & (rainfall_rate > 3) label = xr.where(cond & (label == -1), 31, label) # mixed # --------------------------------- # (label == -1).sum() # (cond & (label == -1)).sum() # --------------------------------- # Mixed (when R > 1 mm/hr) # --> FUTURE: Better clarified the meaning # --> FUTURE: R computed with particles only above 3 m/s would help disentagle snow from mixed ! # --> When R > 1 mm/hr and no splash - solid_liquid_ratio > XXX n_snow_bins_thr = 6 fraction_splash_thr = 0.1 solid_liquid_ratio_thr = 0.05 cond = ( (solid_liquid_ratio >= solid_liquid_ratio_thr) & (rainfall_rate > 1) & (n_snow_bins > n_snow_bins_thr) & (fraction_splash < fraction_splash_thr) ) label = xr.where(cond & (label == -1), 4, label) # mixed cond = ( (solid_liquid_ratio >= solid_liquid_ratio_thr) & (rainfall_rate > 1) & (n_snow_bins <= n_snow_bins_thr) & (fraction_splash < fraction_splash_thr) ) label = xr.where(cond & (label == -1), 21, label) # Set as rain ! # - When R > 1 mm/hr and no splash - solid_liquid_ratio < XXX cond = (solid_liquid_ratio < solid_liquid_ratio_thr) & (rainfall_rate > 1) & (fraction_splash < fraction_splash_thr) label = xr.where(cond & (label == -1), 21, label) # Set as rain ! # --------------------------------- # Non-hydrometeors class cond = (fraction_splash >= 0.5) & (rainfall_rate < 1.5) label = xr.where(cond & (label == -1), -2, label) cond = (fraction_splash >= 0.4) & (fraction_splash <= 0.5) & (rainfall_rate <= 0.2) label = xr.where(cond & (label == -1), -2, label) # --------------------------------- # - When R > 1mm/hr, with splash cond = (rainfall_rate > 1) & (fraction_splash >= 0.1) & (solid_liquid_ratio >= 0.2) label = xr.where(cond & (label == -1), 41, label) # mixed cond = (rainfall_rate > 1) & (fraction_splash >= 0.1) & (solid_liquid_ratio < 0.2) label = xr.where(cond & (label == -1), 23, label) # rainfall # --------------------------------- # - Noisy Rain (solid_liquid_ratio < 0.03) cond = (solid_liquid_ratio <= 0.05) & (n_snow_robust <= 2) label = xr.where(cond & (label == -1), 22, label) # Set noisy rain # --------------------------------- # Ice Crystals cond = fraction_snow_grains_tot >= 0.95 label = xr.where(cond & (label == -1), 6, label) # Remaining snow label = xr.where(label == -1, 51, label) # ------------------------------------------------------------------------. # Improve classification using temperature information if available # - Currently problematic for PARSIVEL when using sensor_temperature when actually snowing if temperature is not None: temperature = temperature.compute() qc_temperature = define_qc_temperature(temperature, sample_interval=sample_interval, threshold_minutes=1440) is_surely_rain_given_temperature = (temperature >= snow_temperature_upper_limit) & (qc_temperature == 0) is_surely_snow_given_temperature = (temperature <= rain_temperature_lower_limit) & (qc_temperature == 0) is_mixed = label.isin([4, 41]) is_snow = label.isin([5, 51]) is_drizzle = label.isin([1]) is_snow_grain = label.isin([6]) is_rain = label.isin([2, 21, 22, 23, 3]) is_graupel = label == 8 # Improve mixed classification (4, 41) # - If T > snow_temperature_upper_limit --> rain # - If T < -5 rain_temperature_lower_limit --> snow label = xr.where(is_surely_rain_given_temperature & is_mixed, 24, label) label = xr.where(is_surely_snow_given_temperature & is_mixed, 52, label) # Improve drizzle classification label = xr.where(is_surely_snow_given_temperature & is_drizzle, 61, label) # Improve snow classification # - If T > snow_temperature_upper_limit --> No hydrometeors label = xr.where(is_surely_rain_given_temperature & is_snow, -21, label) # Improve snow grains classification # --> If T > snow_temperature_upper_limit --> No hydrometeors label = xr.where(is_surely_rain_given_temperature & is_snow_grain, -21, label) # Improve rain classification # If T < rain_temperature_lower_limit --> No hydrometeors label = xr.where(is_surely_snow_given_temperature & is_rain, -21, label) # Improve graupel classification # If T < rain_temperature_lower_limit --> Ice pellets / Sleets label = xr.where(is_surely_snow_given_temperature & is_graupel, 7, label) # ------------------------------------------------------------------------. # Define hydrometeor_typevariable # -2 No hydrometeor # -1 Undefined # 0 No precipitation # 1 Drizzle # 2 Drizzle+Rain # 3 Rain # 4 Mixed (when no only graupel, and rain) # 5 Snow (when not only graupel, and no rain) # 6 Snow grains / ice crystals / needles (only if temperature is available <-- drizzle) # 7 Ice pellets / Sleets (only if temperature is available) # 8 Graupel --> flag_graupel # 9 Hail --> flag_hail hydrometeor_type = label.copy() # No hydrometeor hydrometeor_type = xr.where(label.isin([-2, -21]), -2, hydrometeor_type) # Drizzle hydrometeor_type = xr.where(hydrometeor_type.isin([1]), 1, hydrometeor_type) # Drizzle+Rain hydrometeor_type = xr.where(hydrometeor_type.isin([2, 21, 22, 23, 24]), 2, hydrometeor_type) # Rain hydrometeor_type = xr.where(hydrometeor_type.isin([3, 31]), 3, hydrometeor_type) # Mixed hydrometeor_type = xr.where(hydrometeor_type.isin([4]), 4, hydrometeor_type) # Snow hydrometeor_type = xr.where(hydrometeor_type.isin([5, 51, 52]), 5, hydrometeor_type) # Snow grains hydrometeor_type = xr.where(hydrometeor_type.isin([6]), 6, hydrometeor_type) # Ice Pellets hydrometeor_type = xr.where(hydrometeor_type.isin([7]), 7, hydrometeor_type) # Graupel hydrometeor_type = xr.where(hydrometeor_type.isin([8]), 8, hydrometeor_type) # Add CF-attributes hydrometeor_type = add_hydrometeor_type_attrs(hydrometeor_type) # ------------------------------------------------------------------------. #### Define precipitation type variable precipitation_type = define_precipitation_type_from_hydrometeor_type(hydrometeor_type) # ------------------------------------------------------------------------. #### Define flag graupel flag_graupel = xr.ones_like(ds["time"], dtype=float) * 0 flag_graupel = xr.where( (((precipitation_type == 0) & (n_graupel_ld > 2)) | ((hydrometeor_type == 8) & (n_graupel_ld > 0))), 1, flag_graupel, ) flag_graupel = xr.where( (((precipitation_type == 0) & (n_graupel_hd > 2)) | ((hydrometeor_type == 8) & (n_graupel_hd > 0))), 2, flag_graupel, ) flag_graupel.attrs.update( { "long_name": "graupel occurrence flag", "standard_name": "graupel_flag", "units": "1", "flag_values": [0, 1, 2], "flag_meanings": "no_graupel low_density_graupel high_density_graupel", "description": ( "Flag indicating the presence of graupel. " "The flag is set when hydrometeor classification identifies graupel (class=8) or " "rainfall with graupel particles. " "Low-density graupel (value = 1) corresponds to density < 400 kg/m3 " "while high-density graupel corresponds to density > 400 kg/m3." ), }, ) # ------------------------------------------------------------------------. #### Define flag hail # FUTURE: # - Small hail: check if attached to rain body or not # - Check how much is detached flag_hail = xr.ones_like(ds["time"], dtype=float) * 0 flag_hail = xr.where(((precipitation_type == 0) & (n_small_hail >= 1) & (rainfall_rate > 1)), 1, flag_hail) flag_hail = xr.where(((precipitation_type == 0) & (n_large_hail >= 1) & (rainfall_rate > 1)), 2, flag_hail) flag_hail.attrs.update( { "long_name": "hail occurrence and size flag", "standard_name": "hail_flag", "units": "1", "flag_values": [0, 1, 2], "flag_meanings": "no_hail small_hail large_hail", "description": ( "Flag indicating the presence and estimated size of hail. " "Set to 1 for small hail when precipitation type indicates rain. " "Set to 2 for large hail (>8 mm) under similar conditions." ), }, ) # ------------------------------------------------------------------------. #### Define WMO codes # FUTURE: Use hydrometeor_typeand flag_hail values [1,2] # Require snowfall rate estimate # ------------------------------------------------------------------------ #### Define QC splashing, strong_wind, margin_fallers, spikes # FUTURE: flag_spikes can be used for non hydrometeor classification, # --> But caution because observing the below code show some true rainfall signature # --> raw_spectrum.isel(time=(flag_spikes == 0) & (precipitation_type == 0)).disdrodb.plot_spectrum() flag_splashing = xr.where((precipitation_type == 0) & (fraction_splash >= 0.1), 1, 0) flag_wind_artefacts = xr.where((precipitation_type == 0) & (n_wind_artefacts >= 1), 1, 0) flag_noise = xr.where((hydrometeor_type == -2), 1, 0) flag_spikes = qc_spikes_isolated_precip(hydrometeor_type, sample_interval=sample_interval) # ------------------------------------------------------------------------. #### Define n_particles_<hydro_class> n_graupel_ld_final = xr.where(flag_graupel > 0, n_graupel_ld, 0) n_graupel_hd_final = xr.where(flag_graupel > 0, n_graupel_hd, 0) n_small_hail_final = xr.where(flag_hail > 0, n_small_hail, 0) n_large_hail_final = xr.where(flag_hail > 0, n_large_hail, 0) n_margin_fallers_final = xr.where(precipitation_type == 0, n_margin_fallers, 0) n_splashing_final = xr.where(flag_splashing == 1, n_splashing, 0) n_wind_artefacts_final = xr.where( (flag_wind_artefacts == 1) & (precipitation_type == 0), n_wind_artefacts, 0, ) # maybe when R > XXX ? # ------------------------------------------------------------------------. # Create HC and QC dataset ds_class = ds[["time"]] # ds_class["label"] = label ds_class["precipitation_type"] = precipitation_type ds_class["hydrometeor_type"] = hydrometeor_type ds_class["n_particles"] = n_particles ds_class["n_low_density_graupel"] = n_graupel_ld_final ds_class["n_high_density_graupel"] = n_graupel_hd_final ds_class["n_small_hail"] = n_small_hail_final ds_class["n_large_hail"] = n_large_hail_final ds_class["n_margin_fallers"] = n_margin_fallers_final ds_class["n_splashing"] = n_splashing_final ds_class["n_wind_artefacts"] = n_wind_artefacts_final # fraction_splash # fraction_margin_fallers # ds_class["mask_graupel"] = graupel_mask_without_splash # ds_class["mask_splashing"] = mask_splashing ds_class["flag_hail"] = flag_hail ds_class["flag_graupel"] = flag_graupel ds_class["flag_noise"] = flag_noise ds_class["flag_spikes"] = flag_spikes ds_class["flag_splashing"] = flag_splashing ds_class["flag_wind_artefacts"] = flag_wind_artefacts return ds_class
####-------------------------------------------------------------- #### Other utilities
[docs] def map_precip_flag_to_precipitation_type(precip_flag): """Map OCEANRAIN precip_flag to DISDRODB precipitation_type.""" mapping_dict = { 0: 0, # rain → rainfall 1: 1, # snow → snowfall 2: 2, # mixed_phase → mixed -1: -1, # true_zero_value → no_precipitation 4: -2, # inoperative → undefined 5: -2, # harbor_time_no_data → undefined } precipitation_type = xr_remap_numeric_array(precip_flag, mapping_dict) precipitation_type.attrs.update( { "long_name": "precipitation phase classification", "standard_name": "precipitation_phase", "units": "1", "flag_values": [-2, -1, 0, 1, 2], "flag_meanings": "undefined no_precipitation rainfall snowfall mixed_phase", }, ) return precipitation_type