Source code for hts.functions

from random import choice
from typing import Dict, List, Tuple

import numpy as np
import pandas

from hts._t import MethodT, NAryTreeT
from hts.hierarchy import make_iterable


[docs]def to_sum_mat( ntree: NAryTreeT = None, node_labels: List[str] = None ) -> Tuple[np.ndarray, List[str]]: """ This function creates a summing matrix for the bottom up and optimal combination approaches All the inputs are the same as above The output is a summing matrix, see Rob Hyndman's "Forecasting: principles and practice" Section 9.4 Parameters ---------- ntree : NAryTreeT node_labels : List[str] Labels corresponing to node names/ summing matrix. Get from hts.functions.get_hierarchichal_df(...) Returns ------- numpy.ndarray Summing matrix. List[str] Row order list of the level in the hierarchy represented by each row in the summing matrix. """ if node_labels: columns = len(node_labels[-1]) elif ntree: nodes = ntree.level_order_traversal() node_labels = ntree.get_level_order_labels() num_at_level = list(map(sum, nodes)) columns = num_at_level[-1] else: raise ValueError( "Must pass either ntree or node_labels to the function. Neither was received." ) # Initialize summing matrix with bottom level rows sum_mat = np.identity(columns) # Names of each row in summing matrix. sum_mat_labels = [] # Bottom level matrix labels, with indices correspoding to column in summing matrix bl_mat_idx_ref = node_labels[-1] # Skip total and bottom level of tree. Rows added outside of loop. for level in node_labels[1:-1]: for label in level: # Exclude duplicates specified in tree if label not in sum_mat_labels: row = [] for bl_element in bl_mat_idx_ref: # Check if the bottom level element is part of label is_component = all( [True if x in bl_element else False for x in label.split("_")] ) if is_component: row.append(1) else: row.append(0) # Add row correspoding to label to top of summing matrix row = np.array(row) sum_mat = np.vstack((row, sum_mat)) sum_mat_labels.append(label) # Add top as first row in summing matrix top = np.ones(columns) sum_mat = np.vstack((top, sum_mat)) # Reverse list of labels to match summing matrix, since vstack and append worked in the opposite order. # Not currently returned, but could be for information or matrix alignment. sum_mat_labels.reverse() sum_mat_labels = ["total"] + sum_mat_labels + bl_mat_idx_ref return sum_mat, sum_mat_labels
[docs]def project( hat_mat: np.ndarray, sum_mat: np.ndarray, optimal_mat: np.ndarray ) -> np.ndarray: new_mat = np.empty([hat_mat.shape[0], sum_mat.shape[0]]) for i in range(hat_mat.shape[0]): new_mat[i, :] = np.dot(optimal_mat, np.transpose(hat_mat[i, :])) return new_mat
[docs]def y_hat_matrix(forecasts, keys=None): if not keys: keys = forecasts.keys() first = list(forecasts.keys())[0] y_hat_mat = np.zeros([len(forecasts[first].yhat), 1]) for key in keys: f1 = np.array(forecasts[key].yhat) f2 = f1[:, np.newaxis] if np.all(y_hat_mat == 0): y_hat_mat = f2 else: y_hat_mat = np.concatenate((y_hat_mat, f2), axis=1) return y_hat_mat
[docs]def optimal_combination( forecasts: Dict[str, pandas.DataFrame], sum_mat: np.ndarray, method: str, mse: Dict[str, float], ): """ Produces the optimal combination of forecasts by trace minimization (as described by Wickramasuriya, Athanasopoulos, Hyndman in "Optimal Forecast Reconciliation for Hierarchical and Grouped Time Series Through Trace Minimization") Parameters ---------- forecasts : dict Dictionary of pandas.DataFrames containing the future predictions sum_mat : np.ndarray The summing matrix method : str One of: - OLS (ordinary least squares) - WLSS (structurally weighted least squares) - WLSV (variance weighted least squares) mse Returns ------- """ hat_mat = y_hat_matrix(forecasts) transpose = np.transpose(sum_mat) if method == MethodT.OLS.name: ols = np.dot( np.dot(sum_mat, np.linalg.inv(np.dot(transpose, sum_mat))), transpose ) return project(hat_mat=hat_mat, sum_mat=sum_mat, optimal_mat=ols) elif method == MethodT.WLSS.name: diag = np.diag(np.transpose(np.sum(sum_mat, axis=1))) elif method == MethodT.WLSV.name: diag = [mse[key] for key in mse.keys()] diag = np.diag(np.flip(np.hstack(diag) + 0.0000001, 0)) else: raise ValueError("Invalid method") # S*inv(S'S)*S' optimal_mat = np.dot( np.dot( np.dot( sum_mat, np.linalg.inv(np.dot(np.dot(transpose, np.linalg.inv(diag)), sum_mat)), ), transpose, ), np.linalg.inv(diag), ) return project(hat_mat=hat_mat, sum_mat=sum_mat, optimal_mat=optimal_mat)
[docs]def proportions(nodes, forecasts, sum_mat, method=MethodT.PHA.name): n_cols = len(list(forecasts.keys())) fcst = forecasts[list(forecasts.keys())[0]].yhat fcst = fcst[:, np.newaxis] num_bts = sum_mat.shape[1] cols = [n.key for n in [nodes] + nodes.traversal_level()][ (n_cols - num_bts) : n_cols ] bts_dat = nodes.to_pandas()[cols] if method == MethodT.AHP.name: divs = np.divide(np.transpose(np.array(bts_dat)), np.array(nodes.get_series())) props = divs.mean(1) props = props[:, np.newaxis] elif method == MethodT.PHA.name: bts_sum = bts_dat.sum(0) top_sum = sum(nodes.get_series()) props = bts_sum / top_sum props = props[:, np.newaxis] else: raise ValueError("Invalid method") return np.dot(np.array(fcst), np.transpose(props))
[docs]def forecast_proportions(forecasts, nodes): """ Cons: Produces biased revised forecasts even if base forecasts are unbiased """ n_cols = len(list(forecasts.keys())) + 1 levels = nodes.get_height() column = 0 first_node = 1 key = choice(list(forecasts.keys())) new_mat = np.empty([len(forecasts[key].yhat), n_cols - 1]) new_mat[:, 0] = forecasts[key].yhat as_iterable = make_iterable(nodes, prop=None) for level in range(levels - 1): for i, node in enumerate(nodes.level_order_traversal()[level]): num_child = node last_node = first_node + num_child base_fcst = np.array( [forecasts[k.key].yhat[:] for k in as_iterable[first_node:last_node]] ) print(base_fcst.shape) fore_sum = np.sum(base_fcst, axis=0) fore_sum = fore_sum[:, np.newaxis] if column == 0: rev_top = np.array(forecasts["total"].yhat) rev_top = rev_top[:, np.newaxis] else: rev_top = np.array(new_mat[:, column]) rev_top = rev_top[:, np.newaxis] new_mat[:, first_node:last_node] = np.divide( np.multiply(np.transpose(base_fcst), rev_top), fore_sum ) column += 1 first_node += num_child return new_mat
[docs]def get_agg_series(df: pandas.DataFrame, levels: List[List[str]]) -> List[str]: """ Get aggregate level series names. Parameters ---------- df : pandas.DataFrame Tabular data. levels : List[List[str]] List of lists containing the desired level of aggregation. Returns ------- List[str] Aggregate series names. """ grouped_levels = [] for level in levels: cross_vals = list( "_".join(x for x in y) for y in df[level].drop_duplicates().values ) grouped_levels += cross_vals return grouped_levels
[docs]def _create_bl_str_col(df: pandas.DataFrame, level_names: List[str]) -> List[str]: """ Concatenate the column values of all the specified level_names by row into a single column. Parameters ---------- df : pandas.DataFrame Tabular data. level_names : List[str] Levels in the hierarchy. Returns ------- List[str] Concatendated column values by row. """ return list("_".join(x for x in y) for y in df[level_names].values)
[docs]def add_agg_series_to_df( df: pandas.DataFrame, grouped_levels: List[str], bottom_levels: List[str] ) -> pandas.DataFrame: """ Add aggregate series columns to wide dataframe. Parameters ---------- df : pandas.DataFrame Wide dataframe containing bottom level series. grouped_levels : List[str] Grouped level, underscore delimited, column names. bottom_levels : List[str] Bottom level, underscore delimited, column names. Returns ------- pandas.DataFrame Wide dataframe with all series in hierarchy. """ component_cols = _get_bl(grouped_levels, bottom_levels) # Add series as specified grouping levels for i, cols in enumerate(component_cols): df[grouped_levels[i]] = df[cols].sum(axis=1) return df
[docs]def _get_bl(grouped_levels: List[str], bottom_levels: List[str]) -> List[List[str]]: """ Get bottom level columns required to sum to create grouped columns. Parameters ---------- grouped_levels : List[str] Grouped level, underscore delimited, column names. bottom_levels : List[str] Bottom level, underscore delimited, column names. Returns ------- List[List[str]] Bottom level column names that make up each individual aggregated node in the hierarchy. """ # Split groupings by "_" b/c this makes it possible to search column names grouped_levels_split = [lev.split("_") for lev in grouped_levels] bottom_levels_split = [lev.split("_") for lev in bottom_levels] cols_to_add = [] for lev in grouped_levels_split: group_bl_cols = [ bl_col for bl_col in bottom_levels_split if set(lev).issubset(bl_col) ] cols_to_add.append(["_".join(lev) for lev in group_bl_cols]) return cols_to_add
[docs]def get_hierarchichal_df( df: pandas.DataFrame, level_names: List[str], hierarchy: List[List[str]], date_colname: str, val_colname: str, ) -> Tuple[pandas.DataFrame, np.array, List[str]]: """ Transform your tabular dataframe to a wide dataframe with desired levels a hierarchy. Parameters ---------- df : pd.DataFrame Tabular dataframe level_names : List[str] Levels in the hierarchy. hierarchy : List[List[str]] Desired levels in your hierarchy. date_colname : str Date column name val_colname : str Name of column containing series values. Returns ------- pd.DataFrame Wide dataframe with levels of specified aggregation. np.array Summing matrix. List[str]: Summing matrix labels. Examples -------- >>> import hts.functions >>> hier_df = pandas.DataFrame( data={ 'ds': ['2020-01', '2020-02'] * 5, "lev1": ['A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'B'], "lev2": ['X', 'X', 'Y', 'Y', 'Z', 'Z', 'X', 'X', 'Y', 'Y'], "val": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } ) >>> hier_df ds lev1 lev2 val 0 2020-01 A X 1 1 2020-02 A X 2 2 2020-01 A Y 3 3 2020-02 A Y 4 4 2020-01 A Z 5 5 2020-02 A Z 6 6 2020-01 B X 7 7 2020-02 B X 8 8 2020-01 B Y 9 9 2020-02 B Y 10 >>> level_names = ['lev1', 'lev2'] >>> hierarchy = [['lev1'], ['lev2']] >>> wide_df, sum_mat, sum_mat_labels = hts.functions.get_hierarchichal_df(hier_df, level_names=level_names, hierarchy=hierarchy, date_colname='ds', val_colname='val') >>> wide_df lev1_lev2 A_X A_Y A_Z B_X B_Y total A B X Y Z ds 2020-01 1 3 5 7 9 25 9 16 8 12 5 2020-02 2 4 6 8 10 30 12 18 10 14 6 """ # Column names separated by underscores level_names_underscores = "_".join(level_names) # Create a column representing the bottom level of aggregation df[level_names_underscores] = _create_bl_str_col(df, level_names) # Pivot df to bottom level. We can create the aggregate these series to get all the higher levels. forecast_df = df.pivot( index=date_colname, columns=level_names_underscores, values=val_colname ) # Sum all bottom level series to get total forecast_df["total"] = forecast_df.sum(axis=1) bottom_levels = list(df[level_names_underscores].unique()) grouped_levels = get_agg_series(df, hierarchy) sum_mat, sum_mat_labels = to_sum_mat( ntree=None, node_labels=[["total"], grouped_levels, bottom_levels] ) forecast_df = add_agg_series_to_df(forecast_df, grouped_levels, bottom_levels) return forecast_df, sum_mat, sum_mat_labels