# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Define paths within the DISDRODB infrastructure."""
import os
from disdrodb.configs import get_data_archive_dir, get_metadata_archive_dir
from disdrodb.constants import ARCHIVE_VERSION
from disdrodb.utils.directories import check_directory_exists
from disdrodb.utils.time import (
ensure_sample_interval_in_seconds,
get_file_start_end_time,
seconds_to_temporal_resolution,
)
####--------------------------------------------------------------------------.
#### DISDRODB Metadata and Data Archive directories and file paths
[docs]
def define_disdrodb_path(
archive_dir,
product,
data_source="",
campaign_name="",
check_exists=True,
):
"""Return the directory path in the DISDRODB Metadata and Data Archive.
If ``product="METADATA"``, it returns the path in the DISDRODB Metadata Archive.
Otherwise, it returns the path in the DISDRODB Data Archive.
If ``data_source`` and ``campaign_name`` are not specified it return the product directory.
If ``data_source`` is specified, it returns the ``data_source`` directory.
If ``campaign_source`` is specified, it returns the ``campaign_name`` directory.
Parameters
----------
archive_dir : str
The DISDRODB archive directory
product : str
The DISDRODB product. See ``disdrodb.available_products()``.
If "METADATA" is specified, it returns the path in the DISDRODB Metadata Archive.
data_source : str, optional
The data source. Must be specified if ``campaign_name`` is specified.
campaign_name : str, optional
The campaign name.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``True``.
Raise error if the directory does not exist.
Returns
-------
dir_path : str
Directory path
"""
if len(campaign_name) > 0 and len(data_source) == 0:
raise ValueError("If campaign_name is specified, data_source must be specified.")
# Get directory
if product.upper() == "METADATA":
dir_path = os.path.join(archive_dir, "METADATA", data_source, campaign_name)
elif product.upper() == "RAW":
dir_path = os.path.join(archive_dir, "RAW", data_source, campaign_name)
else:
dir_path = os.path.join(archive_dir, ARCHIVE_VERSION, data_source, campaign_name)
if check_exists:
check_directory_exists(dir_path)
return os.path.normpath(dir_path)
[docs]
def define_data_source_dir(
archive_dir,
product,
data_source,
check_exists=False,
):
"""Return the data source directory in the DISDRODB infrastructure.
If ``product="METADATA"``, it returns the path in the DISDRODB Metadata Archive.
Otherwise, it returns the path in the DISDRODB Data Archive.
Parameters
----------
product : str
The DISDRODB product. See ``disdrodb.available_products()``.
If "METADATA" is specified, it returns the path in the DISDRODB Metadata Archive.
data_source : str
The data source.
archive_dir : str, optional
The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``.
If not specified, the path specified in the DISDRODB active configuration will be used.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``False``.
Raise error if the directory does not exist.
Returns
-------
station_dir : str
Station data directory path
"""
data_source_dir = define_disdrodb_path(
archive_dir=archive_dir,
product=product,
data_source=data_source,
check_exists=check_exists,
)
return str(data_source_dir)
[docs]
def define_campaign_dir(
archive_dir,
product,
data_source,
campaign_name,
check_exists=False,
):
"""Return the campaign directory in the DISDRODB infrastructure.
If ``product="METADATA"``, it returns the path in the DISDRODB Metadata Archive.
Otherwise, it returns the path in the DISDRODB Data Archive.
Parameters
----------
product : str
The DISDRODB product. See ``disdrodb.available_products()``.
If "METADATA" is specified, it returns the path in the DISDRODB Metadata Archive.
data_source : str
The data source. Must be specified if ``campaign_name`` is specified.
campaign_name : str
The campaign name.
archive_dir : str, optional
The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``.
If not specified, the path specified in the DISDRODB active configuration will be used.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``False``.
Returns
-------
station_dir : str
Station data directory path
"""
campaign_dir = define_disdrodb_path(
archive_dir=archive_dir,
product=product,
data_source=data_source,
campaign_name=campaign_name,
check_exists=check_exists,
)
return str(campaign_dir)
[docs]
def define_issue_dir(
data_source,
campaign_name,
metadata_archive_dir=None,
check_exists=False,
):
"""Return the issue directory in the DISDRODB infrastructure.
Parameters
----------
data_source : str
The data source.
campaign_name : str
The campaign name.
data_archive_dir : str, optional
The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``.
If not specified, the path specified in the DISDRODB active configuration will be used.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``False``.
Returns
-------
issue_dir : str
Station data directory path
"""
metadata_archive_dir = get_metadata_archive_dir(metadata_archive_dir)
campaign_dir = define_campaign_dir(
archive_dir=metadata_archive_dir,
product="METADATA",
data_source=data_source,
campaign_name=campaign_name,
check_exists=check_exists,
)
issue_dir = os.path.join(campaign_dir, "issue")
if check_exists:
check_directory_exists(issue_dir)
return str(issue_dir)
[docs]
def define_issue_filepath(
data_source,
campaign_name,
station_name,
metadata_archive_dir=None,
check_exists=False,
):
"""Return the station issue filepath in the DISDRODB infrastructure.
Parameters
----------
data_source : str
The data source.
campaign_name : str
The campaign name.
station_name : str
The station name.
data_archive_dir : str, optional
The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``.
If not specified, the path specified in the DISDRODB active configuration will be used.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``False``.
Returns
-------
issue_dir : str
Station data directory path
"""
metadata_archive_dir = get_metadata_archive_dir(metadata_archive_dir)
issue_dir = define_issue_dir(
metadata_archive_dir=metadata_archive_dir,
data_source=data_source,
campaign_name=campaign_name,
check_exists=False,
)
issue_filepath = os.path.join(issue_dir, f"{station_name}.yml")
if check_exists and not os.path.exists(issue_filepath):
raise ValueError(f"The issue file for {station_name} at {issue_filepath} does not exists.")
return str(issue_filepath)
####--------------------------------------------------------------------------.
#### DISDRODB software configuration directory
[docs]
def define_config_dir(product):
"""Define the config directory path of a given DISDRODB product."""
from disdrodb import package_dir
if product.upper() in ["RAW", "L0A", "L0B"]:
dir_name = "l0"
else:
raise NotImplementedError(f"Product {product} not implemented.")
config_dir = os.path.join(package_dir, dir_name, "configs")
return config_dir
####--------------------------------------------------------------------------.
#### Directory/Filepaths L0A and L0B products
[docs]
def define_partitioning_tree(time, folder_partitioning):
"""Define the time directory tree given a timestep.
Parameters
----------
time : datetime.datetime
Timestep.
folder_partitioning : str or None
Define the subdirectory structure where saving files.
Allowed values are:
- None: Files are saved directly in data_dir.
- "year": Files are saved under a subdirectory for the year.
- "year/month": Files are saved under subdirectories for year and month.
- "year/month/day": Files are saved under subdirectories for year, month and day
- "year/month_name": Files are stored under subdirectories by year and month name
- "year/quarter": Files are saved under subdirectories for year and quarter.
Returns
-------
str
A time partitioned directory tree.
"""
if folder_partitioning == "":
return ""
if folder_partitioning == "year":
year = str(time.year)
return year
if folder_partitioning == "year/month":
year = str(time.year)
month = str(time.month).zfill(2)
return os.path.join(year, month)
if folder_partitioning == "year/month/day":
year = str(time.year)
month = str(time.month).zfill(2)
day = str(time.day).zfill(2)
return os.path.join(year, month, day)
if folder_partitioning == "year/month_name":
year = str(time.year)
month = time.strftime("%B")
return os.path.join(year, month)
if folder_partitioning == "year/quarter":
year = str(time.year)
# Calculate quarter: months 1-3 => Q1, 4-6 => Q2, etc.
quarter = (time.month - 1) // 3 + 1
quarter_dir = f"Q{quarter}"
return os.path.join(year, quarter_dir)
raise NotImplementedError(f"Unrecognized '{folder_partitioning}' folder partitioning scheme.")
[docs]
def define_file_folder_path(obj, dir_path, folder_partitioning):
"""
Define the folder path where saving a file based on the dataset's starting time.
Parameters
----------
ds : xarray.Dataset or pandas.DataFrame
The object containing time information.
dir : str
Directory within the DISDRODB Data Archive where DISDRODB product files are to be saved.
It can be a product directory or a logs directory.
folder_partitioning : str or None
Define the subdirectory structure where saving files.
Allowed values are:
- None or "": Files are saved directly in data_dir.
- "year": Files are saved under a subdirectory for the year.
- "year/month": Files are saved under subdirectories for year and month.
- "year/month/day": Files are saved under subdirectories for year, month and day
- "year/month_name": Files are stored under subdirectories by year and month name
- "year/quarter": Files are saved under subdirectories for year and quarter.
Returns
-------
str
A complete directory path where the file should be saved.
"""
from disdrodb.api.checks import check_folder_partitioning
# Validate the folder partition parameter.
check_folder_partitioning(folder_partitioning)
# Retrieve the starting time from the dataset.
starting_time, _ = get_file_start_end_time(obj)
# Build the folder path based on the chosen partition scheme
partitioning_tree = define_partitioning_tree(time=starting_time, folder_partitioning=folder_partitioning)
return os.path.normpath(os.path.join(dir_path, partitioning_tree))
[docs]
def define_product_dir_tree(
product,
**product_kwargs,
):
"""Return the product directory tree.
Parameters
----------
product : str
The DISDRODB product. See ``disdrodb.available_products()``.
temporal_resolution : str, optional
The temporal resolution of the product.
It must be specified only for product L1, L2E and L2M !
model_name : str
The custom model name of the fitted statistical distribution.
It must be specified only for product L2M !
Returns
-------
data_dir : str
Station data directory path
"""
from disdrodb.api.checks import check_product, check_product_kwargs, check_temporal_resolution
product = check_product(product)
product_kwargs = check_product_kwargs(product, product_kwargs)
if product.upper() == "RAW":
return ""
if product.upper() in ["L0A", "L0B", "L0C"]:
return ""
if product in ["L1", "L2E"]:
temporal_resolution = product_kwargs.get("temporal_resolution")
check_temporal_resolution(temporal_resolution)
return temporal_resolution
# L2M if product == "L2M":
temporal_resolution = product_kwargs.get("temporal_resolution")
check_temporal_resolution(temporal_resolution)
model_name = product_kwargs.get("model_name")
return os.path.join(model_name, temporal_resolution)
[docs]
def define_logs_dir(
product,
data_source,
campaign_name,
station_name,
data_archive_dir=None,
check_exists=False,
**product_kwargs,
):
"""Return the station log directory in the DISDRODB infrastructure.
Parameters
----------
product : str
The DISDRODB product. See ``disdrodb.available_products()``.
data_source : str
The data source.
campaign_name : str
The campaign name.
station_name : str
The station name.
data_archive_dir : str, optional
The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``.
If not specified, the path specified in the DISDRODB active configuration will be used.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``False``.
Returns
-------
station_dir : str
Station data directory path
"""
campaign_dir = define_campaign_dir(
archive_dir=data_archive_dir,
product=product,
data_source=data_source,
campaign_name=campaign_name,
check_exists=check_exists,
)
product_dir_tree = define_product_dir_tree(
product=product,
**product_kwargs,
)
logs_dir = os.path.normpath(os.path.join(campaign_dir, "logs", "files", product, product_dir_tree, station_name))
if check_exists:
check_directory_exists(logs_dir)
return str(logs_dir)
[docs]
def define_station_dir(
product,
data_source,
campaign_name,
station_name,
data_archive_dir=None,
check_exists=False,
):
"""Return the station product directory in the DISDRODB infrastructure.
Parameters
----------
product : str
The DISDRODB product. See ``disdrodb.available_products()``.
data_source : str
The data source.
campaign_name : str
The campaign name.
station_name : str
The station name.
data_archive_dir : str, optional
The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``.
If not specified, the path specified in the DISDRODB active configuration will be used.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``False``.
Returns
-------
station_dir : str
Station data directory path
"""
data_archive_dir = get_data_archive_dir(data_archive_dir)
campaign_dir = define_disdrodb_path(
archive_dir=data_archive_dir,
product=product,
data_source=data_source,
campaign_name=campaign_name,
check_exists=check_exists,
)
if product.upper() == "RAW":
station_dir = os.path.join(campaign_dir, "data", station_name)
else:
station_dir = os.path.join(campaign_dir, product, station_name)
if check_exists:
check_directory_exists(station_dir)
return str(station_dir)
[docs]
def define_data_dir(
product,
data_source,
campaign_name,
station_name,
data_archive_dir=None,
check_exists=False,
**product_kwargs,
):
"""Return the station product data directory in the DISDRODB infrastructure.
Parameters
----------
product : str
The DISDRODB product. See ``disdrodb.available_products()``.
data_source : str
The data source.
campaign_name : str
The campaign name.
station_name : str
The station name.
data_archive_dir : str, optional
The base directory of DISDRODB, expected in the format ``<...>/DISDRODB``.
If not specified, the path specified in the DISDRODB active configuration will be used.
check_exists : bool, optional
Whether to check if the directory exists. The default value is ``False``.
temporal_resolution : str, optional
The temporal resolution of the product.
It must be specified only for product L1, L2E and L2M !
model_name : str
The name of the fitted statistical distribution for the DSD.
It must be specified only for product L2M !
Returns
-------
data_dir : str
Station data directory path
"""
from disdrodb.api.checks import check_product, check_product_kwargs
product = check_product(product)
product_kwargs = check_product_kwargs(product, product_kwargs)
# Define station directory
station_dir = define_station_dir(
data_archive_dir=data_archive_dir,
product=product,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
check_exists=check_exists,
) # <product>/<station_name>
# Define product directory directory (i.e. for L2E and L2M)
product_dir_tree = define_product_dir_tree(
product,
**product_kwargs,
)
# Define data directory
data_dir = os.path.normpath(os.path.join(station_dir, product_dir_tree))
# Check if directory exists
if check_exists:
check_directory_exists(data_dir)
return str(data_dir)
####--------------------------------------------------------------------------.
#### Filenames for DISDRODB products
[docs]
def define_temporal_resolution(seconds, rolling):
"""Define the DISDRODB product temporal resolution.
Prefix the measurement interval with ROLL if rolling=True.
"""
temporal_resolution = seconds_to_temporal_resolution(seconds)
if rolling:
temporal_resolution = f"ROLL{temporal_resolution}"
return temporal_resolution
####--------------------------------------------------------------------------.
#### Filenames for DISDRODB products
[docs]
def define_filename(
product: str,
campaign_name: str,
station_name: str,
# Filename options
start_time=None,
end_time=None,
add_version=True,
add_time_period=True,
add_extension=True,
# Prefix
prefix="",
suffix="",
# Product options
**product_kwargs,
) -> str:
"""Define DISDRODB products filename.
Parameters
----------
campaign_name : str
Name of the campaign.
station_name : str
Name of the station.
start_time : datetime.datatime, optional
Start time.
Required if add_time_period = True.
end_time : datetime.datatime, optional
End time.
Required if add_time_period = True.
temporal_resolution : str, optional
The temporal resolution of the product.
It must be specified only for product L1, L2E and L2M !
model_name : str
The model name of the fitted statistical distribution for the DSD.
It must be specified only for product L2M !
Returns
-------
str
L0B file name.
"""
from disdrodb.api.checks import check_product, check_product_kwargs, check_temporal_resolution
product = check_product(product)
product_kwargs = check_product_kwargs(product, product_kwargs)
if add_time_period and (start_time is None or end_time is None):
raise ValueError("If add_time_period=True, specify start_time and end_time.")
# -----------------------------------------.
# Define product name
product_name = f"{product}"
# L0C ... sample interval known only per-file
if product in ["L1", "L2E", "L2M"]:
temporal_resolution = product_kwargs.get("temporal_resolution")
check_temporal_resolution(temporal_resolution)
product_name = f"{product}.{temporal_resolution}"
if product in ["L2M"]:
model_name = product_kwargs.get("model_name")
product_name = f"L2M_{model_name}.{temporal_resolution}"
# -----------------------------------------.
# Define base filename
filename = f"{product_name}.{campaign_name}.{station_name}"
# -----------------------------------------.
# Add prefix
if prefix != "":
filename = f"{prefix}.{filename}"
# -----------------------------------------.
# Add time period information
if add_time_period:
start_time = start_time.strftime("%Y%m%d%H%M%S")
end_time = end_time.strftime("%Y%m%d%H%M%S")
filename = f"{filename}.s{start_time}.e{end_time}"
# -----------------------------------------.
# Add product version
if add_version:
filename = f"{filename}.{ARCHIVE_VERSION}"
# -----------------------------------------.
# Add product extension
if add_extension:
filename = f"{filename}.parquet" if product == "L0A" else f"{filename}.nc"
# -----------------------------------------.
# Add suffix
if suffix != "":
filename = f"{filename}.{suffix}"
return filename
[docs]
def define_l0a_filename(df, campaign_name: str, station_name: str) -> str:
"""Define L0A file name.
Parameters
----------
df : pandas.DataFrame
L0A DataFrame.
campaign_name : str
Name of the campaign.
station_name : str
Name of the station.
Returns
-------
str
L0A file name.
"""
starting_time, ending_time = get_file_start_end_time(df)
filename = define_filename(
product="L0A",
campaign_name=campaign_name,
station_name=station_name,
# Filename options
start_time=starting_time,
end_time=ending_time,
add_version=True,
add_time_period=True,
add_extension=True,
)
return filename
[docs]
def define_l0b_filename(ds, campaign_name: str, station_name: str) -> str:
"""Define L0B file name."""
starting_time, ending_time = get_file_start_end_time(ds)
filename = define_filename(
product="L0B",
campaign_name=campaign_name,
station_name=station_name,
# Filename options
start_time=starting_time,
end_time=ending_time,
add_version=True,
add_time_period=True,
add_extension=True,
)
return filename
[docs]
def define_l0c_filename(ds, campaign_name: str, station_name: str) -> str:
"""Define L0C file name."""
# TODO: add sample_interval as function argument
sample_interval = int(ensure_sample_interval_in_seconds(ds["sample_interval"]).data.item())
temporal_resolution = define_temporal_resolution(sample_interval, rolling=False)
starting_time, ending_time = get_file_start_end_time(ds)
starting_time = starting_time.strftime("%Y%m%d%H%M%S")
ending_time = ending_time.strftime("%Y%m%d%H%M%S")
version = ARCHIVE_VERSION
filename = f"L0C.{temporal_resolution}.{campaign_name}.{station_name}.s{starting_time}.e{ending_time}.{version}.nc"
return filename
[docs]
def define_l1_filename(ds, campaign_name, station_name: str, temporal_resolution: str) -> str:
"""Define L1 file name."""
starting_time, ending_time = get_file_start_end_time(ds)
filename = define_filename(
product="L1",
campaign_name=campaign_name,
station_name=station_name,
# Filename options
start_time=starting_time,
end_time=ending_time,
add_version=True,
add_time_period=True,
add_extension=True,
# Product options
temporal_resolution=temporal_resolution,
)
return filename
[docs]
def define_l2e_filename(ds, campaign_name: str, station_name: str, temporal_resolution: str) -> str:
"""Define L2E file name."""
starting_time, ending_time = get_file_start_end_time(ds)
filename = define_filename(
product="L2E",
campaign_name=campaign_name,
station_name=station_name,
# Filename options
start_time=starting_time,
end_time=ending_time,
add_version=True,
add_time_period=True,
add_extension=True,
# Product options
temporal_resolution=temporal_resolution,
)
return filename
[docs]
def define_l2m_filename(
ds,
campaign_name: str,
station_name: str,
temporal_resolution: str,
model_name: str,
) -> str:
"""Define L2M file name."""
starting_time, ending_time = get_file_start_end_time(ds)
filename = define_filename(
product="L2M",
campaign_name=campaign_name,
station_name=station_name,
# Filename options
start_time=starting_time,
end_time=ending_time,
add_version=True,
add_time_period=True,
add_extension=True,
# Product options
temporal_resolution=temporal_resolution,
model_name=model_name,
)
return filename