disdrodb.utils package

Contents

disdrodb.utils package#

Submodules#

disdrodb.utils.archiving module#

Utility function for DISDRODB product archiving.

disdrodb.utils.archiving.check_freq(freq: str) None[source][source]#

Check validity of freq argument.

disdrodb.utils.archiving.define_temporal_partitions(filepaths, strategy, parallel, strategy_options)[source][source]#

Define temporal file processing partitions.

Parameters:
  • filepaths (list) – List of files paths to be processed

  • strategy (str) –

    Which partitioning strategy to apply:

    • 'time_block' defines fixed time intervals (e.g. monthly) covering input files.

    • 'event' detect clusters of precipitation (“events”).

  • parallel (bool) – If True, parallel data loading is used to identify events.

  • strategy_options (dict) –

    Dictionary with strategy-specific parameters:

    If strategy == 'time_block', supported options are:

    • freq: Time unit for blocks. One of {‘year’, ‘season’, ‘month’, ‘day’}.

    See identify_time_partitions for more information.

    If strategy == 'event', supported options are:

    • min_drops : int Minimum number of drops to consider a timestep.

    • neighbor_min_size : int Minimum cluster size for merging neighboring events.

    • neighbor_time_interval : str Time window (e.g. “5MIN”) to merge adjacent clusters.

    • event_max_time_gap : str Maximum allowed gap (e.g. “6H”) within a single event.

    • event_min_duration : str Minimum total duration (e.g. “5MIN”) of an event.

    • event_min_size : int Minimum number of records in an event.

    See identify_events for more information.

Returns:

A list of dictionaries, each containing:

  • start_time (numpy.datetime64[s])

    Inclusive start of an event or time block.

  • end_time (numpy.datetime64[s])

    Inclusive end of an event or time block.

Return type:

list

Notes

  • The 'event' strategy requires loading data into memory to identify clusters.

  • The 'time_block' strategy can operate on metadata alone, without full data loading.

  • The 'event' strategy implicitly performs data selection on which files to process !

  • The 'time_block' strategy does not performs data selection on which files to process !

disdrodb.utils.archiving.generate_time_blocks(start_time: datetime64, end_time: datetime64, freq: str, inclusive_end_time: bool = True) ndarray[source][source]#

Generate time blocks between start_time and end_time for a given frequency.

Parameters:
  • start_time (numpy.datetime64) – Inclusive start of the overall time range.

  • end_time (numpy.datetime64) – End of the overall time range. Inclusive by default (see inclusive_end_time argument).

  • freq (str) – Frequency specifier. Accepted values are: - ‘none’ : return a single block [start_time, end_time] - ‘day’ : split into daily blocks - ‘month’ : split into calendar months - ‘quarter’ : split into calendar quarters - ‘year’ : split into calendar years - ‘season’ : split into meteorological seasons (MAM, JJA, SON, DJF)

  • inclusive_end_time (bool) – The default is True. If False, if the last block end_time is equal to input end_time, such block is removed.

Returns:

Array of shape (n, 2) with dtype datetime64[s], where each row is [block_start, block_end].

Return type:

numpy.ndarray

disdrodb.utils.archiving.get_files_partitions(list_partitions, filepaths, sample_interval, accumulation_interval, rolling)[source][source]#

Provide information about the required files for each event.

For each event in list_partitions, this function identifies the file paths from filepaths that overlap with the event period, adjusted by the accumulation_interval. The event period is extended backward or forward based on the rolling parameter.

Parameters:
  • list_partitions (list of dict) – List of events, where each event is a dictionary containing at least ‘start_time’ and ‘end_time’ keys with numpy.datetime64 values.

  • filepaths (list of str) – List of file paths corresponding to data files.

  • sample_interval (numpy.timedelta64 or int) – The sample interval of the input dataset.

  • accumulation_interval (numpy.timedelta64 or int) – Time interval to adjust the event period for accumulation. If an integer is provided, it is assumed to be in seconds.

  • rolling (bool) – If True, adjust the event period backward by accumulation_interval (rolling backward). If False, adjust forward (aggregate forward).

Returns:

A list where each element is a dictionary containing: - ‘start_time’: Adjusted start time of the event (datetime.datetime64). - ‘end_time’: Adjusted end time of the event (datetime.datetime64). - ‘filepaths’: List of file paths overlapping with the adjusted event period.

Return type:

list of dict

disdrodb.utils.archiving.get_files_per_time_block(filepaths, freq='day', tolerance_seconds=120)[source][source]#

Organize files by the days they cover based on their start and end times.

Parameters:

filepaths (list of str) – List of file paths to be processed.

Returns:

Dictionary where keys are days (as strings) and values are lists of file paths that cover those days.

Return type:

dict

Notes

This function adds a tolerance of 60 seconds to account for imprecise time logging by the sensors.

disdrodb.utils.archiving.identify_events(filepaths, parallel=False, min_drops=5, neighbor_min_size=2, neighbor_time_interval='5MIN', event_max_time_gap='6H', event_min_duration='5MIN', event_min_size=3)[source][source]#

Return a list of rainy events.

