Source code for disdrodb.api.io

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Routines to list and open DISDRODB products."""

import datetime
import functools
import os
import subprocess
import sys
from pathlib import Path

import numpy as np

from disdrodb.api.checks import (
    check_filepaths,
    check_start_end_time,
    check_time,
    get_current_utc_time,
)
from disdrodb.api.info import get_start_end_time_from_filepaths, group_filepaths
from disdrodb.api.path import (
    define_campaign_dir,
    define_data_dir,
    define_metadata_dir,
    define_station_dir,
)
from disdrodb.l0.l0_reader import define_readers_directory
from disdrodb.utils.dict import extract_product_kwargs
from disdrodb.utils.directories import list_files, remove_file_or_directories
from disdrodb.utils.logger import (
    log_info,
)

####----------------------------------------------------------------------------------
#### DISDRODB Search Product Files


[docs] def filter_filepaths(filepaths, debugging_mode): """Filter out filepaths if ``debugging_mode=True``.""" if debugging_mode: max_files = min(3, len(filepaths)) filepaths = filepaths[0:max_files] return filepaths
[docs] def is_within_time_period(l_start_time, l_end_time, start_time, end_time): """Assess which files are within the start and end time.""" # - Case 1 # s e # | | # ---------> (-------->) idx_select1 = np.logical_and(l_start_time <= start_time, l_end_time > start_time) # - Case 2 # s e # | | # ---------(-.) idx_select2 = np.logical_and(l_start_time >= start_time, l_end_time <= end_time) # - Case 3 # s e # | | # ------------- idx_select3 = np.logical_and(l_start_time < end_time, l_end_time > end_time) # - Get idx where one of the cases occur idx_select = np.logical_or.reduce([idx_select1, idx_select2, idx_select3]) return idx_select
[docs] def filter_by_time(filepaths, start_time=None, end_time=None): """Filter filepaths by start_time and end_time. Parameters ---------- filepaths : list List of filepaths. start_time : datetime.datetime Start time. If ``None``, will be set to 1997-01-01. end_time : datetime.datetime End time. If ``None`` will be set to current UTC time. Returns ------- filepaths : list List of valid filepaths. If no valid filepaths, returns an empty list ! """ # -------------------------------------------------------------------------. # Check filepaths if isinstance(filepaths, type(None)): return [] filepaths = check_filepaths(filepaths) if len(filepaths) == 0: return [] # -------------------------------------------------------------------------. # Check start_time and end_time if start_time is None: start_time = datetime.datetime(1978, 1, 1, 0, 0, 0) # Dummy start if end_time is None: end_time = get_current_utc_time() # Current time start_time, end_time = check_start_end_time(start_time, end_time) # -------------------------------------------------------------------------. # - Retrieve files start_time and end_time l_start_time, l_end_time = get_start_end_time_from_filepaths(filepaths) # -------------------------------------------------------------------------. # Select granules with data within the start and end time idx_select = is_within_time_period(l_start_time, l_end_time, start_time=start_time, end_time=end_time) return np.array(filepaths)[idx_select].tolist()
[docs] def find_files( data_source, campaign_name, station_name, product, debugging_mode: bool = False, data_archive_dir: str | None = None, metadata_archive_dir: str | None = None, glob_pattern=None, start_time=None, end_time=None, **product_kwargs, ): """Retrieve DISDRODB product files for a give station. Parameters ---------- data_source : str The name of the institution (for campaigns spanning multiple countries) or the name of the country (for campaigns or sensor networks within a single country). Must be provided in UPPER CASE. campaign_name : str The name of the campaign. Must be provided in UPPER CASE. station_name : str The name of the station. product : str The name DISDRODB product. debugging_mode : bool, optional If ``True``, it select maximum 3 files for debugging purposes. The default value is ``False``. data_archive_dir : str, optional The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``. If not specified, the path specified in the DISDRODB active configuration will be used. glob_pattern: str, optional Glob pattern to search for raw data files. The default is "*". The argument is used only if product="RAW". Other Parameters ---------------- temporal_resolution : str, optional The temporal resolution of the product (e.g., "1MIN", "10MIN", "1H"). It must be specified only for product L1, L2E and L2M ! model_name : str The model name of the statistical distribution for the DSD. It must be specified only for product L2M ! Returns ------- filepaths : list List of file paths. """ from disdrodb.metadata import read_station_metadata # Retrieve data directory data_dir = define_data_dir( data_archive_dir=data_archive_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, product=product, # Product options **product_kwargs, ) # For the DISDRODB RAW product, retrieve glob_pattern from metadata if not specified if product == "RAW" and glob_pattern is None: metadata = read_station_metadata( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) glob_pattern = metadata.get("raw_data_glob_pattern", "") # For the others DISDRODB products, define the correct glob pattern if product != "RAW": glob_pattern = "*.parquet" if product == "L0A" else "*.nc" # Retrieve files filepaths = list_files(data_dir, glob_pattern=glob_pattern, recursive=True) # Filter out filepaths if debugging_mode=True filepaths = filter_filepaths(filepaths, debugging_mode=debugging_mode) # If no file available, raise error if len(filepaths) == 0: msg = f"No {product} files are available in {data_dir}. Run {product} processing first." raise ValueError(msg) # Filter files by start_time and end_time if product != "RAW": filepaths = filter_by_time(filepaths=filepaths, start_time=start_time, end_time=end_time) if len(filepaths) == 0: msg = f"No {product} files are available between {start_time} and {end_time}." raise ValueError(msg) # Sort filepaths filepaths = sorted(filepaths) return filepaths
####---------------------------------------------------------------------------------- #### DISDRODB Open Product Files def _open_raw_files(filepaths, data_source, campaign_name, station_name, metadata_archive_dir): """Open raw files to DISDRODB L0A or L0B format. Raw text files are opened into a DISDRODB L0A pandas Dataframe. Raw netCDF files are opened into a DISDRODB L0B xarray Dataset. """ from disdrodb.issue import read_station_issue from disdrodb.l0 import generate_l0a, generate_l0b_from_nc, get_station_reader from disdrodb.metadata import read_station_metadata # Read station metadata metadata = read_station_metadata( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) sensor_name = metadata["sensor_name"] # Read station issue YAML file try: issue_dict = read_station_issue( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) except Exception: issue_dict = None # Get reader reader = get_station_reader( data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) # Return DISDRODB L0A dataframe if raw text files if metadata["raw_data_format"] == "txt": df = generate_l0a( filepaths=filepaths, reader=reader, sensor_name=sensor_name, issue_dict=issue_dict, verbose=False, ) return df # Return DISDRODB L0B dataframe if raw netCDF files ds = generate_l0b_from_nc( filepaths=filepaths, reader=reader, sensor_name=sensor_name, metadata=metadata, issue_dict=issue_dict, verbose=False, ) return ds
[docs] def list_coordinates_names(ds): """List coordinates of a xarray.Dataset not CF decoded !.""" coords = set() for v in ds.variables: attrs = ds[v].attrs # auxiliary coordinates if "coordinates" in attrs: coords |= set(attrs["coordinates"].split()) # bounds variables if "bounds" in attrs: coords.add(attrs["bounds"]) # grid mapping if "grid_mapping" in attrs: coords.add(attrs["grid_mapping"]) return coords
[docs] def subset_variables(ds, variables): """Subset variables while keeping coordinates.""" # Ensure list variables = list(variables) # Always keep dimension variables dim_vars = list(ds.dims) # Variables referenced by CF relationships coords = list_coordinates_names(ds) # Union of everything we must keep keep = set(variables) | set(dim_vars) | coords # Only keep variables that exist keep = [v for v in keep if v in list(ds.variables)] return ds[keep]
[docs] def filter_dataset_by_time(ds, start_time=None, end_time=None): """Subset an xarray.Dataset by time, robust to duplicated/non-monotonic indices. NOTE: ds.sel(time=slice(start_time, end_time)) fails in presence of duplicated timesteps because time 'index is not monotonic increasing or decreasing'. Parameters ---------- ds : xr.Dataset Dataset with a `time` coordinate. start_time : np.datetime64 or None Inclusive start bound. If None, no lower bound is applied. end_time : np.datetime64 or None Inclusive end bound. If None, no upper bound is applied. Returns ------- xr.Dataset Subset dataset with the same ordering of timesteps (duplicates preserved). """ time = ds["time"].to_numpy() mask = np.ones(time.shape, dtype=bool) if start_time is not None: mask &= time >= np.array(start_time, dtype="datetime64[ns]") if end_time is not None: mask &= time <= np.array(end_time, dtype="datetime64[ns]") return ds.isel(time=np.where(mask)[0])
[docs] def open_parquet_files( filepaths, variables=None, start_time=None, end_time=None, time_col="time", use_threads=True, ): """Open Parquet files.""" import pyarrow.dataset as ds # Open dataset dataset = ds.dataset( filepaths, format="parquet", ) # Define filters filters = [] if start_time is not None: start_time = check_time(start_time) filters.append(ds.field(time_col) >= start_time) if end_time is not None: end_time = check_time(end_time) filters.append(ds.field(time_col) <= end_time) # Combine filters if any exist filter_expr = None if filters: filter_expr = filters[0] for f in filters[1:]: filter_expr = filter_expr & f # Read table and convert to pandas df = dataset.to_table( columns=variables, filter=filter_expr, use_threads=use_threads, ).to_pandas() return df
[docs] def ensure_safe_open_mfdataset(function): """Decorator to ensure safe xarray open_mfdataset. parallel argument is changed to False if: - dask threading or single-threaded is active - distributed multiprocessing with more than 1 thread per process parallel argument is allowed to be True only if: - distributed multiprocessing with only 1 thread per process """ import dask from disdrodb.utils.dask import check_parallel_validity @functools.wraps(function) def wrapper(*args, **kwargs): # Check if it must be a delayed function parallel = kwargs.get("parallel", False) parallel = check_parallel_validity(parallel) kwargs["parallel"] = parallel # If parallel is True at this stage, means being using # multiprocessing or dask.distributed with single thread if parallel: return function(*args, **kwargs) # Call function with single threading with dask.config.set(scheduler="single-threaded"): # "synchronous" result = function(*args, **kwargs) return result return wrapper
[docs] @ensure_safe_open_mfdataset def open_netcdf_files( filepaths, chunks=-1, start_time=None, end_time=None, variables=None, parallel=False, compute=True, engine="netcdf4", **open_kwargs, ): """Open DISDRODB netCDF files using xarray. Using data_vars="minimal", coords="minimal", compat="override" --> will only concatenate those variables with the time dimension, --> will skip any checking for variables that don't have a time dimension (simply pick the variable from the first file). https://github.com/pydata/xarray/issues/1385#issuecomment-1958761334 Using combine="nested" and join="outer" ensure that duplicated timesteps are not overwritten! When decode_cf=False --> lat,lon are data_vars and get concatenated without any checking or reading When decode_cf=True --> lat, lon are promoted to coords, then get checked for equality across all files For L0B product, if sample_interval variable is present and varies with time, this function concatenate the variable over time without problems. For L0C product, if sample_interval changes across listed files, only sample_interval of first file is reported. --> open_dataset take care of just providing filepaths of files with same sample interval. In L1 and L2 processing, only filepaths of files with same sample interval must be passed to this function. """ import xarray as xr # Ensure variables is a list if variables is not None: if isinstance(variables, str): variables = [variables] variables = np.unique(variables).tolist() # Define preprocessing function for parallel opening if parallel and variables is not None: def preprocess(ds): return subset_variables(ds, variables) else: preprocess = None # Open netcdf xr.set_options(use_new_combine_kwarg_defaults=True) ds = xr.open_mfdataset( filepaths, chunks=chunks, combine="nested", concat_dim="time", data_vars="minimal", # ["sample_interval"], "all" would concat all across time coords="minimal", join="outer", # "exact" compat="override", # "no_conflicts" slows down combine_attrs="override", preprocess=preprocess, # only if parallel=True engine=engine, parallel=parallel, decode_cf=False, # assume encoding do not vary across files (e.g. "time" units) decode_coords=False, # no effect if decode_cf=False decode_timedelta=False, cache=False, autoclose=True, **open_kwargs, ) # Decode CF # - Set to coordinates the variables # - latitude/longitude/altitude # - sample_interval # - diameter/velocity bin width/upper/lower ds = xr.decode_cf(ds, decode_times=True, decode_coords=True, decode_timedelta=False) # Subset variables # --> After decoding CF, when coordinates are properly set # --> Othewerwise, coordinate variables would be removed unless listed in variables if variables is not None and preprocess is None: variables = [var for var in variables if var in ds] ds = ds[variables] # Subset time if start_time is not None or end_time is not None: ds = filter_dataset_by_time(ds, start_time=start_time, end_time=end_time) # Ensure coordinates are already loaded in memory for coord in list(ds.coords): ds[coord] = ds[coord].load() # Update time coverage attributes ds.attrs["time_coverage_start"] = str(ds.disdrodb.start_time) ds.attrs["time_coverage_end"] = str(ds.disdrodb.end_time) # If compute=True, load in memory and close connections to files if compute: dataset = ds.compute() ds.close() dataset.close() del ds else: dataset = ds return dataset
[docs] def open_dataset( data_source, campaign_name, station_name, product, product_kwargs=None, debugging_mode: bool = False, data_archive_dir: str | None = None, metadata_archive_dir: str | None = None, chunks=-1, parallel=False, compute=False, start_time=None, end_time=None, variables=None, **open_kwargs, ): """Retrieve DISDRODB product files for a give station. Parameters ---------- data_source : str The name of the institution (for campaigns spanning multiple countries) or the name of the country (for campaigns or sensor networks within a single country). Must be provided in UPPER CASE. campaign_name : str The name of the campaign. Must be provided in UPPER CASE. station_name : str The name of the station. product : str The name DISDRODB product. debugging_mode : bool, optional If ``True``, it select maximum 3 files for debugging purposes. The default value is ``False``. data_archive_dir : str, optional The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``. If not specified, the path specified in the DISDRODB active configuration will be used. **product_kwargs : optional DISDRODB product options It must be specified only for product L1, L2E and L2M products ! For L1, L2E and L2M products, temporal_resolution is required FOr L2M product, model_name is required. **open_kwargs : optional Additional keyword arguments passed to ``xarray.open_mfdataset()``. Returns ------- xarray.Dataset """ import xarray as xr # Extract product kwargs from open_kwargs product_kwargs = extract_product_kwargs(open_kwargs, product=product) # List product files filepaths = find_files( data_archive_dir=data_archive_dir, metadata_archive_dir=metadata_archive_dir, data_source=data_source, campaign_name=campaign_name, station_name=station_name, product=product, debugging_mode=debugging_mode, start_time=start_time, end_time=end_time, **product_kwargs, ) # Open RAW files # - For raw txt files return DISDRODB L0A dataframe # - For raw netCDF files return DISDRODB L0B dataframe if product == "RAW": obj = _open_raw_files( filepaths=filepaths, data_source=data_source, campaign_name=campaign_name, station_name=station_name, metadata_archive_dir=metadata_archive_dir, ) return obj # Open L0A Parquet files if product == "L0A": df = open_parquet_files( filepaths=filepaths, variables=variables, start_time=start_time, end_time=end_time, use_threads=parallel, ) return df # Open DISDRODB netCDF files using xarray # - Special handling for L0C product with possible multiple sample intervals if product == "L0C": dict_sample_intervals = group_filepaths(filepaths, groups="sample_interval") if len(dict_sample_intervals) > 1: # Open separately each sample interval list_ds = [ open_netcdf_files( filepaths=filepaths, chunks=chunks, start_time=start_time, end_time=end_time, variables=variables, parallel=parallel, compute=compute, **open_kwargs, ) for filepaths in dict_sample_intervals.values() ] # Expand sample_interval coordinate for each dataset list_ds = [ds.assign_coords(sample_interval=ds.sample_interval.expand_dims(time=ds.time)) for ds in list_ds] # Concatenate along time dimension and sort by time ds = xr.concat(list_ds, dim="time") ds.attrs["measurement_interval"] = list(dict_sample_intervals) ds = ds.sortby("time") # Update time coverage attributes ds.attrs["time_coverage_start"] = str(ds.disdrodb.start_time) ds.attrs["time_coverage_end"] = str(ds.disdrodb.end_time) return ds # Otherwise, open all files together ds = open_netcdf_files( filepaths=filepaths, chunks=chunks, start_time=start_time, end_time=end_time, variables=variables, parallel=parallel, compute=compute, **open_kwargs, ) return ds
####---------------------------------------------------------------------------------- #### DISDRODB Remove Product Files
[docs] def remove_product( data_archive_dir, product, data_source, campaign_name, station_name, logger=None, verbose=True, **product_kwargs, ): """Remove all product files of a specific station.""" if product.upper() == "RAW": raise ValueError("Removal of 'RAW' files is not allowed.") data_dir = define_data_dir( data_archive_dir=data_archive_dir, product=product, data_source=data_source, campaign_name=campaign_name, station_name=station_name, **product_kwargs, ) log_info(logger=logger, msg="Removal of {product} files started.", verbose=verbose) remove_file_or_directories(data_dir) log_info(logger=logger, msg="Removal of {product} files ended.", verbose=verbose)
####--------------------------------------------------------------------------. #### Open directories
[docs] def open_file_explorer(path): """Open the native file-browser showing 'path'.""" p = Path(path).resolve() if not p.exists(): raise FileNotFoundError(f"{p} does not exist") if sys.platform.startswith("win"): # Windows os.startfile(str(p)) elif sys.platform == "darwin": # macOS subprocess.run(["open", str(p)], check=False) else: # Linux (most desktop environments) subprocess.run(["xdg-open", str(p)], check=False)
[docs] def open_logs_directory( data_source, campaign_name, station_name=None, # noqa data_archive_dir=None, ): """Open the DISDRODB Data Archive logs directory of a station.""" from disdrodb.configs import get_data_archive_dir data_archive_dir = get_data_archive_dir(data_archive_dir) campaign_dir = define_campaign_dir( archive_dir=data_archive_dir, product="L0A", data_source=data_source, campaign_name=campaign_name, check_exists=True, ) logs_dir = os.path.join(campaign_dir, "logs") open_file_explorer(logs_dir)
[docs] def open_product_directory( product, data_source, campaign_name, station_name, data_archive_dir=None, ): """Open the DISDRODB Data Archive station product directory.""" from disdrodb.configs import get_data_archive_dir data_archive_dir = get_data_archive_dir(data_archive_dir) station_dir = define_station_dir( data_archive_dir=data_archive_dir, product=product, data_source=data_source, campaign_name=campaign_name, station_name=station_name, check_exists=True, ) open_file_explorer(station_dir)
[docs] def open_metadata_directory( data_source, campaign_name, station_name=None, # noqa metadata_archive_dir=None, ): """Open the DISDRODB Metadata Archive station(s) metadata directory.""" from disdrodb.configs import get_metadata_archive_dir metadata_archive_dir = get_metadata_archive_dir(metadata_archive_dir) metadata_dir = define_metadata_dir( metadata_archive_dir=metadata_archive_dir, data_source=data_source, campaign_name=campaign_name, check_exists=True, ) open_file_explorer(metadata_dir)
[docs] def open_readers_directory(): """Open the disdrodb software readers directory.""" readers_directory = define_readers_directory() open_file_explorer(readers_directory)
[docs] def open_metadata_archive( metadata_archive_dir=None, ): """Open the DISDRODB Metadata Archive.""" from disdrodb.configs import get_metadata_archive_dir metadata_archive_dir = get_metadata_archive_dir(metadata_archive_dir) open_file_explorer(metadata_archive_dir)
[docs] def open_data_archive( data_archive_dir=None, ): """Open the DISDRODB Data Archive.""" from disdrodb.configs import get_data_archive_dir data_archive_dir = get_data_archive_dir(data_archive_dir) open_file_explorer(data_archive_dir)