Source code for disdrodb.api.info

#!/usr/bin/env python3

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Retrieve file information from DISDRODB products file names and filepaths."""

import os
from collections import defaultdict
from pathlib import Path

import numpy as np
from trollsift import Parser

from disdrodb.utils.time import acronym_to_seconds

####---------------------------------------------------------------------------
########################
#### FNAME PATTERNS ####
########################
DISDRODB_FNAME_L0_PATTERN = (
    "{product:s}.{campaign_name:s}.{station_name:s}.s{start_time:%Y%m%d%H%M%S}.e{end_time:%Y%m%d%H%M%S}"
    ".{version:s}.{data_format:s}"
)
DISDRODB_FNAME_L2E_PATTERN = (  # also L0C and L1 --> accumulation_acronym = sample_interval
    "{product:s}.{accumulation_acronym}.{campaign_name:s}.{station_name:s}.s{start_time:%Y%m%d%H%M%S}.e{end_time:%Y%m%d%H%M%S}"
    ".{version:s}.{data_format:s}"
)

DISDRODB_FNAME_L2M_PATTERN = (
    "{product:s}_{subproduct:s}.{accumulation_acronym}.{campaign_name:s}.{station_name:s}.s{start_time:%Y%m%d%H%M%S}.e{end_time:%Y%m%d%H%M%S}"
    ".{version:s}.{data_format:s}"
)

####---------------------------------------------------------------------------.
##########################
#### Filename parsers ####
##########################


def _parse_filename(filename):
    """Parse the filename with trollsift."""
    if filename.startswith("L0A") or filename.startswith("L0B"):
        p = Parser(DISDRODB_FNAME_L0_PATTERN)
        info_dict = p.parse(filename)
    elif filename.startswith("L2E") or filename.startswith("L1") or filename.startswith("L0C"):
        p = Parser(DISDRODB_FNAME_L2E_PATTERN)
        info_dict = p.parse(filename)
    elif filename.startswith("L2M"):
        p = Parser(DISDRODB_FNAME_L2M_PATTERN)
        info_dict = p.parse(filename)
    else:
        raise ValueError("Not a DISDRODB product file.")
    return info_dict


def _get_info_from_filename(filename):
    """Retrieve file information dictionary from filename."""
    # Try to parse the filename
    try:
        info_dict = _parse_filename(filename)
    except ValueError:
        raise ValueError(f"{filename} can not be parsed. Report the issue.")

    # Add additional information to info dictionary
    if "accumulation_acronym" in info_dict:
        info_dict["sample_interval"] = acronym_to_seconds(info_dict["accumulation_acronym"])

    # Return info dictionary
    return info_dict


[docs] def get_info_from_filepath(filepath): """Retrieve file information dictionary from filepath.""" if not isinstance(filepath, str): raise TypeError("'filepath' must be a string.") filename = os.path.basename(filepath) return _get_info_from_filename(filename)
[docs] def get_key_from_filepath(filepath, key): """Extract specific key information from a list of filepaths.""" value = get_info_from_filepath(filepath)[key] return value
[docs] def get_key_from_filepaths(filepaths, key): """Extract specific key information from a list of filepaths.""" if isinstance(filepaths, str): filepaths = [filepaths] return [get_key_from_filepath(filepath, key=key) for filepath in filepaths]
####--------------------------------------------------------------------------. ################################### #### DISDRODB File Information #### ################################### def _get_version_from_filepath(filepath): version = get_key_from_filepath(filepath, key="version") return version
[docs] def get_version_from_filepaths(filepaths): """Return the DISDROB product version of the specified files.""" if isinstance(filepaths, str): filepaths = [filepaths] list_version = [_get_version_from_filepath(filepath) for filepath in filepaths] return list_version
[docs] def get_campaign_name_from_filepaths(filepaths): """Return the DISDROB campaign name of the specified files.""" list_id = get_key_from_filepaths(filepaths, key="campaign_name") return list_id
[docs] def get_station_name_from_filepaths(filepaths): """Return the DISDROB station name of the specified files.""" list_id = get_key_from_filepaths(filepaths, key="station_name") return list_id
[docs] def get_product_from_filepaths(filepaths): """Return the DISDROB product name of the specified files.""" list_id = get_key_from_filepaths(filepaths, key="product") return list_id
[docs] def get_start_time_from_filepaths(filepaths): """Return the start time of the specified files.""" list_start_time = get_key_from_filepaths(filepaths, key="start_time") return list_start_time
[docs] def get_end_time_from_filepaths(filepaths): """Return the end time of the specified files.""" list_end_time = get_key_from_filepaths(filepaths, key="end_time") return list_end_time
[docs] def get_start_end_time_from_filepaths(filepaths): """Return the start and end time of the specified files.""" list_start_time = get_key_from_filepaths(filepaths, key="start_time") list_end_time = get_key_from_filepaths(filepaths, key="end_time") return np.array(list_start_time).astype("M8[s]"), np.array(list_end_time).astype("M8[s]")
[docs] def get_sample_interval_from_filepaths(filepaths): """Return the sample interval of the specified files.""" list_accumulation_acronym = get_key_from_filepaths(filepaths, key="accumulation_acronym") list_sample_interval = [acronym_to_seconds(s) for s in list_accumulation_acronym] return list_sample_interval
####--------------------------------------------------------------------------. ################################### #### DISDRODB Tree Components #### ###################################
[docs] def infer_disdrodb_tree_path_components(path: str) -> list: """Return a list with the component of a DISDRODB path ``disdrodb_path``. Parameters ---------- path : str Directory or file path within the DISDRODB archive. Returns ------- list Path element of the DISDRODB archive. Format: [``data_archive_dir``, ``product_version``, ``data_source`, ``campaign_name``, ...] """ # Retrieve path elements (os-specific) p = Path(path) list_path_elements = [str(part) for part in p.parts] # Retrieve where "DISDRODB" directory occurs idx_occurrence = np.where(np.isin(list_path_elements, "DISDRODB"))[0] # If DISDRODB directory not present, raise error if len(idx_occurrence) == 0: raise ValueError(f"The DISDRODB directory is not present in the path '{path}'") # Find the rightermost occurrence right_most_occurrence = max(idx_occurrence) # Define archive_dir and tree components archive_dir = os.path.join(*list_path_elements[: right_most_occurrence + 1]) tree_components = list_path_elements[right_most_occurrence + 1 :] # Return components components = [archive_dir, *tree_components] return components
[docs] def infer_path_info_dict(path: str) -> dict: """Return a dictionary with the ``data_archive_dir``, ``data_source`` and ``campaign_name`` of the disdrodb_path. Parameters ---------- path : str Directory or file path within the DISDRODB archive. Returns ------- dict Dictionary with the path element of the DISDRODB archive. Valid keys: ``"data_archive_dir"``, ``"data_source"``, ``"campaign_name"`` """ components = infer_disdrodb_tree_path_components(path=path) if len(components) <= 3: raise ValueError(f"Impossible to determine data_source and campaign_name from {path}") path_dict = {} path_dict["data_archive_dir"] = components[0] path_dict["data_source"] = components[2] path_dict["campaign_name"] = components[3] return path_dict
[docs] def infer_path_info_tuple(path: str) -> tuple: """Return a tuple with the ``data_archive_dir``, ``data_source`` and ``campaign_name`` of the disdrodb_path. Parameters ---------- path : str Directory or file path within the DISDRODB archive. Returns ------- tuple Dictionary with the path element of the DISDRODB archive. Valid keys: ``"data_archive_dir"``, ``"data_source"``, ``"campaign_name"`` """ path_dict = infer_path_info_dict(path) return path_dict["data_archive_dir"], path_dict["data_source"], path_dict["campaign_name"]
[docs] def infer_disdrodb_tree_path(path: str) -> str: """Return the directory tree path from the archive directory. Current assumption: no ``data_source``, ``campaign_name``, ``station_name`` or file contain the word DISDRODB! Parameters ---------- path : str Directory or file path within the DISDRODB archive. Returns ------- str Path inside the DISDRODB archive. Format: ``DISDRODB/RAW/<DATA_SOURCE>/<CAMPAIGN_NAME>/...`` Format: ``DISDRODB/<ARCHIVE_VERSION>/<DATA_SOURCE>/<CAMPAIGN_NAME>/...`` """ components = infer_disdrodb_tree_path_components(path=path) tree_filepath = os.path.join("DISDRODB", *components[1:]) return tree_filepath
[docs] def infer_archive_dir_from_path(path: str) -> str: """Return the disdrodb base directory from a file or directory path. Assumption: no data_source, campaign_name, station_name or file contain the word DISDRODB! Parameters ---------- path : str Directory or file path within the DISDRODB archive. Returns ------- str Path of the DISDRODB directory. """ return infer_disdrodb_tree_path_components(path=path)[0]
[docs] def infer_campaign_name_from_path(path: str) -> str: """Return the campaign name from a file or directory path. Assumption: no ``data_source``, ``campaign_name``, ``station_name`` or file contain the word DISDRODB! Parameters ---------- path : str Directory or file path within the DISDRODB archive. Returns ------- str Name of the campaign. """ components = infer_disdrodb_tree_path_components(path) if len(components) <= 3: raise ValueError(f"Impossible to determine campaign_name from {path}") campaign_name = components[3] return campaign_name
[docs] def infer_data_source_from_path(path: str) -> str: """Return the data_source from a file or directory path. Assumption: no ``data_source``, ``campaign_name``, ``station_name`` or file contain the word DISDRODB! Parameters ---------- path : str Directory or file path within the DISDRODB archive. Returns ------- str Name of the data source. """ components = infer_disdrodb_tree_path_components(path) if len(components) <= 2: raise ValueError(f"Impossible to determine data_source from {path}") data_source = components[2] return data_source
####--------------------------------------------------------------------------. ####################### #### Group utility #### ####################### FILE_KEYS = [ "product", "subproduct", "campaign_name", "station_name", "start_time", "end_time", "data_format", "accumulation_acronym", "sample_interval", ] TIME_KEYS = [ "year", "month", "month_name", "quarter", "season", "day", "doy", "dow", "hour", "minute", "second", ]
[docs] def check_groups(groups): """Check groups validity.""" if not isinstance(groups, (str, list)): raise TypeError("'groups' must be a list (or a string if a single group is specified.") if isinstance(groups, str): groups = [groups] groups = np.array(groups) valid_keys = FILE_KEYS + TIME_KEYS invalid_keys = groups[np.isin(groups, valid_keys, invert=True)] if len(invalid_keys) > 0: raise ValueError(f"The following group keys are invalid: {invalid_keys}. Valid values are {valid_keys}.") return groups.tolist()
[docs] def get_season(time): """Get season from `datetime.datetime` or `datetime.date` object.""" month = time.month if month in [12, 1, 2]: return "DJF" # Winter (December, January, February) if month in [3, 4, 5]: return "MAM" # Spring (March, April, May) if month in [6, 7, 8]: return "JJA" # Summer (June, July, August) return "SON" # Autumn (September, October, November)
[docs] def get_time_component(time, component): """Get time component from `datetime.datetime` object.""" func_dict = { "year": lambda time: time.year, "month": lambda time: time.month, "day": lambda time: time.day, "doy": lambda time: time.timetuple().tm_yday, # Day of year "dow": lambda time: time.weekday(), # Day of week (0=Monday, 6=Sunday) "hour": lambda time: time.hour, "minute": lambda time: time.minute, "second": lambda time: time.second, # Additional "month_name": lambda time: time.strftime("%B"), # Full month name "quarter": lambda time: (time.month - 1) // 3 + 1, # Quarter (1-4) "season": lambda time: get_season(time), # Season (DJF, MAM, JJA, SON) } return str(func_dict[component](time))
def _get_groups_value(groups, filepath): """Return the value associated to the groups keys. If multiple keys are specified, the value returned is a string of format: ``<group_value_1>/<group_value_2>/...`` If a single key is specified and is ``start_time`` or ``end_time``, the function returns a :py:class:`datetime.datetime` object. """ single_key = len(groups) == 1 info_dict = get_info_from_filepath(filepath) start_time = info_dict["start_time"] list_key_values = [] for key in groups: if key in TIME_KEYS: list_key_values.append(get_time_component(start_time, component=key)) else: value = info_dict.get(key, f"{key}=None") list_key_values.append(value if single_key else str(value)) if single_key: return list_key_values[0] return "/".join(list_key_values)
[docs] def group_filepaths(filepaths, groups=None): """ Group filepaths in a dictionary if groups are specified. Parameters ---------- filepaths : list List of filepaths. groups: list or str The group keys by which to group the filepaths. Valid group keys are ``product``, ``subproduct``, ``campaign_name``, ``station_name``, ``start_time``, ``end_time``,``accumulation_acronym``,``sample_interval``, ``data_format``, ``year``, ``month``, ``day``, ``doy``, ``dow``, ``hour``, ``minute``, ``second``, ``month_name``, ``quarter``, ``season``. The time components are extracted from ``start_time`` ! If groups is ``None`` returns the input filepaths list. The default value is ``None``. Returns ------- dict or list Either a dictionary of format ``{<group_value>: <list_filepaths>}``. or the original input filepaths (if ``groups=None``) """ if groups is None: return filepaths groups = check_groups(groups) filepaths_dict = defaultdict(list) _ = [filepaths_dict[_get_groups_value(groups, filepath)].append(filepath) for filepath in filepaths] return dict(filepaths_dict)