"""
This module contains the Distributor class, such objects are used to distribute the calculation of features.
Essentially, a Distributor organizes the application of feature calculators to data chunks.
Design of this module by Nils Braun
"""
import itertools
import math
import warnings
from collections import Iterable
from functools import partial
from multiprocessing import Pool
from tqdm import tqdm
[docs]def _function_with_partly_reduce(chunk_list, map_function, kwargs):
"""
Small helper function to call a function (map_function)
on a list of data chunks (chunk_list) and convert the results into
a flattened list.
This function is used to send chunks of data with a size larger than 1 to
the workers in parallel and process these on the worker.
Parameters
----------
chunk_list : List
A list of data chunks to process.
map_function : Callable
Function to be partially applied
kwargs : dict
Keyword arguments to be passed to the map_function
Returns
-------
List
A list of the results of the function evaluated on each chunk and flattened.
"""
kwargs = kwargs or {}
results = (map_function(chunk, kwargs) for chunk in chunk_list)
return list(results)
[docs]def initialize_warnings_in_workers(show_warnings): # pragma: no cover
"""
Small helper function to initialize warnings module in multiprocessing workers.
On Windows, Python spawns fresh processes which do not inherit from warnings
state, so warnings must be enabled/disabled before running computations.
"""
warnings.catch_warnings()
if not show_warnings:
warnings.simplefilter("ignore")
else:
warnings.simplefilter("default")
[docs]class DistributorBaseClass:
"""
The distributor abstract base class.
The main purpose of the instances of the DistributorBaseClass subclasses is to evaluate a function
(called map_function) on a list of data items (called data).
This is done on chunks of the data, meaning, that the DistributorBaseClass classes will chunk the data into chunks,
distribute the data and apply the feature calculator functions from
Dependent on the implementation of the distribute function, this is done in parallel or using a cluster of nodes.
"""
[docs] @staticmethod
def partition(data, chunk_size):
"""
This generator chunks a list of data into slices of length chunk_size. If the chunk_size is not a divider of the
data length, the last slice will be shorter than chunk_size.
Parameters
----------
data : List
The data to chunk
chunk_size : int
Each chunks size. The last chunk may be smaller.
Returns
-------
Generator
A generator producing the chunks of data.
"""
iterable = iter(data)
while True:
next_chunk = list(itertools.islice(iterable, chunk_size))
if not next_chunk:
return
yield next_chunk
[docs] def __init__(self):
"""
Constructs the DistributorBaseClass class
"""
self.n_workers = None
self.disable_progressbar = False
self.progressbar_title = None
[docs] def calculate_best_chunk_size(self, data_length):
"""
Calculates the best chunk size for a list of length data_length. The current implemented formula is more or
less an empirical result for multiprocessing case on one machine.
Parameters
----------
data_length : int
A length which defines how many calculations there need to be.
Returns
-------
int
The calculated chunk size
"""
chunk_size, extra = divmod(data_length, self.n_workers * 5)
if extra:
chunk_size += 1
return chunk_size
[docs] def map_reduce(
self,
map_function,
data,
function_kwargs=None,
chunk_size=None,
data_length=None,
):
"""
This method contains the core functionality of the DistributorBaseClass class.
It maps the map_function to each element of the data and reduces the results to return a flattened list.
How the jobs are calculated, is determined by the class'
:func:`hts.utilities.distribution.DistributorBaseClass.distribute` method,
which can distribute the jobs in multiple threads, across multiple processing units etc.
To not transport each element of the data individually, the data is split into chunks, according to the chunk
size (or an empirical guess if none is given). By this, worker processes not tiny but adequate sized parts of
the data.
Parameters
----------
map_function : Callable
Function to apply to each data item.
data : List
The data to use in the calculation
function_kwargs : Dict
Parameters for the map function
chunk_size : int
If given, chunk the data according to this size. If not given, use an empirical value.
data_length : int
If the data is a generator, you have to set the length here. If it is none, the
length is deduced from the len of the data.
Returns
-------
List
The calculated results
"""
if data_length is None:
data_length = len(data)
if not chunk_size:
chunk_size = self.calculate_best_chunk_size(data_length)
chunk_generator = self.partition(data, chunk_size=chunk_size)
map_kwargs = {"map_function": map_function, "kwargs": function_kwargs}
total_number_of_expected_results = math.ceil(data_length / chunk_size)
result = tqdm(
self.distribute(_function_with_partly_reduce, chunk_generator, map_kwargs),
total=total_number_of_expected_results,
desc=self.progressbar_title,
disable=self.disable_progressbar,
)
result = list(itertools.chain.from_iterable(result))
return result
[docs] def distribute(self, func, partitioned_chunks, kwargs):
"""
This abstract base function distributes the work among workers, which can be threads or nodes in a cluster.
Must be implemented in the derived classes.
Parameters
----------
func : Callable
Function to send to each worker.
partitioned_chunks : List
List of data chunks, each chunk is processed by one woker
kwargs : Dict
Parameters for the map function
Raises
-------
NotImplementedError
"""
raise NotImplementedError
[docs] def close(self):
"""
Abstract base function to clean the DistributorBaseClass after use, e.g. close the connection to a DaskScheduler
"""
pass
[docs]class MapDistributor(DistributorBaseClass):
"""
Distributor using the python build-in map, which calculates each job sequentially one after the other.
"""
[docs] def __init__(self, disable_progressbar=False, progressbar_title="Fitting Models"):
"""
Creates a new MapDistributor instance
Parameters
----------
disable_progressbar : bool
Disables tqdm's progressbar
progressbar_title : str
Title of progressbar
"""
super().__init__()
self.disable_progressbar = disable_progressbar
self.progressbar_title = progressbar_title
[docs] def distribute(self, func, partitioned_chunks, kwargs):
"""
Calculates the features in a sequential fashion by pythons map command
Parameters
----------
func : Callable
Function to send to each worker.
partitioned_chunks : List
List of data chunks, each chunk is processed by one woker
kwargs : Dict
Parameters for the map function
Returns
-------
List
The result of the calculation as a list - each item should be the result of the application of func
to a single element.
"""
return map(partial(func, **kwargs), partitioned_chunks)
[docs] def calculate_best_chunk_size(self, data_length):
"""
For the map command, which calculates the features sequentially, a the chunk_size of 1 will be used.
Parameters
----------
data_length : int
1
"""
return 1
[docs]class LocalDaskDistributor(DistributorBaseClass):
"""
Distributor using a local dask cluster and inproc communication.
"""
[docs] def __init__(self, n_workers):
"""
Initiates a LocalDaskDistributor instance.
Parameters
----------
n_workers : int
How many workers should the local dask cluster have?
"""
super().__init__()
import tempfile
from distributed import Client, LocalCluster
# attribute .local_dir_ is the path where the local dask workers store temporary files
self.local_dir_ = tempfile.mkdtemp()
cluster = LocalCluster(
n_workers=n_workers, processes=False, local_dir=self.local_dir_
)
self.client = Client(cluster)
self.n_workers = n_workers
[docs] def distribute(self, func, partitioned_chunks, kwargs):
"""
Calculates the features in a parallel fashion by distributing the map command to the dask workers on a local
machine
Parameters
----------
func : Callable
Function to send to each worker.
partitioned_chunks : List
List of data chunks, each chunk is processed by one woker
kwargs : Dict
Parameters for the map function
Returns
-------
List
The result of the calculation as a list - each item should be the result of the application of func
to a single element.
"""
if isinstance(partitioned_chunks, Iterable):
# since dask 2.0.0 client map no longer accepts iterables
partitioned_chunks = list(partitioned_chunks)
result = self.client.gather(
self.client.map(partial(func, **kwargs), partitioned_chunks)
)
return result
[docs] def close(self):
"""
Closes the connection to the local Dask Scheduler
"""
self.client.close()
[docs]class ClusterDaskDistributor(DistributorBaseClass):
"""
Distributor using a dask cluster, meaning that the calculation is spread over a cluster
"""
[docs] def __init__(self, address):
"""
Sets up a distributor that connects to a Dask Scheduler to distribute the calculaton of the features
Parameters
----------
address : str
The ip address and port number of the Dask Scheduler
"""
super().__init__()
from distributed import Client
self.client = Client(address=address)
[docs] def calculate_best_chunk_size(self, data_length):
"""
Uses the number of dask workers in the cluster (during execution time, meaning when you start the extraction)
to find the optimal chunk_size.
Parameters
----------
data_length: int
A length which defines how many calculations there need to be.
"""
n_workers = len(self.client.scheduler_info()["workers"])
chunk_size, extra = divmod(data_length, n_workers * 5)
if extra:
chunk_size += 1
return chunk_size
[docs] def distribute(self, func, partitioned_chunks, kwargs):
"""
Calculates the features in a parallel fashion by distributing the map command to the dask workers on a cluster
Parameters
----------
func : Callable
Function to send to each worker.
partitioned_chunks : List
List of data chunks, each chunk is processed by one woker
kwargs : Dict
Parameters for the map function
Returns
-------
List
The result of the calculation as a list - each item should be the result of the application of func
to a single element
"""
if isinstance(partitioned_chunks, Iterable):
# since dask 2.0.0 client map no longer accepts iterables
partitioned_chunks = list(partitioned_chunks)
result = self.client.gather(
self.client.map(partial(func, **kwargs), partitioned_chunks)
)
return result
[docs] def close(self):
"""
Closes the connection to the Dask Scheduler
"""
self.client.close()
[docs]class MultiprocessingDistributor(DistributorBaseClass):
"""
Distributor using a multiprocessing Pool to calculate the jobs in parallel on the local machine.
"""
[docs] def __init__(
self,
n_workers,
disable_progressbar=False,
progressbar_title="Feature Extraction",
show_warnings=True,
):
"""
Creates a new MultiprocessingDistributor instance
Parameters
----------
n_workers : int
How many workers should the multiprocessing pool have?
disable_progressbar : bool
Disables tqdm's progressbar
progressbar_title : str
Title of progressbar
"""
super().__init__()
self.pool = Pool(
processes=n_workers,
initializer=initialize_warnings_in_workers,
initargs=(show_warnings,),
)
self.n_workers = n_workers
self.disable_progressbar = disable_progressbar
self.progressbar_title = progressbar_title
[docs] def distribute(self, func, partitioned_chunks, kwargs):
"""
Calculates the features in a parallel fashion by distributing the map command to a thread pool
Parameters
----------
func : Callable
Function to send to each worker.
partitioned_chunks : List
List of data chunks, each chunk is processed by one woker
kwargs : Dict
Parameters for the map function
Returns
-------
List
The result of the calculation as a list - each item should be the result of the application of func
to a single element.
"""
return self.pool.imap_unordered(partial(func, **kwargs), partitioned_chunks)
[docs] def close(self):
"""
Collects the result from the workers and closes the thread pool.
"""
self.pool.close()
self.pool.terminate()
self.pool.join()