disdrodb.utils package

Contents

disdrodb.utils package#

Submodules#

disdrodb.utils.archiving module#

Utility function for DISDRODB product archiving.

disdrodb.utils.archiving.check_freq(freq: str) None[source][source]#

Check validity of freq argument.

disdrodb.utils.archiving.define_temporal_partitions(filepaths, strategy, parallel, strategy_options)[source][source]#

Define temporal file processing partitions.

Parameters:
  • filepaths (list) – List of files paths to be processed

  • strategy (str) –

    Partitioning strategy to apply.

    Supported values are:

    • 'time_block' defines fixed time intervals (e.g. monthly) covering input files.

    • 'event' detect clusters of precipitation (“events”).

  • parallel (bool) – If True, parallel data loading is used to identify events.

  • strategy_options (dict) –

    Dictionary with strategy-specific parameters:

    If strategy == 'time_block', supported options are:

    • freq: Time unit for blocks. One of {‘year’, ‘season’, ‘month’, ‘day’}.

    See the identify_time_partitions function for more information.

    If strategy == 'event', supported options are:

    • variablestr

      Name of the variable to use to apply the event detection.

    • detection_thresholdint

      Minimum number of drops to consider a timestep.

    • neighbor_min_sizeint

      Minimum cluster size for merging neighboring events.

    • neighbor_time_intervalstr

      Time window (e.g. “5MIN”) to merge adjacent clusters.

    • event_max_time_gapstr

      Maximum allowed gap (e.g. “6H”) within a single event.

    • event_min_durationstr

      Minimum total duration (e.g. “5MIN”) of an event.

    • event_min_sizeint

      Minimum number of records in an event.

    See the identify_events function for more information.

Returns:

A list of dictionaries, each containing:

  • start_time: numpy.datetime64[s]

    Inclusive start of an event or time block.

  • end_time: numpy.datetime64[s]

    Inclusive end of an event or time block.

Return type:

list

Notes

The 'event' strategy requires loading data into memory to identify clusters.

The 'time_block' strategy can operate on metadata alone, without full data loading.

The 'event' strategy implicitly performs data selection on which files to process !

The 'time_block' strategy does not performs data selection on which files to process !

disdrodb.utils.archiving.generate_time_blocks(start_time: datetime64, end_time: datetime64, freq: str, inclusive_end_time: bool = True) ndarray[source][source]#

Generate time blocks between start_time and end_time for a given frequency.

Parameters:
  • start_time (numpy.datetime64) – Inclusive start of the overall time range.

  • end_time (numpy.datetime64) – End of the overall time range. Inclusive by default (see inclusive_end_time argument).

  • freq (str) –

    Frequency specifier. Accepted values are:

    • ’none’ : return a single block [start_time, end_time]

    • ’day’ : split into daily blocks

    • ’month’ : split into calendar months

    • ’quarter’ : split into calendar quarters

    • ’year’ : split into calendar years

    • ’season’ : split into meteorological seasons (MAM, JJA, SON, DJF)

  • inclusive_end_time (bool) – The default is True. If False, if the last block end_time is equal to input end_time, such block is removed.

Returns:

Array of shape (n, 2) with dtype datetime64[s], where each row is [block_start, block_end].

Return type:

numpy.ndarray

disdrodb.utils.archiving.group_files_by_temporal_partitions(temporal_partitions, filepaths, block_starts_offset=0, block_ends_offset=0)[source][source]#

Provide information about the required files for each event.

For each time block in temporal_partitions, the function identifies the filepaths that overlap such time period. The time blocks of temporal_partitions can be adjusted using block_starts_offset and block_ends_offset e.g. for resampling applications.

Parameters:
  • temporal_partitions (list of dict) – List of time blocks, where each time blocks is a dictionary containing at least ‘start_time’ and ‘end_time’ keys with numpy.datetime64 values.

  • filepaths (list of str) – List of file paths corresponding to data files.

  • block_starts_offset (int) – Optional offset (in seconds) to add to time blocks starts. Provide negative offset to go back in time.

  • block_ends_offset (int) – Optional offset (in seconds) to add to time blocks ends. Provide negative offset to go back in time.

Returns:

A list where each element is a dictionary containing:

  • ’start_time’: Adjusted start time of the event (datetime.datetime64).

  • ’end_time’: Adjusted end time of the event (datetime.datetime64).

  • ’filepaths’: List of file paths overlapping with the adjusted event period.

Return type:

list of dict

disdrodb.utils.archiving.group_files_by_time_block(filepaths, freq='day', tolerance_seconds=120)[source][source]#

Organize files by time blocks based on their start and end times.

If tolerance_seconds is specified, it adds some tolerance to files start and end_time. This means that files starting/ending next to the time blocks boundaries will be included in both time blocks. This can be useful to deal with imprecise time within files.

Parameters:
  • filepaths (list of str) – List of file paths to be processed.

  • freq (str) – Frequency of the time block. The default frequency is ‘day’.

  • tolerance_seconds (int) – Tolerance in seconds to subtract/add to files start time and end time.

Returns:

A list where each element is a dictionary containing:

  • ’start_time’: Adjusted start time of the event (datetime.datetime64).

  • ’end_time’: Adjusted end time of the event (datetime.datetime64).

  • ’filepaths’: List of file paths overlapping with the adjusted event period.

Return type:

list of dict

Notes

In the DISDRODB L0C processing chain, a tolerance of 120 seconds is used to account for the possible imprecise/drifting time logged by the sensors before it is corrected.

disdrodb.utils.archiving.identify_events(filepaths, parallel=False, variable='N', detection_threshold=5, neighbor_min_size=2, neighbor_time_interval='5MIN', event_max_time_gap='6H', event_min_duration='5MIN', event_min_size=3)[source][source]#

Return a list of precipitating events.

Precipitating events are defined when ‘variable’ > detection_threshold. Any isolated timesteps with precipitation (based on neighborhood criteria) is removed. Then, consecutive rainy timesteps are grouped into the same event if the time gap between them does not exceed event_max_time_gap. Finally, events that do not meet minimum size or duration requirements are filtered out.

Parameters:
  • filepaths (list) – List of L1C file paths.

  • variable (str) – Name of the variable to use to apply the event detection. The default is “N”.

  • detection_threshold (int) – Minimum value of ‘variable’ to consider an event timestep.

  • parallel (bool) – Whether to load the files in parallel. Set parallel=True only in a multiprocessing environment. The default is False.

  • neighbor_time_interval (str) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors. The neighbor_time_interval must be at least equal to the dataset sampling interval (temporal resolution) ! That is for 1-minute data, neighbor_time_interval should be at least 1MIN, for 5-minute data it should be at least 5MIN, etc.

  • neighbor_min_size (int, optional) –

    The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed !

    • If neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.

    • If neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval.

    • If neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval.

    Defaults to 1.

  • event_max_time_gap (str) – The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !

  • event_min_duration (str) – The minimum duration an event must span. Events shorter than this duration are discarded.

  • event_min_size (int, optional) – The minimum number of valid timesteps required for an event. Defaults to 1.

Returns:

A list of events, where each event is represented as a dictionary with keys:

  • ”start_time”: np.datetime64, start time of the event

  • ”end_time”: np.datetime64, end time of the event

  • ”duration”: np.timedelta64, duration of the event

  • ”n_timesteps”: int, number of valid timesteps in the event

Return type:

list of dict

disdrodb.utils.archiving.identify_time_partitions(start_times, end_times, freq: str) list[dict][source][source]#

