import os
import numpy as np
from disdrodb.api.checks import (
check_invalid_fields_policy,
check_product,
check_product_kwargs,
check_valid_fields,
)
from disdrodb.api.path import (
define_data_dir,
define_data_source_dir,
define_metadata_dir,
define_metadata_filepath,
define_station_dir,
)
from disdrodb.configs import get_data_archive_dir, get_metadata_archive_dir
from disdrodb.constants import PRODUCTS_REQUIREMENTS
from disdrodb.utils.dict import extract_product_kwargs
from disdrodb.utils.directories import contains_files, contains_netcdf_or_parquet_files, list_directories, list_files
from disdrodb.utils.yaml import read_yaml
####-------------------------------------------------------------------------
[docs]
def get_required_product(product):
"""Determine the required product for input product processing."""
# Check input
check_product(product)
# Determine required product
required_product = PRODUCTS_REQUIREMENTS[product]
return required_product
####-------------------------------------------------------------------------
#### List DISDRODB Metadata directories
[docs]
def list_data_sources(metadata_archive_dir, data_sources=None, invalid_fields_policy="raise"):
"""List data sources names in the DISDRODB Metadata Archive."""
path = os.path.join(metadata_archive_dir, "METADATA")
available_data_sources = sorted(list_directories(path, return_paths=False))
# Filter by optionally specified data_sources
if data_sources is not None:
available_data_sources = check_valid_fields(
fields=data_sources,
available_fields=available_data_sources,
field_name="data_sources",
invalid_fields_policy=invalid_fields_policy,
)
# Return the unique data_sources
return np.unique(available_data_sources).tolist()
def _list_campaign_names(metadata_archive_dir, data_source):
data_source_dir = define_data_source_dir(metadata_archive_dir, product="METADATA", data_source=data_source)
campaign_names = sorted(list_directories(data_source_dir, return_paths=False))
return campaign_names
[docs]
def list_campaign_names(
metadata_archive_dir,
data_sources=None,
campaign_names=None,
invalid_fields_policy="raise",
return_tuple=False,
):
"""List campaign names in the DISDRODB Metadata Archive."""
# Retrieve available data sources
data_sources = list_data_sources(
metadata_archive_dir,
data_sources=data_sources,
invalid_fields_policy=invalid_fields_policy,
)
# Retrieve (data_source, campaign_name) tuples
list_tuples = [
(data_source, campaign_name)
for data_source in data_sources
for campaign_name in _list_campaign_names(metadata_archive_dir=metadata_archive_dir, data_source=data_source)
]
# Filter by optionally specified campaign_names
if campaign_names is not None:
available_campaign_names = [campaign_name for _, campaign_name in list_tuples]
campaign_names = check_valid_fields(
fields=campaign_names,
available_fields=available_campaign_names,
field_name="campaign_names",
invalid_fields_policy=invalid_fields_policy,
)
list_tuples = [
(data_source, campaign_name)
for data_source, campaign_name in list_tuples
if campaign_name in campaign_names
]
# If specified, return just the list of (data_source, campaign_name) tuples
if return_tuple:
return list_tuples
# Otherwise just return the unique campaign names
campaign_names = [campaign_name for _, campaign_name in list_tuples]
campaign_names = np.unique(campaign_names).tolist()
return campaign_names
def _list_station_names(metadata_archive_dir, data_source, campaign_name):
metadata_dir = define_metadata_dir(
metadata_archive_dir=metadata_archive_dir,
data_source=data_source,
campaign_name=campaign_name,
)
metadata_filenames = sorted(list_files(metadata_dir, glob_pattern="*.yml", return_paths=False))
station_names = [fname.replace(".yml", "").replace(".yaml", "") for fname in metadata_filenames]
return station_names
[docs]
def list_station_names(
metadata_archive_dir,
data_sources=None,
campaign_names=None,
station_names=None,
invalid_fields_policy="raise",
return_tuple=False,
):
"""List station names in the DISDRODB Metadata Archive."""
# Retrieve (data sources - campaign_names) tuples
list_tuples = list_campaign_names(
metadata_archive_dir,
data_sources=data_sources,
campaign_names=campaign_names,
invalid_fields_policy=invalid_fields_policy,
return_tuple=True,
)
# Retrieve (data_source, campaign_name, station_name) tuples
list_info = [
(data_source, campaign_name, station_name)
for data_source, campaign_name in list_tuples
for station_name in _list_station_names(
metadata_archive_dir=metadata_archive_dir,
data_source=data_source,
campaign_name=campaign_name,
)
]
# Filter by optionally specified station_names
if station_names is not None:
available_station_names = [station_name for data_source, campaign_name, station_name in list_info]
station_names = check_valid_fields(
fields=station_names,
available_fields=available_station_names,
field_name="station_names",
invalid_fields_policy=invalid_fields_policy,
)
list_info = [
(data_source, campaign_name, station_name)
for data_source, campaign_name, station_name in list_info
if station_name in station_names
]
# If specified, return just the list of (data_source, campaign_name, station_name) tuples
if return_tuple:
return list_info
# Otherwise just return the unique station_names
station_names = [station_name for _, _, station_name in list_info]
station_names = np.unique(station_names).tolist()
return station_names
####-------------------------------------------------------------------------
#### Filtering utilities for available_stations
def _finalize_output(list_info, return_tuple, metadata_archive_dir, filter_kwargs):
# Filter stations if metadata filtering values are specified
if len(filter_kwargs) != 0:
list_info = select_stations_matching_metadata_values(
metadata_archive_dir=metadata_archive_dir,
list_info=list_info,
filter_kwargs=filter_kwargs,
)
# - Return the (data_source, campaign_name, station_name) tuple
if return_tuple:
return list_info
# - Return list with the name of the available stations
return [info[2] for info in list_info]
def _raise_an_error_if_no_stations(list_info, raise_error_if_empty, msg):
if len(list_info) == 0 and raise_error_if_empty:
raise ValueError(msg)
[docs]
def is_disdrodb_data_url_specified(metadata_filepath):
"""Check if the disdrodb_data_url is specified in the metadata file."""
disdrodb_data_url = read_yaml(metadata_filepath).get("disdrodb_data_url", "")
return isinstance(disdrodb_data_url, str) and len(disdrodb_data_url) > 1
[docs]
def select_stations_with_disdrodb_data_url(metadata_archive_dir, list_info):
"""Keep only the stations with disdrodb_data_url specified in the metadata file."""
list_info_with_data = []
for data_source, campaign_name, station_name in list_info:
# Define metadata filepath
metadata_filepath = define_metadata_filepath(
metadata_archive_dir=metadata_archive_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
)
# Add station if disdrodb_data_url is specified
if is_disdrodb_data_url_specified(metadata_filepath):
list_info_with_data.append((data_source, campaign_name, station_name))
return list_info_with_data
def _matches(metadata_value, expected_value):
"""Return True if metadata_value matches expected_value."""
# Case 1: both lists → check any intersection
if isinstance(metadata_value, list) and isinstance(expected_value, list):
return any(v in metadata_value for v in expected_value)
# Case 2: metadata is list → check membership
if isinstance(metadata_value, list):
return expected_value in metadata_value
# Case 3: expected is list → check membership
if isinstance(expected_value, list):
return metadata_value in expected_value
# Case 4: both scalars → direct equality
return metadata_value == expected_value
[docs]
def select_stations_with_product_directory(data_archive_dir, product, list_info):
"""Keep only the stations with the product directory."""
list_info_with_product_directory = []
for data_source, campaign_name, station_name in list_info:
# Define station directory
station_dir = define_station_dir(
data_archive_dir=data_archive_dir,
product=product,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
check_exists=False,
)
# Add station if product station directory exists
if os.path.isdir(station_dir):
list_info_with_product_directory.append((data_source, campaign_name, station_name))
return list_info_with_product_directory
[docs]
def select_stations_with_product_data(data_archive_dir, product, list_info, **product_kwargs):
"""Keep only the stations with product data."""
# Define file checking function
checking_function = contains_files if product == "RAW" else contains_netcdf_or_parquet_files
# Check presence of data for each station
list_info_with_product_data = []
for data_source, campaign_name, station_name in list_info:
data_dir = define_data_dir(
data_archive_dir=data_archive_dir,
product=product,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
check_exists=False,
**product_kwargs,
)
if checking_function(data_dir):
list_info_with_product_data.append((data_source, campaign_name, station_name))
return list_info_with_product_data
####-------------------------------------------------------------------------
#### DISDRODB Search Routines
[docs]
def available_stations(
product=None,
data_sources=None,
campaign_names=None,
station_names=None,
return_tuple=True,
available_data=False,
raise_error_if_empty=False,
invalid_fields_policy="raise",
data_archive_dir=None,
metadata_archive_dir=None,
**filter_kwargs,
):
"""Return stations information for which metadata or product data are available on disk.
This function queries the DISDRODB Metadata Archive and, optionally, the
local DISDRODB Data Archive to identify stations that satisfy the specified
filters.
If the DISDRODB product is not specified, it lists the stations present
in the DISDRODB Metadata Archive given the specified filtering criteria.
If the DISDRODB product is specified, it lists the stations present
in the local DISDRODB Data Archive given the specified filtering criteria.
Parameters
----------
product : str or None, optional
Name of the product to filter on (e.g., "RAW", "L0A", "L1").
If the DISDRODB product is not specified (default),
it lists the stations present in the DISDRODB Metadata Archive given the specified filtering criteria.
If the DISDRODB product is specified,
it lists the stations present in the local DISDRODB Data Archive given the specified filtering criteria.
The default is None.
data_sources : str or list of str, optional
One or more data source identifiers to filter stations by.
The name(s) must be UPPER CASE.
If None, no filtering on data source is applied. The default is is ``None``.
campaign_names : str or list of str, optional
One or more campaign names to filter stations by.
The name(s) must be UPPER CASE.
If None, no filtering on campaign is applied. The default is is ``None``.
station_names : str or list of str, optional
One or more station names to include.
If None, all stations matching other filters are considered. The default is ``None``.
available_data : bool, optional
If ``product`` is not specified:
- if ``available_data=False``, return stations present in the DISDRODB Metadata Archive.
- if ``available_data=True``, return stations with data available on the online DISDRODB Decentralized Data Archive (i.e., stations with the disdrodb_data_url in the metadata).
If ``product`` is specified:
- if ``available_data=False``, return stations where the product directory exists in the local DISDRODB Data Archive
- if ``available_data=True``, return stations where product data exists in the local DISDRODB Data Archive.
The default is False.
return_tuple : bool, optional
If True, return a list of tuples ``(data_source, campaign_name, station_name)``.
If False, return only a list of station names
The default is True.
raise_error_if_empty : bool, optional
If True and no stations satisfy the criteria, raise a ``ValueError``.
If False, return an empty list/tuple. The default is False.
invalid_fields_policy : str, optional
How to handle invalid filter values for ``data_sources``, ``campaign_names``,
or ``station_names`` that are not present in the metadata archive:
- 'raise' : raise a ``ValueError`` (default)
- 'warn' : emit a warning, then ignore invalid entries
- 'ignore': silently drop invalid entries
data_archive_dir : str or Path-like, optional
Path to the root of the local DISDRODB Data Archive.
Required only if ``product`` is specified.
If None, the default data archive base directory is used. Default is None.
metadata_archive_dir : str or Path-like, optional
Path to the root of the DISDRODB Metadata Archive.
If None, the default metadata base directory is used. Default is None.
**product_kwargs : dict, optional
Additional arguments required for some products.
It must be specified only for product L1, L2E and L2M products !
For L1, L2E and L2M products, ``temporal_resolution`` is required.
For L2M product, ``model_name`` is required.
Returns
-------
list
If ``return_tuple=True``, return a list of tuples ``(data_source, campaign_name, station_name)``.
If ``return_tuple=True``, return a list of station names.
Examples
--------
>>> # List all stations present in the DISDRODB Metadata Archive
>>> stations = available_stations()
>>> # List all stations present in the online DISDRODB Data Archive
>>> stations = available_stations(available_data=True)
>>> # List stations with raw data available in the local DISDRODB Data Archive
>>> raw_stations = available_stations(product="RAW", available_data=True)
>>> # List stations of specific data sources
>>> stations = available_stations(data_sources=["NASA", "EPFL"])
""" # noqa: E501
# Retrieve DISDRODB Data and Metadata Archive directories
metadata_archive_dir = get_metadata_archive_dir(metadata_archive_dir)
product = check_product(product) if product is not None else None
invalid_fields_policy = check_invalid_fields_policy(invalid_fields_policy)
# Extract product_kwargs from filter_kwargs
product_kwargs = extract_product_kwargs(filter_kwargs, product=product) if product is not None else {}
# Retrieve available stations from the Metadata Archive
# - Raise error if no stations available !
list_info = list_station_names(
metadata_archive_dir=metadata_archive_dir,
data_sources=data_sources,
campaign_names=campaign_names,
station_names=station_names,
invalid_fields_policy=invalid_fields_policy,
return_tuple=True,
)
# Return stations in the Metadata Archive
if product is None and not available_data:
_raise_an_error_if_no_stations(
list_info,
raise_error_if_empty=raise_error_if_empty,
msg="No station available in the DISDRODB Metadata Archive.",
)
return _finalize_output(
list_info,
return_tuple=return_tuple,
metadata_archive_dir=metadata_archive_dir,
filter_kwargs=filter_kwargs,
)
# Return stations in the Metadata Archive with specified disdrodb_data_url
if product is None and available_data:
list_info = select_stations_with_disdrodb_data_url(metadata_archive_dir, list_info)
_raise_an_error_if_no_stations(
list_info,
raise_error_if_empty=raise_error_if_empty,
msg="No station has the disdrodb_data_url specified in the metadata.",
)
return _finalize_output(
list_info,
return_tuple=return_tuple,
metadata_archive_dir=metadata_archive_dir,
filter_kwargs=filter_kwargs,
)
# If product is specified, select stations available in the local DISDRODB Data Archive
# - If available_data=False, search for station with the existing product directory (do not check for data)
data_archive_dir = get_data_archive_dir(data_archive_dir)
product = check_product(product)
if not available_data:
list_info = select_stations_with_product_directory(
data_archive_dir=data_archive_dir,
product=product,
list_info=list_info,
)
_raise_an_error_if_no_stations(
list_info,
raise_error_if_empty=raise_error_if_empty,
msg=f"No station product {product} directory available in the local DISDRODB Data Archive.",
)
return _finalize_output(
list_info,
return_tuple=return_tuple,
metadata_archive_dir=metadata_archive_dir,
filter_kwargs=filter_kwargs,
)
# - If available_data=True, search for station with product data
product_kwargs = check_product_kwargs(product, product_kwargs)
list_info = select_stations_with_product_data(
data_archive_dir=data_archive_dir,
product=product,
list_info=list_info,
**product_kwargs,
)
product_kwargs = product_kwargs if product_kwargs else "" # if empty, set as ""
_raise_an_error_if_no_stations(
list_info,
raise_error_if_empty=raise_error_if_empty,
msg=f"No station has {product} {product_kwargs} data available in the local DISDRODB Data Archive.",
)
return _finalize_output(
list_info,
return_tuple=return_tuple,
metadata_archive_dir=metadata_archive_dir,
filter_kwargs=filter_kwargs,
)
[docs]
def available_data_sources(
product=None,
campaign_names=None,
station_names=None,
available_data=False,
raise_error_if_empty=False,
invalid_fields_policy="raise",
data_archive_dir=None,
metadata_archive_dir=None,
**kwargs,
):
"""Return data sources for which stations are available."""
list_info = available_stations(
product=product,
data_sources=None,
campaign_names=campaign_names,
station_names=station_names,
return_tuple=True,
available_data=available_data,
raise_error_if_empty=raise_error_if_empty,
invalid_fields_policy=invalid_fields_policy,
data_archive_dir=data_archive_dir,
metadata_archive_dir=metadata_archive_dir,
**kwargs,
)
data_sources = [info[0] for info in list_info]
data_sources = np.unique(data_sources).tolist()
return data_sources
[docs]
def available_campaigns(
product=None,
data_sources=None,
station_names=None,
available_data=False,
raise_error_if_empty=False,
invalid_fields_policy="raise",
data_archive_dir=None,
metadata_archive_dir=None,
**kwargs,
):
"""Return campaigns names for which stations are available."""
list_info = available_stations(
product=product,
data_sources=data_sources,
campaign_names=None,
station_names=station_names,
return_tuple=True,
available_data=available_data,
raise_error_if_empty=raise_error_if_empty,
invalid_fields_policy=invalid_fields_policy,
data_archive_dir=data_archive_dir,
metadata_archive_dir=metadata_archive_dir,
**kwargs,
)
campaign_names = [info[1] for info in list_info]
campaign_names = np.unique(campaign_names).tolist()
return campaign_names
####-------------------------------------------------------------------------