Rainy timesteps are defined when N > min_drops. Any rainy isolated timesteps (based on neighborhood criteria) is removed. Then, consecutive rainy timesteps are grouped into the same event if the time gap between them does not exceed event_max_time_gap. Finally, events that do not meet minimum size or duration requirements are filtered out.

Parameters:
  • filepaths (list) – List of L1C file paths.

  • parallel (bool) – Whether to load the files in parallel. Set parallel=True only in a multiprocessing environment. The default is False.

  • neighbor_time_interval (str) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors.

  • neighbor_min_size (int, optional) – The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed ! - If neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs. - If `neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval. - If neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval. Defaults to 1.

  • event_max_time_gap (str) – The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !

  • event_min_duration (str) – The minimum duration an event must span. Events shorter than this duration are discarded.

  • event_min_size (int, optional) – The minimum number of valid timesteps required for an event. Defaults to 1.

Returns:

A list of events, where each event is represented as a dictionary with keys: - “start_time”: np.datetime64, start time of the event - “end_time”: np.datetime64, end time of the event - “duration”: np.timedelta64, duration of the event - “n_timesteps”: int, number of valid timesteps in the event

Return type:

list of dict

disdrodb.utils.archiving.identify_time_partitions(start_times, end_times, freq: str) list[dict][source][source]#

Identify the set of time blocks covered by files.

The result is a minimal, sorted, and unique set of time partitions. ‘start_times’ and end_times can be derived using get_start_end_time_from_filepaths.

Parameters:
  • start_times (numpy.ndarray of datetime64[s]) – Array of inclusive start times for each file.

  • end_times (numpy.ndarray of datetime64[s]) – Array of inclusive end times for each file.

  • freq ({'none', 'hour', 'day', 'month', 'quarter', 'season', 'year'}) – Frequency determining the granularity of candidate blocks. See generate_time_blocks for more details.

Returns:

A list of dictionaries, each containing:

  • start_time (numpy.datetime64[s])

    Inclusive start of a time block.

  • end_time (numpy.datetime64[s])

    Inclusive end of a time block.

Only those blocks that overlap at least one file’s interval are returned. The list is sorted by start_time and contains no duplicate blocks.

Return type:

list of dict

disdrodb.utils.attrs module#

DISDRODB netCDF4 attributes utilities.

disdrodb.utils.attrs.get_attrs_dict()[source][source]#

Get attributes dictionary for DISDRODB product variables and coordinates.

disdrodb.utils.attrs.set_attrs(ds, attrs_dict)[source][source]#

Set attributes to the variables and coordinates of the xr.Dataset.

disdrodb.utils.attrs.set_coordinate_attributes(ds)[source][source]#

Set coordinates attributes.

disdrodb.utils.attrs.set_disdrodb_attrs(ds, product: str)[source][source]#

Add DISDRODB processing information to the netCDF global attributes.

It assumes stations metadata are already added the dataset.

Parameters:
Returns:

Dataset.

Return type:

xarray dataset

disdrodb.utils.attrs.update_disdrodb_attrs(ds, product: str)[source][source]#

Add DISDRODB processing information to the netCDF global attributes.

It assumes stations metadata are already added the dataset.

Parameters:
  • ds (xarray dataset.) – Dataset

  • product (str) – DISDRODB product.

Returns:

Dataset.

Return type:

xarray dataset

disdrodb.utils.cli module#

DISDRODB command-line-interface scripts utilities.

disdrodb.utils.cli.click_data_archive_dir_option(function: object)[source][source]#

Click command line argument for DISDRODB data_archive_dir.

Parameters:

function (object) – Function.

disdrodb.utils.cli.click_l0_archive_options(function: object)[source][source]#

Click command line arguments for L0 processing archiving of a station.

Parameters:

function (object) – Function.

disdrodb.utils.cli.click_metadata_archive_dir_option(function: object)[source][source]#

Click command line argument for DISDRODB metadata_archive_dir.

Parameters:

function (object) – Function.

disdrodb.utils.cli.click_processing_options(function: object)[source][source]#

Click command line default parameters for L0 processing options.

Parameters:

function (object) – Function.

disdrodb.utils.cli.click_remove_l0a_option(function: object)[source][source]#

Click command line argument for remove_l0a.

disdrodb.utils.cli.click_remove_l0b_option(function: object)[source][source]#

Click command line argument for remove_l0b.

disdrodb.utils.cli.click_station_arguments(function: object)[source][source]#

Click command line arguments for DISDRODB station processing.

Parameters:

function (object) – Function.

disdrodb.utils.cli.click_stations_options(function: object)[source][source]#

Click command line options for DISDRODB archive L0 processing.

Parameters:

function (object) – Function.

disdrodb.utils.cli.execute_cmd(cmd, raise_error=False)[source][source]#

Execute command in the terminal, streaming output in python console.

disdrodb.utils.cli.parse_archive_dir(archive_dir: str)[source][source]#

Utility to parse archive directories provided by command line.

If archive_dir = 'None' returns None. If archive_dir = '' returns None.

disdrodb.utils.cli.parse_arg_to_list(args)[source][source]#

Utility to pass list to command line scripts.

If args = '' returns None. If args = 'None' returns None. If args = 'variable' returns [variable]. If args = 'variable1 variable2' returns [variable1, variable2].

disdrodb.utils.cli.parse_empty_string_and_none(args)[source][source]#

Utility to parse argument passed from the command line.

If args = '', returns None. If args = 'None' returns None. Otherwise return args.

disdrodb.utils.compression module#

DISDRODB raw data compression utility.

disdrodb.utils.compression.archive_station_data(metadata_filepath: str, data_archive_dir: str) str[source][source]#

Archive station data into a zip file for subsequent data upload.

It create a zip file into a temporary directory !

Parameters:

metadata_filepath (str) – Metadata file path.

disdrodb.utils.compression.check_consistent_station_name(metadata_filepath, station_name)[source][source]#

Check consistent station_name between YAML file name and metadata key.

disdrodb.utils.compression.compress_station_files(data_archive_dir: str, data_source: str, campaign_name: str, station_name: str, method: str = 'gzip', skip: bool = True) None[source][source]#

Compress each raw file of a station.

Parameters:
  • data_archive_dir (str) – DISDRODB Data Archive directory

  • data_source (str) – Name of data source of interest.

  • campaign_name (str) – Name of the campaign of interest.

  • station_name (str) – Station name of interest.

  • method (str) – Compression method. "zip", "gzip" or "bzip2".

  • skip (bool) – Whether to raise an error if a file is already compressed. If True, it does not raise an error and try to compress the other files. If False, it raise an error and stop the compression routine. The default value is True.

disdrodb.utils.compression.unzip_file(filepath: str, dest_path: str) None[source][source]#

Unzip a file into a directory.

Parameters:
  • filepath (str) – Path of the file to unzip.

  • dest_path (str) – Path of the destination directory.

disdrodb.utils.compression.unzip_file_on_terminal(filepath: str, dest_path: str) str[source][source]#

Unzip a file into a directory using the terminal command.

Parameters:
  • filepath (str) – Path of the file to unzip.

  • dest_path (str) – Path of the destination directory.

disdrodb.utils.dask module#

Utilities for Dask Distributed Computations.

disdrodb.utils.dask.check_parallel_validity(parallel)[source][source]#

Check validity of parallel option given Dask settings.

disdrodb.utils.dask.close_dask_cluster(cluster, client)[source][source]#

Close Dask Cluster.

disdrodb.utils.dask.execute_tasks_safely(list_tasks, parallel: bool, logs_dir: str)[source][source]#

Execute Dask tasks and skip failed ones.

Parameters:
  • list_tasks (list) – List of dask delayed objects or results.

  • parallel (bool) – Whether to execute in parallel with Dask or not.

  • logs_dir (str) – Directory to store FAILED_TASKS.log.

Returns:

list_logs – List of task results. For failed tasks, adds the path to FAILED_TASKS.log in place of the result.

Return type:

list

disdrodb.utils.dask.initialize_dask_cluster(minimum_memory=None)[source][source]#

Initialize Dask Cluster.

disdrodb.utils.dataframe module#

Dataframe utilities.

disdrodb.utils.dataframe.compute_1d_histogram(df, column, variables=None, bins=10, labels=None, prefix_name=True, include_quantiles=False)[source][source]#

Compute conditional univariate statistics.

Parameters:
  • df (pandas.DataFrame) – Input dataframe

  • column (str) – Column name to be binned.

  • variables (str or list, optional) – Column names for which conditional statistics will be computed. If None, only counts are computed.

  • bins (int or array-like) – Number of bins or bin edges.

  • labels (array-like, optional) – Labels for the column bins. If None, uses bin centers.

Return type:

pandas.DataFrame

disdrodb.utils.dataframe.compute_2d_histogram(df, x, y, variables=None, x_bins=10, y_bins=10, x_labels=None, y_labels=None, prefix_name=True, include_quantiles=False)[source][source]#

Compute conditional bivariate statistics.

Parameters:
  • df (pandas.DataFrame) – Input dataframe

  • x (str) – Column name for x-axis binning (will be rounded to integers)

  • y (str) – Column name for y-axis binning

  • variables (str or list, optional) – Column names for which statistics will be computed. If None, only counts are computed.

  • x_bins (int or array-like) – Number of bins or bin edges for x

  • y_bins (int or array-like) – Number of bins or bin edges for y

  • x_labels (array-like, optional) – Labels for x bins. If None, uses bin centers

  • y_labels (array-like, optional) – Labels for y bins. If None, uses bin centers

Returns:

Dataset with dimensions corresponding to binned variables and data variables for each statistic

Return type:

xarray.Dataset

disdrodb.utils.dataframe.log_arange(start, stop, log_step=0.1, base=10)[source][source]#

Return numbers spaced evenly on a log scale (similar to np.arange but in log space).

Parameters:
  • start (float) – The starting value of the sequence (must be > 0).

  • stop (float) – The end value of the sequence (must be > 0).

  • log_step (float) – The step size in log-space (default is 0.1).

  • base (float) – The logarithmic base (default is 10).

Returns:

Array of values spaced in log scale.

Return type:

np.ndarray

disdrodb.utils.dataframe.log_linspace(start, stop, n_bins, base=10)[source][source]#

Return numbers spaced evenly on a log scale between start and stop.

Parameters:
  • start (float) – The starting value of the sequence (must be > 0).

  • stop (float) – The end value of the sequence (must be > 0).

  • n_bins (int) – The number of points to generate (including start and stop).

  • base (float) – The logarithmic base (default is 10).

Returns:

Array of values spaced evenly in log space.

Return type:

np.ndarray

disdrodb.utils.decorators module#

DISDRODB decorators.

disdrodb.utils.decorators.check_pytmatrix_availability(func)[source][source]#

Decorator to ensure that the ‘pytmatrix’ package is installed.

disdrodb.utils.decorators.check_software_availability(software, conda_package)[source][source]#

A decorator to ensure that a software package is installed.

Parameters:
  • software (str) – The package name as recognized by Python’s import system.

  • conda_package (str) – The package name as recognized by conda-forge.

disdrodb.utils.decorators.create_dask_task_name(function_name: str, name=None) str | None[source][source]#

Create a custom dask task name.

Parameters:
  • function_name (str) – Name of the function being delayed.

  • name (str, optional) – Custom name for the task (e.g., filepath or ID). If None, returns None so that Dask generates is own default name.

Returns:

Custom dask task name string if name is given, otherwise None (use Dask’s default naming).

Return type:

str | None

disdrodb.utils.decorators.delayed_if_parallel(function)[source][source]#

Decorator to make the function delayed if its parallel argument is True.

disdrodb.utils.decorators.single_threaded_if_parallel(function)[source][source]#

Decorator to make a function use a single threadon delayed if its parallel argument is True.

disdrodb.utils.directories module#

Define utilities for Directory/File Checks/Creation/Deletion.

disdrodb.utils.directories.check_directory_exists(dir_path)[source][source]#

Check if the directory exists.

disdrodb.utils.directories.check_glob_pattern(pattern: str) None[source][source]#

Check if glob pattern is a string and is a valid pattern.

Parameters:

pattern (str) – String to be checked.

disdrodb.utils.directories.check_glob_patterns(patterns: str | list) list[source][source]#

Check if glob patterns are valids.

disdrodb.utils.directories.contains_files(dir_path: str) bool[source][source]#

Check (recursively) if a directory contains any file.

os.walk under the hood uses os.scandir os.walk file generator + any() avoid use of while loop

The function returns True as soon as one file is found (short-circuit); False otherwise.

disdrodb.utils.directories.contains_netcdf_or_parquet_files(dir_path: str) bool[source][source]#

Check (recursively) if a directory has any Parquet or netCDF file.

os.walk under the hood uses os.scandir os.walk file generator + any() avoid use of while loop

The function returns True as soon as one file is found (short-circuit)^; False otherwise.

disdrodb.utils.directories.copy_file(src_filepath, dst_filepath)[source][source]#

Copy a file from a location to another.

disdrodb.utils.directories.count_directories(dir_path, glob_pattern='*', recursive=False, skip_hidden=True)[source][source]#

Return the number of files (exclude directories).

disdrodb.utils.directories.count_files(dir_path, glob_pattern='*', recursive=False, skip_hidden=True)[source][source]#

Return the number of files (exclude directories).

disdrodb.utils.directories.create_directory(path: str, exist_ok=True) None[source][source]#

Create a directory at the provided path.

disdrodb.utils.directories.create_required_directory(dir_path, dir_name, exist_ok=True)[source][source]#

Create directory dir_name inside the dir_path directory.

disdrodb.utils.directories.ensure_string_path(path, msg, accepth_pathlib=False)[source][source]#

Ensure that the path is a string.

disdrodb.utils.directories.is_empty_directory(path, skip_hidden=True)[source][source]#

Check if a directory path is empty.

Return False if path is a file or non-empty directory. If the path does not exist, raise an error.

disdrodb.utils.directories.list_directories(dir_path, glob_pattern='*', recursive=False, skip_hidden=True, return_paths=True)[source][source]#

Return a list of directory paths (exclude file paths).

disdrodb.utils.directories.list_files(dir_path, glob_pattern='*', recursive=False, skip_hidden=True, return_paths=True)[source][source]#

Return a list of filepaths (exclude directory paths).

disdrodb.utils.directories.list_paths(dir_path, glob_pattern, recursive=False, skip_hidden=True)[source][source]#

Return a list of filepaths and directory paths.

This function accept also a list of glob patterns !

disdrodb.utils.directories.remove_if_exists(path: str, force: bool = False, logger=None) None[source][source]#

Remove file or directory if exists and force=True.

If force=False, it raises an error.

disdrodb.utils.directories.remove_path_trailing_slash(path: str) str[source][source]#

Removes a trailing slash or backslash from a file path if it exists.

This function ensures that the provided file path is normalized by removing any trailing directory separator characters ('/' or '\\'). This is useful for maintaining consistency in path strings and for preparing paths for operations that may not expect a trailing slash.

Parameters:

path (str) – The file path to normalize.

Returns:

The normalized path without a trailing slash.

Return type:

str

Raises:

TypeError – If the input path is not a string.

Examples

>>> remove_trailing_slash("some/path/")
'some/path'
>>> remove_trailing_slash("another\\path\\")
'another\\path'
disdrodb.utils.directories.rmtree_windows(path)[source][source]#

Remove a directory tree on Windows.

disdrodb.utils.encoding module#

DISDRODB netCDF4 encoding utilities.

disdrodb.utils.encoding.get_encodings_dict()[source][source]#

Get encoding dictionary for DISDRODB product variables and coordinates.

disdrodb.utils.encoding.get_time_encoding() dict[source][source]#

Create time encoding.

Returns:

Time encoding.

Return type:

dict

disdrodb.utils.encoding.rechunk_dataset(ds: Dataset, encodings_dict: dict) Dataset[source][source]#

Coerce the dataset arrays to have the chunk size specified in the encoding dictionary.

Parameters:
  • ds (xarray.Dataset) – Input xarray dataset

  • encodings_dict (dict) – Dictionary containing the encoding to write the xarray dataset as a netCDF.

Returns:

Output xarray dataset

Return type:

xarray.Dataset

disdrodb.utils.encoding.sanitize_encodings_dict(encodings_dict: dict, ds: Dataset) dict[source][source]#

Ensure chunk size to be smaller than the array shape.

Parameters:
  • encodings_dict (dict) – Dictionary containing the variable encodings.

  • ds (xarray.Dataset) – Input dataset.

Returns:

Encoding dictionary.

Return type:

dict

disdrodb.utils.encoding.set_encodings(ds: Dataset, encodings_dict: dict) Dataset[source][source]#

Apply the encodings to the xarray Dataset.

Parameters:
  • ds (xarray.Dataset) – Input xarray dataset.

  • encodings_dict (dict) – Dictionary with encodings specifications.

Returns:

Output xarray dataset.

Return type:

xarray.Dataset

disdrodb.utils.event module#

Functions for event definition.

disdrodb.utils.event.group_timesteps_into_event(timesteps, event_max_time_gap, event_min_size=0, event_min_duration='0S', neighbor_min_size=0, neighbor_time_interval='0S')[source][source]#

Group candidate timesteps into events based on temporal criteria.

This function groups valid candidate timesteps into events by considering how they cluster in time. Any isolated timesteps (based on neighborhood criteria) are first removed. Then, consecutive timesteps are grouped into the same event if the time gap between them does not exceed event_max_time_gap. Finally, events that do not meet minimum size or duration requirements are filtered out.

Please note that neighbor_min_size and neighbor_time_interval are very sensitive to the actual sample interval of the data !

Parameters:
  • timesteps (np.ndarray) – Candidate timesteps to be grouped into events.

  • neighbor_time_interval (str) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors.

  • neighbor_min_size (int, optional) – The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed ! - If neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs. - If `neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval. - If neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval. Defaults to 1.

  • event_max_time_gap (str) – The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !

  • event_min_duration (str) – The minimum duration an event must span. Events shorter than this duration are discarded.

  • event_min_size (int, optional) – The minimum number of valid timesteps required for an event. Defaults to 1.