Identify the set of time blocks covered by files.

The result is a minimal, sorted, and unique set of time partitions. ‘start_times’ and end_times can be derived using get_start_end_time_from_filepaths.

Parameters:
  • start_times (numpy.ndarray) – Array of inclusive start times in datetime64[s] format for each file.

  • end_times (numpy.ndarray) – Array of inclusive end times in datetime64[s] format for each file.

  • freq (str) – Frequency determining the granularity of candidate blocks. Allowed values are {‘none’, ‘hour’, ‘day’, ‘month’, ‘quarter’, ‘season’, ‘year’}. See generate_time_blocks for more details.

Returns:

A list of dictionaries, each containing:

  • start_time (numpy.datetime64[s])

    Inclusive start of a time block.

  • end_time (numpy.datetime64[s])

    Inclusive end of a time block.

Only those blocks that overlap at least one file’s interval are returned. The list is sorted by start_time and contains no duplicate blocks.

Return type:

list of dict

disdrodb.utils.attrs module#

DISDRODB netCDF4 attributes utilities.

disdrodb.utils.attrs.get_attrs_dict()[source][source]#

Get attributes dictionary for DISDRODB product variables and coordinates.

disdrodb.utils.attrs.set_attrs(ds, attrs_dict)[source][source]#

Set attributes to the variables and coordinates of the xr.Dataset.

disdrodb.utils.attrs.set_coordinate_attributes(ds)[source][source]#

Set coordinates attributes.

disdrodb.utils.attrs.set_disdrodb_attrs(ds, product: str)[source][source]#

Add DISDRODB processing information to the netCDF global attributes.

It assumes stations metadata are already added the dataset.

Parameters:
Returns:

Dataset.

Return type:

xarray.Dataset

disdrodb.utils.attrs.update_disdrodb_attrs(ds, product: str)[source][source]#

Add DISDRODB processing information to the netCDF global attributes.

It assumes stations metadata are already added the dataset.

Parameters:
Returns:

Dataset.

Return type:

xarray.Dataset

disdrodb.utils.cli module#

DISDRODB command-line-interface scripts utilities.

disdrodb.utils.cli.click_data_archive_dir_option(function: object)[source][source]#

Add Click command line option for DISDRODB data archive directory.

Adds the –data_archive_dir option for specifying the DISDRODB data archive directory path.

Parameters:

function (callable) – Function to decorate with Click option.

Returns:

Decorated function with –data_archive_dir option added.

Return type:

callable

disdrodb.utils.cli.click_l0_archive_options(function: object)[source][source]#

Add Click command line options for L0 processing and archiving control.

Adds options to control which L0 processing levels to run (L0A, L0B, L0C) and whether to remove intermediate files.

Parameters:

function (callable) – Function to decorate with Click options.

Returns:

Decorated function with L0 archive options added.

Return type:

callable

disdrodb.utils.cli.click_metadata_archive_dir_option(function: object)[source][source]#

Add Click command line option for DISDRODB metadata archive directory.

Adds the –metadata_archive_dir option for specifying the DISDRODB metadata archive directory path.

Parameters:

function (callable) – Function to decorate with Click option.

Returns:

Decorated function with –metadata_archive_dir option added.

Return type:

callable

disdrodb.utils.cli.click_processing_options(function: object)[source][source]#

Add Click command line options for DISDRODB processing configuration.

Adds options for –parallel, –debugging_mode, –verbose, and –force to control processing behavior.

Parameters:

function (callable) – Function to decorate with Click options.

Returns:

Decorated function with processing configuration options added.

Return type:

callable

disdrodb.utils.cli.click_remove_l0a_option(function: object)[source][source]#

Add Click command line option for removing L0A files.

Adds the –remove_l0a boolean option to control whether L0A files should be removed after L0B processing.

Parameters:

function (callable) – Function to decorate with Click option.

Returns:

Decorated function with –remove_l0a option added.

Return type:

callable

disdrodb.utils.cli.click_remove_l0b_option(function: object)[source][source]#

Add Click command line option for removing L0B files.

Adds the –remove_l0b boolean option to control whether L0B files should be removed after L0C processing.

Parameters:

function (callable) – Function to decorate with Click option.

Returns:

Decorated function with –remove_l0b option added.

Return type:

callable

disdrodb.utils.cli.click_station_arguments(function: object)[source][source]#

Add Click command line arguments for DISDRODB station processing.

Adds three required arguments: data_source, campaign_name, and station_name.

Parameters:

function (callable) – Function to decorate with Click arguments.

Returns:

Decorated function with Click arguments added.

Return type:

callable

disdrodb.utils.cli.click_stations_options(function: object)[source][source]#

Add Click command line options for filtering DISDRODB stations.

Adds options for –data_sources, –campaign_names, and –station_names to filter which stations to process.

Parameters:

function (callable) – Function to decorate with Click options.

Returns:

Decorated function with station filtering options added.

Return type:

callable

disdrodb.utils.cli.execute_cmd(cmd: str | Sequence[str], raise_error: bool = False, cwd: str | None = None, **kwargs)[source][source]#

Execute a command, streaming output live in the Python console.

Ensures the current kernel environment’s bin/ (or Scripts/ on Windows) is added to PATH so console scripts installed in the active venv/conda environment can be found.

Parameters:
  • cmd (str or Sequence[str]) – Command to execute. Can be a string or list of arguments.

  • raise_error (bool, optional) – If True, raise CalledProcessError on non-zero exit code. Default is False.

  • cwd (str or None, optional) – Working directory for the command. Default is None.

  • **kwargs – Additional keyword arguments passed to subprocess.Popen.

Returns:

The return code of the executed command.

Return type:

int

Raises:
disdrodb.utils.cli.get_command_argv(cmd)[source][source]#

Parse command into argument list.

Parameters:

cmd (str or Sequence[str]) – Command to parse. Can be a string (will be split using shell-like syntax) or a sequence of strings.

Returns:

List of command arguments.

Return type:

list

Raises:

ValueError – If the command is empty.

disdrodb.utils.cli.parse_archive_dir(archive_dir: str)[source][source]#

Parse archive directory from command line argument.

Parameters:

archive_dir (str) – Archive directory path as string.

Returns:

Archive directory path, or None if archive_dir is empty or ‘None’.

Return type:

str or None

Examples

>>> parse_archive_dir('')
None
>>> parse_archive_dir('None')
None
>>> parse_archive_dir('/path/to/archive')
'/path/to/archive'
disdrodb.utils.cli.parse_arg_to_list(args)[source][source]#

Parse space-separated command line argument into a list.

Parameters:

args (str or any) – Space-separated string or other argument type.

Returns:

List of parsed arguments, or None if args is empty or ‘None’.

Return type:

list or None

Examples

>>> parse_arg_to_list('')
None
>>> parse_arg_to_list('None')
None
>>> parse_arg_to_list('variable')
['variable']
>>> parse_arg_to_list('variable1 variable2')
['variable1', 'variable2']
disdrodb.utils.cli.parse_empty_string_and_none(args)[source][source]#

Parse command line argument handling empty strings and ‘None’.

Parameters:

args (str or any) – Argument to parse.

Returns:

Returns None if args is ‘’ or ‘None’, otherwise returns args unchanged.

Return type:

any or None

Examples

>>> parse_empty_string_and_none('')
None
>>> parse_empty_string_and_none('None')
None
>>> parse_empty_string_and_none('value')
'value'
disdrodb.utils.cli.retrieve_environment()[source][source]#

