Source code for disdrodb.l0.l0c_processing

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Functions to process DISDRODB L0B files into DISDRODB L0C netCDF files."""

import logging

import numpy as np
import pandas as pd
import xarray as xr

from disdrodb.api.io import open_netcdf_files
from disdrodb.l0.l0b_processing import set_l0b_encodings
from disdrodb.l1.resampling import add_sample_interval
from disdrodb.utils.attrs import set_disdrodb_attrs
from disdrodb.utils.logger import log_info, log_warning
from disdrodb.utils.time import ensure_sorted_by_time

logger = logging.getLogger(__name__)

# L0C processing requires searching for data (per time blocks) into neighbouring files:
# - to account for possible trailing seconds in previous/next files
# - to get information if at the edges of the time blocks previous/next timesteps are available
# - to shift the time to ensure reported L0C time is the start of the measurement interval
TOLERANCE_SECONDS = 60 * 3

####---------------------------------------------------------------------------------
#### Measurement intervals


[docs] def drop_timesteps_with_invalid_sample_interval(ds, measurement_intervals, verbose=True, logger=None): """Drop timesteps with unexpected sample intervals.""" sample_interval = ds["sample_interval"].to_numpy() timesteps = ds["time"].to_numpy() is_valid_sample_interval = np.isin(sample_interval.data, measurement_intervals) indices_invalid_sample_interval = np.where(~is_valid_sample_interval)[0] if len(indices_invalid_sample_interval) > 0: # Log information for each invalid timestep invalid_timesteps = pd.to_datetime(timesteps[indices_invalid_sample_interval]).strftime("%Y-%m-%d %H:%M:%S") invalid_sample_intervals = sample_interval[indices_invalid_sample_interval] for tt, ss in zip(invalid_timesteps, invalid_sample_intervals, strict=True): msg = f"Unexpected sampling interval ({ss} s) at {tt}. The measurement has been dropped." log_warning(logger=logger, msg=msg, verbose=verbose) # Remove timesteps with invalid sample intervals indices_valid_sample_interval = np.where(is_valid_sample_interval)[0] ds = ds.isel(time=indices_valid_sample_interval) return ds
[docs] def split_dataset_by_sampling_intervals( ds, measurement_intervals, min_sample_interval=10, min_block_size=5, time_is_end_interval=True, ): """ Split a dataset into subsets where each subset has a consistent sampling interval. Parameters ---------- ds : xarray.Dataset The input dataset with a 'time' dimension. measurement_intervals : list or array-like A list of possible primary sampling intervals (in seconds) that the dataset might have. min_sample_interval : int, optional The minimum expected sampling interval in seconds. Defaults to 10s. This is used to deal with possible trailing seconds errors. min_block_size : float, optional The minimum number of timesteps with a given sampling interval to be considered. Otherwise such portion of data is discarded ! Defaults to 5 timesteps. time_is_end_interval: bool Whether time refers to the end of the measurement interval. The default is True. Notes ----- Does not modify timesteps (regularization is left to `regularize_timesteps`). Assumes no duplicated timesteps in the dataset. If only one measurement interval is specified, no timestep-diff checks are performed. If multiple measurement intervals are specified: - Raises an error if *none* of the expected intervals appear. - Splits where interval changes. Segments shorter than `min_block_size` are discarded. Returns ------- dict[int, xarray.Dataset] A dictionary where keys are the identified sampling intervals (in seconds), and values are xarray.Datasets containing only data from those sampling intervals. """ # Define array of possible measurement intervals measurement_intervals = np.array(measurement_intervals) # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) # If a single measurement interval expected, return dictionary with input dataset if len(measurement_intervals) == 1: dict_ds = {int(measurement_intervals[0]): ds} return dict_ds # If sample_interval is a dataset variable, use it to define dictionary of datasets if "sample_interval" in ds: dict_ds = {} for interval in measurement_intervals: ds_subset = ds.isel(time=ds["sample_interval"] == interval) if ds_subset.sizes["time"] > 2: dict_ds[int(interval)] = ds_subset return dict_ds # ---------------------------------------------------------------------------------------. # Otherwise exploit difference between timesteps to identify change point # Calculate time differences in seconds deltadt = np.abs(np.diff(ds["time"].data)).astype("timedelta64[s]").astype(int) # Round each delta to the nearest multiple of 5 (because the smallest possible sample interval is 10 s) # - This account for possible trailing seconds of the logger # Example: for sample_interval = 10, deltat values like 8, 9, 11, 12 become 10 ... # Example: for sample_interval = 10, deltat values like 6, 7 or 13, 14 become respectively 5 and 15 ... # Example: for sample_interval = 30, deltat values like 28,29,30,31,32 deltat become 30 ... # Example: for sample_interval = 30, deltat values like 26, 27 or 33, 34 become respectively 25 and 35 ... min_half_sample_interval = min_sample_interval / 2 deltadt = np.round(deltadt / min_half_sample_interval) * min_half_sample_interval # Map each delta to one of the possible_measurement_intervals if exact match, otherwise np.nan mapped_intervals = np.where(np.isin(deltadt, measurement_intervals), deltadt, np.nan) if np.all(np.isnan(mapped_intervals)): raise ValueError("Impossible to identify timesteps with expected sampling intervals.") # Check which measurements intervals are occurring in the dataset uniques = np.unique(mapped_intervals) uniques_intervals = uniques[~np.isnan(uniques)] n_different_intervals_occurring = len(uniques_intervals) if n_different_intervals_occurring == 1: dict_ds = {int(k): ds for k in uniques_intervals} return dict_ds # Fill NaNs: decide whether to attach to previous or next interval for i in range(len(mapped_intervals)): if np.isnan(mapped_intervals[i]): # If next exists and is NaN → forward fill if i + 1 < len(mapped_intervals) and np.isnan(mapped_intervals[i + 1]): mapped_intervals[i] = mapped_intervals[i - 1] if i > 0 else mapped_intervals[i + 1] # Otherwise → backward fill (attach to next valid) else: mapped_intervals[i] = ( mapped_intervals[i + 1] if i + 1 < len(mapped_intervals) else mapped_intervals[i - 1] ) # Infill np.nan values by using neighbor intervals # Forward fill # for i in range(1, len(mapped_intervals)): # if np.isnan(mapped_intervals[i]): # mapped_intervals[i] = mapped_intervals[i - 1] # # Backward fill (in case the first entries were np.nan) # for i in range(len(mapped_intervals) - 2, -1, -1): # if np.isnan(mapped_intervals[i]): # mapped_intervals[i] = mapped_intervals[i + 1] # Now all intervals are assigned to one of the possible measurement_intervals. # Identify boundaries where interval changes change_points = np.where(mapped_intervals[:-1] != mapped_intervals[1:])[0] + 1 # Split ds into segments according to change_points offset = 1 if time_is_end_interval else 0 segments = np.split(np.arange(ds.sizes["time"]), change_points + offset) # Remove segments with less than min_block_size elements segments = [seg for seg in segments if len(seg) >= min_block_size] if len(segments) == 0: raise ValueError( f"No blocks of {min_block_size} consecutive timesteps with constant sampling interval are available.", ) # Define dataset indices for each sampling interva dict_sampling_interval_indices = {} used_indices = set() for seg in segments: # Define the assumed sampling interval of such segment start_idx = seg[0] segment_sampling_interval = int(mapped_intervals[start_idx]) # Remove any indices that have already been assigned to another interval seg_filtered = seg[~np.isin(seg, list(used_indices))] # Only keep segment if it still meets minimum size after filtering if len(seg_filtered) >= min_block_size: if segment_sampling_interval not in dict_sampling_interval_indices: dict_sampling_interval_indices[segment_sampling_interval] = [seg_filtered] else: dict_sampling_interval_indices[segment_sampling_interval].append(seg_filtered) # Mark these indices as used used_indices.update(seg_filtered) # Concatenate indices for each sampling interval dict_sampling_interval_indices = { k: np.concatenate(list_indices) for k, list_indices in dict_sampling_interval_indices.items() if list_indices # Only include if there are valid segments } # Define dictionary of datasets dict_ds = {int(k): ds.isel(time=indices) for k, indices in dict_sampling_interval_indices.items()} return dict_ds
####--------------------------------------------------------------------------------- #### Timesteps duplicates
[docs] def has_same_value_over_time(da): """ Check if a DataArray has the same value over all timesteps, considering NaNs as equal. Parameters ---------- da : xarray.DataArray The DataArray to check. Must have a 'time' dimension. Returns ------- bool True if the values are the same (or NaN in the same positions) across all timesteps, False otherwise. """ # Select the first timestep da_first = da.isel(time=0) # Create a boolean array that identifies where values are equal or both NaN equal_or_nan = (da == da_first) | (da.isnull() & da_first.isnull()) # noqa: PD003 # Check if all values match this condition across all dimensions return bool(equal_or_nan.all().item())
[docs] def remove_duplicated_timesteps(ds, ensure_variables_equality=True, logger=None, verbose=True): """Removes duplicated timesteps from a xarray dataset.""" # Check for duplicated timesteps timesteps, counts = np.unique(ds["time"].data, return_counts=True) duplicated_timesteps = timesteps[counts > 1] # If no duplicated timesteps, returns dataset as is if len(duplicated_timesteps) == 0: return ds # If there are duplicated timesteps # - First check for variables equality # - Keep first occurrence of duplicated timesteps if values are equals # - Drop duplicated timesteps where values are different different_duplicated_timesteps = [] equal_duplicated_timesteps = [] for t in duplicated_timesteps: # Select dataset at given duplicated timestep ds_duplicated = ds.sel(time=t) n_t = len(ds_duplicated["time"]) # Check raw_drop_number equality if not has_same_value_over_time(ds_duplicated["raw_drop_number"]): different_duplicated_timesteps.append(t) msg = ( f"Presence of {n_t} duplicated timesteps at {t}." "They have different 'raw_drop_number' values. These timesteps are dropped." ) log_warning(logger=logger, msg=msg, verbose=verbose) # Check other variables equality other_variables_to_check = [v for v in ds.data_vars if v != "raw_drop_number"] variables_with_different_values = [ var for var in other_variables_to_check if not has_same_value_over_time(ds_duplicated[var]) ] if len(variables_with_different_values) > 0: msg = ( f"Presence of {n_t} duplicated timesteps at {t}." f"The duplicated timesteps have different values in variables {variables_with_different_values}. " ) if ensure_variables_equality: different_duplicated_timesteps.append(t) msg = msg + "These timesteps are dropped." else: equal_duplicated_timesteps.append(t) msg = msg + ( "These timesteps are not dropped because 'raw_drop_number' values are equals." "'ensure_variables_equality' is False." ) log_warning(logger=logger, msg=msg, verbose=verbose) else: equal_duplicated_timesteps.append(t) # Ensure single occurrence of duplicated timesteps equal_duplicated_timesteps = np.unique(equal_duplicated_timesteps) different_duplicated_timesteps = np.unique(different_duplicated_timesteps) # - Keep first occurrence of equal_duplicated_timesteps if len(equal_duplicated_timesteps) > 0: indices_to_drop = [np.where(ds["time"] == t)[0][1:] for t in equal_duplicated_timesteps] indices_to_drop = np.concatenate(indices_to_drop) # Keep only indices not in indices_to_drop mask = ~np.isin(np.arange(ds["time"].size), indices_to_drop) ds = ds.isel(time=np.where(mask)[0]) # - Drop different_duplicated_timesteps if len(different_duplicated_timesteps) > 0: mask = np.isin(ds["time"], different_duplicated_timesteps, invert=True) ds = ds.isel(time=np.where(mask)[0]) return ds
####--------------------------------------------------------------------------------- #### Timesteps regularization
[docs] def get_problematic_timestep_indices(timesteps, sample_interval): """Identify timesteps with missing previous or following timesteps.""" previous_time = timesteps - pd.Timedelta(seconds=sample_interval) next_time = timesteps + pd.Timedelta(seconds=sample_interval) idx_previous_missing = np.where(~np.isin(previous_time, timesteps))[0][1:] idx_next_missing = np.where(~np.isin(next_time, timesteps))[0][:-1] idx_isolated_missing = np.intersect1d(idx_previous_missing, idx_next_missing) idx_previous_missing = idx_previous_missing[np.isin(idx_previous_missing, idx_isolated_missing, invert=True)] idx_next_missing = idx_next_missing[np.isin(idx_next_missing, idx_isolated_missing, invert=True)] return idx_previous_missing, idx_next_missing, idx_isolated_missing
[docs] def nearest_expected_times(times, expected_times): """Return index of nearest expected time.""" # both must be sorted ascending idx = np.searchsorted(expected_times, times) idx = np.clip(idx, 1, len(expected_times) - 1) # Compare distance to the previous vs next expected time prev = expected_times[idx - 1] next_ = expected_times[idx] choose_next = (times - prev) > (next_ - times) nearest = np.where(choose_next, next_, prev) return nearest
[docs] def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=True, logger=None, verbose=True): """Ensure timesteps match with the sample_interval. This function: - drop dataset indices with duplicated timesteps, - but does not add missing timesteps to the dataset. """ # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) # Convert time to pandas.DatetimeIndex for easier manipulation times = pd.to_datetime(ds["time"].to_numpy()) # Determine the start and end times start_time = times[0].floor(f"{sample_interval}s") end_time = times[-1].ceil(f"{sample_interval}s") # Create the expected time grid expected_times = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}s") # Convert to numpy arrays times = times.to_numpy(dtype="M8[s]") expected_times = expected_times.to_numpy(dtype="M8[s]") # Vectorized mapping of observed times → nearest expected times adjusted_times = nearest_expected_times(times, expected_times) # Map original times to the nearest expected times # # Calculate the difference between original times and expected times # time_deltas = np.abs(times - expected_times[:, None]).astype(int) # # Find the index of the closest expected time for each original time # nearest_indices = np.argmin(time_deltas, axis=0) # adjusted_times = expected_times[nearest_indices] # Check for duplicates in adjusted times unique_times, counts = np.unique(adjusted_times, return_counts=True) duplicates = unique_times[counts > 1] # Initialize time quality flag # - 0 when ok or just rounded to closest 00 # - 1 if previous timestep is missing # - 2 if next timestep is missing # - 3 if previous and next timestep is missing # - 4 if solved duplicated timesteps # - 5 if needed to drop duplicated timesteps and select the last flag_previous_missing = 1 flag_next_missing = 2 flag_isolated_timestep = 3 flag_solved_duplicated_timestep = 4 flag_dropped_duplicated_timestep = 5 qc_flag = np.zeros(adjusted_times.shape) # Initialize list with the duplicated timesteps index to drop # - We drop the first occurrence because is likely the shortest interval idx_to_drop = [] # Attempt to resolve for duplicates if duplicates.size > 0: # Handle duplicates for dup_time in duplicates: # Indices of duplicates dup_indices = np.where(adjusted_times == dup_time)[0] n_duplicates = len(dup_indices) # Define previous and following timestep prev_time = dup_time - pd.Timedelta(seconds=sample_interval) next_time = dup_time + pd.Timedelta(seconds=sample_interval) # Try to find missing slots before and after # - If more than 3 duplicates, impossible to solve ! count_solved = 0 # If the previous timestep is available, set that one if n_duplicates == 2: if prev_time not in adjusted_times: adjusted_times[dup_indices[0]] = prev_time qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep count_solved += 1 elif next_time not in adjusted_times: adjusted_times[dup_indices[-1]] = next_time qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep count_solved += 1 else: pass elif n_duplicates == 3: if prev_time not in adjusted_times: adjusted_times[dup_indices[0]] = prev_time qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep count_solved += 1 if next_time not in adjusted_times: adjusted_times[dup_indices[-1]] = next_time qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep count_solved += 1 if count_solved != n_duplicates - 1: idx_to_drop = np.append(idx_to_drop, dup_indices[0:-1]) qc_flag[dup_indices[-1]] = flag_dropped_duplicated_timestep msg = ( f"Cannot resolve {n_duplicates} duplicated timesteps " f"(after trailing seconds correction) around {dup_time}." ) log_warning(logger=logger, msg=msg, verbose=verbose) if robust: raise ValueError(msg) # Update the time coordinate (Convert to ns for xarray compatibility) ds = ds.assign_coords({"time": adjusted_times.astype("datetime64[ns]")}) # Update quality flag values for next and previous timestep is missing if add_quality_flag: idx_previous_missing, idx_next_missing, idx_isolated_missing = get_problematic_timestep_indices( adjusted_times, sample_interval, ) qc_flag[idx_previous_missing] = np.maximum(qc_flag[idx_previous_missing], flag_previous_missing) qc_flag[idx_next_missing] = np.maximum(qc_flag[idx_next_missing], flag_next_missing) qc_flag[idx_isolated_missing] = np.maximum(qc_flag[idx_isolated_missing], flag_isolated_timestep) # If the first timestep is at 00:00 and currently flagged as previous missing (1), reset to 0 # first_time = pd.to_datetime(adjusted_times[0]).time() # first_expected_time = pd.Timestamp("00:00:00").time() # if first_time == first_expected_time and qc_flag[0] == flag_previous_missing: # qc_flag[0] = 0 # # If the last timestep is flagged and currently flagged as next missing (2), reset it to 0 # last_time = pd.to_datetime(adjusted_times[-1]).time() # last_time_expected = (pd.Timestamp("00:00:00") - pd.Timedelta(30, unit="seconds")).time() # # Check if adding one interval would go beyond the end_time # if last_time == last_time_expected and qc_flag[-1] == flag_next_missing: # qc_flag[-1] = 0 # Add time quality flag variable ds["qc_time"] = xr.DataArray(qc_flag, dims="time") # Add CF attributes for qc_time ds["qc_time"].attrs = { "long_name": "time quality flag", "standard_name": "status_flag", "units": "1", "valid_range": [0, 5], "flag_values": [0, 1, 2, 3, 4, 5], "flag_meanings": ( "good_data " "previous_timestep_missing " "next_timestep_missing " "isolated_timestep " "solved_duplicated_timestep " "dropped_duplicated_timestep" ), "comment": ( "Quality flag for time coordinate. " "Flag 0: data is good or just rounded to nearest sampling interval. " "Flag 1: previous timestep is missing in the time series. " "Flag 2: next timestep is missing in the time series. " "Flag 3: both previous and next timesteps are missing (isolated timestep). " "Flag 4: timestep was moved from duplicate to fill missing timestep. " "Flag 5: duplicate timestep was dropped, keeping the last occurrence." ), } # Drop duplicated timesteps # - Using ds = ds.drop_isel({"time": idx_to_drop.astype(int)}) raise: # --> pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects # --> https://github.com/pydata/xarray/issues/6605 if len(idx_to_drop) > 0: idx_to_drop = idx_to_drop.astype(int) idx_valid_timesteps = np.arange(0, ds["time"].size) idx_valid_timesteps = np.delete(idx_valid_timesteps, idx_to_drop) ds = ds.isel(time=idx_valid_timesteps) # Return dataset return ds
[docs] def check_timesteps_regularity(ds, sample_interval, verbose=False, logger=None): """Check for the regularity of timesteps.""" # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) # Calculate number of timesteps n = len(ds["time"].data) # Calculate time differences in seconds deltadt = np.diff(ds["time"].data).astype("timedelta64[s]").astype(int) # Identify unique time intervals and their occurrences unique_deltadt, counts = np.unique(deltadt, return_counts=True) # Determine the most frequent time interval (mode) most_frequent_deltadt_idx = np.argmax(counts) most_frequent_deltadt = unique_deltadt[most_frequent_deltadt_idx] # Count fraction occurrence of deltadt fractions = np.round(counts / len(deltadt) * 100, 2) # Compute stats about expected deltadt mask = unique_deltadt == sample_interval sample_interval_counts = counts[mask].item() if mask.any() else 0 sample_interval_fraction = fractions[mask].item() if mask.any() else 0.0 # Compute stats about most frequent deltadt mask = unique_deltadt == most_frequent_deltadt most_frequent_deltadt_counts = counts[mask].item() if mask.any() else 0 most_frequent_deltadt_fraction = fractions[mask].item() if mask.any() else 0.0 # Compute stats about unexpected deltadt unexpected_intervals = unique_deltadt[unique_deltadt != sample_interval] unexpected_intervals_counts = counts[unique_deltadt != sample_interval] unexpected_intervals_fractions = fractions[unique_deltadt != sample_interval] frequent_unexpected_intervals = unexpected_intervals[unexpected_intervals_fractions > 5] # Report warning if the sampling_interval deltadt occurs less often than 60 % of times # -> TODO: maybe only report in stations where the disdro does not log only data when rainy if sample_interval_fraction < 60: msg = ( f"The expected (sampling) interval between observations occurs only " f"{sample_interval_counts}/{n} times ({sample_interval_fraction} %)." ) log_warning(logger=logger, msg=msg, verbose=verbose) # Report warning if a deltadt occurs more often then the sampling interval if most_frequent_deltadt != sample_interval: msg = ( f"The most frequent time interval between observations is {most_frequent_deltadt} s " f"(occurs {most_frequent_deltadt_counts}/{n} times) ({most_frequent_deltadt_fraction}%) " f"although the expected (sampling) interval is {sample_interval} s " f"and occurs {sample_interval_counts}/{n} times ({sample_interval_fraction}%)." ) log_warning(logger=logger, msg=msg, verbose=verbose) # Report with a warning all unexpected deltadt with frequency larger than 5 % if len(frequent_unexpected_intervals) > 0: msg = "The following time intervals between observations occur frequently: " for interval in frequent_unexpected_intervals: c = unexpected_intervals_counts[unexpected_intervals == interval].item() f = unexpected_intervals_fractions[unexpected_intervals == interval].item() msg = msg + f"{interval} s ({f}%) ({c}/{n})" log_warning(logger=logger, msg=msg, verbose=verbose) return ds
####----------------------------------------------------------------------------------------------. #### Wrapper
[docs] def finalize_l0c_dataset(ds, sample_interval, sensor_name, verbose=True, logger=None): """Finalize a L0C dataset with unique sampling interval. It adds the sampling_interval coordinate and it regularizes the timesteps for trailing seconds. """ # Add sample interval as coordinate ds = add_sample_interval(ds, sample_interval=sample_interval) # Regularize timesteps (for trailing seconds) # --> This remove time encoding ds = regularize_timesteps( ds, sample_interval=sample_interval, robust=False, # if True, raise error if an error occur during regularization add_quality_flag=True, verbose=verbose, logger=logger, ) # Performs checks about timesteps regularity # - Do not discard anything # - Just log warnings in the log file ds = check_timesteps_regularity(ds=ds, sample_interval=sample_interval, verbose=verbose, logger=logger) # Shift timesteps to ensure time correspond to start of measurement interval # TODO as function of sensor name # Set netCDF dimension order # --> Required for correct encoding ! ds = ds.transpose("time", "diameter_bin_center", ...) # Set encodings ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name) # Update global attributes ds = set_disdrodb_attrs(ds, product="L0C") return ds
[docs] def create_l0c_datasets( event_info, measurement_intervals, ensure_variables_equality=True, logger=None, verbose=True, ): """ Create a single dataset by merging and processing data from multiple filepaths. Parameters ---------- event_info : dict Dictionary with start_time, end_time and filepaths keys. Returns ------- dict A dictionary with an xarray.Dataset for each measurement interval. Raises ------ ValueError If less than 5 timesteps are available for the specified day. Notes ----- - Data is loaded into memory and connections to source files are closed before returning the dataset. - Tolerance in input files is used around expected dataset start_time and end_time to account for imprecise logging times and ensuring correct definition of qc_time at files boundaries (e.g. 00:00). - Duplicated timesteps with different raw drop number values are dropped - First occurrence of duplicated timesteps with equal raw drop number values is kept. - Regularizes timesteps to handle trailing seconds. """ # ---------------------------------------------------------------------------------------. # Retrieve information start_time = np.array(event_info["start_time"], dtype="M8[s]") end_time = np.array(event_info["end_time"], dtype="M8[s]") filepaths = event_info["filepaths"] # Define expected dataset time coverage start_time_tol = start_time - np.array(TOLERANCE_SECONDS, dtype="m8[s]") end_time_tol = end_time + np.array(TOLERANCE_SECONDS, dtype="m8[s]") # ---------------------------------------------------------------------------------------. # Open files with data within the provided day and concatenate them # - If no timesteps available (for given time block, return empty dictionary) # - Can occur when raw files are already by block of months and e.g. here saving to daily blocks ! try: ds = open_netcdf_files( filepaths, start_time=start_time_tol, end_time=end_time_tol, chunks={}, parallel=False, compute=True, ) except ValueError as e: if str(e).startswith("No timesteps available"): log_warning( logger=logger, msg=f"No data between {start_time} and {end_time}.", ) return {} raise # If 1 or 2 timesteps per time block, return empty dictionary n_timesteps = len(ds["time"]) if n_timesteps < 3: log_info( logger=logger, msg=f"Only {n_timesteps} timesteps between {start_time} and {end_time}.", verbose=verbose, ) return {} # Create dictionary of L0C datasets for each measurement interval dict_ds = generate_l0c_datasets( ds=ds, measurement_intervals=measurement_intervals, ensure_variables_equality=ensure_variables_equality, logger=logger, verbose=verbose, ) # Subset to specified start_time and end_time dict_ds = { sample_interval: ds.sel({"time": slice(start_time, end_time)}) for sample_interval, ds in dict_ds.items() } return dict_ds
[docs] def generate_l0c_datasets(ds, measurement_intervals, ensure_variables_equality=True, logger=None, verbose=True): """Generate L0C datasets from L0B data separating timesteps with different measurement intervals. This function processes an L0B dataset by removing invalid and duplicated timesteps, splitting timesteps by measurement intervals, and finalizing each subset into L0C format. Parameters ---------- ds : xarray.Dataset Input L0B dataset to process. measurement_intervals : list or int List of expected measurement intervals (in seconds). ensure_variables_equality : bool, optional If True, drops duplicated timesteps where variables other than 'raw_drop_number' have different values. If False, keeps duplicated timesteps if 'raw_drop_number' values are equal, even if other variables differ. Default is True. logger : logging.Logger, optional Logger instance for logging warnings and information. Default is None. verbose : bool, optional If True, prints log messages to console in addition to logging. Default is True. Returns ------- dict Dictionary with measurements intervals (int, in seconds) as keys and processed xarray.Dataset objects as values. Each dataset contains data for a single measurement interval with regularized timesteps and quality flags. Notes ----- Processing steps: 1. Drops timesteps with invalid sample intervals (if 'sample_interval' variable exists) 2. Removes duplicated timesteps based on 'raw_drop_number' equality 3. Splits dataset by sampling intervals 4. Regularizes timesteps to handle trailing seconds 5. Adds quality control flags for time coordinate 6. Adds sample_interval coordinate and updates attributes Multiple sampling intervals may be present in a single input dataset, resulting in multiple output datasets. """ # Retrieve sensor name from dataset attributes if "sensor_name" not in ds.attrs: raise ValueError("Missing 'sensor_name' attribute in the dataset.") sensor_name = ds.attrs.get("sensor_name") # ---------------------------------------------------------------------------------------. # If sample interval is a dataset variable, drop timesteps with unexpected measurement intervals ! if "sample_interval" in ds: ds = drop_timesteps_with_invalid_sample_interval( ds=ds, measurement_intervals=measurement_intervals, verbose=verbose, logger=logger, ) n_timesteps = len(ds["time"]) if n_timesteps < 3: raise ValueError(f"Only {n_timesteps} timesteps left after removing those with unexpected sample interval.") # ---------------------------------------------------------------------------------------. # Remove duplicated timesteps (before correcting for trailing seconds) # - It checks that duplicated timesteps have the same raw_drop_number values # - If duplicated timesteps have different raw_drop_number values: # --> warning is raised # --> timesteps are dropped ds = remove_duplicated_timesteps( ds, ensure_variables_equality=ensure_variables_equality, logger=logger, verbose=verbose, ) # Raise error if less than 3 timesteps left n_timesteps = len(ds["time"]) if n_timesteps < 3: raise ValueError(f"{n_timesteps} timesteps left after removing duplicated.") # ---------------------------------------------------------------------------------------. # Split dataset by sampling intervals dict_ds = split_dataset_by_sampling_intervals( ds=ds, measurement_intervals=measurement_intervals, min_sample_interval=10, min_block_size=5, ) # Log a warning if two sampling intervals are present within a given time block if len(dict_ds) > 1: occuring_sampling_intervals = list(dict_ds) msg = f"The input files contains these sampling intervals: {occuring_sampling_intervals}." log_warning(logger=logger, msg=msg, verbose=verbose) # ---------------------------------------------------------------------------------------. # Finalize L0C datasets # - Add and ensure sample_interval coordinate has just 1 value (not varying with time) # - Regularize timesteps for trailing seconds dict_ds = { sample_interval: finalize_l0c_dataset( ds=ds, sample_interval=sample_interval, sensor_name=sensor_name, verbose=verbose, logger=logger, ) for sample_interval, ds in dict_ds.items() } return dict_ds
[docs] def generate_l0c(ds, measurement_interval, ensure_variables_equality=True, logger=None, verbose=True): """Generate a single L0C dataset for a specific measurement interval. This is a convenience wrapper around `generate_l0c_datasets` that returns only the dataset corresponding to the specified measurement interval. Parameters ---------- ds : xarray.Dataset Input L0B dataset to process. measurement_interval : int The expected measurement interval (in seconds) of the data. ensure_variables_equality : bool, optional If True, drops duplicated timesteps where variables other than 'raw_drop_number' have different values. If False, keeps duplicated timesteps if 'raw_drop_number' values are equal, even if other variables differ. Default is True. logger : logging.Logger, optional Logger instance for logging warnings and information. Default is None. verbose : bool, optional If True, prints log messages to console in addition to logging. Default is True. Returns ------- xarray.Dataset Processed L0C dataset for the specified measurement interval with regularized timesteps and quality flags. Notes ----- Processing steps: 1. Drops timesteps with invalid measurement interval (if 'sample_interval' variable exists) 2. Removes duplicated timesteps based on 'raw_drop_number' equality 4. Regularizes timesteps to handle trailing seconds 6. Adds sample_interval coordinate and updates attributes See Also -------- generate_l0c_datasets """ if not isinstance(measurement_interval, int): raise TypeError("measurement_interval must be an integer.") dict_ds = generate_l0c_datasets( ds, measurement_intervals=measurement_interval, ensure_variables_equality=ensure_variables_equality, logger=logger, verbose=verbose, ) return dict_ds[measurement_interval]