Source code for disdrodb.utils.dask
#!/usr/bin/env python3
# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities for Dask Distributed Computations."""
import logging
import os
import numpy as np
[docs]
def check_parallel_validity(parallel):
"""Check validity of parallel option given Dask settings."""
import dask
scheduler = dask.config.get("scheduler", None)
if scheduler is None:
return parallel
if scheduler in ["synchronous", "threads"]:
return False
if scheduler == "distributed":
from dask.distributed import default_client
client = default_client()
info = client.scheduler_info()
# If ThreadWorker, only 1 pid
pids = list(client.run(os.getpid).values())
if len(np.unique(pids)) == 1:
return False
# If ProcessWorker
# - Check single thread per worker to avoid locks
nthreads_per_process = np.array([v["nthreads"] for v in info["workers"].values()])
if not np.all(nthreads_per_process == 1):
print(
"To open netCDFs in parallel with dask distributed (processes=True), please set threads_per_worker=1 !",
)
return False
# Otherwise let the user choose
return parallel
[docs]
def initialize_dask_cluster(minimum_memory=None):
"""Initialize Dask Cluster."""
import dask
import psutil
# Silence dask warnings
# dask.config.set({"logging.distributed": "error"})
# Import dask.distributed after setting the config
from dask.distributed import Client, LocalCluster
from dask.utils import parse_bytes
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of processes to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs minus 2
num_workers = dask.config.get("num_workers", available_workers)
# If memory limit specified, ensure correct amount of workers
if minimum_memory is not None:
# Compute available memory (in bytes)
total_memory = psutil.virtual_memory().total
# Get minimum memory per worker (in bytes)
minimum_memory = parse_bytes(minimum_memory)
# Determine number of workers constrained by memory
maximum_workers_allowed = max(1, total_memory // minimum_memory)
# Respect both CPU and memory requirements
num_workers = min(maximum_workers_allowed, num_workers)
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
silence_logs=logging.ERROR,
)
client = Client(cluster)
return cluster, client
[docs]
def close_dask_cluster(cluster, client):
"""Close Dask Cluster."""
logger = logging.getLogger()
# Backup current log level
original_level = logger.level
logger.setLevel(logging.CRITICAL + 1) # Set level to suppress all logs
# Close cluster
# - Avoid log 'distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.'
try:
cluster.close()
client.close()
finally:
# Restore the original log level
logger.setLevel(original_level)