#!/usr/bin/env python3
# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities for DISDRODB processing routines."""
import os
import shutil
import tempfile
from disdrodb.api.io import find_files
from disdrodb.api.path import define_file_folder_path, define_temporal_resolution
from disdrodb.utils.logger import (
close_logger,
create_logger_file,
log_error,
log_info,
)
[docs]
def is_possible_product(accumulation_interval, sample_interval, rolling):
"""Assess if production is possible given the requested accumulation interval and source sample_interval."""
# Avoid rolling product generation at source sample interval
if rolling and accumulation_interval == sample_interval:
return False
# Avoid product generation if the accumulation_interval is less than the sample interval
if accumulation_interval < sample_interval:
return False
# Avoid producti generation if accumulation_interval is not multiple of sample_interval
return accumulation_interval % sample_interval == 0
[docs]
def try_get_required_filepaths(
product,
data_archive_dir,
data_source,
campaign_name,
station_name,
debugging_mode,
**product_kwargs,
):
"""Try to retrieve required filepaths for a product, or return None if unavailable."""
try:
filepaths = find_files(
data_archive_dir=data_archive_dir,
data_source=data_source,
campaign_name=campaign_name,
station_name=station_name,
product=product,
debugging_mode=debugging_mode,
**product_kwargs,
)
return filepaths
# If no files available, print informative message
except Exception as e:
temporal_resolution = ""
if "sample_interval" in product_kwargs and "rolling" in product_kwargs:
temporal_resolution = define_temporal_resolution(
seconds=product_kwargs["sample_interval"],
rolling=product_kwargs["rolling"],
)
print(str(e))
msg = (
f"{product} processing of {data_source} {campaign_name} {station_name} "
f"has not been launched because of missing {product} {temporal_resolution} data."
)
print(msg)
return None
[docs]
def run_product_generation(
product: str,
logs_dir: str,
logs_filename: str,
parallel: bool,
verbose: bool,
folder_partitioning: str,
core_func: callable,
core_func_kwargs: dict,
pass_logger=False,
):
"""
Generic wrapper for DISDRODB product generation.
Parameters
----------
product : str
Product name (e.g., "L0A", "L0B", ...).
logs_dir : str
Logs directory.
logs_filename : str
Logs filename.
parallel : bool
Parallel flag (for logger).
verbose : bool
Verbose logging flag.
folder_partitioning : str
Partitioning scheme.
core_func : callable
Function with signature `core_func(logger)` that does the product-specific work.
Must return an xarray.Dataset or pandas.DataFrame (used to determine log subdir).
"""
with tempfile.TemporaryDirectory() as tmpdir:
# Initialize log file
logger, tmp_logger_filepath = create_logger_file(
logs_dir=tmpdir,
filename=logs_filename,
parallel=parallel,
)
# Inform that product creation has started
log_info(logger, f"{product} processing of {logs_filename} has started.", verbose=verbose)
# Initialize object
obj = None # if None, means the product creation failed
# Add logger to core_func_kwargs if specified
if pass_logger:
core_func_kwargs["logger"] = logger
# Try product creation
try:
# Run product creation
obj = core_func(**core_func_kwargs)
# Inform that product creation has ended
log_info(logger, f"{product} processing of {logs_filename} has ended.", verbose=verbose)
# Report error if the case
except Exception as e:
log_error(logger, f"{type(e).__name__}: {e}", verbose=verbose)
finally:
# Close logger
close_logger(logger)
# Move log file to final logs directory
success_flag = obj is not None
if success_flag: # and "time" in obj and len(obj["time"]) > 0:
logs_dir = define_file_folder_path(obj, dir_path=logs_dir, folder_partitioning=folder_partitioning)
os.makedirs(logs_dir, exist_ok=True)
if tmp_logger_filepath is not None: # (when running pytest, tmp_logger_filepath is None)
logger_filepath = os.path.join(logs_dir, os.path.basename(tmp_logger_filepath))
shutil.move(tmp_logger_filepath, logger_filepath)
else:
logger_filepath = None
# Free memory
del obj
# Return logger filepath
return logger_filepath