Source code for disdrodb.l0.check_configs

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Check configuration files."""

import os
from typing import Optional, Union

import numpy as np
from pydantic import Field, field_validator, model_validator

from disdrodb.api.configs import available_sensor_names, get_sensor_configs_dir, read_config_file
from disdrodb.l0.standards import (
    get_diameter_bin_center,
    get_diameter_bin_lower,
    get_diameter_bin_upper,
    get_diameter_bin_width,
    get_velocity_bin_center,
    get_velocity_bin_lower,
    get_velocity_bin_upper,
    get_velocity_bin_width,
)
from disdrodb.utils.directories import list_files
from disdrodb.utils.pydantic import CustomBaseModel

CONFIG_FILES_LIST = [
    "bins_diameter.yml",
    "bins_velocity.yml",
    "raw_data_format.yml",
    "l0a_encodings.yml",
    "l0b_encodings.yml",
    "l0b_cf_attrs.yml",
]


[docs] def check_yaml_files_exists(sensor_name: str) -> None: """Check if all L0 config YAML files exist. Parameters ---------- sensor_name : str Name of the sensor. """ config_dir = get_sensor_configs_dir(sensor_name, product="L0A") filepaths = list_files(config_dir, glob_pattern="*.yml", recursive=False) filenames = [os.path.split(i)[-1] for i in filepaths] missing_keys = set(CONFIG_FILES_LIST).difference(set(filenames)) if missing_keys: missing_keys_text = ",".join(missing_keys) raise FileNotFoundError(f"Missing YAML files {missing_keys_text} in {config_dir} for sensor {sensor_name}.")
[docs] def check_variable_consistency(sensor_name: str) -> None: """ Check variable consistency across config files. The variables specified within l0b_encoding.yml must be defined also in the other config files. The raw_data_format.yml can contain some extra variables ! Parameters ---------- sensor_name : str Name of the sensor. Raises ------ ValueError If the keys are not consistent. """ list_variables = read_config_file(sensor_name, product="L0A", filename="l0b_cf_attrs.yml").keys() filenames = [ "raw_data_format.yml", "l0a_encodings.yml", "l0b_encodings.yml", ] for filename in filenames: keys_to_check = read_config_file(sensor_name, product="L0A", filename=filename).keys() missing_keys = set(list_variables).difference(set(keys_to_check)) extra_keys = set(keys_to_check).difference(set(list_variables)) if missing_keys: msg = f"The {sensor_name} {filename} file does not have the following keys {missing_keys}" if extra_keys: if filename == "raw_data_format.yml": msg = msg + f"FYI the file has the following extra keys {extra_keys}." else: msg = msg + f"and it has the following extra keys {extra_keys}." raise ValueError(msg) if extra_keys and filename != "raw_data_format.yml": msg = f"The {sensor_name} {filename} has the following extra keys {extra_keys}." raise ValueError(msg)
def _schema_error(object_to_validate: Union[str, list], schema: CustomBaseModel, message) -> bool: """Function that validate the schema of a given object with a given schema. Parameters ---------- object_to_validate : Union[str,list] Object to validate. schema : CustomBaseModel Base model. """ try: schema(**object_to_validate) except ValueError as e: raise ValueError(f"{message} {e!s}") from None
[docs] class L0BEncodingSchema(CustomBaseModel): """Pydantic model for DISDRODB netCDF encodings.""" contiguous: bool dtype: str zlib: bool complevel: int shuffle: bool fletcher32: bool FillValue: Optional[Union[int, float]] = Field(default=None, alias="_FillValue") chunksizes: Optional[Union[int, list[int]]] add_offset: Optional[float] = None scale_factor: Optional[float] = None # if contiguous=False, chunksizes specified, otherwise should be not !
[docs] @model_validator(mode="before") def check_chunksizes_and_zlib(cls, values): """Check the chunksizes validity.""" contiguous = values.get("contiguous") chunksizes = values.get("chunksizes") if not contiguous and not chunksizes: raise ValueError("'chunksizes' must be defined if 'contiguous' is False") return values
# if contiguous = True, then zlib must be set to False
[docs] @model_validator(mode="before") def check_contiguous_and_zlib(cls, values): """Check the the compression value validity.""" contiguous = values.get("contiguous") zlib = values.get("zlib") if contiguous and zlib: raise ValueError("'zlib' must be set to False if 'contiguous' is True") return values
# if contiguous = True, then fletcher32 must be set to False
[docs] @model_validator(mode="before") def check_contiguous_and_fletcher32(cls, values): """Check the fletcher value validity.""" contiguous = values.get("contiguous") fletcher32 = values.get("fletcher32") if contiguous and fletcher32: raise ValueError("'fletcher32' must be set to False if 'contiguous' is True") return values
# if dtype is integer/unsigned integer, _FillValue must be specified and valid
[docs] @model_validator(mode="before") def check_integer_fillvalue(cls, values): """Check that integer dtypes have valid _FillValue.""" dtype = values.get("dtype") fill_value = values.get("_FillValue", None) integer_types = ["int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"] # Check if dtype is an integer type if dtype in integer_types: # _FillValue must be specified for integer types if fill_value is None: raise ValueError(f"'_FillValue' must be specified for integer dtype '{dtype}'") # Check that _FillValue is within valid range for the dtype dtype_info = np.iinfo(dtype) max_value = dtype_info.max # min_value = dtype_info.min # if not (min_value <= fill_value <= max_value): # raise ValueError( # f"'_FillValue' ({fill_value}) is out of range for dtype '{dtype}'. " # f"Valid range is [{min_value}, {max_value}]", # ) # Check that _FillValue corresponds to the maximum allowed value if fill_value != max_value: raise ValueError( f"'_FillValue' ({fill_value}) should be set to the maximum allowed value " f"({max_value}) for integer dtype '{dtype}'", ) return values
[docs] @model_validator(mode="after") def remove_offset_and_scale_if_not_int(cls, values): """Ensure add_offset and scale_factor only apply to integer/uint dtypes.""" integer_types = ["int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"] if values.dtype not in integer_types: # drop those keys to keep model clean values.add_offset = None values.scale_factor = None return values
[docs] def check_l0b_encoding(sensor_name: str) -> None: """Check ``l0b_encodings.yml`` file based on the schema defined in the class ``L0BEncodingSchema``. Parameters ---------- sensor_name : str Name of the sensor. """ data = read_config_file(sensor_name, product="L0B", filename="l0b_encodings.yml") # check that the second level of the dictionary match the schema for key, value in data.items(): _schema_error( object_to_validate=value, schema=L0BEncodingSchema, message=f"Invalid {sensor_name} L0B Encoding YAML file '{key}' settings.", )
[docs] def check_l0a_encoding(sensor_name: str) -> None: """Check ``l0a_encodings.yml`` file. Parameters ---------- sensor_name : str Name of the sensor. Raises ------ ValueError Error raised if the value of a key is not in the list of accepted values. """ data = read_config_file(sensor_name, product="L0A", filename="l0a_encodings.yml") numeric_field = ["uint8", "uint16", "uint32", "uint64", "int8", "int16", "int32", "int64", "float32", "float64"] text_field = ["str"] for key, value in data.items(): if value not in text_field + numeric_field: raise ValueError(f"Wrong value for {key} in l0a_encodings.yml for sensor {sensor_name}.") if not isinstance(key, str): raise TypeError(f"Expecting a string for {key} in l0a_encodings.yml for sensor {sensor_name}.")
[docs] class RawDataFormatSchema(CustomBaseModel): """Pydantic model for the DISDRODB RAW Data Format YAML files.""" n_digits: Optional[int] n_characters: Optional[int] n_decimals: Optional[int] n_naturals: Optional[int] data_range: Optional[list[float]] nan_flags: Optional[Union[int, float, str]] = None valid_values: Optional[list[float]] = None dimension_order: Optional[list[str]] = None n_values: Optional[int] = None field_number: Optional[str] = None
[docs] @field_validator("data_range") def check_list_length(cls, value): """Check the data_range validity.""" if value: if len(value) != 2: raise ValueError(f"data_range must have exactly 2 keys, {len(value)} element have been provided.") return value return None
[docs] def check_raw_data_format(sensor_name: str) -> None: """Check ``raw_data_format.yml`` file based on the schema defined in the class ``RawDataFormatSchema``. Parameters ---------- sensor_name : str Name of the sensor. """ data = read_config_file(sensor_name, product="L0A", filename="raw_data_format.yml") # check that the second level of the dictionary match the schema for key, value in data.items(): _schema_error( object_to_validate=value, schema=RawDataFormatSchema, message=f"Invalid {sensor_name} RawDataFormat YAML file '{key}' settings.", )
[docs] def check_cf_attributes(sensor_name: str) -> None: """Check that the ``l0b_cf_attrs.yml`` description, long_name and units values are strings. Parameters ---------- sensor_name : str Name of the sensor. """ cf_dict = read_config_file(sensor_name, product="L0A", filename="l0b_cf_attrs.yml") for var, attrs_dict in cf_dict.items(): for key, value in attrs_dict.items(): if not isinstance(value, str): raise ValueError(f"Wrong value for {key} in {var} for sensor {sensor_name}.")
[docs] def check_bin_consistency(sensor_name: str) -> None: """Check bin consistency from config file. Do not check the first and last bin ! Parameters ---------- sensor_name : str Name of the sensor. """ diameter_bin_lower = np.array(get_diameter_bin_lower(sensor_name)) diameter_bin_upper = np.array(get_diameter_bin_upper(sensor_name)) diameter_bin_center = np.array(get_diameter_bin_center(sensor_name)) diameter_bin_width = np.array(get_diameter_bin_width(sensor_name)) expected_diameter_width = diameter_bin_upper - diameter_bin_lower np.testing.assert_allclose(expected_diameter_width[1:-1], diameter_bin_width[1:-1], atol=1e-3, rtol=1e-4) expected_diameter_center = diameter_bin_lower + diameter_bin_width / 2 np.testing.assert_allclose(expected_diameter_center[1:-1], diameter_bin_center[1:-1], atol=1e-3, rtol=1e-4) expected_diameter_center = diameter_bin_upper - diameter_bin_width / 2 np.testing.assert_allclose(expected_diameter_center[1:-1], diameter_bin_center[1:-1], atol=1e-3, rtol=1e-4) velocity_bin_lower = np.array(get_velocity_bin_lower(sensor_name)) velocity_bin_upper = np.array(get_velocity_bin_upper(sensor_name)) velocity_bin_center = np.array(get_velocity_bin_center(sensor_name)) velocity_bin_width = np.array(get_velocity_bin_width(sensor_name)) if all(arr.size > 1 for arr in [velocity_bin_center, velocity_bin_lower, velocity_bin_upper, velocity_bin_width]): np.testing.assert_allclose(velocity_bin_upper - velocity_bin_lower, velocity_bin_width, atol=1e-3, rtol=1e-4) np.testing.assert_allclose( velocity_bin_lower + velocity_bin_width / 2, velocity_bin_center, atol=1e-3, rtol=1e-4, ) np.testing.assert_allclose( velocity_bin_upper - velocity_bin_width / 2, velocity_bin_center, atol=1e-3, rtol=1e-4, )
[docs] def check_raw_array(sensor_name: str) -> None: """Check raw array consistency from config file. Parameters ---------- sensor_name : str Name of the sensor. Raises ------ ValueError Error if the chunksizes are not consistent. """ raw_data_format = read_config_file(sensor_name, product="L0A", filename="raw_data_format.yml") # Get keys in raw_data_format where the value is "dimension_order" dict_keys_with_dimension_order = { key: value.get("dimension_order") for key, value in raw_data_format.items() if "dimension_order" in value } l0b_encodings = read_config_file(sensor_name, product="L0A", filename="l0b_encodings.yml") for key, list_velocity_or_diameter in dict_keys_with_dimension_order.items(): expected_length = len(list_velocity_or_diameter) + 1 current_length = len(l0b_encodings.get(key).get("chunksizes")) if expected_length != current_length: raise ValueError(f"Wrong chunksizes for {key} in l0b_encodings.yml for sensor {sensor_name}.") # Get chunksizes in l0b_encoding.yml and check that if len > 1, has dimension_order key in raw_data_format list_attributes_l0b_encodings = [ i for i in l0b_encodings if isinstance(l0b_encodings.get(i).get("chunksizes"), list) and len(l0b_encodings.get(i).get("chunksizes")) > 1 ] list_attributes_from_raw_data_format = [ i for i in raw_data_format if raw_data_format.get(i).get("dimension_order") is not None ] if not sorted(list_attributes_l0b_encodings) == sorted(list_attributes_from_raw_data_format): raise ValueError(f"Chunksizes in l0b_encodings and raw_data_format for sensor {sensor_name} does not match.")
[docs] def check_sensor_configs(sensor_name: str) -> None: """Check validity of sensor configuration YAML files. Parameters ---------- sensor_name : str Name of the sensor. """ check_yaml_files_exists(sensor_name) check_variable_consistency(sensor_name) check_l0b_encoding(sensor_name=sensor_name) check_l0a_encoding(sensor_name=sensor_name) check_raw_data_format(sensor_name=sensor_name) check_cf_attributes(sensor_name=sensor_name) check_bin_consistency(sensor_name=sensor_name) check_raw_array(sensor_name=sensor_name)
[docs] def check_all_sensors_configs() -> None: """Check all sensors configuration YAML files.""" for sensor_name in available_sensor_names(): check_sensor_configs(sensor_name=sensor_name)