Source code for disdrodb.utils.decorators
# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""DISDRODB decorators."""
import functools
import importlib
import uuid
import dask
[docs]
def create_dask_task_name(function_name: str, name=None) -> str | None:
"""
Create a custom dask task name.
Parameters
----------
function_name : str
Name of the function being delayed.
name : str, optional
Custom name for the task (e.g., filepath or ID).
If None, returns None so that Dask generates is own default name.
Returns
-------
str | None
Custom dask task name string if `name` is given,
otherwise None (use Dask's default naming).
"""
if name is None:
return None
return f"{function_name}.{name}-{uuid.uuid4()}"
[docs]
def delayed_if_parallel(function):
"""Decorator to make the function delayed if its ``parallel`` argument is ``True``."""
@functools.wraps(function)
def wrapper(*args, **kwargs):
# Check if it must be a delayed function
parallel = kwargs.get("parallel")
# If parallel is True
if parallel:
# Enforce verbose to be False
kwargs["verbose"] = False
# Define custom dask task name
if "logs_filename" in kwargs:
kwargs["dask_key_name"] = create_dask_task_name(
function_name=function.__name__,
name=kwargs["logs_filename"],
)
# Define the delayed task
result = dask.delayed(function)(*args, **kwargs)
else:
# Else run the function
result = function(*args, **kwargs)
return result
return wrapper
[docs]
def single_threaded_if_parallel(function):
"""Decorator to make a function use a single threadon delayed if its ``parallel`` argument is ``True``."""
@functools.wraps(function)
def wrapper(*args, **kwargs):
# Check if it must be a delayed function
parallel = kwargs.get("parallel")
# If parallel is True
if parallel:
# Call function with single thread
# with dask.config.set(scheduler='single-threaded'):
with dask.config.set(scheduler="single-threaded"): # "synchronous"
result = function(*args, **kwargs)
else:
# Else run the function as usual
result = function(*args, **kwargs)
return result
return wrapper
[docs]
def check_software_availability(software, conda_package):
"""A decorator to ensure that a software package is installed.
Parameters
----------
software : str
The package name as recognized by Python's import system.
conda_package : str
The package name as recognized by conda-forge.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not importlib.util.find_spec(software):
raise ImportError(
f"The '{software}' package is required but not found.\n"
"Please install it using conda:\n"
f" conda install -c conda-forge {conda_package}",
)
return func(*args, **kwargs)
return wrapper
return decorator
[docs]
def check_pytmatrix_availability(func):
"""Decorator to ensure that the 'pytmatrix' package is installed."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not importlib.util.find_spec("pytmatrix"):
raise ImportError(
"The 'pytmatrix' package is required but not found. \n"
"Please install the following software: \n"
" conda install conda-forge gfortran \n"
" conda install conda-forge meson \n"
" pip install git+https://github.com/ltelab/pytmatrix-lte.git@main \n",
)
return func(*args, **kwargs)
return wrapper