Source code for disdrodb.l0.l0a_processing

# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2026 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Functions to process raw text files into DISDRODB L0A Apache Parquet."""

import logging
import os

import numpy as np
import pandas as pd

from disdrodb.l0.check_standards import check_l0a_column_names, check_l0a_standards
from disdrodb.l0.l0b_processing import infer_split_str
from disdrodb.l0.standards import (
    get_data_range_dict,
    get_l0a_dtype,
    get_nan_flags_dict,
    get_valid_values_dict,
)
from disdrodb.utils.directories import create_directory, remove_if_exists

# Logger
from disdrodb.utils.logger import (
    log_error,
    log_info,
    log_warning,
)

logger = logging.getLogger(__name__)

pd.set_option("mode.chained_assignment", None)  # Avoid SettingWithCopyWarning
# https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#evaluation-order-matters


####---------------------------------------------------------------------------.
#### Raw file readers


[docs] def preprocess_reader_kwargs(reader_kwargs: dict) -> dict: """Preprocess arguments required to read raw text file into Pandas. Parameters ---------- reader_kwargs : dict Initial parameter dictionary. Returns ------- dict Parameter dictionary that matches either Pandas or Dask. """ # Check delimiter is specified ! if "delimiter" not in reader_kwargs: raise ValueError("The 'delimiter' key must be specified in reader_kwargs dictionary!") # Remove dtype key # - The dtype is enforced to be 'object' in the read function ! reader_kwargs.pop("dtype", None) # Preprocess the reader_kwargs reader_kwargs = reader_kwargs.copy() # Remove kwargs expected by dask dataframe read_csv reader_kwargs.pop("blocksize", None) reader_kwargs.pop("assume_missing", None) return reader_kwargs
[docs] def check_matching_column_number(df, column_names): """Check the number of columns in the dataframe matches the length of column names.""" n_columns = len(df.columns) n_expected_columns = len(column_names) if n_columns != n_expected_columns: msg = f"The dataframe has {n_columns} columns, while {n_expected_columns} are expected !." raise ValueError(msg)
[docs] def read_raw_text_file( filepath: str, column_names: list, reader_kwargs: dict, logger=None, # noqa ) -> pd.DataFrame: """Read a raw file into a dataframe. Parameters ---------- filepath : str Raw file path. column_names : list Column names. reader_kwargs : dict Pandas ``pd.read_csv`` arguments. logger : logging.Logger Logger object. The default is ``None``. If ``None``, the logger is created using the module name. If ``logger`` is passed, it will be used to log messages. Returns ------- pandas.DataFrame Pandas dataframe. """ # Preprocess reader_kwargs reader_kwargs = preprocess_reader_kwargs(reader_kwargs) # Enforce all raw files columns with dtype = 'str' dtype = "str" # Try to read the data try: df = pd.read_csv(filepath, names=column_names, dtype=dtype, **reader_kwargs) except pd.errors.EmptyDataError: # if isinstance(filepath, zipfile.ZipExtFile): # filepath = filepath.name msg = f"The following file is empty: {filepath}" raise ValueError(msg) # Check the dataframe is not empty if len(df.index) == 0: # if isinstance(filepath, zipfile.ZipExtFile): # filepath = filepath.name msg = f"The following file is empty: {filepath}" raise ValueError(msg) # Check dataframe column number matches columns_names if column_names is not None: check_matching_column_number(df, column_names) # Return dataframe return df
####---------------------------------------------------------------------------. #### L0A checks and homogenization
[docs] def remove_rows_with_missing_time(df: pd.DataFrame, logger=logger, verbose: bool = False): """Remove dataframe rows where the ``"time"`` is ``NaT``. Parameters ---------- df : pandas.DataFrame Input dataframe. verbose : bool Whether to verbose the processing. The default is ``False``. Returns ------- pandas.DataFrame Dataframe with valid timesteps. """ # Get the number of rows of the dataframe n_rows = len(df) # Drop rows with "time" nat values df = df.dropna(subset="time", axis=0) # If no valid timesteps, raise error if len(df.index) == 0: msg = "There are not valid timestep." raise ValueError(msg) # Otherwise, report the number of invalid timesteps n_invalid_timesteps = n_rows - len(df) if n_invalid_timesteps > 0: msg = f"{n_invalid_timesteps} rows had invalid timesteps and were discarded." log_warning(logger=logger, msg=msg, verbose=verbose) return df
[docs] def remove_duplicated_timesteps(df: pd.DataFrame, logger=None, verbose: bool = False): """Remove duplicated timesteps. It keep only the first timestep occurrence ! Parameters ---------- df : pandas.DataFrame Input dataframe. verbose : bool Whether to verbose the processing. The default is ``False``. Returns ------- pandas.DataFrame Dataframe with valid unique timesteps. """ values, counts = np.unique(df["time"], return_counts=True) idx_duplicates = np.where(counts > 1)[0] values_duplicates = values[idx_duplicates].astype("M8[s]") # If there are duplicated timesteps if len(values_duplicates) > 0: # TODO: raise error if duplicated timesteps have different values ! # Drop duplicated timesteps (keeping the first occurrence) df = df.drop_duplicates(subset="time", keep="first") # Report the values of duplicated timesteps msg = ( f"The following timesteps occurred more than once: {values_duplicates}. Only the first occurrence" " selected." ) log_warning(logger=logger, msg=msg, verbose=verbose) return df
[docs] def drop_timesteps(df, timesteps): """Drop problematic time steps.""" df = df[~df["time"].isin(timesteps)] # Check there are row left if len(df) == 0: msg = "No rows left after removing problematic timesteps. Maybe you need to adjust the issue YAML file." raise ValueError(msg) return df
[docs] def drop_time_periods(df, time_periods): """Drop problematic time periods.""" for time_period in time_periods: if len(df) > 0: start_time = time_period[0] end_time = time_period[1] df = df[(df["time"] < start_time) | (df["time"] > end_time)] # Check there are row left if len(df) == 0: msg = "No rows left after removing problematic time_periods. Maybe you need to adjust the issue YAML file." raise ValueError(msg) return df
[docs] def remove_issue_timesteps(df, issue_dict, logger=None, verbose=False): """Drop dataframe rows with timesteps listed in the issue dictionary. Parameters ---------- df : pandas.DataFrame Input dataframe. issue_dict : dict Issue dictionary. verbose : bool Whether to verbose the processing. The default is ``False``. Returns ------- pandas.DataFrame Dataframe with problematic timesteps removed. """ # Retrieve number of initial rows n_initial_rows = len(df) # Retrieve timesteps and time_periods timesteps = issue_dict.get("timesteps") time_periods = issue_dict.get("time_periods") timesteps = [] if timesteps is None else timesteps time_periods = [] if time_periods is None else time_periods # Drop rows of specified timesteps if len(timesteps) > 0: df = drop_timesteps(df=df, timesteps=timesteps) # Drop rows within specified time_period if len(time_periods) > 0: df = drop_time_periods(df, time_periods=time_periods) # Report number of dropped rows n_rows_dropped = n_initial_rows - len(df) if n_rows_dropped > 0: msg = f"{n_rows_dropped} rows were dropped following the issue YAML file content." log_info(logger=logger, msg=msg, verbose=verbose) return df
[docs] def cast_column_dtypes(df: pd.DataFrame, sensor_name: str) -> pd.DataFrame: """Convert ``'object'`` dataframe columns into DISDRODB L0A dtype standards. Parameters ---------- df : pandas.DataFrame Input dataframe. sensor_name : str Name of the sensor. Returns ------- pandas.DataFrame Dataframe with corrected columns types. """ # Cast dataframe to dtypes dtype_dict = get_l0a_dtype(sensor_name) # Ensure time column is saved with seconds resolution dtype_dict["time"] = "M8[s]" # Add latitude, longitude and elevation for mobile disdrometers dtype_dict["latitude"] = "float64" dtype_dict["longitude"] = "float64" dtype_dict["altitude"] = "float64" # Get dataframe column names columns = list(df.columns) # Cast dataframe columns df = df.copy(deep=True) # avoid modify also dtype of input df for column in columns: try: df[column] = df[column].astype(dtype_dict[column]) except ValueError as e: msg = f"ValueError: The column {column} has {e}" raise ValueError(msg) return df
[docs] def coerce_corrupted_values_to_nan(df: pd.DataFrame, sensor_name: str) -> pd.DataFrame: """Coerce corrupted values in dataframe numeric columns to ``np.nan``. Parameters ---------- df : pandas.DataFrame Input dataframe. sensor_name : str Name of the sensor. Returns ------- pandas.DataFrame Dataframe with string columns without corrupted values. """ # Cast dataframe to dtypes dtype_dict = get_l0a_dtype(sensor_name) # Get numeric columns numeric_columns = [k for k, dtype in dtype_dict.items() if "float" in dtype or "int" in dtype] # Get dataframe column names columns = list(df.columns) # Cast dataframe columns for column in columns: if column in numeric_columns: df[column] = pd.to_numeric(df[column], errors="coerce") return df
[docs] def strip_string_spaces(df: pd.DataFrame, sensor_name: str) -> pd.DataFrame: """Strip leading/trailing spaces from dataframe string columns. Parameters ---------- df : pandas.DataFrame Input dataframe. sensor_name : str Name of the sensor. Returns ------- pandas.DataFrame Dataframe with string columns without leading/trailing spaces. """ # Cast dataframe to dtypes dtype_dict = get_l0a_dtype(sensor_name) # Get string columns string_columns = [k for k, dtype in dtype_dict.items() if dtype == "str"] # Get dataframe column names columns = list(df.columns) # Cast dataframe columns for column in columns: if column in string_columns: try: df[column] = df[column].str.strip() except AttributeError: msg = f"The column {column} is not a string/object dtype." raise AttributeError(msg) return df
[docs] def strip_delimiter(string): """Remove the first and last delimiter occurrence from a string.""" if not isinstance(string, str): return string split_str = infer_split_str(string=string) string = string.strip(split_str) return string
[docs] def strip_delimiter_from_raw_arrays(df): """Remove the first and last delimiter occurrence from the raw array fields.""" # Possible fields possible_fields = [ "raw_drop_number", "raw_drop_concentration", "raw_drop_average_velocity", ] available_fields = list(df.columns[np.isin(df.columns, possible_fields)]) # Loop over the fields and strip away the delimiter for field in available_fields: df[field] = df[field].apply(strip_delimiter) # Return the dataframe return df
[docs] def is_raw_array_string_not_corrupted(string): """Check if the raw array is corrupted.""" if not isinstance(string, str): return False if string in ["", "NAN", "NaN"]: return True split_str = infer_split_str(string=string) list_values = string.split(split_str) values = pd.to_numeric(list_values, errors="coerce") return ~np.any(np.isnan(values))
[docs] def remove_corrupted_rows(df): """Remove corrupted rows by checking conversion of raw fields to numeric. Note: The raw array must be stripped away from delimiter at start and end ! """ # Possible fields possible_fields = [ "raw_drop_number", "raw_drop_concentration", "raw_drop_average_velocity", ] available_fields = list(df.columns[np.isin(df.columns, possible_fields)]) # Loop over the fields and remove corrupted ones for field in available_fields: if len(df) != 0: df = df[df[field].apply(is_raw_array_string_not_corrupted)] # Check if there are rows left if len(df) == 0: raise ValueError("No remaining rows after data corruption checks.") # If only one row available, raise also error if len(df) == 1: raise ValueError("Only 1 row remains after data corruption checks. Check the raw file and maybe delete it.") # Return the dataframe return df
[docs] def replace_nan_flags(df, sensor_name, logger=None, verbose=False): """Set values corresponding to ``nan_flags`` to ``np.nan``. Parameters ---------- df : pandas.DataFrame Input dataframe. sensor_name : str Name of the sensor. verbose : bool Whether to verbose the processing. The default is ``False``. Returns ------- pandas.DataFrame Dataframe without nan_flags values. """ # Get dictionary of nan flags dict_nan_flags = get_nan_flags_dict(sensor_name) # Loop over the needed variable, and replace nan_flags values with np.nan for var, nan_flags in dict_nan_flags.items(): # If the variable is in the dataframe if var in df: # Get array with occurrence of nan_flags is_a_nan_flag = df[var].isin(nan_flags) # If nan_flags values are present, replace with np.nan n_nan_flags_values = np.sum(is_a_nan_flag) if n_nan_flags_values > 0: msg = f"In variable {var}, {n_nan_flags_values} values were nan_flags and were replaced to np.nan." log_info(logger=logger, msg=msg, verbose=verbose) df.loc[is_a_nan_flag, var] = np.nan # Return dataframe return df
[docs] def set_nan_outside_data_range(df, sensor_name, logger=None, verbose=False): """Set values outside the data range as ``np.nan``. Parameters ---------- df : pandas.DataFrame Input dataframe. sensor_name : str Name of the sensor. verbose : bool Whether to verbose the processing. The default is ``False``. Returns ------- pandas.DataFrame Dataframe without values outside the expected data range. """ # Get dictionary of data_range dict_data_range = get_data_range_dict(sensor_name) # Loop over the variable with a defined data_range for var, data_range in dict_data_range.items(): # If the variable is in the dataframe if var in df: # Get min and max value min_val = data_range[0] max_val = data_range[1] # Check within data range or already np.nan is_valid = (df[var] >= min_val) & (df[var] <= max_val) | df[var].isna() # If there are values outside the data range, set to np.nan n_invalid = np.sum(~is_valid) if n_invalid > 0: msg = f"{n_invalid} {var} values were outside the data range and were set to np.nan." log_info(logger=logger, msg=msg, verbose=verbose) df[var] = df[var].where(is_valid) # set not valid to np.nan # Return dataframe return df
[docs] def set_nan_invalid_values(df, sensor_name, logger=None, verbose=False): """Set invalid (class) values to ``np.nan``. Parameters ---------- df : pandas.DataFrame Input dataframe. sensor_name : str Name of the sensor. verbose : bool Whether to verbose the processing. The default is ``False``. Returns ------- pandas.DataFrame Dataframe without invalid values. """ # Get dictionary of valid values dict_valid_values = get_valid_values_dict(sensor_name) # Loop over the variable with a defined data_range for var, valid_values in dict_valid_values.items(): # If the variable is in the dataframe if var in df: # Get array with occurrence of correct values (or already np.nan) is_valid_values = df[var].isin(valid_values) | df[var].isna() # If invalid values are present, replace with np.nan n_invalid_values = np.sum(~is_valid_values) if n_invalid_values > 0: msg = f"{n_invalid_values} {var} values were invalid and were replaced to np.nan." log_info(logger=logger, msg=msg, verbose=verbose) df[var] = df[var].where(is_valid_values) # set not valid to np.nan # Return dataframe return df
[docs] def sanitize_df( df, sensor_name, verbose=True, issue_dict=None, logger=None, ): """Read and parse a raw text files into a L0A dataframe. Parameters ---------- filepath : str File path sensor_name : str Name of the sensor. verbose : bool Whether to verbose the processing. The default is ``True``. issue_dict : dict Issue dictionary providing information on timesteps to remove. The default is an empty dictionary ``{}``. Valid issue_dict key are ``'timesteps'`` and ``'time_periods'``. Valid issue_dict values are list of datetime64 values (with second accuracy). To correctly format and check the validity of the ``issue_dict``, use the ``disdrodb.l0.issue.check_issue_dict`` function. Returns ------- pandas.DataFrame Dataframe """ # Define the issue dictionary # - If None, set to empty dictionary issue_dict = {} if issue_dict is None else issue_dict # - Remove rows with time NaT df = remove_rows_with_missing_time(df, logger=logger, verbose=verbose) # - Remove duplicated timesteps df = remove_duplicated_timesteps(df, logger=logger, verbose=verbose) # - Filter out problematic tiemsteps reported in the issue YAML file df = remove_issue_timesteps(df, issue_dict=issue_dict, logger=logger, verbose=verbose) # - Coerce numeric columns corrupted values to np.nan df = coerce_corrupted_values_to_nan(df, sensor_name=sensor_name) # - Strip trailing/leading space from string columns df = strip_string_spaces(df, sensor_name=sensor_name) # - Strip first and last delimiter from the raw arrays df = strip_delimiter_from_raw_arrays(df) # - Ensure raw drop number variable max value is < 999 # - Raise error otherwise # TODO: # - Remove corrupted rows df = remove_corrupted_rows(df) # - Cast dataframe to dtypes df = cast_column_dtypes(df, sensor_name=sensor_name) # - Replace nan flags values with np.nans df = replace_nan_flags(df, sensor_name=sensor_name, logger=logger, verbose=verbose) # - Set values outside the data range to np.nan df = set_nan_outside_data_range(df, sensor_name=sensor_name, logger=logger, verbose=verbose) # - Replace invalid values with np.nan df = set_nan_invalid_values(df, sensor_name=sensor_name, logger=logger, verbose=verbose) # - Sort by time df = df.sort_values("time") # - Drop index df = df.reset_index(drop=True) # ------------------------------------------------------. # - Check column names agrees to DISDRODB standards check_l0a_column_names(df, sensor_name=sensor_name) # - Check the dataframe respects the DISDRODB standards check_l0a_standards(df=df, sensor_name=sensor_name, verbose=verbose) # ------------------------------------------------------. # Return the L0A dataframe return df
####---------------------------------------------------------------------------. #### L0A Apache Parquet Writer
[docs] def write_l0a( df: pd.DataFrame, filepath: str, force: bool = False, logger=None, verbose: bool = False, ): """Save the dataframe into an Apache Parquet file. Parameters ---------- df : pandas.DataFrame Input dataframe. filepath : str Output file path. force : bool, optional Whether to overwrite existing data. If ``True``, overwrite existing data into destination directories. If ``False``, raise an error if there are already data into destination directories. This is the default. verbose : bool, optional Whether to verbose the processing. The default is ``False``. Raises ------ ValueError The input dataframe can not be written as an Apache Parquet file. NotImplementedError The input dataframe can not be processed. """ # -------------------------------------------------------------------------. # Create station directory if does not exist create_directory(os.path.dirname(filepath)) # Check if the file already exists # - If force=True --> Remove it # - If force=False --> Raise error remove_if_exists(filepath, force=force, logger=logger) # -------------------------------------------------------------------------. # Define writing options compression = "snappy" # 'gzip', 'brotli, 'lz4', 'zstd' row_group_size = 100000 engine = "pyarrow" # -------------------------------------------------------------------------. # Save dataframe to Apache Parquet try: df.to_parquet( filepath, engine=engine, compression=compression, row_group_size=row_group_size, ) msg = f"The Pandas Dataframe has been written as an Apache Parquet file to {filepath}." log_info(logger=logger, msg=msg, verbose=verbose) except Exception as e: msg = f"The Pandas DataFrame cannot be written as an Apache Parquet file. The error is: \n {e}." raise ValueError(msg)
# -------------------------------------------------------------------------. ####--------------------------------------------------------------------------. #### DISDRODB L0A product reader
[docs] def concatenate_dataframe(list_df: list, logger=None, verbose: bool = False) -> pd.DataFrame: """Concatenate a list of dataframes. Parameters ---------- list_df : list List of dataframes. verbose : bool, optional If ``True``, print messages. If ``False``, no print. Returns ------- pandas.DataFrame Concatenated dataframe. Raises ------ ValueError Concatenation can not be done. """ # Check if something to concatenate if len(list_df) == 1: df = list_df[0] if not isinstance(df, pd.DataFrame): raise ValueError("Only pd.DataFrame objects are valid.") return df # Log msg = "Concatenation of dataframes started." log_info(logger=logger, msg=msg, verbose=verbose) # Concatenate the dataframe try: df = pd.concat(list_df, axis=0, ignore_index=True) # Sort by increasing time df = df.sort_values(by="time") except (AttributeError, TypeError) as e: msg = f"Can not concatenate the files. \n Error: {e}" raise ValueError(msg) # Log msg = "Concatenation of dataframes has finished." log_info(logger=logger, msg=msg, verbose=verbose) # Return dataframe return df
[docs] def read_l0a_dataframe( filepaths: str | list, debugging_mode: bool = False, ) -> pd.DataFrame: """Read DISDRODB L0A Apache Parquet file(s). Parameters ---------- filepaths : str or list Either a list or a single filepath. debugging_mode : bool If ``True``, it reduces the amount of data to process. If filepaths is a list, it reads only the first 3 files. It selects only 100 rows sampled from the first 3 files. The default is ``False``. Returns ------- pandas.DataFrame L0A Dataframe. """ from disdrodb.api.io import open_parquet_files # ---------------------------------------- # Check filepaths validity if not isinstance(filepaths, (list, str)): raise TypeError("Expecting filepaths to be a string or a list of strings.") # ---------------------------------------- # If filepath is a string, convert to list if isinstance(filepaths, str): filepaths = [filepaths] # --------------------------------------------------- # If debugging_mode=True, it reads only the first 3 filepaths if debugging_mode: filepaths = filepaths[0:3] # select first 3 filepaths # --------------------------------------------------- # Define the list of dataframe df = open_parquet_files(filepaths, use_threads=False) # Reduce rows if debugging_mode: n_rows = min(100, len(df)) df = df.sample(n=n_rows) # Ensure time is in nanoseconds df["time"] = df["time"].astype("M8[ns]") # Ensure sorted by time df = df.sort_values(by="time") # Ensure no index df = df.reset_index(drop=True) # --------------------------------------------------- # Return dataframe return df
####---------------------------------------------------------------------------. #### L0A Utility
[docs] def generate_l0a( filepaths: list | str, reader, sensor_name, issue_dict=None, verbose=True, logger=None, ) -> pd.DataFrame: """Read and parse a list of raw files and generate a DISDRODB L0A dataframe. Parameters ---------- filepaths : Union[list,str] File(s) path(s) reader: DISDRODB reader function. Format: reader(filepath, logger=None) sensor_name : str Name of the sensor. issue_dict : dict, optional Issue dictionary providing information on timesteps to remove. The default is an empty dictionary ``{}``. Valid issue_dict key are ``'timesteps'`` and ``'time_periods'``. Valid issue_dict values are list of datetime64 values (with second accuracy). To correctly format and check the validity of the ``issue_dict``, use the ``disdrodb.l0.issue.check_issue_dict`` function. verbose : bool Whether to verbose the processing. The default is ``True``. Returns ------- pandas.DataFrame Dataframe Raises ------ ValueError Input parameters can not be used or the raw file can not be processed. """ # ------------------------------------------------------. # Check input list if isinstance(filepaths, str): filepaths = [filepaths] if len(filepaths) == 0: raise ValueError("'filepaths' must contains at least 1 filepath.") # ------------------------------------------------------. # Loop over all raw files n_files = len(filepaths) processed_file_counter = 0 list_skipped_files_msg = [] list_df = [] for filepath in filepaths: # Try read the raw text file try: df = reader(filepath, logger=logger) # Sanitize the dataframe df = sanitize_df( df=df, sensor_name=sensor_name, issue_dict=issue_dict, logger=logger, verbose=verbose, ) # Append dataframe to the list list_df.append(df) # Update the logger processed_file_counter += 1 msg = f"Raw file '{filepath}' processed successfully ({processed_file_counter}/{n_files})." log_info(logger=logger, msg=msg, verbose=verbose) # Skip the file if the processing fails except Exception as e: # Update the logger msg = f"{filepath} has been skipped. The error is: {e}." log_error(logger=logger, msg=msg, verbose=verbose) list_skipped_files_msg.append(msg) # Update logger msg = f"{len(list_skipped_files_msg)} of {n_files} have been skipped." log_info(logger=logger, msg=msg, verbose=verbose) ##----------------------------------------------------------------. # Concatenate the dataframe if len(list_df) == 0: raise ValueError("Any raw file could be read!") df = concatenate_dataframe(list_df, verbose=verbose, logger=logger) # ------------------------------------------------------. # Enforce output time to be [ns] # --> For compatibility with xarray df["time"] = df["time"].astype("M8[ns]") # Return the dataframe return df