Source code for disdrodb.utils.dask
#!/usr/bin/env python3
# -----------------------------------------------------------------------------.
# Copyright (c) 2021-2023 DISDRODB developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------.
"""Utilities for Dask Distributed computations."""
import logging
import os
[docs]
def initialize_dask_cluster():
"""Initialize Dask Cluster."""
import dask
from dask.distributed import Client, LocalCluster
# Set HDF5_USE_FILE_LOCKING to avoid going stuck with HDF
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
# Retrieve the number of process to run
available_workers = os.cpu_count() - 2 # if not set, all CPUs
num_workers = dask.config.get("num_workers", available_workers)
# Silence dask warnings
dask.config.set({"logging.distributed": "error"})
# dask.config.set({"distributed.admin.system-monitor.gil.enabled": False})
# Create dask.distributed local cluster
cluster = LocalCluster(
n_workers=num_workers,
threads_per_worker=1,
processes=True,
# memory_limit='8GB',
# silence_logs=False,
)
client = Client(cluster)
return cluster, client
[docs]
def close_dask_cluster(cluster, client):
"""Close Dask Cluster."""
logger = logging.getLogger()
# Backup current log level
original_level = logger.level
logger.setLevel(logging.CRITICAL + 1) # Set level to suppress all logs
# Close cluster
# - Avoid log 'distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.'
try:
cluster.close()
client.close()
finally:
# Restore the original log level
logger.setLevel(original_level)