Returns:

A list of events, where each event is represented as a dictionary with keys: - “start_time”: np.datetime64, start time of the event - “end_time”: np.datetime64, end time of the event - “duration”: np.timedelta64, duration of the event - “n_timesteps”: int, number of valid timesteps in the event

Return type:

list of dict

disdrodb.utils.event.group_timesteps_into_events(timesteps, event_max_time_gap)[source][source]#

Group valid timesteps into events based on a maximum allowed dry interval.

Parameters:
  • timesteps (array-like of np.datetime64) – Sorted array of valid timesteps.

  • event_max_time_gap (np.timedelta64) – Maximum time interval allowed between consecutive valid timesteps for them to be considered part of the same event.

Returns:

A list of events, where each event is an array of timesteps.

Return type:

list of np.ndarray

disdrodb.utils.event.remove_isolated_timesteps(timesteps, neighbor_min_size, neighbor_time_interval)[source][source]#

Remove isolated timesteps that do not have enough neighboring timesteps within a specified time gap.

A timestep is considered isolated (and thus removed) if it does not have at least neighbor_min_size other timesteps within the neighbor_time_interval before or after it. In other words, for each timestep, we look for how many other timesteps fall into the time interval [t - neighbor_time_interval, t + neighbor_time_interval], excluding it itself. If the count of such neighbors is less than neighbor_min_size, that timestep is removed.

