Source code for disdrodb.l1.resampling
# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities for temporal resampling."""
import pandas as pd
import xarray as xr
from disdrodb.utils.time import regularize_dataset
DEFAULT_ACCUMULATIONS = ["10s", "30s", "1min", "2min", "5min", "10min", "30min", "1hour"]
[docs]
def add_sample_interval(ds, sample_interval):
"""Add a sample_interval coordinate to the dataset.
Parameters
----------
ds : xarray.Dataset
The input dataset to which the sample_interval coordinate will be added.
sample_interval : int or float
The dataset sample interval in seconds.
Returns
-------
xarray.Dataset
The dataset with the added sample interval coordinate.
Notes
-----
The function adds a new coordinate named 'sample_interval' to the dataset and
updates the 'measurement_interval' attribute.
"""
# Add sample_interval coordinate
ds["sample_interval"] = sample_interval
ds["sample_interval"].attrs["description"] = "Sample interval"
ds["sample_interval"].attrs["long_name"] = "Sample interval"
ds["sample_interval"].attrs["units"] = "seconds"
ds = ds.set_coords("sample_interval")
# Update measurement_interval attribute
ds.attrs = ds.attrs.copy()
ds.attrs["measurement_interval"] = int(sample_interval)
return ds
[docs]
def define_window_size(sample_interval, accumulation_interval):
"""
Calculate the rolling window size based on sampling and accumulation intervals.
Parameters
----------
sampling_interval : int
The sampling interval in seconds.
accumulation_interval : int
The desired accumulation interval in seconds.
Returns
-------
int
The calculated window size as the number of sampling intervals required to cover the accumulation interval.
Raises
------
ValueError
If the accumulation interval is not a multiple of the sampling interval.
Examples
--------
>>> define_window_size(60, 300)
5
>>> define_window_size(120, 600)
5
"""
# Check compatitiblity
if accumulation_interval % sample_interval != 0:
raise ValueError("The accumulation interval must be a multiple of the sample interval.")
# Calculate the window size
window_size = accumulation_interval // sample_interval
return window_size
[docs]
def resample_dataset(ds, sample_interval, accumulation_interval, rolling=True):
"""
Resample the dataset to a specified accumulation interval.
Parameters
----------
ds : xarray.Dataset
The input dataset to be resampled.
sample_interval : int
The sample interval of the input dataset.
accumulation_interval : int
The interval in seconds over which to accumulate the data.
rolling : bool, optional
If True, apply a rolling window before resampling. Default is True.
If True, forward rolling is performed.
The output timesteps correspond to the starts of the periods over which
the resampling operation has been performed !
Returns
-------
xarray.Dataset
The resampled dataset with updated attributes.
Notes
-----
- The function regularizes the dataset (infill possible missing timesteps)
before performing the resampling operation.
- Variables are categorized into those to be averaged, accumulated, minimized, and maximized.
- Custom processing for quality flags and handling of NaNs is defined.
- The function updates the dataset attributes and the sample_interval coordinate.
"""
# Retrieve attributes
attrs = ds.attrs.copy()
# TODO: here infill NaN with zero if necessary before regularizing !
# Ensure regular dataset without missing timesteps
ds = regularize_dataset(ds, freq=f"{sample_interval}s")
# Initialize resample dataset
ds_resampled = xr.Dataset()
# Retrieve variables to average/sum
var_to_average = ["fall_velocity"]
var_to_cumulate = ["raw_drop_number", "drop_number", "drop_counts", "n_drops_selected", "n_drops_discarded"]
var_to_min = ["Dmin"]
var_to_max = ["Dmax"]
# Retrieve available variables
var_to_average = [var for var in var_to_average if var in ds]
var_to_cumulate = [var for var in var_to_cumulate if var in ds]
var_to_min = [var for var in var_to_min if var in ds]
var_to_max = [var for var in var_to_max if var in ds]
# TODO Define custom processing
# - quality_flag --> take worst
# - skipna if less than fraction (to not waste lot of data when aggregating over i.e. hours)
# Resample the dataset
# - Rolling currently does not allow direct rolling forward.
# - We currently use center=False which means search for data backward (right-aligned) !
# - We then drop the first 'window_size' NaN timesteps and we shift backward the timesteps.
# - https://github.com/pydata/xarray/issues/9773
# - https://github.com/pydata/xarray/issues/8958
if not rolling:
# Resample
if len(var_to_average) > 0:
ds_resampled.update(
ds[var_to_average].resample({"time": pd.Timedelta(seconds=accumulation_interval)}).mean(skipna=False),
)
if len(var_to_cumulate) > 0:
ds_resampled.update(
ds[var_to_cumulate].resample({"time": pd.Timedelta(seconds=accumulation_interval)}).sum(skipna=False),
)
if len(var_to_min) > 0:
ds_resampled.update(
ds[var_to_min].resample({"time": pd.Timedelta(seconds=accumulation_interval)}).min(skipna=False),
)
if len(var_to_max) > 0:
ds_resampled.update(
ds[var_to_max].resample({"time": pd.Timedelta(seconds=accumulation_interval)}).max(skipna=False),
)
else:
# Roll and Resample
window_size = define_window_size(sample_interval=sample_interval, accumulation_interval=accumulation_interval)
if len(var_to_average) > 0:
ds_resampled.update(ds[var_to_average].rolling({"time": window_size}, center=False).mean(skipna=False))
if len(var_to_cumulate) > 0:
ds_resampled.update(ds[var_to_cumulate].rolling({"time": window_size}, center=False).sum(skipna=False))
if len(var_to_min) > 0:
ds_resampled.update(ds[var_to_min].rolling({"time": window_size}, center=False).min(skipna=False))
if len(var_to_max) > 0:
ds_resampled.update(ds[var_to_max].rolling({"time": window_size}, center=False).max(skipna=False))
# Ensure time to correspond to the start time of the integration
ds_resampled = ds_resampled.isel(time=slice(window_size - 1, None)).assign_coords(
{"time": ds_resampled["time"].data[: -window_size + 1]},
)
# Add attributes
ds_resampled.attrs = attrs
if rolling:
ds_resampled.attrs["rolled"] = "True"
else:
ds_resampled.attrs["rolled"] = "False"
# Add accumulation_interval as new sample_interval coordinate
ds_resampled = add_sample_interval(ds_resampled, sample_interval=accumulation_interval)
return ds_resampled
[docs]
def get_possible_accumulations(sample_interval, accumulations=None):
"""
Get a list of valid accumulation intervals based on the sampling time.
Parameters
----------
- sample_interval (int): The inferred sampling time in seconds.
- accumulations (list of int or string): List of desired accumulation intervals.
If provide integers, specify accumulation in seconds.
Returns
-------
- list of int: Valid accumulation intervals in seconds.
"""
# Select default accumulations
if accumulations is None:
accumulations = DEFAULT_ACCUMULATIONS
# Get accumulations in seconds
accumulations = [int(pd.Timedelta(acc).total_seconds()) if isinstance(acc, str) else acc for acc in accumulations]
# Filter candidate accumulations to include only those that are multiples of the sampling time
possible_accumulations = [acc for acc in accumulations if acc % sample_interval == 0]
return possible_accumulations