# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Functions to process DISDRODB L0B files into DISDRODB L0C netCDF files."""
import logging
import numpy as np
import pandas as pd
import xarray as xr
from disdrodb.api.io import open_netcdf_files
from disdrodb.l0.l0b_processing import set_l0b_encodings
from disdrodb.l1.resampling import add_sample_interval
from disdrodb.utils.attrs import set_disdrodb_attrs
from disdrodb.utils.logger import log_info, log_warning
from disdrodb.utils.time import ensure_sorted_by_time
logger = logging.getLogger(__name__)
# L0C processing requires searching for data (per time blocks) into neighbouring files:
# - to account for possible trailing seconds in previous/next files
# - to get information if at the edges of the time blocks previous/next timesteps are available
# - to shift the time to ensure reported L0C time is the start of the measurement interval
TOLERANCE_SECONDS = 60 * 3
####---------------------------------------------------------------------------------
#### Measurement intervals
[docs]
def drop_timesteps_with_invalid_sample_interval(ds, measurement_intervals, verbose=True, logger=None):
"""Drop timesteps with unexpected sample intervals."""
sample_interval = ds["sample_interval"].to_numpy()
timesteps = ds["time"].to_numpy()
is_valid_sample_interval = np.isin(sample_interval.data, measurement_intervals)
indices_invalid_sample_interval = np.where(~is_valid_sample_interval)[0]
if len(indices_invalid_sample_interval) > 0:
# Log information for each invalid timestep
invalid_timesteps = pd.to_datetime(timesteps[indices_invalid_sample_interval]).strftime("%Y-%m-%d %H:%M:%S")
invalid_sample_intervals = sample_interval[indices_invalid_sample_interval]
for tt, ss in zip(invalid_timesteps, invalid_sample_intervals, strict=True):
msg = f"Unexpected sampling interval ({ss} s) at {tt}. The measurement has been dropped."
log_warning(logger=logger, msg=msg, verbose=verbose)
# Remove timesteps with invalid sample intervals
indices_valid_sample_interval = np.where(is_valid_sample_interval)[0]
ds = ds.isel(time=indices_valid_sample_interval)
return ds
[docs]
def split_dataset_by_sampling_intervals(
ds,
measurement_intervals,
min_sample_interval=10,
min_block_size=5,
time_is_end_interval=True,
):
"""
Split a dataset into subsets where each subset has a consistent sampling interval.
Parameters
----------
ds : xarray.Dataset
The input dataset with a 'time' dimension.
measurement_intervals : list or array-like
A list of possible primary sampling intervals (in seconds) that the dataset might have.
min_sample_interval : int, optional
The minimum expected sampling interval in seconds. Defaults to 10s.
This is used to deal with possible trailing seconds errors.
min_block_size : float, optional
The minimum number of timesteps with a given sampling interval to be considered.
Otherwise such portion of data is discarded !
Defaults to 5 timesteps.
time_is_end_interval: bool
Whether time refers to the end of the measurement interval.
The default is True.
Notes
-----
Does not modify timesteps (regularization is left to `regularize_timesteps`).
Assumes no duplicated timesteps in the dataset.
If only one measurement interval is specified, no timestep-diff checks are performed.
If multiple measurement intervals are specified:
- Raises an error if *none* of the expected intervals appear.
- Splits where interval changes.
Segments shorter than `min_block_size` are discarded.
Returns
-------
dict[int, xarray.Dataset]
A dictionary where keys are the identified sampling intervals (in seconds),
and values are xarray.Datasets containing only data from those sampling intervals.
"""
# Define array of possible measurement intervals
measurement_intervals = np.array(measurement_intervals)
# Check sorted by time and sort if necessary
ds = ensure_sorted_by_time(ds)
# If a single measurement interval expected, return dictionary with input dataset
if len(measurement_intervals) == 1:
dict_ds = {int(measurement_intervals[0]): ds}
return dict_ds
# If sample_interval is a dataset variable, use it to define dictionary of datasets
if "sample_interval" in ds:
dict_ds = {}
for interval in measurement_intervals:
ds_subset = ds.isel(time=ds["sample_interval"] == interval)
if ds_subset.sizes["time"] > 2:
dict_ds[int(interval)] = ds_subset
return dict_ds
# ---------------------------------------------------------------------------------------.
# Otherwise exploit difference between timesteps to identify change point
# Calculate time differences in seconds
deltadt = np.abs(np.diff(ds["time"].data)).astype("timedelta64[s]").astype(int)
# Round each delta to the nearest multiple of 5 (because the smallest possible sample interval is 10 s)
# - This account for possible trailing seconds of the logger
# Example: for sample_interval = 10, deltat values like 8, 9, 11, 12 become 10 ...
# Example: for sample_interval = 10, deltat values like 6, 7 or 13, 14 become respectively 5 and 15 ...
# Example: for sample_interval = 30, deltat values like 28,29,30,31,32 deltat become 30 ...
# Example: for sample_interval = 30, deltat values like 26, 27 or 33, 34 become respectively 25 and 35 ...
min_half_sample_interval = min_sample_interval / 2
deltadt = np.round(deltadt / min_half_sample_interval) * min_half_sample_interval
# Map each delta to one of the possible_measurement_intervals if exact match, otherwise np.nan
mapped_intervals = np.where(np.isin(deltadt, measurement_intervals), deltadt, np.nan)
if np.all(np.isnan(mapped_intervals)):
raise ValueError("Impossible to identify timesteps with expected sampling intervals.")
# Check which measurements intervals are occurring in the dataset
uniques = np.unique(mapped_intervals)
uniques_intervals = uniques[~np.isnan(uniques)]
n_different_intervals_occurring = len(uniques_intervals)
if n_different_intervals_occurring == 1:
dict_ds = {int(k): ds for k in uniques_intervals}
return dict_ds
# Fill NaNs: decide whether to attach to previous or next interval
for i in range(len(mapped_intervals)):
if np.isnan(mapped_intervals[i]):
# If next exists and is NaN → forward fill
if i + 1 < len(mapped_intervals) and np.isnan(mapped_intervals[i + 1]):
mapped_intervals[i] = mapped_intervals[i - 1] if i > 0 else mapped_intervals[i + 1]
# Otherwise → backward fill (attach to next valid)
else:
mapped_intervals[i] = (
mapped_intervals[i + 1] if i + 1 < len(mapped_intervals) else mapped_intervals[i - 1]
)
# Infill np.nan values by using neighbor intervals
# Forward fill
# for i in range(1, len(mapped_intervals)):
# if np.isnan(mapped_intervals[i]):
# mapped_intervals[i] = mapped_intervals[i - 1]
# # Backward fill (in case the first entries were np.nan)
# for i in range(len(mapped_intervals) - 2, -1, -1):
# if np.isnan(mapped_intervals[i]):
# mapped_intervals[i] = mapped_intervals[i + 1]
# Now all intervals are assigned to one of the possible measurement_intervals.
# Identify boundaries where interval changes
change_points = np.where(mapped_intervals[:-1] != mapped_intervals[1:])[0] + 1
# Split ds into segments according to change_points
offset = 1 if time_is_end_interval else 0
segments = np.split(np.arange(ds.sizes["time"]), change_points + offset)
# Remove segments with less than min_block_size elements
segments = [seg for seg in segments if len(seg) >= min_block_size]
if len(segments) == 0:
raise ValueError(
f"No blocks of {min_block_size} consecutive timesteps with constant sampling interval are available.",
)
# Define dataset indices for each sampling interva
dict_sampling_interval_indices = {}
used_indices = set()
for seg in segments:
# Define the assumed sampling interval of such segment
start_idx = seg[0]
segment_sampling_interval = int(mapped_intervals[start_idx])
# Remove any indices that have already been assigned to another interval
seg_filtered = seg[~np.isin(seg, list(used_indices))]
# Only keep segment if it still meets minimum size after filtering
if len(seg_filtered) >= min_block_size:
if segment_sampling_interval not in dict_sampling_interval_indices:
dict_sampling_interval_indices[segment_sampling_interval] = [seg_filtered]
else:
dict_sampling_interval_indices[segment_sampling_interval].append(seg_filtered)
# Mark these indices as used
used_indices.update(seg_filtered)
# Concatenate indices for each sampling interval
dict_sampling_interval_indices = {
k: np.concatenate(list_indices)
for k, list_indices in dict_sampling_interval_indices.items()
if list_indices # Only include if there are valid segments
}
# Define dictionary of datasets
dict_ds = {int(k): ds.isel(time=indices) for k, indices in dict_sampling_interval_indices.items()}
return dict_ds
####---------------------------------------------------------------------------------
#### Timesteps duplicates
[docs]
def has_same_value_over_time(da):
"""
Check if a DataArray has the same value over all timesteps, considering NaNs as equal.
Parameters
----------
da : xarray.DataArray
The DataArray to check. Must have a 'time' dimension.
Returns
-------
bool
True if the values are the same (or NaN in the same positions) across all timesteps,
False otherwise.
"""
# Select the first timestep
da_first = da.isel(time=0)
# Create a boolean array that identifies where values are equal or both NaN
equal_or_nan = (da == da_first) | (da.isnull() & da_first.isnull()) # noqa: PD003
# Check if all values match this condition across all dimensions
return bool(equal_or_nan.all().item())
[docs]
def remove_duplicated_timesteps(ds, ensure_variables_equality=True, logger=None, verbose=True):
"""Removes duplicated timesteps from a xarray dataset."""
# Check for duplicated timesteps
timesteps, counts = np.unique(ds["time"].data, return_counts=True)
duplicated_timesteps = timesteps[counts > 1]
# If no duplicated timesteps, returns dataset as is
if len(duplicated_timesteps) == 0:
return ds
# If there are duplicated timesteps
# - First check for variables equality
# - Keep first occurrence of duplicated timesteps if values are equals
# - Drop duplicated timesteps where values are different
different_duplicated_timesteps = []
equal_duplicated_timesteps = []
for t in duplicated_timesteps:
# Select dataset at given duplicated timestep
ds_duplicated = ds.sel(time=t)
n_t = len(ds_duplicated["time"])
# Check raw_drop_number equality
if not has_same_value_over_time(ds_duplicated["raw_drop_number"]):
different_duplicated_timesteps.append(t)
msg = (
f"Presence of {n_t} duplicated timesteps at {t}."
"They have different 'raw_drop_number' values. These timesteps are dropped."
)
log_warning(logger=logger, msg=msg, verbose=verbose)
# Check other variables equality
other_variables_to_check = [v for v in ds.data_vars if v != "raw_drop_number"]
variables_with_different_values = [
var for var in other_variables_to_check if not has_same_value_over_time(ds_duplicated[var])
]
if len(variables_with_different_values) > 0:
msg = (
f"Presence of {n_t} duplicated timesteps at {t}."
f"The duplicated timesteps have different values in variables {variables_with_different_values}. "
)
if ensure_variables_equality:
different_duplicated_timesteps.append(t)
msg = msg + "These timesteps are dropped."
else:
equal_duplicated_timesteps.append(t)
msg = msg + (
"These timesteps are not dropped because 'raw_drop_number' values are equals."
"'ensure_variables_equality' is False."
)
log_warning(logger=logger, msg=msg, verbose=verbose)
else:
equal_duplicated_timesteps.append(t)
# Ensure single occurrence of duplicated timesteps
equal_duplicated_timesteps = np.unique(equal_duplicated_timesteps)
different_duplicated_timesteps = np.unique(different_duplicated_timesteps)
# - Keep first occurrence of equal_duplicated_timesteps
if len(equal_duplicated_timesteps) > 0:
indices_to_drop = [np.where(ds["time"] == t)[0][1:] for t in equal_duplicated_timesteps]
indices_to_drop = np.concatenate(indices_to_drop)
# Keep only indices not in indices_to_drop
mask = ~np.isin(np.arange(ds["time"].size), indices_to_drop)
ds = ds.isel(time=np.where(mask)[0])
# - Drop different_duplicated_timesteps
if len(different_duplicated_timesteps) > 0:
mask = np.isin(ds["time"], different_duplicated_timesteps, invert=True)
ds = ds.isel(time=np.where(mask)[0])
return ds
####---------------------------------------------------------------------------------
#### Timesteps regularization
[docs]
def get_problematic_timestep_indices(timesteps, sample_interval):
"""Identify timesteps with missing previous or following timesteps."""
previous_time = timesteps - pd.Timedelta(seconds=sample_interval)
next_time = timesteps + pd.Timedelta(seconds=sample_interval)
idx_previous_missing = np.where(~np.isin(previous_time, timesteps))[0][1:]
idx_next_missing = np.where(~np.isin(next_time, timesteps))[0][:-1]
idx_isolated_missing = np.intersect1d(idx_previous_missing, idx_next_missing)
idx_previous_missing = idx_previous_missing[np.isin(idx_previous_missing, idx_isolated_missing, invert=True)]
idx_next_missing = idx_next_missing[np.isin(idx_next_missing, idx_isolated_missing, invert=True)]
return idx_previous_missing, idx_next_missing, idx_isolated_missing
[docs]
def nearest_expected_times(times, expected_times):
"""Return index of nearest expected time."""
# both must be sorted ascending
idx = np.searchsorted(expected_times, times)
idx = np.clip(idx, 1, len(expected_times) - 1)
# Compare distance to the previous vs next expected time
prev = expected_times[idx - 1]
next_ = expected_times[idx]
choose_next = (times - prev) > (next_ - times)
nearest = np.where(choose_next, next_, prev)
return nearest
[docs]
def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=True, logger=None, verbose=True):
"""Ensure timesteps match with the sample_interval.
This function:
- drop dataset indices with duplicated timesteps,
- but does not add missing timesteps to the dataset.
"""
# Check sorted by time and sort if necessary
ds = ensure_sorted_by_time(ds)
# Convert time to pandas.DatetimeIndex for easier manipulation
times = pd.to_datetime(ds["time"].to_numpy())
# Determine the start and end times
start_time = times[0].floor(f"{sample_interval}s")
end_time = times[-1].ceil(f"{sample_interval}s")
# Create the expected time grid
expected_times = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}s")
# Convert to numpy arrays
times = times.to_numpy(dtype="M8[s]")
expected_times = expected_times.to_numpy(dtype="M8[s]")
# Vectorized mapping of observed times → nearest expected times
adjusted_times = nearest_expected_times(times, expected_times)
# Map original times to the nearest expected times
# # Calculate the difference between original times and expected times
# time_deltas = np.abs(times - expected_times[:, None]).astype(int)
# # Find the index of the closest expected time for each original time
# nearest_indices = np.argmin(time_deltas, axis=0)
# adjusted_times = expected_times[nearest_indices]
# Check for duplicates in adjusted times
unique_times, counts = np.unique(adjusted_times, return_counts=True)
duplicates = unique_times[counts > 1]
# Initialize time quality flag
# - 0 when ok or just rounded to closest 00
# - 1 if previous timestep is missing
# - 2 if next timestep is missing
# - 3 if previous and next timestep is missing
# - 4 if solved duplicated timesteps
# - 5 if needed to drop duplicated timesteps and select the last
flag_previous_missing = 1
flag_next_missing = 2
flag_isolated_timestep = 3
flag_solved_duplicated_timestep = 4
flag_dropped_duplicated_timestep = 5
qc_flag = np.zeros(adjusted_times.shape)
# Initialize list with the duplicated timesteps index to drop
# - We drop the first occurrence because is likely the shortest interval
idx_to_drop = []
# Attempt to resolve for duplicates
if duplicates.size > 0:
# Handle duplicates
for dup_time in duplicates:
# Indices of duplicates
dup_indices = np.where(adjusted_times == dup_time)[0]
n_duplicates = len(dup_indices)
# Define previous and following timestep
prev_time = dup_time - pd.Timedelta(seconds=sample_interval)
next_time = dup_time + pd.Timedelta(seconds=sample_interval)
# Try to find missing slots before and after
# - If more than 3 duplicates, impossible to solve !
count_solved = 0
# If the previous timestep is available, set that one
if n_duplicates == 2:
if prev_time not in adjusted_times:
adjusted_times[dup_indices[0]] = prev_time
qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep
count_solved += 1
elif next_time not in adjusted_times:
adjusted_times[dup_indices[-1]] = next_time
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep
count_solved += 1
else:
pass
elif n_duplicates == 3:
if prev_time not in adjusted_times:
adjusted_times[dup_indices[0]] = prev_time
qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep
count_solved += 1
if next_time not in adjusted_times:
adjusted_times[dup_indices[-1]] = next_time
qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep
count_solved += 1
if count_solved != n_duplicates - 1:
idx_to_drop = np.append(idx_to_drop, dup_indices[0:-1])
qc_flag[dup_indices[-1]] = flag_dropped_duplicated_timestep
msg = (
f"Cannot resolve {n_duplicates} duplicated timesteps "
f"(after trailing seconds correction) around {dup_time}."
)
log_warning(logger=logger, msg=msg, verbose=verbose)
if robust:
raise ValueError(msg)
# Update the time coordinate (Convert to ns for xarray compatibility)
ds = ds.assign_coords({"time": adjusted_times.astype("datetime64[ns]")})
# Update quality flag values for next and previous timestep is missing
if add_quality_flag:
idx_previous_missing, idx_next_missing, idx_isolated_missing = get_problematic_timestep_indices(
adjusted_times,
sample_interval,
)
qc_flag[idx_previous_missing] = np.maximum(qc_flag[idx_previous_missing], flag_previous_missing)
qc_flag[idx_next_missing] = np.maximum(qc_flag[idx_next_missing], flag_next_missing)
qc_flag[idx_isolated_missing] = np.maximum(qc_flag[idx_isolated_missing], flag_isolated_timestep)
# If the first timestep is at 00:00 and currently flagged as previous missing (1), reset to 0
# first_time = pd.to_datetime(adjusted_times[0]).time()
# first_expected_time = pd.Timestamp("00:00:00").time()
# if first_time == first_expected_time and qc_flag[0] == flag_previous_missing:
# qc_flag[0] = 0
# # If the last timestep is flagged and currently flagged as next missing (2), reset it to 0
# last_time = pd.to_datetime(adjusted_times[-1]).time()
# last_time_expected = (pd.Timestamp("00:00:00") - pd.Timedelta(30, unit="seconds")).time()
# # Check if adding one interval would go beyond the end_time
# if last_time == last_time_expected and qc_flag[-1] == flag_next_missing:
# qc_flag[-1] = 0
# Add time quality flag variable
ds["qc_time"] = xr.DataArray(qc_flag, dims="time")
# Add CF attributes for qc_time
ds["qc_time"].attrs = {
"long_name": "time quality flag",
"standard_name": "status_flag",
"units": "1",
"valid_range": [0, 5],
"flag_values": [0, 1, 2, 3, 4, 5],
"flag_meanings": (
"good_data "
"previous_timestep_missing "
"next_timestep_missing "
"isolated_timestep "
"solved_duplicated_timestep "
"dropped_duplicated_timestep"
),
"comment": (
"Quality flag for time coordinate. "
"Flag 0: data is good or just rounded to nearest sampling interval. "
"Flag 1: previous timestep is missing in the time series. "
"Flag 2: next timestep is missing in the time series. "
"Flag 3: both previous and next timesteps are missing (isolated timestep). "
"Flag 4: timestep was moved from duplicate to fill missing timestep. "
"Flag 5: duplicate timestep was dropped, keeping the last occurrence."
),
}
# Drop duplicated timesteps
# - Using ds = ds.drop_isel({"time": idx_to_drop.astype(int)}) raise:
# --> pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects
# --> https://github.com/pydata/xarray/issues/6605
if len(idx_to_drop) > 0:
idx_to_drop = idx_to_drop.astype(int)
idx_valid_timesteps = np.arange(0, ds["time"].size)
idx_valid_timesteps = np.delete(idx_valid_timesteps, idx_to_drop)
ds = ds.isel(time=idx_valid_timesteps)
# Return dataset
return ds
[docs]
def check_timesteps_regularity(ds, sample_interval, verbose=False, logger=None):
"""Check for the regularity of timesteps."""
# Check sorted by time and sort if necessary
ds = ensure_sorted_by_time(ds)
# Calculate number of timesteps
n = len(ds["time"].data)
# Calculate time differences in seconds
deltadt = np.diff(ds["time"].data).astype("timedelta64[s]").astype(int)
# Identify unique time intervals and their occurrences
unique_deltadt, counts = np.unique(deltadt, return_counts=True)
# Determine the most frequent time interval (mode)
most_frequent_deltadt_idx = np.argmax(counts)
most_frequent_deltadt = unique_deltadt[most_frequent_deltadt_idx]
# Count fraction occurrence of deltadt
fractions = np.round(counts / len(deltadt) * 100, 2)
# Compute stats about expected deltadt
mask = unique_deltadt == sample_interval
sample_interval_counts = counts[mask].item() if mask.any() else 0
sample_interval_fraction = fractions[mask].item() if mask.any() else 0.0
# Compute stats about most frequent deltadt
mask = unique_deltadt == most_frequent_deltadt
most_frequent_deltadt_counts = counts[mask].item() if mask.any() else 0
most_frequent_deltadt_fraction = fractions[mask].item() if mask.any() else 0.0
# Compute stats about unexpected deltadt
unexpected_intervals = unique_deltadt[unique_deltadt != sample_interval]
unexpected_intervals_counts = counts[unique_deltadt != sample_interval]
unexpected_intervals_fractions = fractions[unique_deltadt != sample_interval]
frequent_unexpected_intervals = unexpected_intervals[unexpected_intervals_fractions > 5]
# Report warning if the sampling_interval deltadt occurs less often than 60 % of times
# -> TODO: maybe only report in stations where the disdro does not log only data when rainy
if sample_interval_fraction < 60:
msg = (
f"The expected (sampling) interval between observations occurs only "
f"{sample_interval_counts}/{n} times ({sample_interval_fraction} %)."
)
log_warning(logger=logger, msg=msg, verbose=verbose)
# Report warning if a deltadt occurs more often then the sampling interval
if most_frequent_deltadt != sample_interval:
msg = (
f"The most frequent time interval between observations is {most_frequent_deltadt} s "
f"(occurs {most_frequent_deltadt_counts}/{n} times) ({most_frequent_deltadt_fraction}%) "
f"although the expected (sampling) interval is {sample_interval} s "
f"and occurs {sample_interval_counts}/{n} times ({sample_interval_fraction}%)."
)
log_warning(logger=logger, msg=msg, verbose=verbose)
# Report with a warning all unexpected deltadt with frequency larger than 5 %
if len(frequent_unexpected_intervals) > 0:
msg = "The following time intervals between observations occur frequently: "
for interval in frequent_unexpected_intervals:
c = unexpected_intervals_counts[unexpected_intervals == interval].item()
f = unexpected_intervals_fractions[unexpected_intervals == interval].item()
msg = msg + f"{interval} s ({f}%) ({c}/{n})"
log_warning(logger=logger, msg=msg, verbose=verbose)
return ds
####----------------------------------------------------------------------------------------------.
#### Wrapper
[docs]
def finalize_l0c_dataset(ds, sample_interval, sensor_name, verbose=True, logger=None):
"""Finalize a L0C dataset with unique sampling interval.
It adds the sampling_interval coordinate and it regularizes the timesteps for trailing seconds.
"""
# Add sample interval as coordinate
ds = add_sample_interval(ds, sample_interval=sample_interval)
# Regularize timesteps (for trailing seconds)
# --> This remove time encoding
ds = regularize_timesteps(
ds,
sample_interval=sample_interval,
robust=False, # if True, raise error if an error occur during regularization
add_quality_flag=True,
verbose=verbose,
logger=logger,
)
# Performs checks about timesteps regularity
# - Do not discard anything
# - Just log warnings in the log file
ds = check_timesteps_regularity(ds=ds, sample_interval=sample_interval, verbose=verbose, logger=logger)
# Shift timesteps to ensure time correspond to start of measurement interval
# TODO as function of sensor name
# Set netCDF dimension order
# --> Required for correct encoding !
ds = ds.transpose("time", "diameter_bin_center", ...)
# Set encodings
ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name)
# Update global attributes
ds = set_disdrodb_attrs(ds, product="L0C")
return ds
[docs]
def create_l0c_datasets(
event_info,
measurement_intervals,
ensure_variables_equality=True,
logger=None,
verbose=True,
):
"""
Create a single dataset by merging and processing data from multiple filepaths.
Parameters
----------
event_info : dict
Dictionary with start_time, end_time and filepaths keys.
Returns
-------
dict
A dictionary with an xarray.Dataset for each measurement interval.
Raises
------
ValueError
If less than 5 timesteps are available for the specified day.
Notes
-----
- Data is loaded into memory and connections to source files are closed before returning the dataset.
- Tolerance in input files is used around expected dataset start_time and end_time to account for
imprecise logging times and ensuring correct definition of qc_time at files boundaries (e.g. 00:00).
- Duplicated timesteps with different raw drop number values are dropped
- First occurrence of duplicated timesteps with equal raw drop number values is kept.
- Regularizes timesteps to handle trailing seconds.
"""
# ---------------------------------------------------------------------------------------.
# Retrieve information
start_time = np.array(event_info["start_time"], dtype="M8[s]")
end_time = np.array(event_info["end_time"], dtype="M8[s]")
filepaths = event_info["filepaths"]
# Define expected dataset time coverage
start_time_tol = start_time - np.array(TOLERANCE_SECONDS, dtype="m8[s]")
end_time_tol = end_time + np.array(TOLERANCE_SECONDS, dtype="m8[s]")
# ---------------------------------------------------------------------------------------.
# Open files with data within the provided day and concatenate them
# - If no timesteps available (for given time block, return empty dictionary)
# - Can occur when raw files are already by block of months and e.g. here saving to daily blocks !
try:
ds = open_netcdf_files(
filepaths,
start_time=start_time_tol,
end_time=end_time_tol,
chunks={},
parallel=False,
compute=True,
)
except ValueError as e:
if str(e).startswith("No timesteps available"):
log_warning(
logger=logger,
msg=f"No data between {start_time} and {end_time}.",
)
return {}
raise
# If 1 or 2 timesteps per time block, return empty dictionary
n_timesteps = len(ds["time"])
if n_timesteps < 3:
log_info(
logger=logger,
msg=f"Only {n_timesteps} timesteps between {start_time} and {end_time}.",
verbose=verbose,
)
return {}
# Create dictionary of L0C datasets for each measurement interval
dict_ds = generate_l0c_datasets(
ds=ds,
measurement_intervals=measurement_intervals,
ensure_variables_equality=ensure_variables_equality,
logger=logger,
verbose=verbose,
)
# Subset to specified start_time and end_time
dict_ds = {
sample_interval: ds.sel({"time": slice(start_time, end_time)}) for sample_interval, ds in dict_ds.items()
}
return dict_ds
[docs]
def generate_l0c_datasets(ds, measurement_intervals, ensure_variables_equality=True, logger=None, verbose=True):
"""Generate L0C datasets from L0B data separating timesteps with different measurement intervals.
This function processes an L0B dataset by removing invalid and duplicated timesteps,
splitting timesteps by measurement intervals, and finalizing each subset into L0C format.
Parameters
----------
ds : xarray.Dataset
Input L0B dataset to process.
measurement_intervals : list or int
List of expected measurement intervals (in seconds).
ensure_variables_equality : bool, optional
If True, drops duplicated timesteps where variables other than 'raw_drop_number'
have different values. If False, keeps duplicated timesteps if 'raw_drop_number'
values are equal, even if other variables differ.
Default is True.
logger : logging.Logger, optional
Logger instance for logging warnings and information. Default is None.
verbose : bool, optional
If True, prints log messages to console in addition to logging. Default is True.
Returns
-------
dict
Dictionary with measurements intervals (int, in seconds) as keys and processed
xarray.Dataset objects as values. Each dataset contains data for a single
measurement interval with regularized timesteps and quality flags.
Notes
-----
Processing steps:
1. Drops timesteps with invalid sample intervals (if 'sample_interval' variable exists)
2. Removes duplicated timesteps based on 'raw_drop_number' equality
3. Splits dataset by sampling intervals
4. Regularizes timesteps to handle trailing seconds
5. Adds quality control flags for time coordinate
6. Adds sample_interval coordinate and updates attributes
Multiple sampling intervals may be present in a single input dataset, resulting
in multiple output datasets.
"""
# Retrieve sensor name from dataset attributes
if "sensor_name" not in ds.attrs:
raise ValueError("Missing 'sensor_name' attribute in the dataset.")
sensor_name = ds.attrs.get("sensor_name")
# ---------------------------------------------------------------------------------------.
# If sample interval is a dataset variable, drop timesteps with unexpected measurement intervals !
if "sample_interval" in ds:
ds = drop_timesteps_with_invalid_sample_interval(
ds=ds,
measurement_intervals=measurement_intervals,
verbose=verbose,
logger=logger,
)
n_timesteps = len(ds["time"])
if n_timesteps < 3:
raise ValueError(f"Only {n_timesteps} timesteps left after removing those with unexpected sample interval.")
# ---------------------------------------------------------------------------------------.
# Remove duplicated timesteps (before correcting for trailing seconds)
# - It checks that duplicated timesteps have the same raw_drop_number values
# - If duplicated timesteps have different raw_drop_number values:
# --> warning is raised
# --> timesteps are dropped
ds = remove_duplicated_timesteps(
ds,
ensure_variables_equality=ensure_variables_equality,
logger=logger,
verbose=verbose,
)
# Raise error if less than 3 timesteps left
n_timesteps = len(ds["time"])
if n_timesteps < 3:
raise ValueError(f"{n_timesteps} timesteps left after removing duplicated.")
# ---------------------------------------------------------------------------------------.
# Split dataset by sampling intervals
dict_ds = split_dataset_by_sampling_intervals(
ds=ds,
measurement_intervals=measurement_intervals,
min_sample_interval=10,
min_block_size=5,
)
# Log a warning if two sampling intervals are present within a given time block
if len(dict_ds) > 1:
occuring_sampling_intervals = list(dict_ds)
msg = f"The input files contains these sampling intervals: {occuring_sampling_intervals}."
log_warning(logger=logger, msg=msg, verbose=verbose)
# ---------------------------------------------------------------------------------------.
# Finalize L0C datasets
# - Add and ensure sample_interval coordinate has just 1 value (not varying with time)
# - Regularize timesteps for trailing seconds
dict_ds = {
sample_interval: finalize_l0c_dataset(
ds=ds,
sample_interval=sample_interval,
sensor_name=sensor_name,
verbose=verbose,
logger=logger,
)
for sample_interval, ds in dict_ds.items()
}
return dict_ds
[docs]
def generate_l0c(ds, measurement_interval, ensure_variables_equality=True, logger=None, verbose=True):
"""Generate a single L0C dataset for a specific measurement interval.
This is a convenience wrapper around `generate_l0c_datasets` that returns only
the dataset corresponding to the specified measurement interval.
Parameters
----------
ds : xarray.Dataset
Input L0B dataset to process.
measurement_interval : int
The expected measurement interval (in seconds) of the data.
ensure_variables_equality : bool, optional
If True, drops duplicated timesteps where variables other than 'raw_drop_number'
have different values. If False, keeps duplicated timesteps if 'raw_drop_number'
values are equal, even if other variables differ.
Default is True.
logger : logging.Logger, optional
Logger instance for logging warnings and information. Default is None.
verbose : bool, optional
If True, prints log messages to console in addition to logging. Default is True.
Returns
-------
xarray.Dataset
Processed L0C dataset for the specified measurement interval with regularized
timesteps and quality flags.
Notes
-----
Processing steps:
1. Drops timesteps with invalid measurement interval (if 'sample_interval' variable exists)
2. Removes duplicated timesteps based on 'raw_drop_number' equality
4. Regularizes timesteps to handle trailing seconds
6. Adds sample_interval coordinate and updates attributes
See Also
--------
generate_l0c_datasets
"""
if not isinstance(measurement_interval, int):
raise TypeError("measurement_interval must be an integer.")
dict_ds = generate_l0c_datasets(
ds,
measurement_intervals=measurement_interval,
ensure_variables_equality=ensure_variables_equality,
logger=logger,
verbose=verbose,
)
return dict_ds[measurement_interval]