Parameters:
  • timesteps (array-like of np.datetime64) – Sorted or unsorted array of valid timesteps.

  • neighbor_time_interval (np.timedelta64) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors.

  • neighbor_min_size (int, optional) – The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. - If neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs. - If `neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval. - If neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval. Defaults to 1.

Returns:

Array of timesteps with isolated entries removed.

Return type:

np.ndarray

disdrodb.utils.list module#

Utilities to work with lists.

disdrodb.utils.list.flatten_list(nested_list)[source][source]#

Flatten a nested list into a single-level list.

disdrodb.utils.logger module#

DISDRODB logger utility.

disdrodb.utils.logger.close_logger(logger) None[source][source]#

Close the logger.

Parameters:

logger (logging.Logger) – Logger object.

disdrodb.utils.logger.create_logger_file(logs_dir, filename, parallel)[source][source]#

Create logger file.

disdrodb.utils.logger.create_product_logs(product, data_source, campaign_name, station_name, data_archive_dir=None, list_logs=None, **product_kwargs)[source][source]#

Create station summary and station problems log files.

The summary log selects only logged lines with root, WARNING, and ERROR keywords. The problems log file selects only logged lines with the ERROR keyword.

The logs directory structure is the follow: /logs - /files/<product_name>/<station> (same structure as data … a log for each processed file) - /summary

–> SUMMARY.<PRODUCT_ACRONYM>.<CAMPAIGN_NAME>.<STATION_NAME>.log

  • /problems –> PROBLEMS.<PRODUCT_ACRONYM>.<CAMPAIGN_NAME>.<STATION_NAME>.log

Parameters:
  • product (str) – The DISDRODB product.

  • data_source (str) – The data source name.

  • campaign_name (str) – The campaign name.

  • station_name (str) – The station name.

  • data_archive_dir (str, optional) – The base directory path. Default is None.

  • sample_interval (str, optional) – The sample interval for L2E option. Default is None.

  • rolling (str, optional) – The rolling option for L2E. Default is None.

  • model_name (str, optional) – The model name for L2M. Default is None.

  • list_logs (list, optional) – List of log file paths. If None, the function will list the log files.

Return type:

None

disdrodb.utils.logger.log_debug(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include debug entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.logger.log_error(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include error entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.logger.log_info(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include info entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.logger.log_warning(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include warning entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.manipulations module#

Include functions helping for DISDRODB product manipulations.

disdrodb.utils.manipulations.convert_from_decibel(x)[source][source]#

Convert dB to unit.

disdrodb.utils.manipulations.convert_to_decibel(x)[source][source]#

Convert unit to dB.

disdrodb.utils.manipulations.get_diameter_bin_edges(ds)[source][source]#

Retrieve diameter bin edges.

disdrodb.utils.manipulations.get_diameter_coords_dict_from_bin_edges(diameter_bin_edges)[source][source]#

Get dictionary with all relevant diameter coordinates.

disdrodb.utils.manipulations.resample_drop_number_concentration(drop_number_concentration, diameter_bin_edges, method='linear')[source][source]#

Resample drop number concentration N(D) DataArray to high resolution diameter bins.

disdrodb.utils.manipulations.unstack_radar_variables(ds)[source][source]#

Unstack radar variables.

disdrodb.utils.routines module#

Utilities for DISDRODB processing routines.

disdrodb.utils.routines.is_possible_product(accumulation_interval, sample_interval, rolling)[source][source]#

Assess if production is possible given the requested accumulation interval and source sample_interval.

disdrodb.utils.routines.run_product_generation(product: str, logs_dir: str, logs_filename: str, parallel: bool, verbose: bool, folder_partitioning: str, core_func: callable, core_func_kwargs: dict, pass_logger=False)[source][source]#

Generic wrapper for DISDRODB product generation.

Parameters:
  • product (str) – Product name (e.g., “L0A”, “L0B”, …).

  • logs_dir (str) – Logs directory.

  • logs_filename (str) – Logs filename.

  • parallel (bool) – Parallel flag (for logger).

  • verbose (bool) – Verbose logging flag.

  • folder_partitioning (str) – Partitioning scheme.

  • core_func (callable) – Function with signature core_func(logger) that does the product-specific work. Must return an xarray.Dataset or pandas.DataFrame (used to determine log subdir).

disdrodb.utils.routines.try_get_required_filepaths(product, data_archive_dir, data_source, campaign_name, station_name, debugging_mode, **product_kwargs)[source][source]#

Try to retrieve required filepaths for a product, or return None if unavailable.

disdrodb.utils.subsetting module#

This module contains functions for subsetting and aligning DISDRODB products.

disdrodb.utils.subsetting.align(*args)[source][source]#

Align DISDRODB products over time, velocity and diameter dimensions.

disdrodb.utils.subsetting.is_1d_non_dimensional_coord(xr_obj, coord)[source][source]#

Checks if a coordinate is a 1d, non-dimensional coordinate.

disdrodb.utils.subsetting.isel(xr_obj, indexers=None, drop=False, **indexers_kwargs)[source][source]#

Perform index-based dimension selection.

disdrodb.utils.subsetting.sel(xr_obj, indexers=None, drop=False, method=None, **indexers_kwargs)[source][source]#

Perform value-based coordinate selection.

Slices are treated as inclusive of both the start and stop values, unlike normal Python indexing. The disdrodb sel method is empowered to:

  • slice by disdrodb-id strings !

  • slice by any xarray coordinate value !

You can use string shortcuts for datetime coordinates (e.g., ‘2000-01’ to select all values in January 2000).

disdrodb.utils.time module#

This module contains utilities related to the processing of temporal dataset.

disdrodb.utils.time.ensure_sample_interval_in_seconds(sample_interval)[source][source]#

Ensure the sample interval is in seconds.

Parameters:

sample_interval (int, numpy.ndarray, xarray.DataArray, or numpy.timedelta64) – The sample interval to be converted to seconds. It can be: - An integer representing the interval in seconds. - A numpy array or xarray DataArray of integers representing intervals in seconds. - A numpy.timedelta64 object representing the interval. - A numpy array or xarray DataArray of numpy.timedelta64 objects representing intervals.

Returns:

The sample interval converted to seconds. The return type matches the input type: - If the input is an integer, the output is an integer. - If the input is a numpy array, the output is a numpy array of integers (unless NaN is present) - If the input is an xarray DataArray, the output is an xarray DataArray of integers (unless NaN is present).

Return type:

int, numpy.ndarray, or xarray.DataArray

disdrodb.utils.time.ensure_sorted_by_time(obj, time='time')[source][source]#

Ensure a xarray object or pandas Dataframe is sorted by time.

disdrodb.utils.time.ensure_timedelta_seconds(interval)[source][source]#

Return an a scalar value/array in seconds or timedelta object as numpy.timedelta64 in seconds.

disdrodb.utils.time.get_dataframe_start_end_time(df: DataFrame, time_column='time')[source][source]#

Retrieves dataframe starting and ending time.

Parameters:
  • df (pandas.DataFrame) – Input dataframe

  • time_column (str) – Name of the time column. The default is “time”. The column must be of type datetime.

Returns:

(start_time, end_time) – File start and end time of type pandas.Timestamp.

Return type:

tuple

disdrodb.utils.time.get_dataset_start_end_time(ds: Dataset, time_dim='time')[source][source]#

Retrieves dataset starting and ending time.

Parameters:
  • ds (xarray.Dataset) – Input dataset

  • time_dim (str) – Name of the time dimension. The default is “time”.

Returns:

(start_time, end_time) – File start and end time of type pandas.Timestamp.

Return type:

tuple

disdrodb.utils.time.get_file_start_end_time(obj, time='time')[source][source]#

Retrieves object starting and ending time.

Parameters:
  • obj (xarray.Dataset or pandas.DataFrame) – Input object with time dimension or column respectively.

  • time (str) – Name of the time dimension or column. The default is “time”.

Returns:

(start_time, end_time) – File start and end time of type pandas.Timestamp.

Return type:

tuple

disdrodb.utils.time.get_sampling_information(temporal_resolution)[source][source]#

Extract resampling information from the temporal_resolution string.

Parameters:

temporal_resolution (str) – A string representing the product temporal resolution: e.g., “1H30MIN”, “ROLL1H30MIN”.

Returns:

sample_interval_seconds, rolling – Sample_interval in seconds and whether rolling is enabled.

Return type:

tuple

disdrodb.utils.time.infer_sample_interval(ds, robust=False, verbose=False, logger=None)[source][source]#

Infer the sample interval of a dataset.

Duplicated timesteps are removed before inferring the sample interval.

NOTE: This function is used only for the reader preparation.

disdrodb.utils.time.regularize_dataset(xr_obj, freq: str, time_dim: str = 'time', method: str | None = None, fill_value=None)[source][source]#

Regularize a dataset across time dimension with uniform resolution.

Parameters:
  • xr_obj (xarray.Dataset or xr.DataArray) – xarray object with time dimension.

  • time_dim (str, optional) – The time dimension in the xarray object. The default value is "time".

  • freq (str) – The freq string to pass to pd.date_range() to define the new time coordinates. Examples: freq="2min".

  • method (str, optional) – Method to use for filling missing timesteps. If None, fill with fill_value. The default value is None. For other possible methods, see xarray.Dataset.reindex()`.

  • fill_value ((float, dict), optional) – Fill value to fill missing timesteps. If not specified, for float variables it uses dtypes.NA while for for integers variables it uses the maximum allowed integer value or, in case of undecoded variables, the _FillValue DataArray attribute..

