# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""This module contains utilities related to the processing of temporal dataset."""
import logging
import numbers
import re
import numpy as np
import pandas as pd
import xarray as xr
from disdrodb.utils.logger import log_info, log_warning
from disdrodb.utils.xarray import define_fill_value_dictionary
logger = logging.getLogger(__name__)
####------------------------------------------------------------------------------------.
#### Sampling Interval Acronyms
[docs]
def seconds_to_temporal_resolution(seconds):
"""
Convert a duration in seconds to a readable string format (e.g., "1H30", "1D2H").
Parameters
----------
- seconds (int): The time duration in seconds.
Returns
-------
- str: The duration as a string in a format like "30S", "1MIN30S", "1H30MIN", or "1D2H".
"""
timedelta = pd.Timedelta(seconds=seconds)
components = timedelta.components
parts = []
if components.days > 0:
parts.append(f"{components.days}D")
if components.hours > 0:
parts.append(f"{components.hours}H")
if components.minutes > 0:
parts.append(f"{components.minutes}MIN")
if components.seconds > 0:
parts.append(f"{components.seconds}S")
temporal_resolution = "".join(parts)
return temporal_resolution
[docs]
def temporal_resolution_to_seconds(temporal_resolution):
"""
Extract the measurement interval in seconds from the temporal resolution string.
Parameters
----------
temporal_resolution: str
A string representing the product measurement interval: e.g., "1H30MIN", "ROLL1H30MIN".
Returns
-------
seconds
Duration in seconds.
"""
seconds, _ = get_sampling_information(temporal_resolution)
return seconds
####----------------------------------------------------------------------------.
#### File start and end time utilities
[docs]
def get_dataframe_start_end_time(df: pd.DataFrame, time_column="time"):
"""Retrieves dataframe starting and ending time.
Parameters
----------
df : pandas.DataFrame
Input dataframe
time_column: str
Name of the time column.
The default is "time".
The column must be of type datetime.
Returns
-------
(start_time, end_time): tuple
File start and end time of type pandas.Timestamp.
"""
starting_time = pd.to_datetime(df[time_column].iloc[0])
ending_time = pd.to_datetime(df[time_column].iloc[-1])
return (starting_time, ending_time)
[docs]
def get_dataset_start_end_time(ds: xr.Dataset, time_dim="time"):
"""Retrieves dataset starting and ending time.
Parameters
----------
ds : xarray.Dataset
Input dataset
time_dim: str
Name of the time dimension.
The default is "time".
Returns
-------
(start_time, end_time): tuple
File start and end time of type pandas.Timestamp.
"""
starting_time = pd.to_datetime(ds[time_dim].to_numpy()[0])
ending_time = pd.to_datetime(ds[time_dim].to_numpy()[-1])
return (starting_time, ending_time)
[docs]
def get_file_start_end_time(obj, time="time"):
"""Retrieves object starting and ending time.
Parameters
----------
obj : xarray.Dataset or pandas.DataFrame
Input object with time dimension or column respectively.
time: str
Name of the time dimension or column.
The default is "time".
Returns
-------
(start_time, end_time): tuple
File start and end time of type pandas.Timestamp.
"""
if isinstance(obj, xr.Dataset):
return get_dataset_start_end_time(obj, time_dim=time)
if isinstance(obj, pd.DataFrame):
return get_dataframe_start_end_time(obj, time_column=time)
raise TypeError("Expecting a xarray Dataset or a pandas Dataframe object.")
####------------------------------------------------------------------------------------.
#### Xarray utilities
[docs]
def ensure_sorted_by_time(obj, time="time"):
"""Ensure a xarray object or pandas Dataframe is sorted by time."""
# Check sorted by time and sort if necessary
is_sorted = np.all(np.diff(obj[time].to_numpy().astype(int)) > 0)
if not is_sorted:
if isinstance(obj, pd.DataFrame):
return obj.sort_values(by="time")
# Else xarray DataArray or Dataset
obj = obj.sortby("time")
return obj
def _check_time_sorted(ds, time_dim):
"""Ensure the xarray.Dataset is sorted."""
time_diff = np.diff(ds[time_dim].to_numpy().astype(int))
if np.any(time_diff == 0):
raise ValueError(f"In the {time_dim} dimension there are duplicated timesteps !")
if not np.all(time_diff > 0):
print(f"The {time_dim} dimension was not sorted. Sorting it now !")
ds = ds.sortby(time_dim)
return ds
[docs]
def regularize_dataset(
xr_obj,
freq: str,
time_dim: str = "time",
method: str | None = None,
fill_value=None,
start_time=None,
end_time=None,
):
"""Regularize a xarray object across time dimension with uniform resolution.
Parameters
----------
xr_obj : xarray.Dataset or xr.DataArray
xarray object with time dimension.
time_dim : str, optional
The time dimension in the xarray object. The default value is ``"time"``.
freq : str
The ``freq`` string to pass to `pd.date_range()` to define the new time coordinates.
Examples: ``freq="2min"``.
method : str, optional
Method to use for filling missing timesteps.
If ``None``, fill with ``fill_value``. The default value is ``None``.
For other possible methods, see xarray.Dataset.reindex()`.
fill_value : (float, dict), optional
Fill value to fill missing timesteps.
If not specified, for float variables it uses ``dtypes.NA`` while for
for integers variables it uses the maximum allowed integer value or,
in case of undecoded variables, the ``_FillValue`` DataArray attribute..
Returns
-------
ds_reindexed : xarray.Dataset
Regularized dataset.
"""
attrs = xr_obj.attrs.copy()
xr_obj = _check_time_sorted(xr_obj, time_dim=time_dim)
# Define start time and end_time
start, end = get_dataset_start_end_time(xr_obj, time_dim=time_dim)
if start_time is None:
start_time = start
if end_time is None:
end_time = end
xr_obj = xr_obj.sel({time_dim: slice(start_time, end_time)})
# Define new time index
new_time_index = pd.date_range(
start=start_time,
end=end_time,
freq=freq,
)
# Check all existing timesteps are within the new time index
# - Otherwise raise error because it means that the desired frequency is not compatible
idx_missing = np.where(~np.isin(xr_obj[time_dim].data, new_time_index))[0]
if len(idx_missing) > 0:
not_included_timesteps = xr_obj[time_dim].data[idx_missing].astype("M8[s]")
raise ValueError(f"With freq='{freq}', the following timesteps would be dropped: {not_included_timesteps}")
# Define fill_value dictionary
if fill_value is None:
fill_value = define_fill_value_dictionary(xr_obj)
# Regularize dataset and fill with NA values
xr_obj = xr_obj.reindex(
{time_dim: new_time_index},
method=method, # do not fill gaps
# tolerance=tolerance, # mismatch in seconds
fill_value=fill_value,
)
# Ensure attributes are preserved
xr_obj.attrs = attrs
return xr_obj
####------------------------------------------
#### Interval utilities
[docs]
def ensure_sample_interval_in_seconds(sample_interval): # noqa: PLR0911
"""
Ensure the sample interval is in seconds.
Parameters
----------
sample_interval : int, numpy.ndarray, xarray.DataArray, or numpy.timedelta64
The sample interval to be converted to seconds.
It can be:
- An integer representing the interval in seconds.
- A numpy array or xarray DataArray of integers representing intervals in seconds.
- A numpy.timedelta64 object representing the interval.
- A numpy array or xarray DataArray of numpy.timedelta64 objects representing intervals.
Returns
-------
int, numpy.ndarray, or xarray.DataArray
The sample interval converted to seconds. The return type matches the input type:
- If the input is an integer, the output is an integer.
- If the input is a numpy array, the output is a numpy array of integers (unless NaN is present)
- If the input is an xarray DataArray, the output is an xarray DataArray of integers (unless NaN is present).
"""
# Deal with timedelta objects
if isinstance(sample_interval, np.timedelta64):
return (sample_interval.astype("m8[s]") / np.timedelta64(1, "s")).astype(int)
# return sample_interval.astype("m8[s]").astype(int)
# Deal with scalar pure integer types (Python int or numpy int32/int64/etc.)
# --> ATTENTION: this also include np.timedelta64 objects !
if isinstance(sample_interval, numbers.Integral):
return sample_interval
# Deal with numpy or xarray arrays of integer types
if isinstance(sample_interval, (np.ndarray, xr.DataArray)) and np.issubdtype(sample_interval.dtype, int):
return sample_interval
# Deal with scalar floats that are actually integers (e.g. 1.0, np.float64(3.0))
if isinstance(sample_interval, numbers.Real):
if float(sample_interval).is_integer():
# Cast back to int seconds
return int(sample_interval)
raise TypeError(f"sample_interval floats must be whole numbers of seconds, got {sample_interval}")
# Deal with timedelta64 numpy arrays
if isinstance(sample_interval, np.ndarray) and np.issubdtype(sample_interval.dtype, np.timedelta64):
is_nat = np.isnat(sample_interval)
if np.any(is_nat):
sample_interval = sample_interval.astype("timedelta64[s]").astype(float)
sample_interval[is_nat] = np.nan
return sample_interval
return sample_interval.astype("timedelta64[s]").astype(int)
# Deal with timedelta64 xarray arrays
if isinstance(sample_interval, xr.DataArray) and np.issubdtype(sample_interval.dtype, np.timedelta64):
sample_interval = sample_interval.copy()
is_nat = np.isnat(sample_interval)
if np.any(is_nat):
sample_interval_array = sample_interval.data.astype("timedelta64[s]").astype(float)
sample_interval_array[is_nat] = np.nan
sample_interval.data = sample_interval_array
return sample_interval
sample_interval_array = sample_interval.data.astype("timedelta64[s]").astype(int)
sample_interval.data = sample_interval_array
return sample_interval
# Deal with numpy array of floats that are all integer-valued (with optionally some NaN)
if isinstance(sample_interval, np.ndarray) and np.issubdtype(sample_interval.dtype, np.floating):
mask_nan = np.isnan(sample_interval)
if mask_nan.any():
# Check non-NaN entries are whole numbers
nonnan = sample_interval[~mask_nan]
if not np.allclose(nonnan, np.rint(nonnan)):
raise TypeError("Float array sample_interval must contain only whole numbers or NaN.")
# Leave as float array so NaNs are preserved
return sample_interval
# No NaNs: can safely cast to integer dtype
if not np.allclose(sample_interval, np.rint(sample_interval)):
raise TypeError("Float array sample_interval must contain only whole numbers.")
return sample_interval.astype(int)
# Deal with xarray.DataArray of floats that are all integer-valued (with optionally some NaN)
if isinstance(sample_interval, xr.DataArray) and np.issubdtype(sample_interval.dtype, np.floating):
arr = sample_interval.copy()
data = arr.data
mask_nan = np.isnan(data)
if mask_nan.any():
nonnan = data[~mask_nan]
if not np.allclose(nonnan, np.rint(nonnan)):
raise TypeError("Float DataArray sample_interval must contain only whole numbers or NaN.")
# return as float DataArray so NaNs stay
return arr
if not np.allclose(data, np.rint(data)):
raise TypeError("Float DataArray sample_interval must contain only whole numbers.")
arr.data = data.astype(int)
return arr
raise TypeError(
"sample_interval must be an integer value or array, or numpy.ndarray / xarray.DataArray with type timedelta64.",
)
[docs]
def ensure_timedelta_seconds(interval):
"""Return an a scalar value/array in seconds or timedelta object as numpy.timedelta64 in seconds."""
if isinstance(interval, (xr.DataArray, np.ndarray)):
return ensure_sample_interval_in_seconds(interval).astype("m8[s]")
return np.array(ensure_sample_interval_in_seconds(interval), dtype="m8[s]")
####------------------------------------------
#### Sample Interval Utilities
[docs]
def infer_sample_interval(ds, robust=False, verbose=False, logger=None):
"""Infer the sample interval of a dataset.
Duplicated timesteps are removed before inferring the sample interval.
NOTE: This function is used only for the reader preparation.
"""
# Check sorted by time and sort if necessary
ds = ensure_sorted_by_time(ds)
# Retrieve timesteps
# - Remove duplicate timesteps
timesteps = np.unique(ds["time"].data)
# Calculate number of timesteps
n_timesteps = len(timesteps)
# Calculate time differences in seconds
deltadt = np.diff(timesteps).astype("timedelta64[s]").astype(int)
# Round each delta to the nearest multiple of 5 (because the smallest possible sample interval is 10 s)
# Example: for sample_interval = 10, deltat values like 8, 9, 11, 12 become 10 ...
# Example: for sample_interval = 10, deltat values like 6, 7 or 13, 14 become respectively 5 and 15 ...
# Example: for sample_interval = 30, deltat values like 28,29,30,31,32 deltat become 30 ...
# Example: for sample_interval = 30, deltat values like 26, 27 or 33, 34 become respectively 25 and 35 ...
# --> Need other rounding after having identified the most frequent sample interval to coerce such values to 30
min_sample_interval = 10
min_half_sample_interval = min_sample_interval / 2
deltadt = np.round(deltadt / min_half_sample_interval) * min_half_sample_interval
# Identify unique time intervals and their occurrences
unique_deltas, counts = np.unique(deltadt, return_counts=True)
# Determine the most frequent time interval (mode)
most_frequent_delta_idx = np.argmax(counts)
sample_interval = unique_deltas[most_frequent_delta_idx]
# Reround deltadt once knowing the sample interval
# - If sample interval is 10: all values between 6 and 14 are rounded to 10, below 6 to 0, above 14 to 20
# - If sample interval is 30: all values between 16 and 44 are rounded to 30, below 16 to 0, above 44 to 20
deltadt = np.round(deltadt / min_sample_interval) * min_sample_interval
# Identify unique time intervals and their occurrences
unique_deltas, counts = np.unique(deltadt, return_counts=True)
fractions = np.round(counts / len(deltadt) * 100, 2)
# Determine the most frequent time interval (mode)
most_frequent_delta_idx = np.argmax(counts)
sample_interval = unique_deltas[most_frequent_delta_idx]
sample_interval_fraction = fractions[most_frequent_delta_idx]
# Inform about irregular sampling
unexpected_intervals = unique_deltas[unique_deltas != sample_interval]
unexpected_intervals_counts = counts[unique_deltas != sample_interval]
unexpected_intervals_fractions = fractions[unique_deltas != sample_interval]
if verbose and len(unexpected_intervals) > 0:
msg = "Non-unique interval detected."
log_info(logger=logger, msg=msg, verbose=verbose)
for interval, count, fraction in zip(
unexpected_intervals,
unexpected_intervals_counts,
unexpected_intervals_fractions,
strict=True,
):
msg = f"--> Interval: {interval} seconds, Occurrence: {count}, Frequency: {fraction} %"
log_info(logger=logger, msg=msg, verbose=verbose)
# Perform checks
# - Raise error if negative or zero time intervals are presents
# - If robust = False, still return the estimated sample_interval
if robust and np.any(deltadt == 0):
raise ValueError("Likely presence of duplicated timesteps.")
if robust and len(unexpected_intervals) > 0:
raise ValueError("Not unique sampling interval.")
###-------------------------------------------------------------------------.
### Display informative messages
# - Log a warning if estimated sample interval has frequency less than 60 %
sample_interval_fraction_threshold = 60
msg = (
f"The most frequent sampling interval ({sample_interval} s) "
+ f"has a frequency lower than {sample_interval_fraction_threshold}%: {sample_interval_fraction} %. "
+ f"(Total number of timesteps: {n_timesteps})"
)
if sample_interval_fraction < sample_interval_fraction_threshold:
log_warning(logger=logger, msg=msg, verbose=verbose)
# - Log a warning if an unexpected interval has frequency larger than 20 percent
frequent_unexpected_intervals = unexpected_intervals[unexpected_intervals_fractions > 20]
if len(frequent_unexpected_intervals) != 0:
frequent_unexpected_intervals_str = ", ".join(
f"{interval} seconds" for interval in frequent_unexpected_intervals
)
msg = (
"The following unexpected intervals have a frequency "
+ f"greater than 20%: {frequent_unexpected_intervals_str}. "
+ f"(Total number of timesteps: {n_timesteps})"
)
log_warning(logger=logger, msg=msg, verbose=verbose)
return int(sample_interval)