Retrieve environment variables with scripts directory added to PATH.

Creates a copy of the current environment and ensures that the Python environment’s scripts directory is on the PATH.

Returns:

Dictionary of environment variables with updated PATH.

Return type:

dict

disdrodb.utils.cli.retrieve_executable(command, env)[source][source]#

Resolve the full path to an executable command.

Parameters:
  • command (str) – The command name to resolve.

  • env (dict) – Environment variables dictionary containing PATH.

Returns:

Full path to the executable.

Return type:

str

Raises:

FileNotFoundError – If the command cannot be found on PATH.

disdrodb.utils.cli.subprocess_run(cmd: str | Sequence[str], *, check: bool = True, capture_output: bool = False, text: bool = True, cwd: str | None = None, **kwargs) CompletedProcess[source][source]#

Run a command ensuring the current kernel’s env ‘bin/Scripts’ is on PATH.

This wrapper ensures subprocess can find and run console scripts from the current Jupyter kernel’s conda/venv by adding that environment’s bin/ (or Scripts/ on Windows) to PATH when the notebook starts with a system-only PATH.

Parameters:
  • cmd (str or Sequence[str]) – Command to execute. Can be a string or list of arguments. Example: [“disdrodb_run_l0”, “–help”]

  • check (bool, optional) – If True, raise CalledProcessError if the process returns a non-zero exit code. Default is True.

  • capture_output (bool, optional) – If True, capture stdout and stderr. Default is False.

  • text (bool, optional) – If True, decode stdout/stderr as text. Default is True.

  • cwd (str or None, optional) – Working directory for the command. Default is None.

  • **kwargs – Additional keyword arguments passed to subprocess.run.

Returns:

Completed process object containing return code, stdout, and stderr.

Return type:

subprocess.CompletedProcess

Raises:

disdrodb.utils.compression module#

DISDRODB raw data compression utility.

disdrodb.utils.compression.archive_station_data(metadata_filepath: str, data_archive_dir: str) str[source][source]#

Archive station data into a zip file for subsequent data upload.

It create a zip file into a temporary directory !

Parameters:

metadata_filepath (str) – Metadata file path.

disdrodb.utils.compression.check_consistent_station_name(metadata_filepath, station_name)[source][source]#

Check consistent station_name between YAML file name and metadata key.

disdrodb.utils.compression.compress_file(filepath: str, method: str, skip: bool) str[source][source]#

Compress a file and delete the original.

If the file is already compressed, it is not compressed again.

Parameters:
  • filepath (str) – Path of the file to compress.

  • method (str) – Compression method. None, "zip", "gzip" or "bzip2".

  • skip (bool) – Whether to raise an error if a file is already compressed. If True, it does not raise an error return the input filepath. If False, it raise an error.

Returns:

Path of the compressed file. Same as input if no compression.

Return type:

str

disdrodb.utils.compression.compress_file_bzip2(filepath: str, compressed_filepath: str) None[source][source]#

Compress a single file into a bzip2 archive.

Parameters:
  • filepath (str) – Path of the file to compress.

  • compressed_filepath (str) – Path of the compressed file.

disdrodb.utils.compression.compress_file_gzip(filepath: str, compressed_filepath: str) None[source][source]#

Compress a single file into a gzip archive.

Parameters:
  • filepath (str) – Path of the file to compress.

  • compressed_filepath (str) – Path of the compressed file.

disdrodb.utils.compression.compress_file_zip(filepath: str, compressed_filepath: str) None[source][source]#

Compress a single file into a zip archive.

Parameters:
  • filepath (str) – Path of the file to compress.

  • compressed_filepath (str) – Path of the compressed file.

disdrodb.utils.compression.compress_station_files(data_archive_dir: str, data_source: str, campaign_name: str, station_name: str, method: str = 'gzip', skip: bool = True) None[source][source]#

Compress each raw file of a station.

Parameters:
  • data_archive_dir (str) – DISDRODB Data Archive directory

  • data_source (str) – Name of data source of interest.

  • campaign_name (str) – Name of the campaign of interest.

  • station_name (str) – Station name of interest.

  • method (str) – Compression method. "zip", "gzip" or "bzip2".

  • skip (bool) – Whether to raise an error if a file is already compressed. If True, it does not raise an error and try to compress the other files. If False, it raise an error and stop the compression routine. The default value is True.

disdrodb.utils.compression.unzip_file(filepath: str, dest_path: str) None[source][source]#

Unzip a file into a directory.

Parameters:
  • filepath (str) – Path of the file to unzip.

  • dest_path (str) – Path of the destination directory.

disdrodb.utils.compression.unzip_file_on_terminal(filepath: str, dest_path: str)[source][source]#

Unzip a file into a directory using the terminal command.

Parameters:
  • filepath (str) – Path of the file to unzip.

  • dest_path (str) – Path of the destination directory.

disdrodb.utils.coords module#

DISDRODB coordinates utilities.

disdrodb.utils.coords.add_dataset_crs_coords(xr_obj)[source][source]#

Add a CF-compliant CRS (WGS84) to an xarray.Dataset.

disdrodb.utils.dask module#

Utilities for Dask Distributed Computations.

disdrodb.utils.dask.check_parallel_validity(parallel)[source][source]#

Check validity of parallel option given Dask settings.

disdrodb.utils.dask.close_dask_cluster(cluster, client)[source][source]#

Close Dask Cluster.

disdrodb.utils.dask.execute_tasks_safely(list_tasks, parallel: bool, logs_dir: str, max_tasks_per_batch=5000)[source][source]#

Execute Dask tasks and skip failed ones.

Parameters:
  • list_tasks (list) – List of dask delayed objects or results.

  • parallel (bool) – Whether to execute in parallel with Dask or not.

  • logs_dir (str) – Directory to store FAILED_TASKS.log.

  • max_tasks_per_batch (int or None, optional) – Maximum number of tasks to submit to client.compute() at once. The default is 5000. Dask struggle if more than 10_000 tasks are submitted.

Returns:

list_logs – List of task results. For failed tasks, adds the path to FAILED_TASKS.log in place of the result.

Return type:

list

disdrodb.utils.dask.initialize_dask_cluster(minimum_memory=None)[source][source]#

Initialize Dask Cluster.

disdrodb.utils.dataframe module#

Dataframe utilities.

disdrodb.utils.dataframe.compute_1d_histogram(df, column, variables=None, bins=10, labels=None, prefix_name=True, include_quantiles=False)[source][source]#

Compute conditional univariate statistics.

Parameters:
  • df (pandas.DataFrame) – Input dataframe

  • column (str) – Column name to be binned.

  • variables (str or list, optional) – Column names for which conditional statistics will be computed. If None, only counts are computed.

  • bins (int or array-like) – Number of bins or bin edges.

  • labels (array-like, optional) – Labels for the column bins. If None, uses bin centers.

Return type:

pandas.DataFrame

disdrodb.utils.dataframe.compute_2d_histogram(df, x, y, variables=None, x_bins=10, y_bins=10, x_labels=None, y_labels=None, prefix_name=True, include_quantiles=False)[source][source]#

Compute conditional bivariate statistics.

Parameters:
  • df (pandas.DataFrame) – Input dataframe

  • x (str) – Column name for x-axis binning (will be rounded to integers)

  • y (str) – Column name for y-axis binning

  • variables (str or list, optional) – Column names for which statistics will be computed. If None, only counts are computed.

  • x_bins (int or array-like) – Number of bins or bin edges for x

  • y_bins (int or array-like) – Number of bins or bin edges for y

  • x_labels (array-like, optional) – Labels for x bins. If None, uses bin centers

  • y_labels (array-like, optional) – Labels for y bins. If None, uses bin centers

