Source code for disdrodb.api.io

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Routines to list and open DISDRODB products."""

import datetime
import functools
import os
import subprocess
import sys
from pathlib import Path

import numpy as np

from disdrodb.api.checks import (
    check_filepaths,
    check_start_end_time,
    check_time,
    get_current_utc_time,
)
from disdrodb.api.info import get_start_end_time_from_filepaths, group_filepaths
from disdrodb.api.path import (
    define_campaign_dir,
    define_data_dir,
    define_metadata_dir,
    define_station_dir,
)
from disdrodb.l0.l0_reader import define_readers_directory
from disdrodb.utils.dict import extract_product_kwargs
from disdrodb.utils.directories import list_files, remove_file_or_directories
from disdrodb.utils.logger import (
    log_info,
)

####----------------------------------------------------------------------------------
#### DISDRODB Search Product Files


[docs] def filter_filepaths(filepaths, debugging_mode): """Filter out filepaths if ``debugging_mode=True``.""" if debugging_mode: max_files = min(3, len(filepaths)) filepaths = filepaths[0:max_files] return filepaths
[docs] def is_within_time_period(l_start_time, l_end_time, start_time, end_time): """Assess which files are within the start and end time.""" # - Case 1 # s e # | | # ---------> (-------->) idx_select1 = np.logical_and(l_start_time <= start_time, l_end_time > start_time) # - Case 2 # s e # | | # ---------(-.) idx_select2 = np.logical_and(l_start_time >= start_time, l_end_time <= end_time) # - Case 3 # s e # | | # ------------- idx_select3 = np.logical_and(l_start_time < end_time, l_end_time > end_time) # - Get idx where one of the cases occur idx_select = np.logical_or.reduce([idx_select1, idx_select2, idx_select3]) return idx_select
[docs] def filter_by_time(filepaths, start_time=None, end_time=None): """Filter filepaths by start_time and end_time. Parameters ---------- filepaths : list List of filepaths. start_time : datetime.datetime Start time. If ``None``, will be set to 1997-01-01. end_time : datetime.datetime End time. If ``None`` will be set to current UTC time. Returns ------- filepaths : list List of valid filepaths. If no valid filepaths, returns an empty list ! """ # -------------------------------------------------------------------------. # Check filepaths if isinstance(filepaths, type(None)): return [] filepaths = check_filepaths(filepaths) if len(filepaths) == 0: return [] # -------------------------------------------------------------------------. # Check start_time and end_time if start_time is None: start_time = datetime.datetime(1978, 1, 1, 0, 0, 0) # Dummy start if end_time is None: end_time = get_current_utc_time() # Current time start_time, end_time = check_start_end_time(start_time, end_time) # -------------------------------------------------------------------------. # - Retrieve files start_time and end_time l_start_time, l_end_time = get_start_end_time_from_filepaths(filepaths) # -------------------------------------------------------------------------. # Select granules with data within the start and end time idx_select = is_within_time_period(l_start_time, l_end_time, start_time=start_time, end_time=end_time) return np.array(filepaths)[idx_select].tolist()
[docs] def find_files( data_source, campaign_name, station_name, product, debugging_mode: bool = False, data_archive_dir: str | None = None, metadata_archive_dir: str | None = None, glob_pattern=None, start_time=None, end_time=None, **product_kwargs, ): """Retrieve DISDRODB product files for a give station. Parameters ---------- data_source : str The name of the institution (for campaigns spanning multiple countries) or the name of the country (for campaigns or sensor networks within a single country). Must be provided in UPPER CASE. campaign_name : str The name of the campaign. Must be provided in UPPER CASE. station_name : str The name of the station. product : str The name DISDRODB product. debugging_mode : bool, optional If ``True``, it select maximum 3 files for debugging purposes. The default value is ``False``. data_archive_dir : str, optional The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``. If not specified, the path specified in the DISDRODB active configuration will be used. glob_pattern: str, optional Glob pattern to search for raw data files. The default is "*". The argument is used only if product="RAW". Other Parameters ---------------- temporal_resolution : str, optional The temporal resolution of the product (e.g., "1MIN", "10MIN", "1H"). It must be specified only for product L1, L2E and L2M ! model_name : str The model name of the statistical distribution for the DSD. It must be specified only for product L2M ! Returns ------- filepaths : list List of file paths. """ from disdrodb.metadata import read_station_metadata # Retrieve data directory data_dir = define_data_dir( data_archive_dir=data_archive_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, product=product, # Product options **product_kwargs, ) # For the DISDRODB RAW product, retrieve glob_pattern from metadata if not specified if product == "RAW" and glob_pattern is None: metadata = read_station_metadata( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) glob_pattern = metadata.get("raw_data_glob_pattern", "") # For the others DISDRODB products, define the correct glob pattern if product != "RAW": glob_pattern = "*.parquet" if product == "L0A" else "*.nc" # Retrieve files filepaths = list_files(data_dir, glob_pattern=glob_pattern, recursive=True) # Filter out filepaths if debugging_mode=True filepaths = filter_filepaths(filepaths, debugging_mode=debugging_mode) # If no file available, raise error if len(filepaths) == 0: msg = f"No {product} files are available in {data_dir}. Run {product} processing first." raise ValueError(msg) # Filter files by start_time and end_time if product != "RAW": filepaths = filter_by_time(filepaths=filepaths, start_time=start_time, end_time=end_time) if len(filepaths) == 0: msg = f"No {product} files are available between {start_time} and {end_time}." raise ValueError(msg) # Sort filepaths filepaths = sorted(filepaths) return filepaths
####---------------------------------------------------------------------------------- #### DISDRODB Open Product Files def _open_raw_files(filepaths, data_source, campaign_name, station_name, metadata_archive_dir): """Open raw files to DISDRODB L0A or L0B format. Raw text files are opened into a DISDRODB L0A pandas Dataframe. Raw netCDF files are opened into a DISDRODB L0B xarray Dataset. """ from disdrodb.issue import read_station_issue from disdrodb.l0 import generate_l0a, generate_l0b_from_nc, get_station_reader from disdrodb.metadata import read_station_metadata # Read station metadata metadata = read_station_metadata( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) sensor_name = metadata["sensor_name"] # Read station issue YAML file try: issue_dict = read_station_issue( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) except Exception: issue_dict = None # Get reader reader = get_station_reader( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) # Return DISDRODB L0A dataframe if raw text files if metadata["raw_data_format"] == "txt": df = generate_l0a( filepaths=filepaths, reader=reader, sensor_name=sensor_name, issue_dict=issue_dict, verbose=False, ) return df # Return DISDRODB L0B dataframe if raw netCDF files ds = generate_l0b_from_nc( filepaths=filepaths, reader=reader, sensor_name=sensor_name, metadata=metadata, issue_dict=issue_dict, verbose=False, ) return ds
[docs] def list_coordinates_names(ds): """List coordinates of a xarray.Dataset not CF decoded !.""" coords = set() for v in ds.variables: attrs = ds[v].attrs # auxiliary coordinates if "coordinates" in attrs: coords |= set(attrs["coordinates"].split()) # bounds variables if "bounds" in attrs: coords.add(attrs["bounds"]) # grid mapping if "grid_mapping" in attrs: coords.add(attrs["grid_mapping"]) return coords
[docs] def subset_variables(ds, variables): """Subset variables while keeping coordinates.""" # Ensure list variables = list(variables) # Always keep dimension variables dim_vars = list(ds.dims) # Variables referenced by CF relationships coords = list_coordinates_names(ds) # Union of everything we must keep keep = set(variables) | set(dim_vars) | coords # Only keep variables that exist keep = [v for v in keep if v in list(ds.variables)] return ds[keep]
[docs] def filter_dataset_by_time(ds, start_time=None, end_time=None): """Subset an xarray.Dataset by time, robust to duplicated/non-monotonic indices. NOTE: ds.sel(time=slice(start_time, end_time)) fails in presence of duplicated timesteps because time 'index is not monotonic increasing or decreasing'. Parameters ---------- ds : xarray.Dataset Dataset with a `time` coordinate. start_time : str, numpy.datetime64 or None Inclusive start bound. If None, no lower bound is applied. end_time : str, numpy.datetime64 or None Inclusive end bound. If None, no upper bound is applied. Returns ------- xarray.Dataset Subset dataset with the same ordering of timesteps (duplicates preserved). """ time = ds["time"].to_numpy() mask = np.ones(time.shape, dtype=bool) if start_time is not None: mask &= time >= np.array(check_time(start_time), dtype="datetime64[ns]") if end_time is not None: mask &= time <= np.array(check_time(end_time), dtype="datetime64[ns]") return ds.isel(time=np.where(mask)[0])
[docs] def open_parquet_files( filepaths, variables=None, start_time=None, end_time=None, time_col="time", use_threads=True, ): """Open Parquet files.""" import pyarrow.dataset as ds # Open dataset dataset = ds.dataset( filepaths, format="parquet", ) # Define filters filters = [] if start_time is not None: start_time = check_time(start_time) filters.append(ds.field(time_col) >= start_time) if end_time is not None: end_time = check_time(end_time) filters.append(ds.field(time_col) <= end_time) # Combine filters if any exist filter_expr = None if filters: filter_expr = filters[0] for f in filters[1:]: filter_expr = filter_expr & f # Read table and convert to pandas df = dataset.to_table( columns=variables, filter=filter_expr, use_threads=use_threads, ).to_pandas() return df
[docs] def ensure_safe_open_mfdataset(function): """Decorator to ensure safe xarray open_mfdataset. parallel argument is changed to False if: - dask threading or single-threaded is active - distributed multiprocessing with more than 1 thread per process parallel argument is allowed to be True only if: - distributed multiprocessing with only 1 thread per process """ import dask from disdrodb.utils.dask import check_parallel_validity @functools.wraps(function) def wrapper(*args, **kwargs): # Check if it must be a delayed function parallel = kwargs.get("parallel", False) parallel = check_parallel_validity(parallel) kwargs["parallel"] = parallel # If parallel is True at this stage, means being using # multiprocessing or dask.distributed with single thread if parallel: return function(*args, **kwargs) # Call function with single threading with dask.config.set(scheduler="single-threaded"): # "synchronous" result = function(*args, **kwargs) return result return wrapper
[docs] @ensure_safe_open_mfdataset def open_netcdf_files( filepaths, chunks=-1, start_time=None, end_time=None, variables=None, parallel=False, compute=True, engine="netcdf4", **open_kwargs, ): """Open DISDRODB NetCDF files using xarray. This function opens and concatenates multiple NetCDF files using ``xarray.open_mfdataset`` with settings optimized for time-based concatenation and minimal variable checking. The function uses ``data_vars="minimal"``, ``coords="minimal"``, and ``compat="override"`` to: - Concatenate only variables that depend on the time dimension. - Skip consistency checks for variables without a time dimension, taking them from the first file instead. See: https://github.com/pydata/xarray/issues/1385#issuecomment-1958761334 Using ``combine="nested"`` and ``join="outer"`` ensures that duplicated timesteps are preserved and not overwritten. Behavior depends on ``decode_cf``: - If ``decode_cf=False``: ``lat`` and ``lon`` are treated as data variables and concatenated without validation. - If ``decode_cf=True``: ``lat`` and ``lon`` are promoted to coordinates and checked for equality across files. Special handling of ``sample_interval``: - For L0B products, if ``sample_interval`` varies with time, it is safely concatenated. - For L0C products, if ``sample_interval`` differs across files, only the value from the first file is retained. - For L1 and L2 processing, only files with identical ``sample_interval`` values should be passed to this function. Parameters ---------- filepaths : str or sequence of str Path(s) to NetCDF files to open. chunks : int, dict, or None, optional Chunking strategy passed to xarray for dask-backed arrays. Use ``-1`` to load data into a single chunk (default). start_time : str or datetime-like or None, optional Start time for temporal subsetting. end_time : str or datetime-like or None, optional End time for temporal subsetting. variables : sequence of str or None, optional Subset of variables to retain. parallel : bool, optional Whether to open files in parallel using dask. The default is ``False``. compute : bool, optional Whether to immediately compute the dataset when using dask. The default is ``True``. engine : str, optional Backend engine used by xarray to read NetCDF files. The default is "netcdf4". **open_kwargs Additional keyword arguments passed to ``xarray.open_mfdataset``. Returns ------- xarray.Dataset The opened and concatenated dataset. See Also -------- xarray.open_mfdataset Notes ----- This function is decorated with ``ensure_safe_open_mfdataset`` to protect against unsafe or incompatible combinations of arguments. """ import xarray as xr # Ensure variables is a list if variables is not None: if isinstance(variables, str): variables = [variables] variables = np.unique(variables).tolist() # Define preprocessing function for parallel opening if parallel and variables is not None: def preprocess(ds): return subset_variables(ds, variables) else: preprocess = None # Open netcdf xr.set_options(use_new_combine_kwarg_defaults=True) ds = xr.open_mfdataset( filepaths, chunks=chunks, combine="nested", concat_dim="time", data_vars="minimal", # ["sample_interval"], "all" would concat all across time coords="minimal", join="outer", # "exact" compat="override", # "no_conflicts" slows down combine_attrs="override", preprocess=preprocess, # only if parallel=True engine=engine, parallel=parallel, decode_cf=False, # assume encoding do not vary across files (e.g. "time" units) decode_coords=False, # no effect if decode_cf=False decode_timedelta=False, cache=False, autoclose=True, **open_kwargs, ) # Decode CF # - Set to coordinates the variables # - latitude/longitude/altitude # - sample_interval # - diameter/velocity bin width/upper/lower ds = xr.decode_cf(ds, decode_times=True, decode_coords=True, decode_timedelta=False) # Subset variables # --> After decoding CF, when coordinates are properly set # --> Othewerwise, coordinate variables would be removed unless listed in variables if variables is not None and preprocess is None: variables = [var for var in variables if var in ds] ds = ds[variables] # Subset time if start_time is not None or end_time is not None: ds = filter_dataset_by_time(ds, start_time=start_time, end_time=end_time) # Ensure coordinates are already loaded in memory for coord in list(ds.coords): ds[coord] = ds[coord].load() # Update time coverage attributes ds.attrs["time_coverage_start"] = str(ds.disdrodb.start_time) ds.attrs["time_coverage_end"] = str(ds.disdrodb.end_time) # If compute=True, load in memory and close connections to files if compute: dataset = ds.compute() ds.close() dataset.close() del ds else: dataset = ds return dataset
[docs] def open_dataset( data_source, campaign_name, station_name, product, product_kwargs=None, debugging_mode: bool = False, data_archive_dir: str | None = None, metadata_archive_dir: str | None = None, chunks=-1, parallel=False, compute=False, start_time=None, end_time=None, variables=None, **open_kwargs, ): """Retrieve DISDRODB product files for a give station. Parameters ---------- data_source : str The name of the institution (for campaigns spanning multiple countries) or the name of the country (for campaigns or sensor networks within a single country). Must be provided in UPPER CASE. campaign_name : str The name of the campaign. Must be provided in UPPER CASE. station_name : str The name of the station. product : str The name DISDRODB product. debugging_mode : bool, optional If ``True``, it select maximum 3 files for debugging purposes. The default value is ``False``. data_archive_dir : str, optional The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``. If not specified, the path specified in the DISDRODB active configuration will be used. **product_kwargs : optional DISDRODB product options It must be specified only for product L1, L2E and L2M products ! For L1, L2E and L2M products, temporal_resolution is required FOr L2M product, model_name is required. **open_kwargs : optional Additional keyword arguments passed to ``xarray.open_mfdataset()``. Returns ------- xarray.Dataset """ import xarray as xr # Extract product kwargs from open_kwargs product_kwargs = extract_product_kwargs(open_kwargs, product=product) # List product files filepaths = find_files( data_archive_dir=data_archive_dir, metadata_archive_dir=metadata_archive_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, product=product, debugging_mode=debugging_mode, start_time=start_time, end_time=end_time, **product_kwargs, ) # Open RAW files # - For raw txt files return DISDRODB L0A dataframe # - For raw netCDF files return DISDRODB L0B dataframe if product == "RAW": obj = _open_raw_files( filepaths=filepaths, data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) return obj # Open L0A Parquet files if product == "L0A": df = open_parquet_files( filepaths=filepaths, variables=variables, start_time=start_time, end_time=end_time, use_threads=parallel, ) return df # Open DISDRODB netCDF files using xarray # - Special handling for L0C product with possible multiple sample intervals if product == "L0C": dict_sample_intervals = group_filepaths(filepaths, groups="sample_interval") if len(dict_sample_intervals) > 1: # Open separately each sample interval list_ds = [ open_netcdf_files( filepaths=filepaths, chunks=chunks, start_time=start_time, end_time=end_time, variables=variables, parallel=parallel, compute=compute, **open_kwargs, ) for filepaths in dict_sample_intervals.values() ] # Expand sample_interval coordinate for each dataset list_ds = [ds.assign_coords(sample_interval=ds.sample_interval.expand_dims(time=ds.time)) for ds in list_ds] # Concatenate along time dimension and sort by time ds = xr.concat(list_ds, dim="time") ds.attrs["measurement_interval"] = list(dict_sample_intervals) ds = ds.sortby("time") # Update time coverage attributes ds.attrs["time_coverage_start"] = str(ds.disdrodb.start_time) ds.attrs["time_coverage_end"] = str(ds.disdrodb.end_time) return ds # Otherwise, open all files together ds = open_netcdf_files( filepaths=filepaths, chunks=chunks, start_time=start_time, end_time=end_time, variables=variables, parallel=parallel, compute=compute, **open_kwargs, ) return ds
####---------------------------------------------------------------------------------- #### DISDRODB Remove Product Files
[docs] def remove_product( data_archive_dir, product, data_source, campaign_name, station_name, logger=None, verbose=True, **product_kwargs, ): """Remove all product files of a specific station.""" if product.upper() == "RAW": raise ValueError("Removal of 'RAW' files is not allowed.") data_dir = define_data_dir( data_archive_dir=data_archive_dir, product=product, data_source=data_source, campaign_name=campaign_name, station_name=station_name, **product_kwargs, ) log_info(logger=logger, msg="Removal of {product} files started.", verbose=verbose) remove_file_or_directories(data_dir) log_info(logger=logger, msg="Removal of {product} files ended.", verbose=verbose)
####--------------------------------------------------------------------------. #### Open directories
[docs] def open_file_explorer(path): """Open the native file-browser showing 'path'.""" p = Path(path).resolve() if not p.exists(): raise FileNotFoundError(f"{p} does not exist") if sys.platform.startswith("win"): # Windows os.startfile(str(p)) elif sys.platform == "darwin": # macOS subprocess.run(["open", str(p)], check=False) else: # Linux (most desktop environments) subprocess.run(["xdg-open", str(p)], check=False)
[docs] def open_logs_directory( data_source, campaign_name, station_name=None, # noqa data_archive_dir=None, ): """Open the DISDRODB Data Archive logs directory of a station.""" from disdrodb.configs import get_data_archive_dir data_archive_dir = get_data_archive_dir(data_archive_dir) campaign_dir = define_campaign_dir( archive_dir=data_archive_dir, product="L0A", data_source=data_source, campaign_name=campaign_name, check_exists=True, ) logs_dir = os.path.join(campaign_dir, "logs") open_file_explorer(logs_dir)
[docs] def open_product_directory( product, data_source, campaign_name, station_name, data_archive_dir=None, ): """Open the DISDRODB Data Archive station product directory.""" from disdrodb.configs import get_data_archive_dir data_archive_dir = get_data_archive_dir(data_archive_dir) station_dir = define_station_dir( data_archive_dir=data_archive_dir, product=product, data_source=data_source, campaign_name=campaign_name, station_name=station_name, check_exists=True, ) open_file_explorer(station_dir)
[docs] def open_metadata_directory( data_source, campaign_name, station_name=None, # noqa metadata_archive_dir=None, ): """Open the DISDRODB Metadata Archive station(s) metadata directory.""" from disdrodb.configs import get_metadata_archive_dir metadata_archive_dir = get_metadata_archive_dir(metadata_archive_dir) metadata_dir = define_metadata_dir( metadata_archive_dir=metadata_archive_dir, data_source=data_source, campaign_name=campaign_name, check_exists=True, ) open_file_explorer(metadata_dir)
[docs] def open_readers_directory(): """Open the disdrodb software readers directory.""" readers_directory = define_readers_directory() open_file_explorer(readers_directory)
[docs] def open_products_options(): """Open the disdrodb product options directory.""" from disdrodb.configs import get_products_configs_dir products_configs_dir = get_products_configs_dir() open_file_explorer(products_configs_dir)
[docs] def open_metadata_archive( metadata_archive_dir=None, ): """Open the DISDRODB Metadata Archive.""" from disdrodb.configs import get_metadata_archive_dir metadata_archive_dir = get_metadata_archive_dir(metadata_archive_dir) open_file_explorer(metadata_archive_dir)
[docs] def open_data_archive( data_archive_dir=None, ): """Open the DISDRODB Data Archive.""" from disdrodb.configs import get_data_archive_dir data_archive_dir = get_data_archive_dir(data_archive_dir) open_file_explorer(data_archive_dir)