#!/usr/bin/env python3
# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Define utilities for Directory/File Checks/Creation/Deletion."""
import glob
import logging
import os
import pathlib
import shutil
import subprocess
from typing import Union
from disdrodb.utils.list import flatten_list
from disdrodb.utils.logger import log_info
logger = logging.getLogger(__name__)
[docs]
def ensure_string_path(path, msg, accepth_pathlib=False):
"""Ensure that the path is a string."""
valid_types = (str, pathlib.PurePath) if accepth_pathlib else str
if not isinstance(path, valid_types):
raise TypeError(msg)
return str(path)
[docs]
def contains_netcdf_or_parquet_files(dir_path: str) -> bool:
"""Check (recursively) if a directory has any Parquet or netCDF file.
os.walk under the hood uses os.scandir
os.walk file generator + any() avoid use of while loop
The function returns True as soon as one file is found (short-circuit)^; False otherwise.
"""
suffixes = (".nc", ".parquet")
return any(fname.endswith(suffixes) for _, _, files in os.walk(dir_path) for fname in files)
[docs]
def contains_files(dir_path: str) -> bool:
"""Check (recursively) if a directory contains any file.
os.walk under the hood uses os.scandir
os.walk file generator + any() avoid use of while loop
The function returns True as soon as one file is found (short-circuit); False otherwise.
"""
return any(fname for _, _, files in os.walk(dir_path) for fname in files)
[docs]
def check_glob_pattern(pattern: str) -> None:
"""Check if glob pattern is a string and is a valid pattern.
Parameters
----------
pattern : str
String to be checked.
"""
if not isinstance(pattern, str):
raise TypeError("Expect pattern as a string.")
if pattern[0] == "/":
raise ValueError("glob_pattern should not start with /")
if "//" in pattern:
raise ValueError("glob_pattern expects path with single separators: /, not //")
if "\\" in pattern:
raise ValueError("glob_pattern expects path separators to be /, not \\")
return pattern
[docs]
def check_glob_patterns(patterns: Union[str, list]) -> list:
"""Check if glob patterns are valids."""
if not isinstance(patterns, (str, list)):
raise ValueError("'glob_patterns' must be a str or list of strings.")
if isinstance(patterns, str):
patterns = [patterns]
patterns = [check_glob_pattern(pattern) for pattern in patterns]
return patterns
def _recursive_glob(dir_path, glob_pattern):
# ** search for in zero or all subdirectories recursively
dir_path = pathlib.Path(dir_path)
return [str(path) for path in dir_path.rglob(glob_pattern)]
def _is_hidden(path):
"""Return True if any component of path is hidden."""
return any(part.startswith(".") for part in path.split(os.sep))
def _list_paths(dir_path, glob_pattern, recursive=False, skip_hidden=True):
"""Return a list of filepaths and directory paths based on a single glob pattern."""
# If glob pattern has separators, disable recursive option
if "/" in glob_pattern and "**" not in glob_pattern:
recursive = False
# Search paths
if not recursive:
matches = glob.glob(os.path.join(dir_path, glob_pattern))
else:
matches = _recursive_glob(dir_path, glob_pattern)
# Filter out anything with a hidden component
if skip_hidden:
matches = [p for p in matches if not _is_hidden(os.path.relpath(p, dir_path))]
return matches
[docs]
def list_paths(dir_path, glob_pattern, recursive=False, skip_hidden=True):
"""Return a list of filepaths and directory paths.
This function accept also a list of glob patterns !
"""
# Check validity of glob pattern(s)
glob_patterns = check_glob_patterns(glob_pattern)
# Search path for specified glob patterns
paths = flatten_list(
[
_list_paths(dir_path=dir_path, glob_pattern=glob_pattern, recursive=recursive, skip_hidden=skip_hidden)
for glob_pattern in glob_patterns
],
)
return paths
[docs]
def list_files(dir_path, glob_pattern="*", recursive=False, skip_hidden=True, return_paths=True):
"""Return a list of filepaths (exclude directory paths)."""
paths = list_paths(dir_path, glob_pattern, recursive=recursive, skip_hidden=skip_hidden)
filepaths = [f for f in paths if os.path.isfile(f)]
# If return_paths is False, return only files names
if not return_paths:
filepaths = [os.path.basename(f) for f in filepaths]
return filepaths
[docs]
def list_directories(dir_path, glob_pattern="*", recursive=False, skip_hidden=True, return_paths=True):
"""Return a list of directory paths (exclude file paths)."""
paths = list_paths(dir_path, glob_pattern, recursive=recursive, skip_hidden=skip_hidden)
dir_paths = [f for f in paths if os.path.isdir(f)]
# If return_paths is False, return only directory names
if not return_paths:
dir_paths = [os.path.basename(f) for f in dir_paths]
return dir_paths
[docs]
def count_files(dir_path, glob_pattern="*", recursive=False, skip_hidden=True):
"""Return the number of files (exclude directories)."""
return len(list_files(dir_path, glob_pattern, recursive=recursive, skip_hidden=skip_hidden))
[docs]
def count_directories(dir_path, glob_pattern="*", recursive=False, skip_hidden=True):
"""Return the number of files (exclude directories)."""
return len(list_directories(dir_path, glob_pattern, recursive=recursive, skip_hidden=skip_hidden))
[docs]
def check_directory_exists(dir_path):
"""Check if the directory exists."""
if not os.path.exists(dir_path):
raise ValueError(f"{dir_path} directory does not exist.")
if not os.path.isdir(dir_path):
raise ValueError(f"{dir_path} is not a directory.")
[docs]
def create_directory(path: str, exist_ok=True) -> None:
"""Create a directory at the provided path."""
path = ensure_string_path(path, msg="'path' must be a string", accepth_pathlib=True)
try:
os.makedirs(path, exist_ok=exist_ok)
except Exception as e:
dir_path = os.path.dirname(path)
dir_name = os.path.basename(path)
msg = f"Can not create directory {dir_name} inside {dir_path}. Error: {e}"
raise FileNotFoundError(msg)
[docs]
def create_required_directory(dir_path, dir_name, exist_ok=True):
"""Create directory ``dir_name`` inside the ``dir_path`` directory."""
dir_path = ensure_string_path(dir_path, msg="'path' must be a string", accepth_pathlib=True)
new_dir_path = os.path.join(dir_path, dir_name)
create_directory(path=new_dir_path, exist_ok=exist_ok)
[docs]
def is_empty_directory(path, skip_hidden=True):
"""Check if a directory path is empty.
Return ``False`` if path is a file or non-empty directory.
If the path does not exist, raise an error.
"""
if not os.path.exists(path):
raise OSError(f"{path} does not exist.")
if not os.path.isdir(path):
return False
paths = os.listdir(path)
# If skip_hidden is True, filter out hidden files/directories
if skip_hidden:
paths = [f for f in paths if not f.startswith(".")]
return len(paths) == 0
def _remove_file_or_directories(path, logger=None):
"""Return the file/directory or subdirectories tree of ``path``.
Use this function with caution.
"""
# If file
if os.path.isfile(path):
os.remove(path)
log_info(logger, msg=f"Deleted the file {path}")
# If empty directory
elif is_empty_directory(path):
os.rmdir(path)
log_info(logger, msg=f"Deleted the empty directory {path}")
# If not empty directory
else:
# If not window use shutil.rmtree
if os.name != "nt": # Check if not Windows
shutil.rmtree(path)
else:
rmtree_windows(path)
log_info(logger, msg=f"Deleted directories within {path}")
[docs]
def rmtree_windows(path):
"""Remove a directory tree on Windows."""
if not os.path.isdir(path):
raise FileNotFoundError(f"{path!r} is not a valid directory")
# Use rd (alias rmdir) with /S (remove all subdirectories/files) and /Q (quiet)
subprocess.check_call(["cmd", "/c", "rd", "/S", "/Q", path])
[docs]
def remove_if_exists(path: str, force: bool = False, logger=None) -> None:
"""Remove file or directory if exists and ``force=True``.
If ``force=False``, it raises an error.
"""
# If the path does not exist, do nothing
if not os.path.exists(path):
return
# If the path exists and force=False, raise Error
if not force:
msg = f"--force is False and a file already exists at: {path}"
raise ValueError(msg)
# If force=True, remove the file/directory or subdirectories and files !
try:
_remove_file_or_directories(path, logger=logger)
except Exception as e:
msg = f"Can not delete file(s) at {path}. The error is: {e}"
raise ValueError(msg)
[docs]
def copy_file(src_filepath, dst_filepath):
"""Copy a file from a location to another."""
filename = os.path.basename(src_filepath)
dst_dir = os.path.dirname(dst_filepath)
try:
shutil.copy(src_filepath, dst_filepath)
msg = f"{filename} copied at {dst_filepath}."
logger.info(msg)
except Exception as e:
msg = f"Something went wrong when copying {filename} into {dst_dir}.\n The error is: {e}."
raise ValueError(msg)
[docs]
def remove_path_trailing_slash(path: str) -> str:
r"""
Removes a trailing slash or backslash from a file path if it exists.
This function ensures that the provided file path is normalized by removing
any trailing directory separator characters (``'/'`` or ``'\\'``).
This is useful for maintaining consistency in path strings and for
preparing paths for operations that may not expect a trailing slash.
Parameters
----------
path : str
The file path to normalize.
Returns
-------
str
The normalized path without a trailing slash.
Raises
------
TypeError
If the input path is not a string.
Examples
--------
>>> remove_trailing_slash("some/path/")
'some/path'
>>> remove_trailing_slash("another\\path\\")
'another\\path'
"""
path = ensure_string_path(path, msg="Expecting a string 'path'", accepth_pathlib=True)
# Remove trailing slash or backslash (if present)
path = path.rstrip("/\\")
return path