Returns:

Dataset with dimensions corresponding to binned variables and data variables for each statistic

Return type:

xarray.Dataset

disdrodb.utils.dataframe.log_arange(start, stop, log_step=0.1, base=10)[source][source]#

Return numbers spaced evenly on a log scale (similar to np.arange but in log space).

Parameters:
  • start (float) – The starting value of the sequence (must be > 0).

  • stop (float) – The end value of the sequence (must be > 0).

  • log_step (float) – The step size in log-space (default is 0.1).

  • base (float) – The logarithmic base (default is 10).

Returns:

Array of values spaced in log scale.

Return type:

numpy.ndarray

disdrodb.utils.dataframe.log_linspace(start, stop, n_bins, base=10)[source][source]#

Return numbers spaced evenly on a log scale between start and stop.

Parameters:
  • start (float) – The starting value of the sequence (must be > 0).

  • stop (float) – The end value of the sequence (must be > 0).

  • n_bins (int) – The number of points to generate (including start and stop).

  • base (float) – The logarithmic base (default is 10).

Returns:

Array of values spaced evenly in log space.

Return type:

numpy.ndarray

disdrodb.utils.decorators module#

DISDRODB decorators.

disdrodb.utils.decorators.check_pytmatrix_availability(func)[source][source]#

Decorator to ensure that the ‘pytmatrix’ package is installed.

disdrodb.utils.decorators.check_software_availability(software, conda_package)[source][source]#

A decorator to ensure that a software package is installed.

Parameters:
  • software (str) – The package name as recognized by Python’s import system.

  • conda_package (str) – The package name as recognized by conda-forge.

disdrodb.utils.decorators.create_dask_task_name(function_name: str, name=None) str | None[source][source]#

Create a custom dask task name.

Parameters:
  • function_name (str) – Name of the function being delayed.

  • name (str, optional) – Custom name for the task (e.g., filepath or ID). If None, returns None so that Dask generates is own default name.

Returns:

Custom dask task name string if name is given, otherwise None (use Dask’s default naming).

Return type:

str | None

disdrodb.utils.decorators.delayed_if_parallel(function)[source][source]#

Decorator to make the function delayed if its parallel argument is True.

disdrodb.utils.decorators.single_threaded_if_parallel(function)[source][source]#

Decorator to make a function use a single threadon delayed if its parallel argument is True.

disdrodb.utils.dict module#

This module contains functions for manipulating dictionaries.

disdrodb.utils.dict.extract_dictionary(dictionary, keys)[source][source]#

Extract a subset of keys from the dictionary, removing them from the input dictionary.

disdrodb.utils.dict.extract_product_kwargs(kwargs, product)[source][source]#

Infer product kwargs dictionary.

disdrodb.utils.directories module#

Define utilities for Directory/File Checks/Creation/Deletion.

disdrodb.utils.directories.check_directory_exists(dir_path)[source][source]#

Check if the directory exists.

disdrodb.utils.directories.check_glob_pattern(pattern: str) None[source][source]#

Check if glob pattern is a string and is a valid pattern.

Parameters:

pattern (str) – String to be checked.

disdrodb.utils.directories.check_glob_patterns(patterns: str | list) list[source][source]#

Check if glob patterns are valids.

disdrodb.utils.directories.contains_files(dir_path: str) bool[source][source]#

Check (recursively) if a directory contains any file.

os.walk under the hood uses os.scandir os.walk file generator + any() avoid use of while loop

The function returns True as soon as one file is found (short-circuit); False otherwise.

disdrodb.utils.directories.contains_netcdf_or_parquet_files(dir_path: str) bool[source][source]#

Check (recursively) if a directory has any Parquet or netCDF file.

os.walk under the hood uses os.scandir os.walk file generator + any() avoid use of while loop

The function returns True as soon as one file is found (short-circuit)^; False otherwise.

disdrodb.utils.directories.copy_file(src_filepath, dst_filepath)[source][source]#

Copy a file from a location to another.

disdrodb.utils.directories.count_directories(dir_path, glob_pattern='*', recursive=False, skip_hidden=True)[source][source]#

Return the number of files (exclude directories).

disdrodb.utils.directories.count_files(dir_path, glob_pattern='*', recursive=False, skip_hidden=True)[source][source]#

Return the number of files (exclude directories).

disdrodb.utils.directories.create_directory(path: str, exist_ok=True) None[source][source]#

Create a directory at the provided path.

disdrodb.utils.directories.create_required_directory(dir_path, dir_name, exist_ok=True)[source][source]#

Create directory dir_name inside the dir_path directory.

disdrodb.utils.directories.ensure_string_path(path, msg, accepth_pathlib=False)[source][source]#

Ensure that the path is a string.

disdrodb.utils.directories.is_empty_directory(path, skip_hidden=True)[source][source]#

Check if a directory path is empty.

Return False if path is a file or non-empty directory. If the path does not exist, raise an error.

disdrodb.utils.directories.list_directories(dir_path, glob_pattern='*', recursive=False, skip_hidden=True, return_paths=True)[source][source]#

Return a list of directory paths (exclude file paths).

disdrodb.utils.directories.list_files(dir_path, glob_pattern='*', recursive=False, skip_hidden=True, return_paths=True)[source][source]#

Return a list of filepaths (exclude directory paths).

disdrodb.utils.directories.list_paths(dir_path, glob_pattern, recursive=False, skip_hidden=True)[source][source]#

Return a list of filepaths and directory paths.

This function accept also a list of glob patterns !

disdrodb.utils.directories.remove_file_or_directories(path, logger=None)[source][source]#

Return the file/directory or subdirectories tree of path.

Use this function with caution.

disdrodb.utils.directories.remove_if_exists(path: str, force: bool = False, logger=None) None[source][source]#

Remove file or directory if exists and force=True.

If force=False, it raises an error.

disdrodb.utils.directories.remove_path_trailing_slash(path: str) str[source][source]#

Removes a trailing slash or backslash from a file path if it exists.

This function ensures that the provided file path is normalized by removing any trailing directory separator characters ('/' or '\\'). This is useful for maintaining consistency in path strings and for preparing paths for operations that may not expect a trailing slash.

Parameters:

path (str) – The file path to normalize.

Returns:

The normalized path without a trailing slash.

Return type:

str

Raises:

TypeError – If the input path is not a string.

Examples

>>> remove_trailing_slash("some/path/")
'some/path'
>>> remove_trailing_slash("another\\path\\")
'another\\path'
disdrodb.utils.directories.rmtree_windows(path)[source][source]#

Remove a directory tree on Windows.

disdrodb.utils.encoding module#

DISDRODB netCDF4 encoding utilities.

disdrodb.utils.encoding.get_encodings_dict()[source][source]#

Get encoding dictionary for DISDRODB product variables and coordinates.

disdrodb.utils.encoding.get_time_encoding() dict[source][source]#

Create time encoding.

Returns:

Time encoding.

Return type:

dict

disdrodb.utils.encoding.rechunk_dataset(ds: Dataset, encodings_dict: dict) Dataset[source][source]#

Coerce the dataset arrays to have the chunk size specified in the encoding dictionary.

