Near-Real-Time Processing#

This tutorial demonstrates how to process individual files or small batches for near-real-time (NRT) applications. Unlike batch archive processing, this workflow provides fine-grained control over each processing step, enabling integration into operational systems, streaming pipelines, or custom workflows.

Overview#

Use Cases

  • Operational monitoring: Process incoming disdrometer data as it arrives

  • Real-time displays: Update visualizations with latest observations

  • Alert systems: Trigger warnings based on rainfall thresholds

  • Data streaming: Integrate DISDRODB into real-time data pipelines

  • Custom workflows: Build specialized processing chains with custom quality control

Workflow

This tutorial processes data through the complete DISDRODB chain manually:

Raw File(s) → L0A (DataFrame) → L0B (xarray) → L0C → L1 → L2E → L2M

Each step can be executed independently, allowing you to:

  • Process individual files or small batches

  • Skip intermediate file I/O for faster processing

  • Customize quality control at each level

  • Extract specific products without generating entire archive

Setup and Configuration#

Import Required Modules

import numpy as np
import xarray as xr
import disdrodb

Configure Station

# Define station identifiers
data_source = "EPFL"
campaign_name = "LOCARNO_2019"
station_name = "61"

# Processing options
verbose = True
parallel = False
temporal_resolution = "1MIN"

# Load products configuration directory (or specify custom path)
products_configs_dir = disdrodb.config.get("products_configs_dir")

# Read station metadata
metadata = disdrodb.read_station_metadata(
    data_source=data_source,
    campaign_name=campaign_name,
    station_name=station_name,
)

Step 1: Generate L0A DataFrame#

The L0A level converts raw disdrometer files into standardized tabular format. Here we illustrate how to manually generate the L0A DataFrame from specific raw files.

# Extract metadata
sensor_name = metadata["sensor_name"]
measurement_interval = metadata["measurement_interval"]

# Get station-specific reader
reader = disdrodb.get_station_reader(
    data_source=data_source,
    campaign_name=campaign_name,
    station_name=station_name,
)

# Define files to process (for NRT, typically the most recent file(s))
# For illustration, here we list all existing station files and select a subset to mock NRT processing
filepaths = disdrodb.find_files(
    data_source=data_source,
    campaign_name=campaign_name,
    station_name=station_name,
    product="RAW",
)
filepaths = filepaths[10:15]

# Process specific files
df = disdrodb.generate_l0a(
    filepaths=filepaths,
    reader=reader,
    sensor_name=sensor_name,
    verbose=verbose,
)

Alternatively you can also use:

df = disdrodb.open_raw_files(
    filepaths=filepaths, data_source=data_source, campaign_name=campaign_name, station_name=station_name
)

Tip

Near-Real-Time Processing

For operational NRT systems:

  • Monitor directory for new files using watchdog or similar tools

  • Implement error handling for incomplete or corrupted files

Step 2: Generate L0B Dataset#

Convert the L0A DataFrame into an xarray Dataset with proper dimensions and metadata.

# Create L0B dataset
ds_l0b = disdrodb.generate_l0b(df=df, metadata=metadata, verbose=verbose)

What L0B Processing Does:

  • Parses string-encoded arrays into numerical arrays

  • Constructs xarray Dataset with dimensions (time, diameter_bin_center, velocity_bin_center)

  • Adds bin centers and bounds

  • Attaches CF-compliant variable attributes

  • Adds station geolocation metadata

See disdrodb.generate_l0b() for implementation details.

Step 3: Generate L0C Dataset#

Ensure temporal consistency by regularizing timestamps and validating measurement intervals.

# Create L0C dataset
# - Regularizes trailing seconds in timestamps
# - Validates measurement interval consistency
# - Computes quality control flags
ds_l0c = disdrodb.generate_l0c(
    ds_l0b,
    measurement_interval=measurement_interval,
    verbose=verbose,
)

What L0C Processing Does: - Drops timesteps with invalid measurement interval (if ‘sample_interval’ variable exists) - Regularizes timestamps to ensure consistent trailing seconds (e.g., 00, 30 for 30s interval) - Remove duplicate timesteps if they exist - Adds sample_interval coordinate and updates attributes - Computes quality control flags for temporal consistency and measurement validity

Note

Simplified L0C Processing

The full L0C archive processing additionally:

  • Consolidates multiple L0B files into fixed-period outputs (daily by default)

  • Removes duplicate timesteps across file boundaries

  • Splits datasets if measurement intervals change