Returns:

ds_reindexed – Regularized dataset.

Return type:

xarray.Dataset

disdrodb.utils.time.seconds_to_temporal_resolution(seconds)[source][source]#

Convert a duration in seconds to a readable string format (e.g., “1H30”, “1D2H”).

Parameters:

(int) (- seconds) –

Returns:

- str

Return type:

The duration as a string in a format like “30S”, “1MIN30S”, “1H30MIN”, or “1D2H”.

disdrodb.utils.time.temporal_resolution_to_seconds(temporal_resolution)[source][source]#

Extract the measurement interval in seconds from the temporal resolution string.

Parameters:

temporal_resolution (str) – A string representing the product measurement interval: e.g., “1H30MIN”, “ROLL1H30MIN”.

Returns:

Duration in seconds.

Return type:

seconds

disdrodb.utils.warnings module#

Warning utilities.

disdrodb.utils.warnings.suppress_warnings()[source][source]#

Context manager suppressing RuntimeWarnings and UserWarnings.

disdrodb.utils.writer module#

DISDRODB product writers.

disdrodb.utils.writer.finalize_product(ds, product=None) Dataset[source][source]#

Finalize DISDRODB product.

disdrodb.utils.writer.write_product(ds: Dataset, filepath: str, force: bool = False) None[source][source]#

