hts.utilities

hts.utilities.distribution

This module contains the Distributor class, such objects are used to distribute the calculation of features. Essentially, a Distributor organizes the application of feature calculators to data chunks. Design of this module by Nils Braun

class hts.utilities.distribution.ClusterDaskDistributor(address)[source]

Bases: hts.utilities.distribution.DistributorBaseClass

Distributor using a dask cluster, meaning that the calculation is spread over a cluster

__init__(address)[source]

Sets up a distributor that connects to a Dask Scheduler to distribute the calculaton of the features

Parameters:address (str) – The ip address and port number of the Dask Scheduler
calculate_best_chunk_size(data_length)[source]

Uses the number of dask workers in the cluster (during execution time, meaning when you start the extraction) to find the optimal chunk_size.

Parameters:data_length (int) – A length which defines how many calculations there need to be.
close()[source]

Closes the connection to the Dask Scheduler

distribute(func, partitioned_chunks, kwargs)[source]

Calculates the features in a parallel fashion by distributing the map command to the dask workers on a cluster

Parameters:
  • func (Callable) – Function to send to each worker.
  • partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
  • kwargs (Dict) – Parameters for the map function
Returns:

The result of the calculation as a list - each item should be the result of the application of func to a single element

Return type:

List

class hts.utilities.distribution.DistributorBaseClass[source]

Bases: object

The distributor abstract base class. The main purpose of the instances of the DistributorBaseClass subclasses is to evaluate a function (called map_function) on a list of data items (called data). This is done on chunks of the data, meaning, that the DistributorBaseClass classes will chunk the data into chunks, distribute the data and apply the feature calculator functions from Dependent on the implementation of the distribute function, this is done in parallel or using a cluster of nodes.

__init__()[source]

Constructs the DistributorBaseClass class

calculate_best_chunk_size(data_length)[source]

Calculates the best chunk size for a list of length data_length. The current implemented formula is more or less an empirical result for multiprocessing case on one machine.

Parameters:data_length (int) – A length which defines how many calculations there need to be.
Returns:The calculated chunk size
Return type:int
close()[source]

Abstract base function to clean the DistributorBaseClass after use, e.g. close the connection to a DaskScheduler

distribute(func, partitioned_chunks, kwargs)[source]

This abstract base function distributes the work among workers, which can be threads or nodes in a cluster. Must be implemented in the derived classes.

Parameters:
  • func (Callable) – Function to send to each worker.
  • partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
  • kwargs (Dict) – Parameters for the map function
Raises:

NotImplementedError

