import logging
import tempfile
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union
import numpy
import pandas
from sklearn.base import BaseEstimator, RegressorMixin
from hts import defaults
from hts import model as hts_models
from hts._t import ExogT, MethodT, ModelT, NodesT, TimeSeriesModelT, Transform
from hts.core.exceptions import InvalidArgumentException, MissingRegressorException
from hts.core.result import HTSResult
from hts.core.utils import _do_fit, _do_predict, _model_mapping_to_iterable
from hts.functions import to_sum_mat
from hts.hierarchy import HierarchyTree
from hts.hierarchy.utils import make_iterable
from hts.model.base import TimeSeriesModel
from hts.revision import RevisionMethod
from hts.utilities.distribution import DistributorBaseClass
logger = logging.getLogger(__name__)
[docs]class HTSRegressor(BaseEstimator, RegressorMixin):
"""
Main regressor class for scikit-hts. Likely the only import you'll need for using
this project. It takes a pandas dataframe, the nodes specifying the hierarchies, model kind, revision
method, and a few other parameters. See Examples to get an idea of how to use it.
Attributes
----------
transform : Union[NamedTuple[str, Callable], bool]
Function transform to be applied to input and outputs. If True, it will use ``scipy.stats.boxcox``
and ``scipy.special._ufuncs.inv_boxcox`` on input and output data
sum_mat : array_like
The summing matrix, explained in depth in `Forecasting <https://otexts.com/fpp2/gts.html>`_
nodes : Dict[str, List[str]]
Nodes representing node, edges of the hierarchy. Keys are nodes, values are list of edges.
df : pandas.DataFrame
The dataframe containing the nodes and edges specified above
revision_method : str
One of: ``"OLS", "WLSS", "WLSV", "FP", "PHA", "AHP", "BU", "NONE"``
models : dict
Dictionary that holds the trained models
mse : dict
Dictionary that holds the mse scores for the trained models
residuals : dict
Dictionary that holds the mse residual for the trained models
forecasts : dict
Dictionary that holds the forecasts for the trained models
model_instance : TimeSeriesModel
Reference to the class implementing the actual time series model
"""
[docs] def __init__(
self,
model: str = defaults.MODEL,
revision_method: str = defaults.REVISION,
transform: Optional[Union[Transform, bool]] = False,
n_jobs: int = defaults.N_PROCESSES,
low_memory: bool = defaults.LOW_MEMORY,
**kwargs: Any,
):
"""
Parameters
----------
model : str
One of the models supported by ``hts``. These can be found
revision_method : str
The revision method to be used. One of: ``"OLS", "WLSS", "WLSV", "FP", "PHA", "AHP", "BU", "NONE"``
transform : Boolean or NamedTuple
If True, ``scipy.stats.boxcox`` and ``scipy.special._ufuncs.inv_boxcox`` will be applied prior and after
fitting.
If False (default), no transform is applied.
If you desired to use custom functions, use a NamedTuple like:
.. highlight:: python
.. code-block:: python
from collections import namedtuple
Transform = namedtuple('Transform', ['func', 'inv_func']
transform = Transform(func=numpy.exp, inv_func=numpy.log)
ht = HTSRegressor(transform=transform, ...)
The signatures for the ``func`` as well as ``inv_func`` parameters must both be
``Callable[[numpy.ndarry], numpy.ndarray]``, i.e. they must take an array and return an array, both of equal
dimensions
n_jobs : int
Number of parallel jobs to run the forecasting on
low_memory : Bool
If True, models will be fit, serialized, and released from memory. Usually a good idea if
you are dealing with a large amount of nodes
kwargs
Keyword arguments to be passed to the underlying model to be instantiated
"""
self.model = model
self.method: str = revision_method
self.n_jobs: int = n_jobs
self.low_memory: bool = low_memory
if self.low_memory:
self.tmp_dir: Optional[str] = tempfile.mkdtemp(prefix="hts_")
else:
self.tmp_dir = None
self.transform = transform
self.sum_mat: Optional[numpy.ndarray] = None
self.nodes: Optional[NodesT] = None
self.model_instance: Optional[TimeSeriesModelT] = None
self.exogenous: bool = False
self.revision_method: Optional[RevisionMethod] = None
self.hts_result: HTSResult = HTSResult()
self.model_args = kwargs
def __init_hts(
self,
nodes: Optional[NodesT] = None,
df: Optional[pandas.DataFrame] = None,
tree: Optional[HierarchyTree] = None,
root: str = "root",
exogenous: Optional[List[str]] = None,
):
if not nodes and not df:
if not tree:
raise InvalidArgumentException(
"Either nodes and df must be passed, or a pre-built hierarchy tree"
)
else:
self.nodes = tree
else:
self.nodes = HierarchyTree.from_nodes(
nodes=nodes, df=df, exogenous=exogenous, root=root
)
self.exogenous = exogenous
self.sum_mat, sum_mat_labels = to_sum_mat(self.nodes)
self._set_model_instance()
self._init_revision()
def _init_revision(self):
self.revision_method = RevisionMethod(
sum_mat=self.sum_mat, transformer=self.transform, name=self.method
)
def _set_model_instance(self):
try:
self.model_instance = hts_models.MODEL_MAPPING[self.model]
except KeyError:
raise InvalidArgumentException(
f'Model {self.model} not valid. Pick one of: {" ".join(ModelT.names())}'
)
[docs] def fit(
self,
df: Optional[pandas.DataFrame] = None,
nodes: Optional[NodesT] = None,
tree: Optional[HierarchyTree] = None,
exogenous: Optional[ExogT] = None,
root: str = "total",
distributor: Optional[DistributorBaseClass] = None,
disable_progressbar=defaults.DISABLE_PROGRESSBAR,
show_warnings=defaults.SHOW_WARNINGS,
**fit_kwargs: Any,
) -> "HTSRegressor":
"""
Fit hierarchical model to dataframe containing hierarchical data as specified in the ``nodes`` parameter.
Exogenous can also be passed as a dict of (string, list), where string is the specific node key and the list
contains the names of the columns to be used as exogenous variables for that node.
Alternatively, a pre-built HierarchyTree can be passed without specifying the node and df. See more at
:class:`hts.hierarchy.HierarchyTree`
Parameters
----------
df : pandas.DataFrame
A Dataframe of time series with a DateTimeIndex. Each column represents a node in the hierarchy. Ignored if
tree argument is passed
nodes : Dict[str, List[str]]
The hierarchy defined as a dict of (string, list), as specified in
:py:func:`HierarchyTree.from_nodes <hts.hierarchy.HierarchyTree.from_nodes>`
tree : HierarchyTree
A pre-built HierarchyTree. Ignored if df and nodes are passed, as the tree will be built from thise
distributor : Optional[DistributorBaseClass]
A distributor, for parallel/distributed processing
exogenous : Dict[str, List[str]] or None
Node key mapping to columns that contain the exogenous variable for that node
root : str
The name of the root node
disable_progressbar : Bool
Disable or enable progressbar
show_warnings : Bool
Disable warnings
fit_kwargs : Any
Any arguments to be passed to the underlying forecasting model's fit function
Returns
-------
HTSRegressor
The fitted HTSRegressor instance
"""
self.__init_hts(nodes=nodes, df=df, tree=tree, root=root, exogenous=exogenous)
nodes = make_iterable(self.nodes, prop=None)
fit_function_kwargs = {
"fit_kwargs": fit_kwargs,
"low_memory": self.low_memory,
"tmp_dir": self.tmp_dir,
"model_instance": self.model_instance,
"model_args": self.model_args,
"transform": self.transform,
}
fitted_models = _do_fit(
nodes=nodes,
function_kwargs=fit_function_kwargs,
n_jobs=self.n_jobs,
disable_progressbar=disable_progressbar,
show_warnings=show_warnings,
distributor=distributor,
)
for model in fitted_models:
if isinstance(model, tuple):
self.hts_result.models = model
else:
self.hts_result.models = (model.node.key, model)
return self
def __validate_exogenous(
self, exogenous_df: pandas.DataFrame
) -> Optional[pandas.DataFrame]:
if exogenous_df is not None:
if self.model not in [ModelT.prophet.value, ModelT.auto_arima.value]:
logger.warning(
"Providing `exogenous_df` with a model that is not `prophet` or `auto_arima` has no effect"
)
if self.exogenous and exogenous_df is None:
raise MissingRegressorException(
"Exogenous variables were provided at fit step, hence are required at "
"predict step. Please pass the 'exogenous_df' variable to predict "
"function"
)
return exogenous_df
def __validate_steps_ahead(
self, exogenous_df: pandas.DataFrame, steps_ahead: int
) -> int:
if exogenous_df is None and not steps_ahead:
logger.info(
"No arguments passed for 'steps_ahead', defaulting to predicting 1-step-ahead"
)
steps_ahead = 1
elif exogenous_df is not None:
steps_ahead = len(exogenous_df)
for node in make_iterable(self.nodes, prop=None):
exog_cols = node.exogenous
try:
_ = exogenous_df[exog_cols]
except KeyError:
raise MissingRegressorException(
f"Node {node.key} has as exogenous variables {node.exogenous} but "
f"these columns were not found in 'exogenous_df'"
)
return steps_ahead
[docs] def predict(
self,
exogenous_df: pandas.DataFrame = None,
steps_ahead: int = None,
distributor: Optional[DistributorBaseClass] = None,
disable_progressbar: bool = defaults.DISABLE_PROGRESSBAR,
show_warnings: bool = defaults.SHOW_WARNINGS,
**predict_kwargs,
) -> pandas.DataFrame:
"""
Parameters
----------
distributor : Optional[DistributorBaseClass]
A distributor, for parallel/distributed processing
disable_progressbar : Bool
Disable or enable progressbar
show_warnings : Bool
Disable warnings
predict_kwargs : Any
Any arguments to be passed to the underlying forecasting model's predict function
exogenous_df : pandas.DataFrame
A dataframe of length == steps_ahead containing the exogenous data for each of the nodes.
Only required when using ``prophet`` or ``auto_arima`` models. See
`fbprophet's additional regression docs <https://facebook.github.io/prophet/docs/seasonality,_holiday_effects,_and_regressors.html#additional-regressors>`_
and
`AutoARIMA's exogenous handling docs <https://alkaline-ml.com/pmdarima/modules/generated/pmdarima.arima.AutoARIMA.html>`_
for more information.
Other models do not require additional regressors at predict time.
steps_ahead : int
The number of forecasting steps for which to produce a forecast
Returns
-------
Revised Forecasts, as a pandas.DataFrame in the same format as the one passed for fitting, extended by `steps_ahead`
time steps`
"""
exogenous_df = self.__validate_exogenous(exogenous_df)
steps_ahead = self.__validate_steps_ahead(
exogenous_df=exogenous_df, steps_ahead=steps_ahead
)
if exogenous_df is not None:
predict_kwargs["exogenous_df"] = exogenous_df
predict_function_kwargs = {
"fit_kwargs": predict_kwargs,
"steps_ahead": steps_ahead,
"low_memory": self.low_memory,
"tmp_dir": self.tmp_dir,
"predict_kwargs": predict_kwargs,
}
fit_models = _model_mapping_to_iterable(self.hts_result.models, self.nodes)
results = _do_predict(
models=fit_models,
function_kwargs=predict_function_kwargs,
n_jobs=self.n_jobs,
disable_progressbar=disable_progressbar,
show_warnings=show_warnings,
distributor=distributor,
)
for key, forecast, error, residual in results:
self.hts_result.forecasts = (key, forecast)
self.hts_result.errors = (key, error)
self.hts_result.residuals = (key, residual)
return self._revise(steps_ahead=steps_ahead)
def _revise(self, steps_ahead: int = 1) -> pandas.DataFrame:
logger.info(f"Reconciling forecasts using {self.revision_method}")
revised = self.revision_method.revise(
forecasts=self.hts_result.forecasts,
mse=self.hts_result.errors,
nodes=self.nodes,
)
revised_columns = list(make_iterable(self.nodes))
revised_index = self._get_predict_index(steps_ahead=steps_ahead)
return pandas.DataFrame(revised, index=revised_index, columns=revised_columns)
def _get_predict_index(self, steps_ahead=1) -> Any:
freq = getattr(self.nodes.item.index, "freq", 1) or 1
try:
start = self.nodes.item.index[-1] + timedelta(freq)
end = self.nodes.item.index[-1] + timedelta(steps_ahead * freq)
future = pandas.date_range(start=start, end=end)
except TypeError:
start = self.nodes.item.index[-1] + freq
end = self.nodes.item.index[-1] + (steps_ahead * freq)
future = pandas.date_range(freq=freq, start=start, end=end)
return self.nodes.item.index.append(future)