Save the xarray dataset into a NetCDF file.

Parameters:
  • ds (xarray.Dataset) – Input xarray dataset.

  • filepath (str) – Output file path.

  • force (bool, optional) – Whether to overwrite existing data. If True, overwrite existing data into destination directories. If False, raise an error if there are already data into destination directories. This is the default.

disdrodb.utils.xarray module#

Xarray utilities.

disdrodb.utils.xarray.define_dataarray_fill_value(da)[source][source]#

Define the fill value for a numerical xarray.DataArray.

disdrodb.utils.xarray.define_dataarray_fill_value_dictionary(da)[source][source]#

Define fill values for numerical variables and coordinates of a xarray.DataArray.

Return a dict of fill values:
  • floating → NaN

  • integer → ds[var].attrs[“_FillValue”] if present, else np.iinfo(dtype).max

disdrodb.utils.xarray.define_dataset_fill_value_dictionary(ds)[source][source]#

Define fill values for numerical variables and coordinates of a xarray.Dataset.

Return a dict of per-variable fill values:
  • floating –> NaN

  • integer –> ds[var].attrs[“_FillValue”] if present, else the maximum allowed number.

disdrodb.utils.xarray.define_fill_value_dictionary(xr_obj)[source][source]#

Define fill values for numerical variables and coordinates of a xarray object.