Parameters:
  • ds (xarray.Dataset) – Input xarray dataset

  • encodings_dict (dict) – Dictionary containing the encoding to write the xarray dataset as a netCDF.

Returns:

Output xarray dataset

Return type:

xarray.Dataset

disdrodb.utils.encoding.sanitize_encodings_dict(encodings_dict: dict, ds: Dataset) dict[source][source]#

Ensure chunk size to be smaller than the array shape.

Parameters:
  • encodings_dict (dict) – Dictionary containing the variable encodings.

  • ds (xarray.Dataset) – Input dataset.

Returns:

Encoding dictionary.

Return type:

dict

disdrodb.utils.encoding.set_encodings(ds: Dataset, encodings_dict: dict) Dataset[source][source]#

Apply the encodings to the xarray Dataset.

Parameters:
  • ds (xarray.Dataset) – Input xarray dataset.

  • encodings_dict (dict) – Dictionary with encodings specifications.

Returns:

Output xarray dataset.

Return type:

xarray.Dataset

disdrodb.utils.event module#

Functions for event definition.

disdrodb.utils.event.group_timesteps_into_event(timesteps, event_max_time_gap, event_min_size=0, event_min_duration='0S', neighbor_min_size=0, neighbor_time_interval='0S')[source][source]#

Group candidate timesteps into events based on temporal criteria.

This function groups valid candidate timesteps into events by considering how they cluster in time. Any isolated timesteps (based on neighborhood criteria) are first removed. Then, consecutive timesteps are grouped into the same event if the time gap between them does not exceed event_max_time_gap. Finally, events that do not meet minimum size or duration requirements are filtered out.

Please note that neighbor_min_size and neighbor_time_interval are very sensitive to the actual sample interval of the data !

Parameters:
  • timesteps (numpy.ndarray) – Candidate timesteps to be grouped into events.

  • neighbor_time_interval (str) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors.

  • neighbor_min_size (int, optional) –

    The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed !

    • If neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.

    • If neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval.

    • If neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval.

    Defaults to 1.

  • event_max_time_gap (str) – The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !

  • event_min_duration (str) – The minimum duration an event must span. Events shorter than this duration are discarded.

  • event_min_size (int, optional) – The minimum number of valid timesteps required for an event. Defaults to 1.

Returns:

A list of events, where each event is represented as a dictionary with keys:

  • ”start_time”: np.datetime64, start time of the event

  • ”end_time”: np.datetime64, end time of the event

  • ”duration”: np.timedelta64, duration of the event

  • ”n_timesteps”: int, number of valid timesteps in the event

Return type:

list of dict

disdrodb.utils.event.group_timesteps_into_events(timesteps, event_max_time_gap)[source][source]#

Group valid timesteps into events based on a maximum allowed dry interval.

Parameters:
  • timesteps (array-like of numpy.datetime64) – Sorted array of valid timesteps.

  • event_max_time_gap (numpy.timedelta64) – Maximum time interval allowed between consecutive valid timesteps for them to be considered part of the same event.

Returns:

A list of events, where each event is an array of timesteps.

Return type:

list of numpy.ndarray

disdrodb.utils.event.remove_isolated_timesteps(timesteps, neighbor_min_size, neighbor_time_interval)[source][source]#

Remove isolated timesteps that do not have enough neighboring timesteps within a specified time gap.

A timestep is considered isolated (and thus removed) if it does not have at least neighbor_min_size other timesteps within the neighbor_time_interval before or after it. In other words, for each timestep, we look for how many other timesteps fall into the time interval [t - neighbor_time_interval, t + neighbor_time_interval], excluding it itself. If the count of such neighbors is less than neighbor_min_size, that timestep is removed.

Parameters:
  • timesteps (array-like of numpy.datetime64) – Sorted or unsorted array of valid timesteps.

  • neighbor_time_interval (numpy.timedelta64) – The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors.

  • neighbor_min_size (int, optional) –

    The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated.

    • If neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.

    • If neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval.

    • If neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval.

    Defaults to 1.

Returns:

Array of timesteps with isolated entries removed.

Return type:

numpy.ndarray

disdrodb.utils.event.split_into_events(ds, variable, *, threshold=None, neighbor_min_size=2, neighbor_time_interval='5MIN', event_max_time_gap='6H', event_min_duration='5MIN', event_min_size=3, sortby=None, sortby_order='decreasing')[source][source]#

Split a dataset into “events” and yield each event as a Dataset.

Events are detected from candidate timesteps and then grouped into contiguous events using group_timesteps_into_event. Candidate timesteps can be selected either by thresholding a numeric variable or by using a boolean variable.

Detection logic#

If threshold is not None, a timestep is a candidate when ds[variable] > threshold. If threshold is None, ds[variable] must be boolean; a timestep is a candidate when ds[variable] is True.

Neighborhood and grouping#

Candidate timesteps are first filtered for isolation: a candidate is kept only if it has at least neighbor_min_size candidates within neighbor_time_interval (before/after). Remaining candidates are grouped into events when consecutive candidates are separated by no more than event_max_time_gap. Events shorter than event_min_duration or with fewer than event_min_size timesteps are discarded.

Sorting#

Events are yielded in the grouping order (time order) unless sortby is provided: - sortby=None: yield events in time order (as returned by the grouping). - sortby="duration": sort by event duration. - sortby callable: sortby(ds_event) -> scalar used as sorting key.

param ds:

Input dataset with a time coordinate/dimension. The dataset is sorted by time internally.

type ds:

xarray.Dataset

param variable:

Name of the variable used for event detection:

  • numeric variable if threshold is not None

  • boolean variable if threshold is None

type variable:

str

thresholdint or float or None, optional

Threshold used to define candidate timesteps.

  • If not None: candidates are where ds[variable] > threshold.

  • If None: ds[variable] must be boolean and candidates are where it is True.

Default is None.

neighbor_time_intervalstr

The time interval around a given a timestep defining the neighborhood. Only timesteps that fall within this time interval before or after a timestep are considered neighbors. The neighbor_time_interval must be at least equal to the dataset sampling interval (temporal resolution)! That is for 1-minute data, neighbor_time_interval should be at least 1MIN, for 5-minute data it should be at least 5MIN, etc.

neighbor_min_sizeint, optional

The minimum number of neighboring timesteps required within neighbor_time_interval for a timestep to be considered non-isolated. Isolated timesteps are removed !

  • If neighbor_min_size=0, then no timestep is considered isolated and no filtering occurs.

  • If neighbor_min_size=1, the timestep must have at least one neighbor within neighbor_time_interval.

  • If neighbor_min_size=2, the timestep must have at least two timesteps within neighbor_time_interval.

Defaults to 2.

event_max_time_gap: str

The maximum time interval between two timesteps to be considered part of the same event. This parameters is used to group timesteps into events !

event_min_durationstr

The minimum duration an event must span. Events shorter than this duration are discarded.

event_min_sizeint, optional

The minimum number of valid timesteps required for an event. Defaults to 3.

sortby: None, str or callable

Sorting key for events:

  • None: no sorting (time order)

  • “duration”: sort by event duration

  • callable: sortby(ds_event) -> scalar

sortby_order: str

Sorting direction when sortby is not None. Default is “decreasing”. Valid values are “increasing” or “decreasing”.

Yields:

ds_event (xarray.Dataset) – A view of the input dataset restricted to the event time span

Notes

  • This function yields event datasets (generator). Use list(split_into_events(...)) to materialize all events.

  • Event detection uses > threshold (strictly greater).

Examples

