# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Implements ProcessingOption class for DISDRODB routines."""
import json
import os
import disdrodb
from disdrodb.api.checks import check_product, check_sensor_name, check_temporal_resolution
from disdrodb.api.info import group_filepaths
from disdrodb.configs import get_products_configs_dir
from disdrodb.utils.archiving import define_temporal_partitions, group_files_by_temporal_partitions
from disdrodb.utils.list import flatten_list
from disdrodb.utils.routines import is_possible_product
from disdrodb.utils.time import ensure_timedelta_seconds, get_sampling_information
from disdrodb.utils.yaml import read_yaml
# TODO: Test ensure recursive update for product_options key, do not replace just "product_options" dict !
# get_product_options(product="L2E", temporal_resolution="10MIN")
# get_product_options(product="L2M", temporal_resolution="10MIN")
# get_product_options(product="L1")
# get_product_options(product="L1", temporal_resolution="1MIN")
# get_product_options(product="L1", temporal_resolution="1MIN", sensor_name="PARSIVEL")
# TODO: test return list
# get_product_temporal_resolutions(product="L1")
# get_product_temporal_resolutions(product="L2E")
# get_product_temporal_resolutions(product="L2M")
[docs]
def get_product_options(product, temporal_resolution=None, sensor_name=None):
"""Return DISDRODB product options.
If temporal resolution is not provided, it returns the global product option.
If temporal resolution is provided, it provides the custom product options, if specified.
If product="L1" and sensor_name is specified, it customize product options also by sensor.
"""
# Retrieve products configuration directory
products_configs_dir = get_products_configs_dir()
# Validate DISDRODB products configuration
validate_product_configuration(products_configs_dir)
# Check product
check_product(product)
# Retrieve global product options (when no temporal resolution !)
global_options = read_yaml(os.path.join(products_configs_dir, product, "global.yaml"))
if temporal_resolution is None:
global_options = check_availability_radar_simulations(global_options)
return global_options
# Check temporal resolution
check_temporal_resolution(temporal_resolution)
# If temporal resolutions are specified, drop 'temporal_resolutions' key
global_options.pop("temporal_resolutions", None)
# Read custom options for specific temporal resolution
custom_options_path = os.path.join(products_configs_dir, product, f"{temporal_resolution}.yaml")
if not os.path.exists(custom_options_path):
return global_options
custom_options = read_yaml(custom_options_path)
# Define product options
options = global_options.copy()
if "product_options" in custom_options:
options["product_options"].update(custom_options.pop("product_options"))
options.update(custom_options)
# Check availability of radar simulations
options = check_availability_radar_simulations(options)
# Customize product options by sensor if L1 product
if product == "L1" and sensor_name is not None:
check_sensor_name(sensor_name)
custom_options_path = os.path.join(products_configs_dir, product, sensor_name, f"{temporal_resolution}.yaml")
if not os.path.exists(custom_options_path):
return options
custom_options = read_yaml(custom_options_path)
if "product_options" in custom_options:
options["product_options"].update(custom_options.pop("product_options"))
return options
[docs]
def get_product_temporal_resolutions(product):
"""Return DISDRODB products temporal resolutions."""
# Check only L2E and L2M
return get_product_options(product)["temporal_resolutions"]
[docs]
def get_model_options(product, model_name):
"""Return DISDRODB L2M product model options."""
# Retrieve products configuration directory
products_configs_dir = get_products_configs_dir()
model_options_path = os.path.join(products_configs_dir, product, "MODELS", f"{model_name}.yaml")
model_options = read_yaml(model_options_path)
return model_options
[docs]
def check_availability_radar_simulations(options):
"""Check radar simulations are possible for L2E and L2M products."""
if "radar_enabled" in options and not disdrodb.is_pytmatrix_available():
options["radar_enabled"] = False
return options
[docs]
def validate_product_configuration(products_configs_dir):
"""Validate the DISDRODB products configuration files."""
# TODO: Implement validation of DISDRODB products configuration files with pydantic
# TODO: Raise warning if L1 temporal resolutions does not includes all temporal resolutions of L2 products.
# TODO: Raise warning if L2E temporal resolutions does not includes all temporal resolutions of L2M products.
# if stategy_event, check neighbor_time_interval >= sample_interval !
# if temporal_resolution_to_seconds(neighbor_time_interval) < temporal_resolution_to_seconds(sample_interval):
# msg = "'neighbor_time_interval' must be at least equal to the dataset sample interval ({sample_interval})"
# raise ValueError(msg)
pass
def _define_blocks_offsets(sample_interval, temporal_resolution):
"""Define blocks offset for resampling logic."""
# Retrieve accumulation_interval and rolling option
accumulation_interval, rolling = get_sampling_information(temporal_resolution)
# Ensure sample_interval and accumulation_interval is numpy.timedelta64
accumulation_interval = ensure_timedelta_seconds(accumulation_interval)
sample_interval = ensure_timedelta_seconds(sample_interval)
# Define offset to apply to time partitions blocks
block_starts_offset = 0
block_ends_offset = 0
# If rolling, need to search also in next time block ...
if rolling and sample_interval != accumulation_interval:
block_ends_offset = accumulation_interval - sample_interval
return block_starts_offset, block_ends_offset
[docs]
class L1ProcessingOptions:
"""Define L1 product processing options."""
def __init__(self, filepaths, parallel, sensor_name, temporal_resolutions=None):
"""Define DISDRODB L1 product processing options."""
product = "L1"
# ---------------------------------------------------------------------.
# Define temporal resolutions for which to retrieve processing options
if temporal_resolutions is None:
temporal_resolutions = get_product_temporal_resolutions(product)
elif isinstance(temporal_resolutions, str):
temporal_resolutions = [temporal_resolutions]
_ = [check_temporal_resolution(temporal_resolution) for temporal_resolution in temporal_resolutions]
# ---------------------------------------------------------------------.
# Get product options at various temporal resolutions
src_dict_product_options = {
temporal_resolution: get_product_options(
product=product,
temporal_resolution=temporal_resolution,
sensor_name=sensor_name,
)
for temporal_resolution in temporal_resolutions
}
# ---------------------------------------------------------------------.
# Group filepaths by source sample intervals
# - Typically the sample interval is fixed and is just one
# - Some stations might change the sample interval along the years
# - For each sample interval, separated processing must take place
dict_filepaths = group_filepaths(filepaths, groups="sample_interval")
# ---------------------------------------------------------------------.
# Retrieve processing information for each temporal resolution
dict_product_options = {}
dict_folder_partitioning = {}
dict_files_partitions = {}
_cache_dict_temporal_partitions: dict[str, dict] = {}
# temporal_resolution = temporal_resolutions[0]
for temporal_resolution in temporal_resolutions:
# -------------------------------------------------------------------------.
# Retrieve product options
product_options = src_dict_product_options[temporal_resolution].copy()
# Extract processing options
archive_options = product_options.pop("archive_options")
dict_product_options[temporal_resolution] = product_options
# -------------------------------------------------------------------------.
# Define folder partitioning
if "folder_partitioning" not in archive_options:
dict_folder_partitioning[temporal_resolution] = disdrodb.config.get("folder_partitioning")
else:
dict_folder_partitioning[temporal_resolution] = archive_options.pop("folder_partitioning")
# -------------------------------------------------------------------------.
# Define list of temporal partitions
# - [{start_time: np.datetime64, end_time: np.datetime64}, ....]
# - Either strategy: "event" or "time_block" or save_by_time_block"
# - "event" requires loading data into memory to identify events
# --> Does some data filtering on what to process !
# - "time_block" does not require loading data into memory
# --> Does not do data filtering on what to process !
# --> Here we cache dict_temporal_partitions so that we don't need to recompute
# stuffs if processing options are the same
# --> Using time_block with e.g. freq="day" we get start/end in the format of 00:00:00-23:59:59
key = json.dumps(archive_options, sort_keys=True)
if key not in _cache_dict_temporal_partitions:
_cache_dict_temporal_partitions[key] = {
sample_interval: define_temporal_partitions(filepaths, parallel=parallel, **archive_options)
for sample_interval, filepaths in dict_filepaths.items()
}
dict_temporal_partitions = _cache_dict_temporal_partitions[key].copy() # To avoid in-place replacement
# ------------------------------------------------------------------.
# Group filepaths by temporal partitions
# - This is done separately for each possible source sample interval
# - It groups filepaths by start_time and end_time provided by temporal_partitions
# --> Output: [{'start_time': ..., 'end_time': ..., filepaths: [...]}, ...]
# - In L0C we ensure that the time reported correspond to the start of the measurement interval.
# - When aggregating/resampling/accumulating data, we need to load also some data after the
# actual end_time of the time partition to ensure that
# the resampled dataset contains the timesteps of the partition end time.
# --> Use of block_starts_offset and block_ends_offset in group_files_by_temporal_partitions
# - ATTENTION: group_files_by_temporal_partitions returns
# start_time and end_time as datetime.datetime64objects !
# - ATTENTION: Files within each files_partitions block have the same sample_interval !
# sample_interval = 30
# temporal_partitions = dict_temporal_partitions[sample_interval]
files_partitions = []
for sample_interval, temporal_partitions in dict_temporal_partitions.items():
if is_possible_product(
temporal_resolution=temporal_resolution,
sample_interval=sample_interval,
):
block_starts_offset, block_ends_offset = _define_blocks_offsets(
sample_interval=sample_interval,
temporal_resolution=temporal_resolution,
)
files_partitions.append(
group_files_by_temporal_partitions(
temporal_partitions=temporal_partitions,
filepaths=dict_filepaths[sample_interval],
block_starts_offset=block_starts_offset,
block_ends_offset=block_ends_offset,
),
)
files_partitions = flatten_list(files_partitions)
dict_files_partitions[temporal_resolution] = files_partitions
# ------------------------------------------------------------------.
# Keep only temporal_resolutions for which products can be defined
# - Remove e.g when not compatible accumulation_interval with source sample_interval
temporal_resolutions = [
temporal_resolution
for temporal_resolution in temporal_resolutions
if len(dict_files_partitions[temporal_resolution]) > 0
]
# ------------------------------------------------------------------.
# Add attributes
self.temporal_resolutions = temporal_resolutions
self.dict_files_partitions = dict_files_partitions
self.dict_product_options = dict_product_options
self.dict_folder_partitioning = dict_folder_partitioning
[docs]
def group_files_by_temporal_partitions(self, temporal_resolution):
"""Return files partitions dictionary for a specific L2E product."""
return self.dict_files_partitions[temporal_resolution]
[docs]
def get_product_options(self, temporal_resolution):
"""Return product options dictionary for a specific L2E product."""
return self.dict_product_options[temporal_resolution]
[docs]
def get_folder_partitioning(self, temporal_resolution):
"""Return the folder partitioning for a specific L2E product."""
# to be used for logs and files !
return self.dict_folder_partitioning[temporal_resolution]
[docs]
class L2ProcessingOptions:
"""Define L2 products processing options."""
def __init__(self, product, filepaths, parallel, temporal_resolution):
"""Define DISDRODB L2 products processing options."""
import disdrodb
# Check temporal resolution
check_temporal_resolution(temporal_resolution)
# Get product options
product_options = get_product_options(product, temporal_resolution=temporal_resolution)
# Extract processing options
archive_options = product_options.pop("archive_options")
# Define folder partitioning
if "folder_partitioning" not in archive_options:
folder_partitioning = disdrodb.config.get("folder_partitioning")
else:
folder_partitioning = archive_options.pop("folder_partitioning")
# Define files temporal partitions
# - [{start_time: np.datetime64, end_time: np.datetime64}, ....]
# - Either strategy: "event" or "time_block"
# - "strategy=event" requires loading data into memory to identify events
# --> Does some data filtering on what to process !
# - "strategy=time_block" does not require loading data into memory
# --> Does not do data filtering on what to process !
temporal_partitions = define_temporal_partitions(filepaths, parallel=parallel, **archive_options)
# ------------------------------------------------------------------.
# Group filepaths by temporal partitions
# - It groups filepaths by start_time and end_time provided by temporal_partitions
# - ATTENTION: group_files_by_temporal_partitions returns
# start_time and end_time as datetime.datetime64 objects !
files_partitions = group_files_by_temporal_partitions(
temporal_partitions=temporal_partitions,
filepaths=filepaths,
)
files_partitions = flatten_list(files_partitions)
# ------------------------------------------------------------------.
# Add attributes
# self.temporal_partitions = temporal_partitions
self.folder_partitioning = folder_partitioning
self.files_partitions = files_partitions
self.product_options = product_options