disdrodb.utils package#
Submodules#
disdrodb.utils.archiving module#
Utility function for DISDRODB product archiving.
- disdrodb.utils.archiving.check_freq(freq: str) None[source][source]#
Check validity of freq argument.
- disdrodb.utils.archiving.define_temporal_partitions(filepaths, strategy, parallel, strategy_options)[source][source]#
Define temporal file processing partitions.
- Parameters:
filepaths (list) – List of files paths to be processed
strategy (str) –
Partitioning strategy to apply.
Supported values are:
'time_block'defines fixed time intervals (e.g. monthly) covering input files.'event'detect clusters of precipitation (“events”).
parallel (bool) – If True, parallel data loading is used to identify events.
strategy_options (dict) –
Dictionary with strategy-specific parameters:
If
strategy == 'time_block', supported options are:freq: Time unit for blocks. One of {‘year’, ‘season’, ‘month’, ‘day’}.
See the
identify_time_partitionsfunction for more information.If
strategy == 'event', supported options are:variablestrName of the variable to use to apply the event detection.
detection_thresholdintMinimum number of drops to consider a timestep.
neighbor_min_sizeintMinimum cluster size for merging neighboring events.
neighbor_time_intervalstrTime window (e.g. “5MIN”) to merge adjacent clusters.
event_max_time_gapstrMaximum allowed gap (e.g. “6H”) within a single event.
event_min_durationstrMinimum total duration (e.g. “5MIN”) of an event.
event_min_sizeintMinimum number of records in an event.
See the
identify_eventsfunction for more information.
- Returns:
A list of dictionaries, each containing:
start_time: numpy.datetime64[s]Inclusive start of an event or time block.
end_time: numpy.datetime64[s]Inclusive end of an event or time block.
- Return type:
Notes
The
'event'strategy requires loading data into memory to identify clusters.The
'time_block'strategy can operate on metadata alone, without full data loading.The
'event'strategy implicitly performs data selection on which files to process !The
'time_block'strategy does not performs data selection on which files to process !
- disdrodb.utils.archiving.generate_time_blocks(start_time: datetime64, end_time: datetime64, freq: str, inclusive_end_time: bool = True) ndarray[source][source]#
Generate time blocks between start_time and end_time for a given frequency.
- Parameters:
start_time (numpy.datetime64) – Inclusive start of the overall time range.
end_time (numpy.datetime64) – End of the overall time range. Inclusive by default (see inclusive_end_time argument).
freq (str) –
Frequency specifier. Accepted values are:
’none’ : return a single block [start_time, end_time]
’day’ : split into daily blocks
’month’ : split into calendar months
’quarter’ : split into calendar quarters
’year’ : split into calendar years
’season’ : split into meteorological seasons (MAM, JJA, SON, DJF)
inclusive_end_time (bool) – The default is True. If False, if the last block end_time is equal to input end_time, such block is removed.
- Returns:
Array of shape (n, 2) with dtype datetime64[s], where each row is [block_start, block_end].
- Return type:
- disdrodb.utils.archiving.group_files_by_temporal_partitions(temporal_partitions, filepaths, block_starts_offset=0, block_ends_offset=0)[source][source]#
Provide information about the required files for each event.
For each time block in temporal_partitions, the function identifies the filepaths that overlap such time period. The time blocks of temporal_partitions can be adjusted using block_starts_offset and block_ends_offset e.g. for resampling applications.
- Parameters:
temporal_partitions (list of dict) – List of time blocks, where each time blocks is a dictionary containing at least ‘start_time’ and ‘end_time’ keys with numpy.datetime64 values.
filepaths (list of str) – List of file paths corresponding to data files.
block_starts_offset (int) – Optional offset (in seconds) to add to time blocks starts. Provide negative offset to go back in time.
block_ends_offset (int) – Optional offset (in seconds) to add to time blocks ends. Provide negative offset to go back in time.
- Returns:
A list where each element is a dictionary containing:
’start_time’: Adjusted start time of the event (datetime.datetime64).
’end_time’: Adjusted end time of the event (datetime.datetime64).
’filepaths’: List of file paths overlapping with the adjusted event period.
- Return type:
- disdrodb.utils.archiving.group_files_by_time_block(filepaths, freq='day', tolerance_seconds=120)[source][source]#
Organize files by time blocks based on their start and end times.
If tolerance_seconds is specified, it adds some tolerance to files start and end_time. This means that files starting/ending next to the time blocks boundaries will be included in both time blocks. This can be useful to deal with imprecise time within files.
- Parameters:
- Returns:
A list where each element is a dictionary containing:
’start_time’: Adjusted start time of the event (datetime.datetime64).
’end_time’: Adjusted end time of the event (datetime.datetime64).
’filepaths’: List of file paths overlapping with the adjusted event period.
- Return type:
Notes
In the DISDRODB L0C processing chain, a tolerance of 120 seconds is used to account for the possible imprecise/drifting time logged by the sensors before it is corrected.
- disdrodb.utils.archiving.identify_events(filepaths, parallel=False, variable='N', detection_threshold=5, neighbor_min_size=2, neighbor_time_interval='5MIN', event_max_time_gap='6H', event_min_duration='5MIN', event_min_size=3)[source][source]#
Return a list of precipitating events.
Precipitating events are defined when ‘variable’ > detection_threshold. Any isolated timesteps with precipitation (based on neighborhood criteria) is removed. Then, consecutive rainy timesteps are grouped into the same event if the time gap between them does not exceed event_max_time_gap. Finally, events that do not meet minimum size or duration requirements are filtered out.
- Parameters:
filepaths (list) – List of L1C file paths.
variable (str) – Name of the variable to use to apply the event detection. The default is “N”.
detection_threshold (int) – Minimum value of ‘variable’ to consider an event timestep.
parallel (bool) – Whether to load the files in parallel. Set parallel=True only in a multiprocessing environment. The default is False.
neighbor_time_interval (str) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors. The neighbor_time_interval must be at least equal to the dataset sampling interval (temporal resolution) ! That is for 1-minute data,
neighbor_time_intervalshould be at least1MIN, for 5-minute data it should be at least5MIN, etc.neighbor_min_size (int, optional) –
The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed !
If
neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.If
neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval.If
neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval.
Defaults to 1.
event_max_time_gap (str) – The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !
event_min_duration (str) – The minimum duration an event must span. Events shorter than this duration are discarded.
event_min_size (int, optional) – The minimum number of valid timesteps required for an event. Defaults to 1.
- Returns:
A list of events, where each event is represented as a dictionary with keys:
”start_time”: np.datetime64, start time of the event
”end_time”: np.datetime64, end time of the event
”duration”: np.timedelta64, duration of the event
”n_timesteps”: int, number of valid timesteps in the event
- Return type:
- disdrodb.utils.archiving.identify_time_partitions(start_times, end_times, freq: str) list[dict][source][source]#
Identify the set of time blocks covered by files.
The result is a minimal, sorted, and unique set of time partitions. ‘start_times’ and end_times can be derived using get_start_end_time_from_filepaths.
- Parameters:
start_times (numpy.ndarray) – Array of inclusive start times in datetime64[s] format for each file.
end_times (numpy.ndarray) – Array of inclusive end times in datetime64[s] format for each file.
freq (str) – Frequency determining the granularity of candidate blocks. Allowed values are {‘none’, ‘hour’, ‘day’, ‘month’, ‘quarter’, ‘season’, ‘year’}. See generate_time_blocks for more details.
- Returns:
A list of dictionaries, each containing:
- start_time (numpy.datetime64[s])
Inclusive start of a time block.
- end_time (numpy.datetime64[s])
Inclusive end of a time block.
Only those blocks that overlap at least one file’s interval are returned. The list is sorted by start_time and contains no duplicate blocks.
- Return type:
disdrodb.utils.attrs module#
DISDRODB netCDF4 attributes utilities.
- disdrodb.utils.attrs.get_attrs_dict()[source][source]#
Get attributes dictionary for DISDRODB product variables and coordinates.
- disdrodb.utils.attrs.set_attrs(ds, attrs_dict)[source][source]#
Set attributes to the variables and coordinates of the xr.Dataset.
- disdrodb.utils.attrs.set_disdrodb_attrs(ds, product: str)[source][source]#
Add DISDRODB processing information to the netCDF global attributes.
It assumes stations metadata are already added the dataset.
- Parameters:
ds (xarray.Dataset) – Dataset
product (str) – DISDRODB product.
- Returns:
Dataset.
- Return type:
- disdrodb.utils.attrs.update_disdrodb_attrs(ds, product: str)[source][source]#
Add DISDRODB processing information to the netCDF global attributes.
It assumes stations metadata are already added the dataset.
- Parameters:
ds (xarray.Dataset) – Dataset
product (str) – DISDRODB product.
- Returns:
Dataset.
- Return type:
disdrodb.utils.cli module#
DISDRODB command-line-interface scripts utilities.
- disdrodb.utils.cli.click_data_archive_dir_option(function: object)[source][source]#
Click command line argument for DISDRODB
data_archive_dir.- Parameters:
function (object) – Function.
- disdrodb.utils.cli.click_l0_archive_options(function: object)[source][source]#
Click command line arguments for L0 processing archiving of a station.
- Parameters:
function (object) – Function.
- disdrodb.utils.cli.click_metadata_archive_dir_option(function: object)[source][source]#
Click command line argument for DISDRODB
metadata_archive_dir.- Parameters:
function (object) – Function.
- disdrodb.utils.cli.click_processing_options(function: object)[source][source]#
Click command line default parameters for L0 processing options.
- Parameters:
function (object) – Function.
- disdrodb.utils.cli.click_remove_l0a_option(function: object)[source][source]#
Click command line argument for
remove_l0a.
- disdrodb.utils.cli.click_remove_l0b_option(function: object)[source][source]#
Click command line argument for
remove_l0b.
- disdrodb.utils.cli.click_station_arguments(function: object)[source][source]#
Click command line arguments for DISDRODB station processing.
- Parameters:
function (object) – Function.
- disdrodb.utils.cli.click_stations_options(function: object)[source][source]#
Click command line options for DISDRODB archive L0 processing.
- Parameters:
function (object) – Function.
- disdrodb.utils.cli.execute_cmd(cmd, raise_error=False)[source][source]#
Execute command in the terminal, streaming output in python console.
- disdrodb.utils.cli.parse_archive_dir(archive_dir: str)[source][source]#
Utility to parse archive directories provided by command line.
If
archive_dir = 'None'returnsNone. Ifarchive_dir = ''returnsNone.
- disdrodb.utils.cli.parse_arg_to_list(args)[source][source]#
Utility to pass list to command line scripts.
If
args = ''returnsNone. Ifargs = 'None'returnsNone. Ifargs = 'variable'returns[variable]. Ifargs = 'variable1 variable2'returns[variable1, variable2].
- disdrodb.utils.cli.parse_empty_string_and_none(args)[source][source]#
Utility to parse argument passed from the command line.
If
args = '', returns None. Ifargs = 'None'returns None. Otherwise returnargs.
- disdrodb.utils.cli.subprocess_run(argv: Sequence[str], *, check: bool = True, capture_output: bool = False, text: bool = True, cwd: str | None = None, **kwargs) CompletedProcess[source][source]#
Run a command ensuring the current kernel’s env ‘bin/Scripts’ is on PATH.
argv: like [“disdrodb_run_l0”, “–help”]
This wrapper ensures subprocess can find and run console scripts from the current Jupyter kernel’s conda/venv by adding that environment’s bin/ (or Scripts/ on Windows) to PATH when the notebook starts with a system-only PATH.
disdrodb.utils.compression module#
DISDRODB raw data compression utility.
- disdrodb.utils.compression.archive_station_data(metadata_filepath: str, data_archive_dir: str) str[source][source]#
Archive station data into a zip file for subsequent data upload.
It create a zip file into a temporary directory !
- Parameters:
metadata_filepath (str) – Metadata file path.
- disdrodb.utils.compression.check_consistent_station_name(metadata_filepath, station_name)[source][source]#
Check consistent station_name between YAML file name and metadata key.
- disdrodb.utils.compression.compress_file(filepath: str, method: str, skip: bool) str[source][source]#
Compress a file and delete the original.
If the file is already compressed, it is not compressed again.
- Parameters:
- Returns:
Path of the compressed file. Same as input if no compression.
- Return type:
- disdrodb.utils.compression.compress_file_bzip2(filepath: str, compressed_filepath: str) None[source][source]#
Compress a single file into a bzip2 archive.
- disdrodb.utils.compression.compress_file_gzip(filepath: str, compressed_filepath: str) None[source][source]#
Compress a single file into a gzip archive.
- disdrodb.utils.compression.compress_file_zip(filepath: str, compressed_filepath: str) None[source][source]#
Compress a single file into a zip archive.
- disdrodb.utils.compression.compress_station_files(data_archive_dir: str, data_source: str, campaign_name: str, station_name: str, method: str = 'gzip', skip: bool = True) None[source][source]#
Compress each raw file of a station.
- Parameters:
data_archive_dir (str) – DISDRODB Data Archive directory
data_source (str) – Name of data source of interest.
campaign_name (str) – Name of the campaign of interest.
station_name (str) – Station name of interest.
method (str) – Compression method.
"zip","gzip"or"bzip2".skip (bool) – Whether to raise an error if a file is already compressed. If
True, it does not raise an error and try to compress the other files. IfFalse, it raise an error and stop the compression routine. The default value isTrue.
disdrodb.utils.coords module#
DISDRODB coordinates utilities.
disdrodb.utils.dask module#
Utilities for Dask Distributed Computations.
- disdrodb.utils.dask.check_parallel_validity(parallel)[source][source]#
Check validity of parallel option given Dask settings.
- disdrodb.utils.dask.execute_tasks_safely(list_tasks, parallel: bool, logs_dir: str, max_tasks_per_batch=5000)[source][source]#
Execute Dask tasks and skip failed ones.
- Parameters:
list_tasks (list) – List of dask delayed objects or results.
parallel (bool) – Whether to execute in parallel with Dask or not.
logs_dir (str) – Directory to store FAILED_TASKS.log.
max_tasks_per_batch (int or None, optional) – Maximum number of tasks to submit to client.compute() at once. The default is 5000. Dask struggle if more than 10_000 tasks are submitted.
- Returns:
list_logs – List of task results. For failed tasks, adds the path to FAILED_TASKS.log in place of the result.
- Return type:
disdrodb.utils.dataframe module#
Dataframe utilities.
- disdrodb.utils.dataframe.compute_1d_histogram(df, column, variables=None, bins=10, labels=None, prefix_name=True, include_quantiles=False)[source][source]#
Compute conditional univariate statistics.
- Parameters:
df (pandas.DataFrame) – Input dataframe
column (str) – Column name to be binned.
variables (str or list, optional) – Column names for which conditional statistics will be computed. If None, only counts are computed.
bins (int or array-like) – Number of bins or bin edges.
labels (array-like, optional) – Labels for the column bins. If None, uses bin centers.
- Return type:
- disdrodb.utils.dataframe.compute_2d_histogram(df, x, y, variables=None, x_bins=10, y_bins=10, x_labels=None, y_labels=None, prefix_name=True, include_quantiles=False)[source][source]#
Compute conditional bivariate statistics.
- Parameters:
df (pandas.DataFrame) – Input dataframe
x (str) – Column name for x-axis binning (will be rounded to integers)
y (str) – Column name for y-axis binning
variables (str or list, optional) – Column names for which statistics will be computed. If None, only counts are computed.
x_bins (int or array-like) – Number of bins or bin edges for x
y_bins (int or array-like) – Number of bins or bin edges for y
x_labels (array-like, optional) – Labels for x bins. If None, uses bin centers
y_labels (array-like, optional) – Labels for y bins. If None, uses bin centers
- Returns:
Dataset with dimensions corresponding to binned variables and data variables for each statistic
- Return type:
- disdrodb.utils.dataframe.log_arange(start, stop, log_step=0.1, base=10)[source][source]#
Return numbers spaced evenly on a log scale (similar to np.arange but in log space).
- Parameters:
- Returns:
Array of values spaced in log scale.
- Return type:
disdrodb.utils.decorators module#
DISDRODB decorators.
- disdrodb.utils.decorators.check_pytmatrix_availability(func)[source][source]#
Decorator to ensure that the ‘pytmatrix’ package is installed.
- disdrodb.utils.decorators.check_software_availability(software, conda_package)[source][source]#
A decorator to ensure that a software package is installed.
- disdrodb.utils.decorators.create_dask_task_name(function_name: str, name=None) str | None[source][source]#
Create a custom dask task name.
- Parameters:
- Returns:
Custom dask task name string if name is given, otherwise None (use Dask’s default naming).
- Return type:
str | None
disdrodb.utils.dict module#
This module contains functions for manipulating dictionaries.
disdrodb.utils.directories module#
Define utilities for Directory/File Checks/Creation/Deletion.
- disdrodb.utils.directories.check_directory_exists(dir_path)[source][source]#
Check if the directory exists.
- disdrodb.utils.directories.check_glob_pattern(pattern: str) None[source][source]#
Check if glob pattern is a string and is a valid pattern.
- Parameters:
pattern (str) – String to be checked.
- disdrodb.utils.directories.check_glob_patterns(patterns: str | list) list[source][source]#
Check if glob patterns are valids.
- disdrodb.utils.directories.contains_files(dir_path: str) bool[source][source]#
Check (recursively) if a directory contains any file.
os.walk under the hood uses os.scandir os.walk file generator + any() avoid use of while loop
The function returns True as soon as one file is found (short-circuit); False otherwise.
- disdrodb.utils.directories.contains_netcdf_or_parquet_files(dir_path: str) bool[source][source]#
Check (recursively) if a directory has any Parquet or netCDF file.
os.walk under the hood uses os.scandir os.walk file generator + any() avoid use of while loop
The function returns True as soon as one file is found (short-circuit)^; False otherwise.
- disdrodb.utils.directories.copy_file(src_filepath, dst_filepath)[source][source]#
Copy a file from a location to another.
- disdrodb.utils.directories.count_directories(dir_path, glob_pattern='*', recursive=False, skip_hidden=True)[source][source]#
Return the number of files (exclude directories).
- disdrodb.utils.directories.count_files(dir_path, glob_pattern='*', recursive=False, skip_hidden=True)[source][source]#
Return the number of files (exclude directories).
- disdrodb.utils.directories.create_directory(path: str, exist_ok=True) None[source][source]#
Create a directory at the provided path.
- disdrodb.utils.directories.create_required_directory(dir_path, dir_name, exist_ok=True)[source][source]#
Create directory
dir_nameinside thedir_pathdirectory.
- disdrodb.utils.directories.ensure_string_path(path, msg, accepth_pathlib=False)[source][source]#
Ensure that the path is a string.
- disdrodb.utils.directories.is_empty_directory(path, skip_hidden=True)[source][source]#
Check if a directory path is empty.
Return
Falseif path is a file or non-empty directory. If the path does not exist, raise an error.
- disdrodb.utils.directories.list_directories(dir_path, glob_pattern='*', recursive=False, skip_hidden=True, return_paths=True)[source][source]#
Return a list of directory paths (exclude file paths).
- disdrodb.utils.directories.list_files(dir_path, glob_pattern='*', recursive=False, skip_hidden=True, return_paths=True)[source][source]#
Return a list of filepaths (exclude directory paths).
- disdrodb.utils.directories.list_paths(dir_path, glob_pattern, recursive=False, skip_hidden=True)[source][source]#
Return a list of filepaths and directory paths.
This function accept also a list of glob patterns !
- disdrodb.utils.directories.remove_file_or_directories(path, logger=None)[source][source]#
Return the file/directory or subdirectories tree of
path.Use this function with caution.
- disdrodb.utils.directories.remove_if_exists(path: str, force: bool = False, logger=None) None[source][source]#
Remove file or directory if exists and
force=True.If
force=False, it raises an error.
- disdrodb.utils.directories.remove_path_trailing_slash(path: str) str[source][source]#
Removes a trailing slash or backslash from a file path if it exists.
This function ensures that the provided file path is normalized by removing any trailing directory separator characters (
'/'or'\\'). This is useful for maintaining consistency in path strings and for preparing paths for operations that may not expect a trailing slash.- Parameters:
path (str) – The file path to normalize.
- Returns:
The normalized path without a trailing slash.
- Return type:
- Raises:
TypeError – If the input path is not a string.
Examples
>>> remove_trailing_slash("some/path/") 'some/path' >>> remove_trailing_slash("another\\path\\") 'another\\path'
disdrodb.utils.encoding module#
DISDRODB netCDF4 encoding utilities.
- disdrodb.utils.encoding.get_encodings_dict()[source][source]#
Get encoding dictionary for DISDRODB product variables and coordinates.
- disdrodb.utils.encoding.get_time_encoding() dict[source][source]#
Create time encoding.
- Returns:
Time encoding.
- Return type:
- disdrodb.utils.encoding.rechunk_dataset(ds: Dataset, encodings_dict: dict) Dataset[source][source]#
Coerce the dataset arrays to have the chunk size specified in the encoding dictionary.
- Parameters:
ds (xarray.Dataset) – Input xarray dataset
encodings_dict (dict) – Dictionary containing the encoding to write the xarray dataset as a netCDF.
- Returns:
Output xarray dataset
- Return type:
- disdrodb.utils.encoding.sanitize_encodings_dict(encodings_dict: dict, ds: Dataset) dict[source][source]#
Ensure chunk size to be smaller than the array shape.
- Parameters:
encodings_dict (dict) – Dictionary containing the variable encodings.
ds (xarray.Dataset) – Input dataset.
- Returns:
Encoding dictionary.
- Return type:
- disdrodb.utils.encoding.set_encodings(ds: Dataset, encodings_dict: dict) Dataset[source][source]#
Apply the encodings to the xarray Dataset.
- Parameters:
ds (xarray.Dataset) – Input xarray dataset.
encodings_dict (dict) – Dictionary with encodings specifications.
- Returns:
Output xarray dataset.
- Return type:
disdrodb.utils.event module#
Functions for event definition.
- disdrodb.utils.event.group_timesteps_into_event(timesteps, event_max_time_gap, event_min_size=0, event_min_duration='0S', neighbor_min_size=0, neighbor_time_interval='0S')[source][source]#
Group candidate timesteps into events based on temporal criteria.
This function groups valid candidate timesteps into events by considering how they cluster in time. Any isolated timesteps (based on neighborhood criteria) are first removed. Then, consecutive timesteps are grouped into the same event if the time gap between them does not exceed event_max_time_gap. Finally, events that do not meet minimum size or duration requirements are filtered out.
Please note that neighbor_min_size and neighbor_time_interval are very sensitive to the actual sample interval of the data !
- Parameters:
timesteps (numpy.ndarray) – Candidate timesteps to be grouped into events.
neighbor_time_interval (str) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors.
neighbor_min_size (int, optional) –
The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed !
If
neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.If
neighbor_min_size=1, the timestep must have at least one neighbor withinneighbor_time_interval.If
neighbor_min_size=2, the timestep must have at least two timesteps withinneighbor_time_interval.
Defaults to 1.
event_max_time_gap (str) – The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !
event_min_duration (str) – The minimum duration an event must span. Events shorter than this duration are discarded.
event_min_size (int, optional) – The minimum number of valid timesteps required for an event. Defaults to 1.
- Returns:
A list of events, where each event is represented as a dictionary with keys:
”start_time”: np.datetime64, start time of the event
”end_time”: np.datetime64, end time of the event
”duration”: np.timedelta64, duration of the event
”n_timesteps”: int, number of valid timesteps in the event
- Return type:
- disdrodb.utils.event.group_timesteps_into_events(timesteps, event_max_time_gap)[source][source]#
Group valid timesteps into events based on a maximum allowed dry interval.
- Parameters:
timesteps (array-like of numpy.datetime64) – Sorted array of valid timesteps.
event_max_time_gap (numpy.timedelta64) – Maximum time interval allowed between consecutive valid timesteps for them to be considered part of the same event.
- Returns:
A list of events, where each event is an array of timesteps.
- Return type:
- disdrodb.utils.event.remove_isolated_timesteps(timesteps, neighbor_min_size, neighbor_time_interval)[source][source]#
Remove isolated timesteps that do not have enough neighboring timesteps within a specified time gap.
A timestep is considered isolated (and thus removed) if it does not have at least neighbor_min_size other timesteps within the neighbor_time_interval before or after it. In other words, for each timestep, we look for how many other timesteps fall into the time interval [t - neighbor_time_interval, t + neighbor_time_interval], excluding it itself. If the count of such neighbors is less than neighbor_min_size, that timestep is removed.
- Parameters:
timesteps (array-like of numpy.datetime64) – Sorted or unsorted array of valid timesteps.
neighbor_time_interval (numpy.timedelta64) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors.
neighbor_min_size (int, optional) –
The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated.
If
neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.If
neighbor_min_size=1, the timestep must have at least one neighbor withinneighbor_time_interval.If
neighbor_min_size=2, the timestep must have at least two timesteps withinneighbor_time_interval.
Defaults to 1.
- Returns:
Array of timesteps with isolated entries removed.
- Return type:
- disdrodb.utils.event.split_into_events(ds, variable, *, threshold=None, neighbor_min_size=2, neighbor_time_interval='5MIN', event_max_time_gap='6H', event_min_duration='5MIN', event_min_size=3, sortby=None, sortby_order='decreasing')[source][source]#
Split a dataset into “events” and yield each event as a Dataset.
Events are detected from candidate timesteps and then grouped into contiguous events using group_timesteps_into_event. Candidate timesteps can be selected either by thresholding a numeric variable or by using a boolean variable.
Detection logic#
If
thresholdis not None, a timestep is a candidate whends[variable] > threshold. Ifthresholdis None,ds[variable]must be boolean; a timestep is a candidate whends[variable]is True.Neighborhood and grouping#
Candidate timesteps are first filtered for isolation: a candidate is kept only if it has at least
neighbor_min_sizecandidates withinneighbor_time_interval(before/after). Remaining candidates are grouped into events when consecutive candidates are separated by no more thanevent_max_time_gap. Events shorter thanevent_min_durationor with fewer thanevent_min_sizetimesteps are discarded.Sorting#
Events are yielded in the grouping order (time order) unless
sortbyis provided: -sortby=None: yield events in time order (as returned by the grouping). -sortby="duration": sort by event duration. -sortbycallable:sortby(ds_event) -> scalarused as sorting key.- param ds:
Input dataset with a
timecoordinate/dimension. The dataset is sorted by time internally.- type ds:
xarray.Dataset
- param variable:
Name of the variable used for event detection:
numeric variable if
thresholdis not Noneboolean variable if
thresholdis None
- type variable:
str
- thresholdint or float or None, optional
Threshold used to define candidate timesteps.
If not None: candidates are where
ds[variable] > threshold.If None:
ds[variable]must be boolean and candidates are where it is True.
Default is None.
- neighbor_time_intervalstr
The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors. The neighbor_time_interval must be at least equal to the dataset sampling interval (temporal resolution)! That is for 1-minute data,
neighbor_time_intervalshould be at least1MIN, for 5-minute data it should be at least5MIN, etc.- neighbor_min_sizeint, optional
The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed !
If
neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.If
neighbor_min_size=1, the timestep must have at least one neighbor withinneighbor_time_interval.If
neighbor_min_size=2, the timestep must have at least two timesteps withinneighbor_time_interval.
Defaults to 2.
- event_max_time_gap: str
The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !
- event_min_durationstr
The minimum duration an event must span. Events shorter than this duration are discarded.
- event_min_sizeint, optional
The minimum number of valid timesteps required for an event. Defaults to 3.
- sortby: None, str or callable
Sorting key for events:
None: no sorting (time order)
“duration”: sort by event duration
callable:
sortby(ds_event) -> scalar
- sortby_order: str
Sorting direction when
sortbyis not None. Default is “decreasing”. Valid values are “increasing” or “decreasing”.
- Yields:
ds_event (xarray.Dataset) – A view of the input dataset restricted to the event time span
Notes
This function yields event datasets (generator). Use
list(split_into_events(...))to materialize all events.Event detection uses
> threshold(strictly greater).
Examples
Threshold-based detection (numeric variable) (timesteps with N > 10) >>> events = list(split_into_events(ds, variable=”N”, threshold=10))
Boolean-based detection (precomputed mask) >>> ds[“is_rainy”] = (ds[“R”] > 0.1) & (ds[“Nbins”] > 2) >>> events = list(split_into_events(ds, variable=”is_rainy”))
Sort by duration (longest first) >>> for ds_event in split_into_events(ds, variable=”N”, threshold=10,
sortby=”duration”, sortby_order=”decreasing”)
… print(ds_event.time.values[0], ds_event.time.values[-1])
Sort by a custom scalar (e.g., maximum R during the event) >>> sortby_func = lambda ds_event: float(ds_event[“R”].max(dim=”time”).item()) >>> for ds_event in split_into_events(ds, variable=”N”, threshold=10,
sortby=sortby_func, sortby_order=”decreasing”):
… print(float(ds_event[“R”].max()))
disdrodb.utils.list module#
Utilities to work with lists.
disdrodb.utils.logger module#
DISDRODB logger utility.
- disdrodb.utils.logger.close_logger(logger) None[source][source]#
Close the logger.
- Parameters:
logger (logging.Logger) – Logger object.
- disdrodb.utils.logger.create_logger_file(logs_dir, filename, parallel)[source][source]#
Create logger file.
- disdrodb.utils.logger.create_product_logs(product, data_source, campaign_name, station_name, data_archive_dir=None, list_logs=None, **product_kwargs)[source][source]#
Create station summary and station problems log files.
The summary log selects only logged lines with
root,WARNING, andERRORkeywords. The problems log file selects only logged lines with theERRORkeyword.The logs directory structure is the following -
/logs/files/<product_name>/<station>(same structure as data directory, with logs for each processed file) -/logs/summary/SUMMARY.<PRODUCT_ACRONYM>.<CAMPAIGN_NAME>.<STATION_NAME>.log-/logs/problems/PROBLEMS.<PRODUCT_ACRONYM>.<CAMPAIGN_NAME>.<STATION_NAME>.log- Parameters:
product (str) – The DISDRODB product.
data_source (str) – The data source name.
campaign_name (str) – The campaign name.
station_name (str) – The station name.
data_archive_dir (str, optional) – The base directory path. Default is None.
sample_interval (str, optional) – The sample interval for L2E option. Default is None.
rolling (str, optional) – The rolling option for L2E. Default is None.
model_name (str, optional) – The model name for L2M. Default is None.
list_logs (list, optional) – List of log file paths. If None, the function will list the log files.
- Return type:
None
- disdrodb.utils.logger.log_debug(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#
Include debug entry into log.
- Parameters:
logger (logging.Logger) – Log object.
msg (str) – Message.
verbose (bool, optional) – Whether to verbose the processing. The default value is
False.
- disdrodb.utils.logger.log_error(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#
Include error entry into log.
- Parameters:
logger (logging.Logger) – Log object.
msg (str) – Message.
verbose (bool, optional) – Whether to verbose the processing. The default value is
False.
- disdrodb.utils.logger.log_info(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#
Include info entry into log.
- Parameters:
logger (logging.Logger) – Log object.
msg (str) – Message.
verbose (bool, optional) – Whether to verbose the processing. The default value is
False.
- disdrodb.utils.logger.log_warning(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#
Include warning entry into log.
- Parameters:
logger (logging.Logger) – Log object.
msg (str) – Message.
verbose (bool, optional) – Whether to verbose the processing. The default value is
False.
disdrodb.utils.manipulations module#
Include functions helping for DISDRODB product manipulations.
- disdrodb.utils.manipulations.compute_normalized_dsd_datarray(ds, Nc='Nw', Dc='Dm', d_min=0, d_max=6, d_step=0.001, method='linear')[source][source]#
Compute normalized DSD and remap to regular D/Dc dimension.
- disdrodb.utils.manipulations.define_diameter_array(diameter_min=0, diameter_max=10, diameter_spacing=0.05)[source][source]#
Define an array of diameters and their corresponding bin properties.
- Parameters:
- Returns:
A DataArray containing the center of each diameter bin, with coordinates for the bin width, lower bound, upper bound, and center.
- Return type:
- disdrodb.utils.manipulations.define_diameter_datarray(bounds, dim='diameter_bin_center')[source][source]#
Define diameter DataArray.
- disdrodb.utils.manipulations.define_velocity_array(velocity_min=0, velocity_max=10, velocity_spacing=0.05)[source][source]#
Define an array of velocities and their corresponding bin properties.
- Parameters:
- Returns:
A DataArray containing the center of each velocity bin, with coordinates for the bin width, lower bound, upper bound, and center.
- Return type:
- disdrodb.utils.manipulations.define_velocity_datarray(bounds, dim='velocity_bin_center')[source][source]#
Define velocity DataArray.
- disdrodb.utils.manipulations.filter_diameter_bins(ds, minimum_diameter=None, maximum_diameter=None)[source][source]#
Filter the dataset to include only diameter bins within specified bounds.
- Parameters:
ds (xarray.Dataset) – The dataset containing diameter bin data.
minimum_diameter (float, optional) – The minimum diameter to be included, in millimeters. Defaults to the minimum value in ds[“diameter_bin_lower”].
maximum_diameter (float, optional) – The maximum diameter to be included, in millimeters. Defaults to the maximum value in ds[“diameter_bin_upper”].
- Returns:
The filtered dataset containing only the specified diameter bins.
- Return type:
- disdrodb.utils.manipulations.filter_velocity_bins(ds, minimum_velocity=None, maximum_velocity=None)[source][source]#
Filter the dataset to include only velocity bins within specified bounds.
- Parameters:
ds (xarray.Dataset) – The dataset containing velocity bin data.
minimum_velocity (float, optional) – The minimum velocity to include in the filter, in meters per second. Defaults to the minimum value in ds[“velocity_bin_lower”].
maximum_velocity (float, optional) – The maximum velocity to include in the filter, in meters per second. Defaults to the maximum value in ds[“velocity_bin_upper”].
- Returns:
The filtered dataset containing only the specified velocity bins.
- Return type:
- disdrodb.utils.manipulations.get_diameter_bin_edges(ds)[source][source]#
Retrieve diameter bin edges.
- disdrodb.utils.manipulations.get_diameter_coords_dict_from_bin_edges(diameter_bin_edges)[source][source]#
Get dictionary with all relevant diameter coordinates.
- disdrodb.utils.manipulations.get_velocity_bin_edges(ds)[source][source]#
Retrieve velocity bin edges.
- disdrodb.utils.manipulations.remap_to_diameter(da, d_src, d_dst, dim, new_dim, method='linear')[source][source]#
Remap DataArray from source to destination diameter coordinate.
- Parameters:
da (xr.DataArray) – DataArray with dimension dim and typically another dim (e.g., time).
d_src (xr.DataArray) – Source diameter coordinate (can be 2D, e.g., D/Dm (time, D)). Must share dimensions with da.
d_dst (xr.DataArray) – 1D target coordinate.
dim (str) – Original diameter dimension.
new_dim (str) – Name of output diameter dimension.
method ({"linear", "pchip"}) – Interpolation method used for remapping.
- Return type:
xr.DataArray
- disdrodb.utils.manipulations.resample_density(da_density, d_src, d_dst, dim, new_dim, dD_src, dD_dst, method='log_pchip')[source][source]#
Conservative resampling of density.
- Parameters:
da_density (xr.DataArray) – Density defined per unit diameter.
d_src (xr.DataArray) – Source diameter centers (can be 2D: time, D).
d_dst (xr.DataArray) – Destination diameter centers (1D).
dim (str) – Source diameter dimension.
new_dim (str) – Destination dimension name.
dD_src (xr.DataArray) – Source bin widths (same dim as dim).
dD_dst (xr.DataArray) – Destination bin widths (same dim as new_dim).
method (str or callable) – Remapping strategy used within
xr.apply_ufunc. If str, available methods are: -"constant": first-order conservative remapping (piecewise constant in source bins). -"log_pchip": conservative, smooth remapping using PCHIP in log-density space. If callable, it must have signaturef(y_src, d_src, d_dst, dD_src, dD_dst)and return remapped destination density.
- Returns:
Remapped density conserving total number.
- Return type:
xr.DataArray
disdrodb.utils.pydantic module#
Definition of pydantic validation custom class.
- class disdrodb.utils.pydantic.CustomBaseModel[source][source]#
Bases:
BaseModelCustom pydantic BaseModel.
Forbid extra keys. Hide URLs in error message. Simplify error message.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'hide_error_urls': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
disdrodb.utils.routines module#
Utilities for DISDRODB processing routines.
- disdrodb.utils.routines.is_possible_product(temporal_resolution, sample_interval)[source][source]#
Assess if production is possible given the requested accumulation interval and source sample_interval.
- disdrodb.utils.routines.run_product_generation(product: str, logs_dir: str, logs_filename: str, parallel: bool, verbose: bool, folder_partitioning: str, core_func: callable, core_func_kwargs: dict, pass_logger=False)[source][source]#
Generic wrapper for DISDRODB product generation.
- Parameters:
product (str) – Product name (e.g., “L0A”, “L0B”, …).
logs_dir (str) – Logs directory.
logs_filename (str) – Logs filename.
parallel (bool) – Parallel flag (for logger).
verbose (bool) – Verbose logging flag.
folder_partitioning (str) – Partitioning scheme.
core_func (callable) – Function with signature core_func(logger) that does the product-specific work. Must return an xarray.Dataset or pandas.DataFrame (used to determine log subdir).
disdrodb.utils.subsetting module#
This module contains functions for subsetting and aligning DISDRODB products.
- disdrodb.utils.subsetting.align(*args)[source][source]#
Align DISDRODB products over time, velocity and diameter dimensions.
- disdrodb.utils.subsetting.is_1d_non_dimensional_coord(xr_obj, coord)[source][source]#
Checks if a coordinate is a 1d, non-dimensional coordinate.
- disdrodb.utils.subsetting.isel(xr_obj, indexers=None, drop=False, **indexers_kwargs)[source][source]#
Perform index-based dimension selection.
- disdrodb.utils.subsetting.sel(xr_obj, indexers=None, drop=False, method=None, **indexers_kwargs)[source][source]#
Perform value-based coordinate selection.
Slices are treated as inclusive of both the start and stop values, unlike normal Python indexing. The disdrodb sel method is empowered to:
slice by disdrodb-id strings !
slice by any xarray coordinate value !
You can use string shortcuts for datetime coordinates (e.g., ‘2000-01’ to select all values in January 2000).
disdrodb.utils.time module#
This module contains utilities related to the processing of temporal dataset.
- disdrodb.utils.time.ensure_sample_interval_in_seconds(sample_interval)[source][source]#
Ensure the sample interval is in seconds.
- Parameters:
sample_interval (int, numpy.ndarray, xarray.DataArray, or numpy.timedelta64) –
The sample interval to be converted to seconds. It can be:
An integer representing the interval in seconds.
A numpy array or xarray DataArray of integers representing intervals in seconds.
A numpy.timedelta64 object representing the interval.
A numpy array or xarray DataArray of numpy.timedelta64 objects representing intervals.
- Returns:
The sample interval converted to seconds. The return type matches the input type:
If the input is an integer, the output is an integer.
If the input is a numpy array, the output is a numpy array of integers (unless NaN is present)
If the input is an xarray DataArray, the output is an xarray DataArray of integers (unless NaN is present).
- Return type:
- disdrodb.utils.time.ensure_sorted_by_time(obj, time='time')[source][source]#
Ensure a xarray object or pandas Dataframe is sorted by time.
- disdrodb.utils.time.ensure_timedelta_seconds(interval)[source][source]#
Return an a scalar value/array in seconds or timedelta object as numpy.timedelta64 in seconds.
- disdrodb.utils.time.get_dataframe_start_end_time(df: DataFrame, time_column='time')[source][source]#
Retrieves dataframe starting and ending time.
- Parameters:
df (pandas.DataFrame) – Input dataframe
time_column (str) – Name of the time column. The default is “time”. The column must be of type datetime.
- Returns:
(start_time, end_time) – File start and end time of type pandas.Timestamp.
- Return type:
- disdrodb.utils.time.get_dataset_start_end_time(ds: Dataset, time_dim='time')[source][source]#
Retrieves dataset starting and ending time.
- Parameters:
ds (xarray.Dataset) – Input dataset
time_dim (str) – Name of the time dimension. The default is “time”.
- Returns:
(start_time, end_time) – File start and end time of type pandas.Timestamp.
- Return type:
- disdrodb.utils.time.get_file_start_end_time(obj, time='time')[source][source]#
Retrieves object starting and ending time.
- Parameters:
obj (xarray.Dataset or pandas.DataFrame) – Input object with time dimension or column respectively.
time (str) – Name of the time dimension or column. The default is “time”.
- Returns:
(start_time, end_time) – File start and end time of type pandas.Timestamp.
- Return type:
- disdrodb.utils.time.get_sampling_information(temporal_resolution)[source][source]#
Extract resampling information from the temporal_resolution string.
- disdrodb.utils.time.infer_sample_interval(ds, robust=False, verbose=False, logger=None)[source][source]#
Infer the sample interval of a dataset.
Duplicated timesteps are removed before inferring the sample interval.
NOTE: This function is used only for the reader preparation.
- disdrodb.utils.time.regularize_dataset(xr_obj, freq: str, time_dim: str = 'time', method: str | None = None, fill_value=None, start_time=None, end_time=None)[source][source]#
Regularize a xarray object across time dimension with uniform resolution.
- Parameters:
xr_obj (xarray.Dataset or xarray.DataArray) – xarray object with time dimension.
time_dim (str, optional) – The time dimension in the xarray object. The default value is
"time".freq (str) – The
freqstring to pass to pd.date_range() to define the new time coordinates. Examples:freq="2min".method (str, optional) – Method to use for filling missing timesteps. If
None, fill withfill_value. The default value isNone. For other possible methods, see xarray.Dataset.reindex()`.fill_value (float or dict, optional) – Fill value to fill missing timesteps. If not specified, for float variables it uses
dtypes.NAwhile for for integers variables it uses the maximum allowed integer value or, in case of undecoded variables, the_FillValueDataArray attribute.
- Returns:
ds_reindexed – Regularized dataset.
- Return type:
- disdrodb.utils.time.seconds_to_temporal_resolution(seconds)[source][source]#
Convert a duration in seconds to a readable string format (e.g., “1H30”, “1D2H”).
disdrodb.utils.warnings module#
Warning utilities.
disdrodb.utils.writer module#
DISDRODB product writers.
- disdrodb.utils.writer.finalize_product(ds, product=None) Dataset[source][source]#
Finalize DISDRODB product.
- disdrodb.utils.writer.write_product(ds: Dataset, filepath: str, force: bool = False) None[source][source]#
Save the xarray dataset into a NetCDF file.
- Parameters:
ds (xarray.Dataset) – Input xarray dataset.
filepath (str) – Output file path.
force (bool, optional) – Whether to overwrite existing data. If
True, overwrite existing data in destination directories. IfFalse, raise an error if there are already data in destination directories. This is the default.
disdrodb.utils.xarray module#
Xarray utilities.
- disdrodb.utils.xarray.define_dataarray_fill_value(da)[source][source]#
Define the fill value for a numerical xarray.DataArray.
- disdrodb.utils.xarray.define_dataarray_fill_value_dictionary(da)[source][source]#
Define fill values for numerical variables and coordinates of a xarray.DataArray.
- Return a dict of fill values:
floating → NaN
integer → ds[var].attrs[“_FillValue”] if present, else np.iinfo(dtype).max
- disdrodb.utils.xarray.define_dataset_fill_value_dictionary(ds)[source][source]#
Define fill values for numerical variables and coordinates of a xarray.Dataset.
- Return a dict of per-variable fill values:
floating –> NaN
integer –> ds[var].attrs[“_FillValue”] if present, else the maximum allowed number.
- disdrodb.utils.xarray.define_fill_value_dictionary(xr_obj)[source][source]#
Define fill values for numerical variables and coordinates of a xarray object.
- Return a dict of per-variable fill values:
floating –> NaN
integer –> ds[var].attrs[“_FillValue”] if present, else the maximum allowed number.
- disdrodb.utils.xarray.remap_numeric_array(arr, remapping_dict, fill_value=nan)[source][source]#
Remap the values of a numeric array.
- disdrodb.utils.xarray.remove_diameter_coordinates(xr_obj)[source][source]#
Drop diameter coordinates from xarray object.
- disdrodb.utils.xarray.remove_velocity_coordinates(xr_obj)[source][source]#
Drop velocity coordinates from xarray object.
- disdrodb.utils.xarray.unstack_datarray_dimension(da, dim, coord_handling='keep', prefix='', suffix='')[source][source]#
Split a DataArray along a specified dimension into a Dataset with separate prefixed and suffixed variables.
- Parameters:
da (xarray.DataArray) – The DataArray to split.
dim (str) – The dimension along which to split the DataArray.
coord_handling (str, optional) – Option to handle coordinates sharing the target dimension. Choices are ‘keep’, ‘drop’, or ‘unstack’. Defaults to ‘keep’.
prefix (str, optional) – String to prepend to each new variable name.
suffix (str, optional) – String to append to each new variable name.
- Returns:
A Dataset with each variable split along the specified dimension. The Dataset variables are named “{prefix}{name}{suffix}{dim_value}”. Coordinates sharing the target dimension are handled based on coord_handling.
- Return type:
- disdrodb.utils.xarray.xr_get_last_valid_idx(da_condition, dim, fill_value=None)[source][source]#
Get the index of the last True value along a specified dimension in an xarray DataArray.
This function finds the last index along the given dimension where the condition is True. If all values are False or NaN along that dimension, the function returns
fill_value.- Parameters:
da_condition (xarray.DataArray) – A boolean DataArray where True indicates valid or desired values. Should have the dimension specified in dim.
dim (str) – The name of the dimension along which to find the last True index.
fill_value (int or float) – The fill value when all values are False or NaN along the specified dimension. The default value is
dim_size - 1.
- Returns:
last_idx – An array containing the index of the last True value along the specified dimension. If all values are False or NaN, the corresponding entry in last_idx will be NaN.
- Return type:
Notes
The function works by reversing the DataArray along the specified dimension and using argmax to find the first True value in the reversed array. It then calculates the corresponding index in the original array. To handle cases where all values are False or NaN (and argmax would return 0), the function checks if there is any True value along the dimension and assigns NaN to last_idx where appropriate.
Examples
>>> import xarray as xr >>> da = xr.DataArray([[False, False, True], [False, False, False]], dims=["time", "my_dimension"]) >>> last_idx = xr_get_last_valid_idx(da, "my_dimension") >>> print(last_idx) <xarray.DataArray (time: 2)> array([2., nan]) Dimensions without coordinates: time
In this example, for the first time step, the last True index is 2. For the second time step, all values are False, so the function returns NaN.
disdrodb.utils.yaml module#
YAML utility.
Module contents#
DISDRODB Utils Module.