Threshold-based detection (numeric variable) (timesteps with N > 10) >>> events = list(split_into_events(ds, variable=”N”, threshold=10))

Boolean-based detection (precomputed mask) >>> ds[“is_rainy”] = (ds[“R”] > 0.1) & (ds[“Nbins”] > 2) >>> events = list(split_into_events(ds, variable=”is_rainy”))

Sort by duration (longest first) >>> for ds_event in split_into_events(ds, variable=”N”, threshold=10,

sortby=”duration”, sortby_order=”decreasing”)

… print(ds_event.time.values[0], ds_event.time.values[-1])

Sort by a custom scalar (e.g., maximum R during the event) >>> sortby_func = lambda ds_event: float(ds_event[“R”].max(dim=”time”).item()) >>> for ds_event in split_into_events(ds, variable=”N”, threshold=10,

sortby=sortby_func, sortby_order=”decreasing”):

… print(float(ds_event[“R”].max()))

disdrodb.utils.list module#

Utilities to work with lists.

disdrodb.utils.list.flatten_list(nested_list)[source][source]#

Flatten a nested list into a single-level list.

disdrodb.utils.logger module#

DISDRODB logger utility.

disdrodb.utils.logger.close_logger(logger) None[source][source]#

Close the logger.

Parameters:

logger (logging.Logger) – Logger object.

disdrodb.utils.logger.create_logger_file(logs_dir, filename, parallel)[source][source]#

Create logger file.

disdrodb.utils.logger.create_product_logs(product, data_source, campaign_name, station_name, data_archive_dir=None, list_logs=None, **product_kwargs)[source][source]#

Create station summary and station problems log files.

The summary log selects only logged lines with root, WARNING, and ERROR keywords. The problems log file selects only logged lines with the ERROR keyword.

The logs directory structure is the following - /logs/files/<product_name>/<station> (same structure as data directory, with logs for each processed file) - /logs/summary/SUMMARY.<PRODUCT_ACRONYM>.<CAMPAIGN_NAME>.<STATION_NAME>.log - /logs/problems/PROBLEMS.<PRODUCT_ACRONYM>.<CAMPAIGN_NAME>.<STATION_NAME>.log

Parameters:
  • product (str) – The DISDRODB product.

  • data_source (str) – The data source name.

  • campaign_name (str) – The campaign name.

  • station_name (str) – The station name.

  • data_archive_dir (str, optional) – The base directory path. Default is None.

  • sample_interval (str, optional) – The sample interval for L2E option. Default is None.

  • rolling (str, optional) – The rolling option for L2E. Default is None.

  • model_name (str, optional) – The model name for L2M. Default is None.

  • list_logs (list, optional) – List of log file paths. If None, the function will list the log files.

Return type:

None