For NRT processing of individual files, generate_l0c() provides the essential time consistency checks without file consolidation.

See disdrodb.l0.l0c_processing.generate_l0c() for implementation details.

Subset Data for Testing

For faster iteration during development:

# Select subset and load into memory
ds_l0c = ds_l0c.isel(time=slice(0, 1000))
ds_l0c = ds_l0c.compute()

Step 4: Generate L1 Dataset#

Resample data to desired temporal resolution and perform hydrometeor classification.

# Resample to target temporal resolution
ds_l0c_resampled = ds_l0c.disdrodb.resample(
    temporal_resolution=temporal_resolution,
)

# Generate L1 product with hydrometeor classification
ds_l1 = disdrodb.generate_l1(ds_l0c_resampled)

Temporal Resolution Options:

  • Fixed intervals: "30S", "1MIN", "5MIN", "10MIN"

  • Rolling windows: "ROLL5MIN", "ROLL10MIN"

See disdrodb.generate_l1() for implementation details.

Tip

Multiple Temporal Resolutions

For operational systems requiring multiple resolutions, loop over desired values:

for temporal_resolution in ["1MIN", "5MIN", "10MIN"]:
    ds_resampled = ds_l0c.disdrodb.resample(temporal_resolution=temporal_resolution)
    ds_l1 = disdrodb.generate_l1(ds_resampled)
    # Process L1 → L2E → L2M for this resolution
    ...

Step 5: Generate L2E Dataset#

Compute empirical rainfall parameters and radar observables from classified L1 data.

Extract L2E Configuration

# Load L2E processing options
l2e_options = disdrodb.get_product_options(
    product="L2E",
    temporal_resolution=temporal_resolution,
    products_configs_dir=products_configs_dir,
)

For configuration options, see L2E Product Configuration.

Generate L2E Product

# Extract product options
l2e_product_options = l2e_options["product_options"]

# Adjust thresholds for NRT (optional)
# Default configuration processes only rainy timesteps
# For NRT monitoring, you may want all timesteps:
l2e_product_options["minimum_rain_rate"] = 0
l2e_product_options["minimum_ndrops"] = 0
l2e_product_options["minimum_nbins"] = 0

# Generate L2E dataset
ds_l2e = disdrodb.generate_l2e(ds_l1, **l2e_product_options)

See disdrodb.l2.processing.generate_l2e() for implementation details.

Add Radar Simulations (Optional)

If pytmatrix is installed, compute radar observables:

# Extract radar simulation options
l2e_radar_options = l2e_options["radar_options"]

# Generate radar variables
ds_l2e_radar = disdrodb.generate_l2_radar(ds_l2e, **l2e_radar_options)

# Merge radar variables into L2E dataset
ds_l2e.update(ds_l2e_radar)
ds_l2e.attrs = ds_l2e_radar.attrs.copy()

See Radar Variable Simulations for details on radar options.

Step 6: Generate L2M Dataset#

Fit parametric DSD models to derive modeled rainfall parameters.

Extract L2M Configuration

# Load L2M processing options
l2m_options = disdrodb.get_product_options(
    product="L2M",
    temporal_resolution=temporal_resolution,
    products_configs_dir=products_configs_dir,
)

# Extract product options
l2m_product_options = l2m_options["product_options"]

# Adjust thresholds for NRT (optional)
l2m_product_options["minimum_rain_rate"] = 0
l2m_product_options["minimum_ndrops"] = 0
l2m_product_options["minimum_nbins"] = 0

For configuration options, see L2M Product Configuration.

Select DSD Model

Load configuration for specific parametric model:

# Load model configuration
model_options = disdrodb.get_model_options(
    model_name="GAMMA_ML",  # GAMMA_ML, NGAMMA_GS_ND_SSE, etc.
    products_configs_dir=products_configs_dir,
)

Available models are defined in the L2M/MODELS/ directory. See L2M Model Configuration for available models and options.

Generate L2M Product

# Generate L2M dataset with fitted model parameters
ds_l2m = disdrodb.generate_l2m(
    ds_l2e,
    **model_options,
    **l2m_product_options,
)

# Extract fitted PSD parameters
ds_l2m_parameters = ds_l2m.disdrodb.psd_parameters

See disdrodb.generate_l2m() for implementation details.

Further Reading#