Return a dict of per-variable fill values:
  • floating –> NaN

  • integer –> ds[var].attrs[“_FillValue”] if present, else the maximum allowed number.

disdrodb.utils.xarray.remove_diameter_coordinates(xr_obj)[source][source]#

Drop diameter coordinates from xarray object.

disdrodb.utils.xarray.remove_velocity_coordinates(xr_obj)[source][source]#

Drop velocity coordinates from xarray object.

disdrodb.utils.xarray.unstack_datarray_dimension(da, dim, coord_handling='keep', prefix='', suffix='')[source][source]#

Split a DataArray along a specified dimension into a Dataset with separate prefixed and suffixed variables.

Parameters:
  • da (xarray.DataArray) – The DataArray to split.

  • dim (str) – The dimension along which to split the DataArray.

  • coord_handling (str, optional) – Option to handle coordinates sharing the target dimension. Choices are ‘keep’, ‘drop’, or ‘unstack’. Defaults to ‘keep’.

  • prefix (str, optional) – String to prepend to each new variable name.

  • suffix (str, optional) – String to append to each new variable name.

Returns:

A Dataset with each variable split along the specified dimension. The Dataset variables are named “{prefix}{name}{suffix}{dim_value}”. Coordinates sharing the target dimension are handled based on coord_handling.

