Source code for disdrodb.l0.l0c_processing

#!/usr/bin/env python3

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Functions to process DISDRODB L0B files into DISDRODB L0C netCDF files."""
import logging

import numpy as np
import pandas as pd
import xarray as xr

from disdrodb.api.io import open_netcdf_files
from disdrodb.l0.l0b_processing import set_l0b_encodings
from disdrodb.l1.resampling import add_sample_interval
from disdrodb.utils.attrs import set_disdrodb_attrs
from disdrodb.utils.logger import log_info, log_warning
from disdrodb.utils.time import ensure_sorted_by_time

logger = logging.getLogger(__name__)

# L0C processing requires searching for data (per time blocks) into neighbouring files:
# - to account for possible trailing seconds in previous/next files
# - to get information if at the edges of the time blocks previous/next timesteps are available
# - to shift the time to ensure reported L0C time is the start of the measurement interval
TOLERANCE_SECONDS = 60 * 3

####---------------------------------------------------------------------------------
#### Measurement intervals


[docs] def drop_timesteps_with_invalid_sample_interval(ds, measurement_intervals, verbose=True, logger=None): """Drop timesteps with unexpected sample intervals.""" sample_interval = ds["sample_interval"].to_numpy() timesteps = ds["time"].to_numpy() is_valid_sample_interval = np.isin(sample_interval.data, measurement_intervals) indices_invalid_sample_interval = np.where(~is_valid_sample_interval)[0] if len(indices_invalid_sample_interval) > 0: # Log information for each invalid timestep invalid_timesteps = pd.to_datetime(timesteps[indices_invalid_sample_interval]).strftime("%Y-%m-%d %H:%M:%S") invalid_sample_intervals = sample_interval[indices_invalid_sample_interval] for tt, ss in zip(invalid_timesteps, invalid_sample_intervals): msg = f"Unexpected sampling interval ({ss} s) at {tt}. The measurement has been dropped." log_warning(logger=logger, msg=msg, verbose=verbose) # Remove timesteps with invalid sample intervals indices_valid_sample_interval = np.where(is_valid_sample_interval)[0] ds = ds.isel(time=indices_valid_sample_interval) return ds
[docs] def split_dataset_by_sampling_intervals( ds, measurement_intervals, min_sample_interval=10, min_block_size=5, time_is_end_interval=True, ): """ Split a dataset into subsets where each subset has a consistent sampling interval. Notes ----- - Does not modify timesteps (regularization is left to `regularize_timesteps`). - Assumes no duplicated timesteps in the dataset. - If only one measurement interval is specified, no timestep-diff checks are performed. - If multiple measurement intervals are specified: * Raises an error if *none* of the expected intervals appear. * Splits where interval changes. - Segments shorter than `min_block_size` are discarded. Parameters ---------- ds : xarray.Dataset The input dataset with a 'time' dimension. measurement_intervals : list or array-like A list of possible primary sampling intervals (in seconds) that the dataset might have. min_sample_interval : int, optional The minimum expected sampling interval in seconds. Defaults to 10s. This is used to deal with possible trailing seconds errors. min_block_size : float, optional The minimum number of timesteps with a given sampling interval to be considered. Otherwise such portion of data is discarded ! Defaults to 5 timesteps. time_is_end_interval: bool Whether time refers to the end of the measurement interval. The default is True. Returns ------- dict[int, xr.Dataset] A dictionary where keys are the identified sampling intervals (in seconds), and values are xarray.Datasets containing only data from those sampling intervals. """ # Define array of possible measurement intervals measurement_intervals = np.array(measurement_intervals) # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) # If a single measurement interval expected, return dictionary with input dataset if len(measurement_intervals) == 1: dict_ds = {int(measurement_intervals[0]): ds} return dict_ds # If sample_interval is a dataset variable, use it to define dictionary of datasets if "sample_interval" in ds: return {int(interval): ds.isel(time=ds["sample_interval"] == interval) for interval in measurement_intervals} # ---------------------------------------------------------------------------------------. # Otherwise exploit difference between timesteps to identify change point # Calculate time differences in seconds deltadt = np.abs(np.diff(ds["time"].data)).astype("timedelta64[s]").astype(int) # Round each delta to the nearest multiple of 5 (because the smallest possible sample interval is 10 s) # - This account for possible trailing seconds of the logger # Example: for sample_interval = 10, deltat values like 8, 9, 11, 12 become 10 ... # Example: for sample_interval = 10, deltat values like 6, 7 or 13, 14 become respectively 5 and 15 ... # Example: for sample_interval = 30, deltat values like 28,29,30,31,32 deltat become 30 ... # Example: for sample_interval = 30, deltat values like 26, 27 or 33, 34 become respectively 25 and 35 ... min_half_sample_interval = min_sample_interval / 2 deltadt = np.round(deltadt / min_half_sample_interval) * min_half_sample_interval # Map each delta to one of the possible_measurement_intervals if exact match, otherwise np.nan mapped_intervals = np.where(np.isin(deltadt, measurement_intervals), deltadt, np.nan) if np.all(np.isnan(mapped_intervals)): raise ValueError("Impossible to identify timesteps with expected sampling intervals.") # Check which measurements intervals are occurring in the dataset uniques = np.unique(mapped_intervals) uniques_intervals = uniques[~np.isnan(uniques)] n_different_intervals_occurring = len(uniques_intervals) if n_different_intervals_occurring == 1: dict_ds = {int(k): ds for k in uniques_intervals} return dict_ds # Fill NaNs: decide whether to attach to previous or next interval for i in range(len(mapped_intervals)): if np.isnan(mapped_intervals[i]): # If next exists and is NaN → forward fill if i + 1 < len(mapped_intervals) and np.isnan(mapped_intervals[i + 1]): mapped_intervals[i] = mapped_intervals[i - 1] if i > 0 else mapped_intervals[i + 1] # Otherwise → backward fill (attach to next valid) else: mapped_intervals[i] = ( mapped_intervals[i + 1] if i + 1 < len(mapped_intervals) else mapped_intervals[i - 1] ) # Infill np.nan values by using neighbor intervals # Forward fill # for i in range(1, len(mapped_intervals)): # if np.isnan(mapped_intervals[i]): # mapped_intervals[i] = mapped_intervals[i - 1] # # Backward fill (in case the first entries were np.nan) # for i in range(len(mapped_intervals) - 2, -1, -1): # if np.isnan(mapped_intervals[i]): # mapped_intervals[i] = mapped_intervals[i + 1] # Now all intervals are assigned to one of the possible measurement_intervals. # Identify boundaries where interval changes change_points = np.where(mapped_intervals[:-1] != mapped_intervals[1:])[0] + 1 # Split ds into segments according to change_points offset = 1 if time_is_end_interval else 0 segments = np.split(np.arange(ds.sizes["time"]), change_points + offset) # Remove segments with less than min_block_size elements segments = [seg for seg in segments if len(seg) >= min_block_size] if len(segments) == 0: raise ValueError( f"No blocks of {min_block_size} consecutive timesteps with constant sampling interval are available.", ) # Define dataset indices for each sampling interva dict_sampling_interval_indices = {} used_indices = set() for seg in segments: # Define the assumed sampling interval of such segment start_idx = seg[0] segment_sampling_interval = int(mapped_intervals[start_idx]) # Remove any indices that have already been assigned to another interval seg_filtered = seg[~np.isin(seg, list(used_indices))] # Only keep segment if it still meets minimum size after filtering if len(seg_filtered) >= min_block_size: if segment_sampling_interval not in dict_sampling_interval_indices: dict_sampling_interval_indices[segment_sampling_interval] = [seg_filtered] else: dict_sampling_interval_indices[segment_sampling_interval].append(seg_filtered) # Mark these indices as used used_indices.update(seg_filtered) # Concatenate indices for each sampling interval dict_sampling_interval_indices = { k: np.concatenate(list_indices) for k, list_indices in dict_sampling_interval_indices.items() if list_indices # Only include if there are valid segments } # Define dictionary of datasets dict_ds = {int(k): ds.isel(time=indices) for k, indices in dict_sampling_interval_indices.items()} return dict_ds
####--------------------------------------------------------------------------------- #### Timesteps duplicates
[docs] def has_same_value_over_time(da): """ Check if a DataArray has the same value over all timesteps, considering NaNs as equal. Parameters ---------- da : xarray.DataArray The DataArray to check. Must have a 'time' dimension. Returns ------- bool True if the values are the same (or NaN in the same positions) across all timesteps, False otherwise. """ # Select the first timestep da_first = da.isel(time=0) # Create a boolean array that identifies where values are equal or both NaN equal_or_nan = (da == da_first) | (da.isnull() & da_first.isnull()) # noqa: PD003 # Check if all values match this condition across all dimensions return bool(equal_or_nan.all().item())
[docs] def remove_duplicated_timesteps(ds, ensure_variables_equality=True, logger=None, verbose=True): """Removes duplicated timesteps from a xarray dataset.""" # Check for duplicated timesteps timesteps, counts = np.unique(ds["time"].data, return_counts=True) duplicated_timesteps = timesteps[counts > 1] # If no duplicated timesteps, returns dataset as is if len(duplicated_timesteps) == 0: return ds # If there are duplicated timesteps # - First check for variables equality # - Keep first occurrence of duplicated timesteps if values are equals # - Drop duplicated timesteps where values are different different_duplicated_timesteps = [] equal_duplicated_timesteps = [] for t in duplicated_timesteps: # Select dataset at given duplicated timestep ds_duplicated = ds.sel(time=t) n_t = len(ds_duplicated["time"]) # Check raw_drop_number equality if not has_same_value_over_time(ds_duplicated["raw_drop_number"]): different_duplicated_timesteps.append(t) msg = ( f"Presence of {n_t} duplicated timesteps at {t}." "They have different 'raw_drop_number' values. These timesteps are dropped." ) log_warning(logger=logger, msg=msg, verbose=verbose) # Check other variables equality other_variables_to_check = [v for v in ds.data_vars if v != "raw_drop_number"] variables_with_different_values = [ var for var in other_variables_to_check if not has_same_value_over_time(ds_duplicated[var]) ] if len(variables_with_different_values) > 0: msg = ( f"Presence of {n_t} duplicated timesteps at {t}." f"The duplicated timesteps have different values in variables {variables_with_different_values}. " ) if ensure_variables_equality: different_duplicated_timesteps.append(t) msg = msg + "These timesteps are dropped." else: equal_duplicated_timesteps.append(t) msg = msg + ( "These timesteps are not dropped because 'raw_drop_number' values are equals." "'ensure_variables_equality' is False." ) log_warning(logger=logger, msg=msg, verbose=verbose) else: equal_duplicated_timesteps.append(t) # Ensure single occurrence of duplicated timesteps equal_duplicated_timesteps = np.unique(equal_duplicated_timesteps) different_duplicated_timesteps = np.unique(different_duplicated_timesteps) # - Keep first occurrence of equal_duplicated_timesteps if len(equal_duplicated_timesteps) > 0: indices_to_drop = [np.where(ds["time"] == t)[0][1:] for t in equal_duplicated_timesteps] indices_to_drop = np.concatenate(indices_to_drop) # Keep only indices not in indices_to_drop mask = ~np.isin(np.arange(ds["time"].size), indices_to_drop) ds = ds.isel(time=np.where(mask)[0]) # - Drop different_duplicated_timesteps if len(different_duplicated_timesteps) > 0: mask = np.isin(ds["time"], different_duplicated_timesteps, invert=True) ds = ds.isel(time=np.where(mask)[0]) return ds
####--------------------------------------------------------------------------------- #### Timesteps regularization
[docs] def get_problematic_timestep_indices(timesteps, sample_interval): """Identify timesteps with missing previous or following timesteps.""" previous_time = timesteps - pd.Timedelta(seconds=sample_interval) next_time = timesteps + pd.Timedelta(seconds=sample_interval) idx_previous_missing = np.where(~np.isin(previous_time, timesteps))[0][1:] idx_next_missing = np.where(~np.isin(next_time, timesteps))[0][:-1] idx_isolated_missing = np.intersect1d(idx_previous_missing, idx_next_missing) idx_previous_missing = idx_previous_missing[np.isin(idx_previous_missing, idx_isolated_missing, invert=True)] idx_next_missing = idx_next_missing[np.isin(idx_next_missing, idx_isolated_missing, invert=True)] return idx_previous_missing, idx_next_missing, idx_isolated_missing
[docs] def regularize_timesteps(ds, sample_interval, robust=False, add_quality_flag=True, logger=None, verbose=True): """Ensure timesteps match with the sample_interval. This function: - drop dataset indices with duplicated timesteps, - but does not add missing timesteps to the dataset. """ # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) # Convert time to pandas.DatetimeIndex for easier manipulation times = pd.to_datetime(ds["time"].to_numpy()) # Determine the start and end times start_time = times[0].floor(f"{sample_interval}s") end_time = times[-1].ceil(f"{sample_interval}s") # Create the expected time grid expected_times = pd.date_range(start=start_time, end=end_time, freq=f"{sample_interval}s") # Convert to numpy arrays times = times.to_numpy(dtype="M8[s]") expected_times = expected_times.to_numpy(dtype="M8[s]") # Map original times to the nearest expected times # Calculate the difference between original times and expected times time_deltas = np.abs(times - expected_times[:, None]).astype(int) # Find the index of the closest expected time for each original time nearest_indices = np.argmin(time_deltas, axis=0) adjusted_times = expected_times[nearest_indices] # Check for duplicates in adjusted times unique_times, counts = np.unique(adjusted_times, return_counts=True) duplicates = unique_times[counts > 1] # Initialize time quality flag # - 0 when ok or just rounded to closest 00 # - 1 if previous timestep is missing # - 2 if next timestep is missing # - 3 if previous and next timestep is missing # - 4 if solved duplicated timesteps # - 5 if needed to drop duplicated timesteps and select the last flag_previous_missing = 1 flag_next_missing = 2 flag_isolated_timestep = 3 flag_solved_duplicated_timestep = 4 flag_dropped_duplicated_timestep = 5 qc_flag = np.zeros(adjusted_times.shape) # Initialize list with the duplicated timesteps index to drop # - We drop the first occurrence because is likely the shortest interval idx_to_drop = [] # Attempt to resolve for duplicates if duplicates.size > 0: # Handle duplicates for dup_time in duplicates: # Indices of duplicates dup_indices = np.where(adjusted_times == dup_time)[0] n_duplicates = len(dup_indices) # Define previous and following timestep prev_time = dup_time - pd.Timedelta(seconds=sample_interval) next_time = dup_time + pd.Timedelta(seconds=sample_interval) # Try to find missing slots before and after # - If more than 3 duplicates, impossible to solve ! count_solved = 0 # If the previous timestep is available, set that one if n_duplicates == 2: if prev_time not in adjusted_times: adjusted_times[dup_indices[0]] = prev_time qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep count_solved += 1 elif next_time not in adjusted_times: adjusted_times[dup_indices[-1]] = next_time qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep count_solved += 1 else: pass elif n_duplicates == 3: if prev_time not in adjusted_times: adjusted_times[dup_indices[0]] = prev_time qc_flag[dup_indices[0]] = flag_solved_duplicated_timestep count_solved += 1 if next_time not in adjusted_times: adjusted_times[dup_indices[-1]] = next_time qc_flag[dup_indices[-1]] = flag_solved_duplicated_timestep count_solved += 1 if count_solved != n_duplicates - 1: idx_to_drop = np.append(idx_to_drop, dup_indices[0:-1]) qc_flag[dup_indices[-1]] = flag_dropped_duplicated_timestep msg = ( f"Cannot resolve {n_duplicates} duplicated timesteps " f"(after trailing seconds correction) around {dup_time}." ) log_warning(logger=logger, msg=msg, verbose=verbose) if robust: raise ValueError(msg) # Update the time coordinate (Convert to ns for xarray compatibility) ds = ds.assign_coords({"time": adjusted_times.astype("datetime64[ns]")}) # Update quality flag values for next and previous timestep is missing if add_quality_flag: idx_previous_missing, idx_next_missing, idx_isolated_missing = get_problematic_timestep_indices( adjusted_times, sample_interval, ) qc_flag[idx_previous_missing] = np.maximum(qc_flag[idx_previous_missing], flag_previous_missing) qc_flag[idx_next_missing] = np.maximum(qc_flag[idx_next_missing], flag_next_missing) qc_flag[idx_isolated_missing] = np.maximum(qc_flag[idx_isolated_missing], flag_isolated_timestep) # If the first timestep is at 00:00 and currently flagged as previous missing (1), reset to 0 # first_time = pd.to_datetime(adjusted_times[0]).time() # first_expected_time = pd.Timestamp("00:00:00").time() # if first_time == first_expected_time and qc_flag[0] == flag_previous_missing: # qc_flag[0] = 0 # # If the last timestep is flagged and currently flagged as next missing (2), reset it to 0 # last_time = pd.to_datetime(adjusted_times[-1]).time() # last_time_expected = (pd.Timestamp("00:00:00") - pd.Timedelta(30, unit="seconds")).time() # # Check if adding one interval would go beyond the end_time # if last_time == last_time_expected and qc_flag[-1] == flag_next_missing: # qc_flag[-1] = 0 # Assign time quality flag coordinate ds["time_qc"] = xr.DataArray(qc_flag, dims="time") ds = ds.set_coords("time_qc") # Add CF attributes for time_qc ds["time_qc"].attrs = { "long_name": "time quality flag", "standard_name": "status_flag", "units": "1", "valid_range": [0, 5], "flag_values": [0, 1, 2, 3, 4, 5], "flag_meanings": ( "good_data " "previous_timestep_missing " "next_timestep_missing " "isolated_timestep " "solved_duplicated_timestep " "dropped_duplicated_timestep" ), "comment": ( "Quality flag for time coordinate. " "Flag 0: data is good or just rounded to nearest sampling interval. " "Flag 1: previous timestep is missing in the time series. " "Flag 2: next timestep is missing in the time series. " "Flag 3: both previous and next timesteps are missing (isolated timestep). " "Flag 4: timestep was moved from duplicate to fill missing timestep. " "Flag 5: duplicate timestep was dropped, keeping the last occurrence." ), } # Drop duplicated timesteps # - Using ds = ds.drop_isel({"time": idx_to_drop.astype(int)}) raise: # --> pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects # --> https://github.com/pydata/xarray/issues/6605 if len(idx_to_drop) > 0: idx_to_drop = idx_to_drop.astype(int) idx_valid_timesteps = np.arange(0, ds["time"].size) idx_valid_timesteps = np.delete(idx_valid_timesteps, idx_to_drop) ds = ds.isel(time=idx_valid_timesteps) # Return dataset return ds
[docs] def check_timesteps_regularity(ds, sample_interval, verbose=False, logger=None): """Check for the regularity of timesteps.""" # Check sorted by time and sort if necessary ds = ensure_sorted_by_time(ds) # Calculate number of timesteps n = len(ds["time"].data) # Calculate time differences in seconds deltadt = np.diff(ds["time"].data).astype("timedelta64[s]").astype(int) # Identify unique time intervals and their occurrences unique_deltadt, counts = np.unique(deltadt, return_counts=True) # Determine the most frequent time interval (mode) most_frequent_deltadt_idx = np.argmax(counts) most_frequent_deltadt = unique_deltadt[most_frequent_deltadt_idx] # Count fraction occurrence of deltadt fractions = np.round(counts / len(deltadt) * 100, 2) # Compute stats about expected deltadt mask = unique_deltadt == sample_interval sample_interval_counts = counts[mask].item() if mask.any() else 0 sample_interval_fraction = fractions[mask].item() if mask.any() else 0.0 # Compute stats about most frequent deltadt mask = unique_deltadt == most_frequent_deltadt most_frequent_deltadt_counts = counts[mask].item() if mask.any() else 0 most_frequent_deltadt_fraction = fractions[mask].item() if mask.any() else 0.0 # Compute stats about unexpected deltadt unexpected_intervals = unique_deltadt[unique_deltadt != sample_interval] unexpected_intervals_counts = counts[unique_deltadt != sample_interval] unexpected_intervals_fractions = fractions[unique_deltadt != sample_interval] frequent_unexpected_intervals = unexpected_intervals[unexpected_intervals_fractions > 5] # Report warning if the sampling_interval deltadt occurs less often than 60 % of times # -> TODO: maybe only report in stations where the disdro does not log only data when rainy if sample_interval_fraction < 60: msg = ( f"The expected (sampling) interval between observations occurs only " f"{sample_interval_counts}/{n} times ({sample_interval_fraction} %)." ) log_warning(logger=logger, msg=msg, verbose=verbose) # Report warning if a deltadt occurs more often then the sampling interval if most_frequent_deltadt != sample_interval: msg = ( f"The most frequent time interval between observations is {most_frequent_deltadt} s " f"(occurs {most_frequent_deltadt_counts}/{n} times) ({most_frequent_deltadt_fraction}%) " f"although the expected (sampling) interval is {sample_interval} s " f"and occurs {sample_interval_counts}/{n} times ({sample_interval_fraction}%)." ) log_warning(logger=logger, msg=msg, verbose=verbose) # Report with a warning all unexpected deltadt with frequency larger than 5 % if len(frequent_unexpected_intervals) > 0: msg = "The following time intervals between observations occur frequently: " for interval in frequent_unexpected_intervals: c = unexpected_intervals_counts[unexpected_intervals == interval].item() f = unexpected_intervals_fractions[unexpected_intervals == interval].item() msg = msg + f"{interval} s ({f}%) ({c}/{n})" log_warning(logger=logger, msg=msg, verbose=verbose) return ds
####----------------------------------------------------------------------------------------------. #### Wrapper def _finalize_l0c_dataset(ds, sample_interval, sensor_name, verbose=True, logger=None): """Finalize a L0C dataset with unique sampling interval. It adds the sampling_interval coordinate and it regularizes the timesteps for trailing seconds. """ # Add sample interval as coordinate ds = add_sample_interval(ds, sample_interval=sample_interval) # Regularize timesteps (for trailing seconds) ds = regularize_timesteps( ds, sample_interval=sample_interval, robust=False, # if True, raise error if an error occur during regularization add_quality_flag=True, verbose=verbose, logger=logger, ) # Performs checks about timesteps regularity # - Do not discard anything # - Just log warnings in the log file ds = check_timesteps_regularity(ds=ds, sample_interval=sample_interval, verbose=verbose, logger=logger) # Shift timesteps to ensure time correspond to start of measurement interval # TODO as function of sensor name # Set netCDF dimension order # --> Required for correct encoding ! ds = ds.transpose("time", "diameter_bin_center", ...) # Set encodings ds = set_l0b_encodings(ds=ds, sensor_name=sensor_name) # Update global attributes ds = set_disdrodb_attrs(ds, product="L0C") return ds
[docs] def create_l0c_datasets( event_info, measurement_intervals, sensor_name, ensure_variables_equality=True, logger=None, verbose=True, ): """ Create a single dataset by merging and processing data from multiple filepaths. Parameters ---------- event_info : dict Dictionary with start_time, end_time and filepaths keys. Returns ------- dict A dictionary with an xarray.Dataset for each measurement interval. Raises ------ ValueError If less than 5 timesteps are available for the specified day. Notes ----- - Data is loaded into memory and connections to source files are closed before returning the dataset. - Tolerance in input files is used around expected dataset start_time and end_time to account for imprecise logging times and ensuring correct definition of qc_time at files boundaries (e.g. 00:00). - Duplicated timesteps with different raw drop number values are dropped - First occurrence of duplicated timesteps with equal raw drop number values is kept. - Regularizes timesteps to handle trailing seconds. """ # ---------------------------------------------------------------------------------------. # Retrieve information start_time = np.array(event_info["start_time"], dtype="M8[s]") end_time = np.array(event_info["end_time"], dtype="M8[s]") filepaths = event_info["filepaths"] # Define expected dataset time coverage start_time_tol = start_time - np.array(TOLERANCE_SECONDS, dtype="m8[s]") end_time_tol = end_time + np.array(TOLERANCE_SECONDS, dtype="m8[s]") # ---------------------------------------------------------------------------------------. # Open files with data within the provided day and concatenate them ds = open_netcdf_files( filepaths, start_time=start_time_tol, end_time=end_time_tol, chunks={}, parallel=False, compute=True, ) # If not data for that time block, return empty dictionary # - Can occur when raw files are already by block of months and e.g. here saving to daily blocks ! if ds.sizes["time"] == 0: log_info(logger=logger, msg=f"No data between {start_time} and {end_time}.", verbose=verbose) return {} # ---------------------------------------------------------------------------------------. # If sample interval is a dataset variable, drop timesteps with unexpected measurement intervals ! if "sample_interval" in ds: ds = drop_timesteps_with_invalid_sample_interval( ds=ds, measurement_intervals=measurement_intervals, verbose=verbose, logger=logger, ) n_timesteps = len(ds["time"]) if n_timesteps < 3: raise ValueError(f"Only {n_timesteps} timesteps left after removing those with unexpected sample interval.") # ---------------------------------------------------------------------------------------. # Remove duplicated timesteps (before correcting for trailing seconds) # - It checks that duplicated timesteps have the same raw_drop_number values # - If duplicated timesteps have different raw_drop_number values: # --> warning is raised # --> timesteps are dropped ds = remove_duplicated_timesteps( ds, ensure_variables_equality=ensure_variables_equality, logger=logger, verbose=verbose, ) # Raise error if less than 3 timesteps left n_timesteps = len(ds["time"]) if n_timesteps < 3: raise ValueError(f"{n_timesteps} timesteps left after removing duplicated.") # ---------------------------------------------------------------------------------------. # Split dataset by sampling intervals dict_ds = split_dataset_by_sampling_intervals( ds=ds, measurement_intervals=measurement_intervals, min_sample_interval=10, min_block_size=5, ) # Log a warning if two sampling intervals are present within a given time block if len(dict_ds) > 1: occuring_sampling_intervals = list(dict_ds) msg = f"The input files contains these sampling intervals: {occuring_sampling_intervals}." log_warning(logger=logger, msg=msg, verbose=verbose) # ---------------------------------------------------------------------------------------. # Finalize L0C datasets # - Add and ensure sample_interval coordinate has just 1 value (not varying with time) # - Regularize timesteps for trailing seconds dict_ds = { sample_interval: _finalize_l0c_dataset( ds=ds, sample_interval=sample_interval, sensor_name=sensor_name, verbose=verbose, logger=logger, ).sel({"time": slice(start_time, end_time)}) for sample_interval, ds in dict_ds.items() } return dict_ds