map_reduce(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[source]

This method contains the core functionality of the DistributorBaseClass class. It maps the map_function to each element of the data and reduces the results to return a flattened list. How the jobs are calculated, is determined by the class’ hts.utilities.distribution.DistributorBaseClass.distribute() method, which can distribute the jobs in multiple threads, across multiple processing units etc. To not transport each element of the data individually, the data is split into chunks, according to the chunk size (or an empirical guess if none is given). By this, worker processes not tiny but adequate sized parts of the data.

Parameters:
  • map_function (Callable) – Function to apply to each data item.
  • data (List) – The data to use in the calculation
  • function_kwargs (Dict) – Parameters for the map function
  • chunk_size (int) – If given, chunk the data according to this size. If not given, use an empirical value.
  • data_length (int) – If the data is a generator, you have to set the length here. If it is none, the length is deduced from the len of the data.
Returns:

The calculated results

Return type:

List

static partition(data, chunk_size)[source]

This generator chunks a list of data into slices of length chunk_size. If the chunk_size is not a divider of the data length, the last slice will be shorter than chunk_size.

Parameters:
  • data (List) – The data to chunk
  • chunk_size (int) – Each chunks size. The last chunk may be smaller.
Returns:

A generator producing the chunks of data.

Return type:

Generator

class hts.utilities.distribution.LocalDaskDistributor(n_workers)[source]

Bases: hts.utilities.distribution.DistributorBaseClass

Distributor using a local dask cluster and inproc communication.

__init__(n_workers)[source]

Initiates a LocalDaskDistributor instance.

Parameters:n_workers (int) – How many workers should the local dask cluster have?
close()[source]

Closes the connection to the local Dask Scheduler

distribute(func, partitioned_chunks, kwargs)[source]

Calculates the features in a parallel fashion by distributing the map command to the dask workers on a local machine

Parameters:
  • func (Callable) – Function to send to each worker.
  • partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
  • kwargs (Dict) – Parameters for the map function
Returns:

The result of the calculation as a list - each item should be the result of the application of func to a single element.

Return type:

List

class hts.utilities.distribution.MapDistributor(disable_progressbar=False, progressbar_title='Fitting Models')[source]

Bases: hts.utilities.distribution.DistributorBaseClass

Distributor using the python build-in map, which calculates each job sequentially one after the other.

__init__(disable_progressbar=False, progressbar_title='Fitting Models')[source]

Creates a new MapDistributor instance

Parameters:
  • disable_progressbar (bool) – Disables tqdm’s progressbar
  • progressbar_title (str) – Title of progressbar
calculate_best_chunk_size(data_length)[source]

For the map command, which calculates the features sequentially, a the chunk_size of 1 will be used.

Parameters:data_length (int) – 1
distribute(func, partitioned_chunks, kwargs)[source]

Calculates the features in a sequential fashion by pythons map command

Parameters:
  • func (Callable) – Function to send to each worker.
  • partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
  • kwargs (Dict) – Parameters for the map function
Returns:

The result of the calculation as a list - each item should be the result of the application of func to a single element.

Return type:

List

class hts.utilities.distribution.MultiprocessingDistributor(n_workers, disable_progressbar=False, progressbar_title='Feature Extraction', show_warnings=True)[source]

Bases: hts.utilities.distribution.DistributorBaseClass

Distributor using a multiprocessing Pool to calculate the jobs in parallel on the local machine.

__init__(n_workers, disable_progressbar=False, progressbar_title='Feature Extraction', show_warnings=True)[source]

Creates a new MultiprocessingDistributor instance

Parameters:
  • n_workers (int) – How many workers should the multiprocessing pool have?
  • disable_progressbar (bool) – Disables tqdm’s progressbar
  • progressbar_title (str) – Title of progressbar
close()[source]

Collects the result from the workers and closes the thread pool.

distribute(func, partitioned_chunks, kwargs)[source]

Calculates the features in a parallel fashion by distributing the map command to a thread pool

Parameters:
  • func (Callable) – Function to send to each worker.
  • partitioned_chunks (List) – List of data chunks, each chunk is processed by one woker
  • kwargs (Dict) – Parameters for the map function
Returns:

The result of the calculation as a list - each item should be the result of the application of func to a single element.

Return type:

List

hts.utilities.distribution._function_with_partly_reduce(chunk_list, map_function, kwargs)[source]

Small helper function to call a function (map_function) on a list of data chunks (chunk_list) and convert the results into a flattened list. This function is used to send chunks of data with a size larger than 1 to the workers in parallel and process these on the worker.

Parameters:
  • chunk_list (List) – A list of data chunks to process.
  • map_function (Callable) – Function to be partially applied
  • kwargs (dict) – Keyword arguments to be passed to the map_function
Returns:

A list of the results of the function evaluated on each chunk and flattened.

Return type:

List

hts.utilities.distribution.initialize_warnings_in_workers(show_warnings)[source]

Small helper function to initialize warnings module in multiprocessing workers. On Windows, Python spawns fresh processes which do not inherit from warnings state, so warnings must be enabled/disabled before running computations.

hts.utilities.load_data

hts.utilities.load_data.get_data_home(data_home=None)[source]

Return the path of the scikit-hts data dir.

This folder is used by some large dataset loaders to avoid downloading the data several times.

By default the data dir is set to a folder named ‘scikit_hts_data’ in the user home folder. Alternatively, it can be set by the ‘SCIKIT_HTS_DATA’ environment variable or programmatically by giving an explicit folder path. The ‘~’ symbol is expanded to the user home folder. If the folder does not already exist, it is automatically created. :param data_home: The path to scikit-hts data dir. :type data_home: str | None

hts.utilities.load_data.load_geo_events_data(data_home=None)[source]
Returns:df
Return type:pandas.DataFrame
hts.utilities.load_data.load_hierarchical_sine_data(start, end, n=10000)[source]
hts.utilities.load_data.load_mobility_data(data_home=None)[source]

Original dataset: https://www.kaggle.com/pronto/cycle-share-dataset :returns: df :rtype: pandas.DataFrame

hts.utilities.load_data.partition_column(column, n=3)[source]

hts.utilities.utils

class hts.utilities.utils.suppress_stdout_stderr[source]

Bases: object

A context manager for doing a “deep suppression” of stdout and stderr in Python, i.e. will suppress all print, even if the print originates in a compiled C/Fortran sub-function.

This will not suppress raised exceptions, since exceptions are printed

to stderr just before a script exits, and after the context manager has exited (at least, I think that is why it lets exceptions through).