disdrodb.utils.logger.log_debug(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include debug entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.logger.log_error(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include error entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.logger.log_info(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include info entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.logger.log_warning(logger: <Logger asyncio (WARNING)>, msg: str, verbose: bool = False) None[source][source]#

Include warning entry into log.

Parameters:
  • logger (logging.Logger) – Log object.

  • msg (str) – Message.

  • verbose (bool, optional) – Whether to verbose the processing. The default value is False.

disdrodb.utils.manipulations module#

Include functions helping for DISDRODB product manipulations.

disdrodb.utils.manipulations.compute_normalized_dsd_datarray(ds, Nc='Nw', Dc='Dm', d_min=0, d_max=6, d_step=0.001, method='linear')[source][source]#

Compute normalized DSD and remap to regular D/Dc dimension.

disdrodb.utils.manipulations.convert_from_decibel(x)[source][source]#

Convert dB to unit.

disdrodb.utils.manipulations.convert_to_decibel(x)[source][source]#

Convert unit to dB.

disdrodb.utils.manipulations.define_diameter_array(diameter_min=0, diameter_max=10, diameter_spacing=0.05)[source][source]#

Define an array of diameters and their corresponding bin properties.

Parameters:
  • diameter_min (float, optional) – The minimum diameter value. The default value is 0 mm.

  • diameter_max (float, optional) – The maximum diameter value. The default value is 10 mm.

  • diameter_spacing (float, optional) – The spacing between diameter values. The default value is 0.05 mm.

Returns:

A DataArray containing the center of each diameter bin, with coordinates for the bin width, lower bound, upper bound, and center.

Return type:

xarray.DataArray

disdrodb.utils.manipulations.define_diameter_datarray(bounds, dim='diameter_bin_center')[source][source]#

Define diameter DataArray.

disdrodb.utils.manipulations.define_velocity_array(velocity_min=0, velocity_max=10, velocity_spacing=0.05)[source][source]#

Define an array of velocities and their corresponding bin properties.

Parameters:
  • velocity_min (float, optional) – The minimum velocity value. The default value is 0 mm.

  • velocity_max (float, optional) – The maximum velocity value. The default value is 10 mm.

  • velocity_spacing (float, optional) – The spacing between velocity values. The default value is 0.05 mm.

Returns:

A DataArray containing the center of each velocity bin, with coordinates for the bin width, lower bound, upper bound, and center.

Return type:

xarray.DataArray

disdrodb.utils.manipulations.define_velocity_datarray(bounds, dim='velocity_bin_center')[source][source]#

Define velocity DataArray.

disdrodb.utils.manipulations.filter_diameter_bins(ds, minimum_diameter=None, maximum_diameter=None)[source][source]#

Filter the dataset to include only diameter bins within specified bounds.

Parameters:
  • ds (xarray.Dataset) – The dataset containing diameter bin data.

  • minimum_diameter (float, optional) – The minimum diameter to be included, in millimeters. Defaults to the minimum value in ds[“diameter_bin_lower”].

  • maximum_diameter (float, optional) – The maximum diameter to be included, in millimeters. Defaults to the maximum value in ds[“diameter_bin_upper”].

Returns:

The filtered dataset containing only the specified diameter bins.

Return type:

xarray.Dataset

disdrodb.utils.manipulations.filter_velocity_bins(ds, minimum_velocity=None, maximum_velocity=None)[source][source]#

Filter the dataset to include only velocity bins within specified bounds.

Parameters:
  • ds (xarray.Dataset) – The dataset containing velocity bin data.

  • minimum_velocity (float, optional) – The minimum velocity to include in the filter, in meters per second. Defaults to the minimum value in ds[“velocity_bin_lower”].

  • maximum_velocity (float, optional) – The maximum velocity to include in the filter, in meters per second. Defaults to the maximum value in ds[“velocity_bin_upper”].

Returns:

The filtered dataset containing only the specified velocity bins.

Return type:

xarray.Dataset

disdrodb.utils.manipulations.get_diameter_bin_edges(ds)[source][source]#

Retrieve diameter bin edges.

disdrodb.utils.manipulations.get_diameter_coords_dict_from_bin_edges(diameter_bin_edges)[source][source]#

Get dictionary with all relevant diameter coordinates.

disdrodb.utils.manipulations.get_velocity_bin_edges(ds)[source][source]#

Retrieve velocity bin edges.

disdrodb.utils.manipulations.remap_to_diameter(da, d_src, d_dst, dim, new_dim, method='linear')[source][source]#

Remap DataArray from source to destination diameter coordinate.

Parameters:
  • da (xr.DataArray) – DataArray with dimension dim and typically another dim (e.g., time).

  • d_src (xr.DataArray) – Source diameter coordinate (can be 2D, e.g., D/Dm (time, D)). Must share dimensions with da.

  • d_dst (xr.DataArray) – 1D target coordinate.

  • dim (str) – Original diameter dimension.

  • new_dim (str) – Name of output diameter dimension.

  • method ({"linear", "pchip"}) – Interpolation method used for remapping.

Return type:

xr.DataArray

disdrodb.utils.manipulations.resample_density(da_density, d_src, d_dst, dim, new_dim, dD_src, dD_dst, method='log_pchip')[source][source]#

Conservative resampling of density.

Parameters:
  • da_density (xr.DataArray) – Density defined per unit diameter.

  • d_src (xr.DataArray) – Source diameter centers (can be 2D: time, D).

  • d_dst (xr.DataArray) – Destination diameter centers (1D).

  • dim (str) – Source diameter dimension.

  • new_dim (str) – Destination dimension name.

  • dD_src (xr.DataArray) – Source bin widths (same dim as dim).

  • dD_dst (xr.DataArray) – Destination bin widths (same dim as new_dim).

  • method (str or callable) – Remapping strategy used within xr.apply_ufunc. If str, available methods are: - "constant": first-order conservative remapping (piecewise constant in source bins). - "log_pchip": conservative, smooth remapping using PCHIP in log-density space. If callable, it must have signature f(y_src, d_src, d_dst, dD_src, dD_dst) and return remapped destination density.

Returns:

Remapped density conserving total number.

Return type:

xr.DataArray

disdrodb.utils.manipulations.resample_drop_number_concentration(drop_number_concentration, diameter_bin_edges, method='log_pchip')[source][source]#

Resample drop number concentration N(D) DataArray to high resolution diameter bins.

disdrodb.utils.manipulations.unstack_radar_variables(ds)[source][source]#

Unstack radar variables.

disdrodb.utils.pydantic module#

Definition of pydantic validation custom class.

class disdrodb.utils.pydantic.CustomBaseModel[source][source]#

Bases: BaseModel

Custom pydantic BaseModel.

Forbid extra keys. Hide URLs in error message. Simplify error message.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid', 'hide_error_urls': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

disdrodb.utils.pydantic.format_validation_error(validation_error: Exception) str[source][source]#

Format a Pydantic ValidationError for better readability.

disdrodb.utils.routines module#

Utilities for DISDRODB processing routines.

disdrodb.utils.routines.is_possible_product(temporal_resolution, sample_interval)[source][source]#

Assess if production is possible given the requested accumulation interval and source sample_interval.

disdrodb.utils.routines.run_product_generation(product: str, logs_dir: str, logs_filename: str, parallel: bool, verbose: bool, folder_partitioning: str, core_func: callable, core_func_kwargs: dict, pass_logger=False)[source][source]#

Generic wrapper for DISDRODB product generation.

Parameters:
  • product (str) – Product name (e.g., “L0A”, “L0B”, …).

  • logs_dir (str) – Logs directory.

  • logs_filename (str) – Logs filename.

  • parallel (bool) – Parallel flag (for logger).

  • verbose (bool) – Verbose logging flag.

  • folder_partitioning (str) – Partitioning scheme.

  • core_func (callable) – Function with signature core_func(logger) that does the product-specific work. Must return an xarray.Dataset or pandas.DataFrame (used to determine log subdir).

disdrodb.utils.routines.try_get_required_filepaths(product, data_archive_dir, data_source, campaign_name, station_name, debugging_mode, **product_kwargs)[source][source]#

Try to retrieve required filepaths for a product, or return None if unavailable.

disdrodb.utils.subsetting module#

This module contains functions for subsetting and aligning DISDRODB products.

disdrodb.utils.subsetting.align(*args)[source][source]#

Align DISDRODB products over time, velocity and diameter dimensions.

disdrodb.utils.subsetting.is_1d_non_dimensional_coord(xr_obj, coord)[source][source]#

Checks if a coordinate is a 1d, non-dimensional coordinate.

disdrodb.utils.subsetting.isel(xr_obj, indexers=None, drop=False, **indexers_kwargs)[source][source]#

Perform index-based dimension selection.

disdrodb.utils.subsetting.sel(xr_obj, indexers=None, drop=False, method=None, **indexers_kwargs)[source][source]#

Perform value-based coordinate selection.

Slices are treated as inclusive of both the start and stop values, unlike normal Python indexing. The disdrodb sel method is empowered to:

  • slice by disdrodb-id strings !

  • slice by any xarray coordinate value !

You can use string shortcuts for datetime coordinates (e.g., ‘2000-01’ to select all values in January 2000).

disdrodb.utils.time module#

This module contains utilities related to the processing of temporal dataset.

disdrodb.utils.time.ensure_sample_interval_in_seconds(sample_interval)[source][source]#

Ensure the sample interval is in seconds.

Parameters:

sample_interval (int, numpy.ndarray, xarray.DataArray, or numpy.timedelta64) –

The sample interval to be converted to seconds. It can be:

  • An integer representing the interval in seconds.

  • A numpy array or xarray DataArray of integers representing intervals in seconds.

  • A numpy.timedelta64 object representing the interval.

  • A numpy array or xarray DataArray of numpy.timedelta64 objects representing intervals.

Returns:

The sample interval converted to seconds. The return type matches the input type:

  • If the input is an integer, the output is an integer.

  • If the input is a numpy array, the output is a numpy array of integers (unless NaN is present)

  • If the input is an xarray DataArray, the output is an xarray DataArray of integers (unless NaN is present).

Return type:

int, numpy.ndarray, or xarray.DataArray

disdrodb.utils.time.ensure_sorted_by_time(obj, time='time')[source][source]#

Ensure a xarray object or pandas Dataframe is sorted by time.

disdrodb.utils.time.ensure_timedelta_seconds(interval)[source][source]#

Return an a scalar value/array in seconds or timedelta object as numpy.timedelta64 in seconds.

disdrodb.utils.time.get_dataframe_start_end_time(df: DataFrame, time_column='time')[source][source]#

Retrieves dataframe starting and ending time.

Parameters:
  • df (pandas.DataFrame) – Input dataframe

  • time_column (str) – Name of the time column. The default is “time”. The column must be of type datetime.

Returns:

(start_time, end_time) – File start and end time of type pandas.Timestamp.

Return type:

tuple

disdrodb.utils.time.get_dataset_start_end_time(ds: Dataset, time_dim='time')[source][source]#

Retrieves dataset starting and ending time.

Parameters:
  • ds (xarray.Dataset) – Input dataset

  • time_dim (str) – Name of the time dimension. The default is “time”.

Returns:

(start_time, end_time) – File start and end time of type pandas.Timestamp.

Return type:

tuple

disdrodb.utils.time.get_file_start_end_time(obj, time='time')[source][source]#

Retrieves object starting and ending time.

Parameters:
  • obj (xarray.Dataset or pandas.DataFrame) – Input object with time dimension or column respectively.

  • time (str) – Name of the time dimension or column. The default is “time”.

Returns:

(start_time, end_time) – File start and end time of type pandas.Timestamp.

Return type:

tuple

disdrodb.utils.time.get_sampling_information(temporal_resolution)[source][source]#

Extract resampling information from the temporal_resolution string.

Parameters:

temporal_resolution (str) – A string representing the product temporal resolution: e.g., “1H30MIN”, “ROLL1H30MIN”.

Returns:

sample_interval_seconds, rolling – Sample_interval in seconds and whether rolling is enabled.

Return type:

tuple

disdrodb.utils.time.infer_sample_interval(ds, robust=False, verbose=False, logger=None)[source][source]#

Infer the sample interval of a dataset.

Duplicated timesteps are removed before inferring the sample interval.

NOTE: This function is used only for the reader preparation.

disdrodb.utils.time.regularize_dataset(xr_obj, freq: str, time_dim: str = 'time', method: str | None = None, fill_value=None, start_time=None, end_time=None)[source][source]#

Regularize a xarray object across time dimension with uniform resolution.

Parameters:
  • xr_obj (xarray.Dataset or xarray.DataArray) – xarray object with time dimension.

  • time_dim (str, optional) – The time dimension in the xarray object. The default value is "time".

  • freq (str) – The freq string to pass to pd.date_range() to define the new time coordinates. Examples: freq="2min".

  • method (str, optional) – Method to use for filling missing timesteps. If None, fill with fill_value. The default value is None. For other possible methods, see xarray.Dataset.reindex()`.

  • fill_value (float or dict, optional) – Fill value to fill missing timesteps. If not specified, for float variables it uses dtypes.NA while for for integers variables it uses the maximum allowed integer value or, in case of undecoded variables, the _FillValue DataArray attribute.

Returns:

ds_reindexed – Regularized dataset.

Return type:

xarray.Dataset

disdrodb.utils.time.seconds_to_temporal_resolution(seconds)[source][source]#

Convert a duration in seconds to a readable string format (e.g., “1H30”, “1D2H”).

Parameters:

seconds (int) – The time duration in seconds.

Returns:

The duration as a string in a format like “30S”, “1MIN30S”, “1H30MIN”, or “1D2H”.

Return type:

str

disdrodb.utils.time.temporal_resolution_to_seconds(temporal_resolution)[source][source]#

Extract the measurement interval in seconds from the temporal resolution string.

Parameters:

temporal_resolution (str) – A string representing the product measurement interval: e.g., “1H30MIN”, “ROLL1H30MIN”.

Returns:

Duration in seconds.

Return type:

int

disdrodb.utils.warnings module#

Warning utilities.

disdrodb.utils.warnings.suppress_warnings()[source][source]#

Context manager suppressing RuntimeWarnings and UserWarnings.

disdrodb.utils.writer module#

DISDRODB product writers.

disdrodb.utils.writer.finalize_product(ds, product=None) Dataset[source][source]#

Finalize DISDRODB product.

disdrodb.utils.writer.write_product(ds: Dataset, filepath: str, force: bool = False) None[source][source]#

Save the xarray dataset into a NetCDF file.

Parameters:
  • ds (xarray.Dataset) – Input xarray dataset.

  • filepath (str) – Output file path.

  • force (bool, optional) – Whether to overwrite existing data. If True, overwrite existing data in destination directories. If False, raise an error if there are already data in destination directories. This is the default.

disdrodb.utils.xarray module#

Xarray utilities.

disdrodb.utils.xarray.define_dataarray_fill_value(da)[source][source]#

Define the fill value for a numerical xarray.DataArray.

disdrodb.utils.xarray.define_dataarray_fill_value_dictionary(da)[source][source]#

Define fill values for numerical variables and coordinates of a xarray.DataArray.

Return a dict of fill values:
  • floating → NaN

  • integer → ds[var].attrs[“_FillValue”] if present, else np.iinfo(dtype).max

disdrodb.utils.xarray.define_dataset_fill_value_dictionary(ds)[source][source]#

Define fill values for numerical variables and coordinates of a xarray.Dataset.

Return a dict of per-variable fill values:
  • floating –> NaN

  • integer –> ds[var].attrs[“_FillValue”] if present, else the maximum allowed number.

disdrodb.utils.xarray.define_fill_value_dictionary(xr_obj)[source][source]#

Define fill values for numerical variables and coordinates of a xarray object.

Return a dict of per-variable fill values:
  • floating –> NaN

  • integer –> ds[var].attrs[“_FillValue”] if present, else the maximum allowed number.

disdrodb.utils.xarray.remap_numeric_array(arr, remapping_dict, fill_value=nan)[source][source]#

Remap the values of a numeric array.

disdrodb.utils.xarray.remove_diameter_coordinates(xr_obj)[source][source]#

Drop diameter coordinates from xarray object.

disdrodb.utils.xarray.remove_velocity_coordinates(xr_obj)[source][source]#

Drop velocity coordinates from xarray object.

disdrodb.utils.xarray.unstack_datarray_dimension(da, dim, coord_handling='keep', prefix='', suffix='')[source][source]#

Split a DataArray along a specified dimension into a Dataset with separate prefixed and suffixed variables.

Parameters:
  • da (xarray.DataArray) – The DataArray to split.

  • dim (str) – The dimension along which to split the DataArray.

  • coord_handling (str, optional) – Option to handle coordinates sharing the target dimension. Choices are ‘keep’, ‘drop’, or ‘unstack’. Defaults to ‘keep’.

  • prefix (str, optional) – String to prepend to each new variable name.

  • suffix (str, optional) – String to append to each new variable name.

Returns:

A Dataset with each variable split along the specified dimension. The Dataset variables are named “{prefix}{name}{suffix}{dim_value}”. Coordinates sharing the target dimension are handled based on coord_handling.

Return type:

xarray.Dataset

disdrodb.utils.xarray.xr_get_last_valid_idx(da_condition, dim, fill_value=None)[source][source]#

Get the index of the last True value along a specified dimension in an xarray DataArray.

This function finds the last index along the given dimension where the condition is True. If all values are False or NaN along that dimension, the function returns fill_value.

Parameters:
  • da_condition (xarray.DataArray) – A boolean DataArray where True indicates valid or desired values. Should have the dimension specified in dim.

  • dim (str) – The name of the dimension along which to find the last True index.

  • fill_value (int or float) – The fill value when all values are False or NaN along the specified dimension. The default value is dim_size - 1.

Returns:

last_idx – An array containing the index of the last True value along the specified dimension. If all values are False or NaN, the corresponding entry in last_idx will be NaN.

Return type:

xarray.DataArray

Notes

The function works by reversing the DataArray along the specified dimension and using argmax to find the first True value in the reversed array. It then calculates the corresponding index in the original array. To handle cases where all values are False or NaN (and argmax would return 0), the function checks if there is any True value along the dimension and assigns NaN to last_idx where appropriate.

Examples

>>> import xarray as xr
>>> da = xr.DataArray([[False, False, True], [False, False, False]], dims=["time", "my_dimension"])
>>> last_idx = xr_get_last_valid_idx(da, "my_dimension")
>>> print(last_idx)
<xarray.DataArray (time: 2)>
array([2., nan])
Dimensions without coordinates: time

In this example, for the first time step, the last True index is 2. For the second time step, all values are False, so the function returns NaN.

disdrodb.utils.xarray.xr_remap_numeric_array(da, remapping_dict, fill_value=nan)[source][source]#

Remap values of a xr.DataArray.

disdrodb.utils.yaml module#

YAML utility.

disdrodb.utils.yaml.read_yaml(filepath: str) dict[source][source]#

Read a YAML file into a dictionary.

Parameters:

filepath (str) – Input YAML file path.

Returns:

Dictionary with the attributes read from the YAML file.

Return type:

dict

disdrodb.utils.yaml.write_yaml(dictionary, filepath, sort_keys=False)[source][source]#

Write a dictionary into a YAML file.

Parameters:

dictionary (dict) – Dictionary to write into a YAML file.

Module contents#

DISDRODB Utils Module.