Return type:

xarray.Dataset

disdrodb.utils.xarray.xr_get_last_valid_idx(da_condition, dim, fill_value=None)[source][source]#

Get the index of the last True value along a specified dimension in an xarray DataArray.

This function finds the last index along the given dimension where the condition is True. If all values are False or NaN along that dimension, the function returns fill_value.

Parameters:
  • da_condition (xarray.DataArray) – A boolean DataArray where True indicates valid or desired values. Should have the dimension specified in dim.

  • dim (str) – The name of the dimension along which to find the last True index.

  • fill_value (int or float) – The fill value when all values are False or NaN along the specified dimension. The default value is dim_size - 1.

Returns:

last_idx – An array containing the index of the last True value along the specified dimension. If all values are False or NaN, the corresponding entry in last_idx will be NaN.

Return type:

xarray.DataArray

Notes

The function works by reversing the DataArray along the specified dimension and using argmax to find the first True value in the reversed array. It then calculates the corresponding index in the original array. To handle cases where all values are False or NaN (and argmax would return 0), the function checks if there is any True value along the dimension and assigns NaN to last_idx where appropriate.

Examples

>>> import xarray as xr
>>> da = xr.DataArray([[False, False, True], [False, False, False]], dims=["time", "my_dimension"])
>>> last_idx = xr_get_last_valid_idx(da, "my_dimension")
>>> print(last_idx)
<xarray.DataArray (time: 2)>
array([2., nan])
Dimensions without coordinates: time

In this example, for the first time step, the last True index is 2. For the second time step, all values are False, so the function returns NaN.

disdrodb.utils.yaml module#

YAML utility.

disdrodb.utils.yaml.read_yaml(filepath: str) dict[source][source]#

Read a YAML file into a dictionary.

Parameters:

filepath (str) – Input YAML file path.

Returns:

Dictionary with the attributes read from the YAML file.

Return type:

dict

disdrodb.utils.yaml.write_yaml(dictionary, filepath, sort_keys=False)[source][source]#

Write a dictionary into a YAML file.

Parameters:

dictionary (dict) – Dictionary to write into a YAML file.

Module contents#

DISDRODB Utils Module.