hts.utilities¶
hts.utilities.distribution¶
This module contains the Distributor class, such objects are used to distribute the calculation of features. Essentially, a Distributor organizes the application of feature calculators to data chunks. Design of this module by Nils Braun
-
class
hts.utilities.distribution.
ClusterDaskDistributor
(address)[source]¶ Bases:
hts.utilities.distribution.DistributorBaseClass
Distributor using a dask cluster, meaning that the calculation is spread over a cluster
-
__init__
(address)[source]¶ Sets up a distributor that connects to a Dask Scheduler to distribute the calculaton of the features
Parameters: address (str) – The ip address and port number of the Dask Scheduler
-
calculate_best_chunk_size
(data_length)[source]¶ Uses the number of dask workers in the cluster (during execution time, meaning when you start the extraction) to find the optimal chunk_size.
Parameters: data_length (int) – A length which defines how many calculations there need to be.
-
distribute
(func, partitioned_chunks, kwargs)[source]¶ Calculates the features in a parallel fashion by distributing the map command to the dask workers on a cluster
Parameters: - func (Callable) – Function to send to each worker.
- partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
- kwargs (Dict) – Parameters for the map function
Returns: The result of the calculation as a list - each item should be the result of the application of func to a single element
Return type: List
-
-
class
hts.utilities.distribution.
DistributorBaseClass
[source]¶ Bases:
object
The distributor abstract base class. The main purpose of the instances of the DistributorBaseClass subclasses is to evaluate a function (called map_function) on a list of data items (called data). This is done on chunks of the data, meaning, that the DistributorBaseClass classes will chunk the data into chunks, distribute the data and apply the feature calculator functions from Dependent on the implementation of the distribute function, this is done in parallel or using a cluster of nodes.
-
calculate_best_chunk_size
(data_length)[source]¶ Calculates the best chunk size for a list of length data_length. The current implemented formula is more or less an empirical result for multiprocessing case on one machine.
Parameters: data_length (int) – A length which defines how many calculations there need to be. Returns: The calculated chunk size Return type: int
-
close
()[source]¶ Abstract base function to clean the DistributorBaseClass after use, e.g. close the connection to a DaskScheduler
-
distribute
(func, partitioned_chunks, kwargs)[source]¶ This abstract base function distributes the work among workers, which can be threads or nodes in a cluster. Must be implemented in the derived classes.
Parameters: - func (Callable) – Function to send to each worker.
- partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
- kwargs (Dict) – Parameters for the map function
Raises: NotImplementedError
-
map_reduce
(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[source]¶ This method contains the core functionality of the DistributorBaseClass class. It maps the map_function to each element of the data and reduces the results to return a flattened list. How the jobs are calculated, is determined by the class’
hts.utilities.distribution.DistributorBaseClass.distribute()
method, which can distribute the jobs in multiple threads, across multiple processing units etc. To not transport each element of the data individually, the data is split into chunks, according to the chunk size (or an empirical guess if none is given). By this, worker processes not tiny but adequate sized parts of the data.Parameters: - map_function (Callable) – Function to apply to each data item.
- data (List) – The data to use in the calculation
- function_kwargs (Dict) – Parameters for the map function
- chunk_size (int) – If given, chunk the data according to this size. If not given, use an empirical value.
- data_length (int) – If the data is a generator, you have to set the length here. If it is none, the length is deduced from the len of the data.
Returns: The calculated results
Return type: List
-
static
partition
(data, chunk_size)[source]¶ This generator chunks a list of data into slices of length chunk_size. If the chunk_size is not a divider of the data length, the last slice will be shorter than chunk_size.
Parameters: - data (List) – The data to chunk
- chunk_size (int) – Each chunks size. The last chunk may be smaller.
Returns: A generator producing the chunks of data.
Return type: Generator
-
-
class
hts.utilities.distribution.
LocalDaskDistributor
(n_workers)[source]¶ Bases:
hts.utilities.distribution.DistributorBaseClass
Distributor using a local dask cluster and inproc communication.
-
__init__
(n_workers)[source]¶ Initiates a LocalDaskDistributor instance.
Parameters: n_workers (int) – How many workers should the local dask cluster have?
-
distribute
(func, partitioned_chunks, kwargs)[source]¶ Calculates the features in a parallel fashion by distributing the map command to the dask workers on a local machine
Parameters: - func (Callable) – Function to send to each worker.
- partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
- kwargs (Dict) – Parameters for the map function
Returns: The result of the calculation as a list - each item should be the result of the application of func to a single element.
Return type: List
-
-
class
hts.utilities.distribution.
MapDistributor
(disable_progressbar=False, progressbar_title='Fitting Models')[source]¶ Bases:
hts.utilities.distribution.DistributorBaseClass
Distributor using the python build-in map, which calculates each job sequentially one after the other.
-
__init__
(disable_progressbar=False, progressbar_title='Fitting Models')[source]¶ Creates a new MapDistributor instance
Parameters: - disable_progressbar (bool) – Disables tqdm’s progressbar
- progressbar_title (str) – Title of progressbar
-
calculate_best_chunk_size
(data_length)[source]¶ For the map command, which calculates the features sequentially, a the chunk_size of 1 will be used.
Parameters: data_length (int) – 1
-
distribute
(func, partitioned_chunks, kwargs)[source]¶ Calculates the features in a sequential fashion by pythons map command
Parameters: - func (Callable) – Function to send to each worker.
- partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
- kwargs (Dict) – Parameters for the map function
Returns: The result of the calculation as a list - each item should be the result of the application of func to a single element.
Return type: List
-
-
class
hts.utilities.distribution.
MultiprocessingDistributor
(n_workers, disable_progressbar=False, progressbar_title='Feature Extraction', show_warnings=True)[source]¶ Bases:
hts.utilities.distribution.DistributorBaseClass
Distributor using a multiprocessing Pool to calculate the jobs in parallel on the local machine.
-
__init__
(n_workers, disable_progressbar=False, progressbar_title='Feature Extraction', show_warnings=True)[source]¶ Creates a new MultiprocessingDistributor instance
Parameters: - n_workers (int) – How many workers should the multiprocessing pool have?
- disable_progressbar (bool) – Disables tqdm’s progressbar
- progressbar_title (str) – Title of progressbar
-
distribute
(func, partitioned_chunks, kwargs)[source]¶ Calculates the features in a parallel fashion by distributing the map command to a thread pool
Parameters: - func (Callable) – Function to send to each worker.
- partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
- kwargs (Dict) – Parameters for the map function
Returns: The result of the calculation as a list - each item should be the result of the application of func to a single element.
Return type: List
-
-
hts.utilities.distribution.
_function_with_partly_reduce
(chunk_list, map_function, kwargs)[source]¶ Small helper function to call a function (map_function) on a list of data chunks (chunk_list) and convert the results into a flattened list. This function is used to send chunks of data with a size larger than 1 to the workers in parallel and process these on the worker.
Parameters: - chunk_list (List) – A list of data chunks to process.
- map_function (Callable) – Function to be partially applied
- kwargs (dict) – Keyword arguments to be passed to the map_function
Returns: A list of the results of the function evaluated on each chunk and flattened.
Return type: List
-
hts.utilities.distribution.
initialize_warnings_in_workers
(show_warnings)[source]¶ Small helper function to initialize warnings module in multiprocessing workers. On Windows, Python spawns fresh processes which do not inherit from warnings state, so warnings must be enabled/disabled before running computations.
hts.utilities.load_data¶
-
hts.utilities.load_data.
get_data_home
(data_home=None)[source]¶ Return the path of the scikit-hts data dir.
This folder is used by some large dataset loaders to avoid downloading the data several times.
By default the data dir is set to a folder named ‘scikit_hts_data’ in the user home folder. Alternatively, it can be set by the ‘SCIKIT_HTS_DATA’ environment variable or programmatically by giving an explicit folder path. The ‘~’ symbol is expanded to the user home folder. If the folder does not already exist, it is automatically created. :param data_home: The path to scikit-hts data dir. :type data_home: str | None
-
hts.utilities.load_data.
load_geo_events_data
(data_home=None)[source]¶ Returns: df Return type: pandas.DataFrame
-
hts.utilities.load_data.
load_mobility_data
(data_home=None)[source]¶ Original dataset: https://www.kaggle.com/pronto/cycle-share-dataset :returns: df :rtype: pandas.DataFrame
hts.utilities.utils¶
-
class
hts.utilities.utils.
suppress_stdout_stderr
[source]¶ Bases:
object
A context manager for doing a “deep suppression” of stdout and stderr in Python, i.e. will suppress all print, even if the print originates in a compiled C/Fortran sub-function.
This will not suppress raised exceptions, since exceptions are printedto stderr just before a script exits, and after the context manager has exited (at least, I think that is why it lets exceptions through).