Source code for hts.utilities.load_data

import logging
import os
from io import StringIO

import numpy
import pandas

logger = logging.getLogger(__name__)

try:
    import requests
except ImportError:  # pragma: no cover
    logger.error(
        "Some loading functions might be impaired, install requests "
        "with: \npip install requests\n if you'd like to use them"
    )

MOBILITY_URL = "https://hierarchical-sample-data.s3.amazonaws.com/mobility.csv"
GEO_EVENTS_URL = "https://osf.io/v8qax/download"


[docs]def get_data_home(data_home=None): """ Return the path of the scikit-hts data dir. This folder is used by some large dataset loaders to avoid downloading the data several times. By default the data dir is set to a folder named 'scikit_hts_data' in the user home folder. Alternatively, it can be set by the 'SCIKIT_HTS_DATA' environment variable or programmatically by giving an explicit folder path. The '~' symbol is expanded to the user home folder. If the folder does not already exist, it is automatically created. Parameters ---------- data_home : str | None The path to scikit-hts data dir. """ if data_home is None: data_home = os.environ.get( "SCIKIT_HTS_DATA", os.path.join("~", "scikit_hts_data") ) data_home = os.path.expanduser(data_home) if not os.path.exists(data_home): os.makedirs(data_home) return data_home
[docs]def partition_column(column, n=3): partitioned = column.apply( lambda x: numpy.random.dirichlet(numpy.ones(n), size=1).ravel() * x ).values return [[i[j] for i in partitioned] for j in range(n)]
[docs]def load_hierarchical_sine_data(start, end, n=10000): dts = (end - start).total_seconds() dti = pandas.DatetimeIndex( [start + pandas.Timedelta(numpy.random.uniform(0, dts), "s") for _ in range(n)] ).sort_values() time = numpy.arange(0, len(dti), 0.01) amplitude = numpy.sin(time) * 10 amplitude += numpy.random.normal(2 * amplitude + 2, 5) df = pandas.DataFrame(index=dti, data={"total": amplitude[0 : len(dti)]}) df["a"], df["b"], df["c"] = partition_column(df.total, n=3) df["a_x"], df["a_y"] = partition_column(df.a, n=2) df["b_x"], df["b_y"] = partition_column(df.b, n=2) df["c_x"], df["c_y"] = partition_column(df.c, n=2) df["a_x_1"], df["a_x_2"] = partition_column(df.a_x, n=2) df["a_y_1"], df["a_y_2"] = partition_column(df.a_y, n=2) df["b_x_1"], df["b_x_2"] = partition_column(df.b_x, n=2) df["b_y_1"], df["b_y_2"] = partition_column(df.b_y, n=2) df["c_x_1"], df["c_x_2"] = partition_column(df.c_x, n=2) df["c_y_1"], df["c_y_2"] = partition_column(df.c_y, n=2) return df
[docs]def load_mobility_data(data_home=None): """ Original dataset: https://www.kaggle.com/pronto/cycle-share-dataset Returns ------- df : pandas.DataFrame """ data_path = get_data_home(data_home) if "mobility.csv" not in os.listdir(data_path): df_string = requests.get(MOBILITY_URL).content df = pandas.read_csv( StringIO(df_string.decode("utf-8")), index_col="starttime", parse_dates=["starttime"], ) df.reset_index().to_csv(os.path.join(data_path, "mobility.csv"), index=False) return df else: return pandas.read_csv( os.path.join(data_path, "mobility.csv"), index_col="starttime", parse_dates=["starttime"], )
[docs]def load_geo_events_data(data_home=None): """ Returns ------- df : pandas.DataFrame """ data_path = get_data_home(data_home) if "power.csv" not in os.listdir(data_path): df_string = requests.get(GEO_EVENTS_URL).content df = pandas.read_csv( StringIO(df_string.decode("utf-8")), parse_dates=["event_ts"], index_col="event_ts", ) df.reset_index().to_csv(os.path.join(data_path, "power.csv"), index=False) return df else: return pandas.read_csv( os.path.join(data_path, "power.csv"), parse_dates=["event_ts"], index_col="event_ts", )