#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pandas as pd
import numpy as np
import functools
from zoo.chronos.data.utils.feature import generate_dt_features, generate_global_features
from zoo.chronos.data.utils.impute import impute_timeseries_dataframe
from zoo.chronos.data.utils.deduplicate import deduplicate_timeseries_dataframe
from zoo.chronos.data.utils.roll import roll_timeseries_dataframe
from zoo.chronos.data.utils.scale import unscale_timeseries_numpy
from zoo.chronos.data.utils.resample import resample_timeseries_dataframe
from zoo.chronos.data.utils.split import split_timeseries_dataframe
from tsfresh.utilities.dataframe_functions import roll_time_series
from tsfresh.utilities.dataframe_functions import impute as impute_tsfresh
from tsfresh import extract_features
from tsfresh.feature_extraction import ComprehensiveFCParameters,\
MinimalFCParameters, EfficientFCParameters
DEFAULT_PARAMS = {"comprehensive": ComprehensiveFCParameters(),
"minimal": MinimalFCParameters(),
"efficient": EfficientFCParameters()}
_DEFAULT_ID_COL_NAME = "id"
_DEFAULT_ID_PLACEHOLDER = "0"
[docs]class TSDataset:
def __init__(self, data, **schema):
'''
TSDataset is an abstract of time series dataset.
Cascade call is supported for most of the transform methods.
'''
self.df = data
self.id_col = schema["id_col"]
self.dt_col = schema["dt_col"]
self.feature_col = schema["feature_col"].copy()
self.target_col = schema["target_col"].copy()
self.numpy_x = None
self.numpy_y = None
self.roll_feature = None
self.roll_target = None
self.roll_feature_df = None
self.roll_addional_feature = None
self.scaler = None
self.scaler_index = [i for i in range(len(self.target_col))]
self.id_sensitive = None
self._check_basic_invariants()
self._id_list = list(np.unique(self.df[self.id_col]))
self._is_pd_datetime = pd.api.types.is_datetime64_any_dtype(self.df[self.dt_col].dtypes)
[docs] @staticmethod
def from_pandas(df,
dt_col,
target_col,
id_col=None,
extra_feature_col=None,
with_split=False,
val_ratio=0,
test_ratio=0.1,
largest_look_back=0,
largest_horizon=1):
'''
Initialize tsdataset(s) from pandas dataframe.
:param df: a pandas dataframe for your raw time series data.
:param dt_col: a str indicates the col name of datetime
column in the input data frame.
:param target_col: a str or list indicates the col name of target column
in the input data frame.
:param id_col: (optional) a str indicates the col name of dataframe id. If
it is not explicitly stated, then the data is interpreted as only
containing a single id.
:param extra_feature_col: (optional) a str or list indicates the col name
of extra feature columns that needs to predict the target column.
:param with_split: (optional) bool, states if we need to split the dataframe
to train, validation and test set. The value defaults to False.
:param val_ratio: (optional) float, validation ratio. Only effective when
with_split is set to True. The value defaults to 0.
:param test_ratio: (optional) float, test ratio. Only effective when with_split
is set to True. The value defaults to 0.1.
:param largest_look_back: (optional) int, the largest length to look back.
Only effective when with_split is set to True. The value defaults to 0.
:param largest_horizon: (optional) int, the largest num of steps to look
forward. Only effective when with_split is set to True. The value defaults
to 1.
:return: a TSDataset instance when with_split is set to False,
three TSDataset instances when with_split is set to True.
Create a tsdataset instance by:
>>> # Here is a df example:
>>> # id datetime value "extra feature 1" "extra feature 2"
>>> # 00 2019-01-01 1.9 1 2
>>> # 01 2019-01-01 2.3 0 9
>>> # 00 2019-01-02 2.4 3 4
>>> # 01 2019-01-02 2.6 0 2
>>> tsdataset = TSDataset.from_pandas(df, dt_col="datetime",
>>> target_col="value", id_col="id",
>>> extra_feature_col=["extra feature 1",
>>> "extra feature 2"])
'''
_check_type(df, "df", pd.DataFrame)
tsdataset_df = df.copy(deep=True)
target_col = _to_list(target_col, name="target_col")
feature_col = _to_list(extra_feature_col, name="extra_feature_col")
if id_col is None:
tsdataset_df[_DEFAULT_ID_COL_NAME] = _DEFAULT_ID_PLACEHOLDER
id_col = _DEFAULT_ID_COL_NAME
if with_split:
tsdataset_dfs = split_timeseries_dataframe(df=tsdataset_df,
id_col=id_col,
val_ratio=val_ratio,
test_ratio=test_ratio,
look_back=largest_look_back,
horizon=largest_horizon)
return [TSDataset(data=tsdataset_dfs[i],
id_col=id_col,
dt_col=dt_col,
target_col=target_col,
feature_col=feature_col) for i in range(3)]
return TSDataset(data=tsdataset_df,
id_col=id_col,
dt_col=dt_col,
target_col=target_col,
feature_col=feature_col)
[docs] def impute(self, mode="last", const_num=0):
'''
Impute the tsdataset by imputing each univariate time series
distinguished by id_col and feature_col.
:param mode: imputation mode, select from "last", "const" or "linear".
"last": impute by propagating the last non N/A number to its following N/A.
if there is no non N/A number ahead, 0 is filled instead.
"const": impute by a const value input by user.
"linear": impute by linear interpolation.
:param const_num: indicates the const number to fill, which is only effective when mode
is set to "const".
:return: the tsdataset instance.
'''
df_list = [impute_timeseries_dataframe(df=self.df[self.df[self.id_col] == id_name],
dt_col=self.dt_col,
mode=mode,
const_num=const_num)
for id_name in self._id_list]
self.df = pd.concat(df_list)
return self
[docs] def deduplicate(self):
'''
Remove those duplicated records which has exactly the same values in each feature_col
for each multivariate timeseries distinguished by id_col.
:return: the tsdataset instance.
'''
df_list = [deduplicate_timeseries_dataframe(df=self.df[self.df[self.id_col] == id_name],
dt_col=self.dt_col)
for id_name in self._id_list]
self.df = pd.concat(df_list)
return self
[docs] def resample(self, interval, start_time=None, end_time=None, merge_mode="mean"):
'''
Resample on a new interval for each univariate time series distinguished
by id_col and feature_col.
:param interval: pandas offset aliases, indicating time interval of the output dataframe.
:param start_time: start time of the output dataframe.
:param end_time: end time of the output dataframe.
:param merge_mode: if current interval is smaller than output interval,
we need to merge the values in a mode. "max", "min", "mean"
or "sum" are supported for now.
:return: the tsdataset instance.
'''
df_list = []
for id_name in self._id_list:
df_id = resample_timeseries_dataframe(df=self.df[self.df[self.id_col] == id_name]
.drop(self.id_col, axis=1),
dt_col=self.dt_col,
interval=interval,
start_time=start_time,
end_time=end_time,
merge_mode=merge_mode)
df_id[self.id_col] = id_name
df_list.append(df_id.copy())
self.df = pd.concat(df_list)
return self
[docs] def gen_dt_feature(self):
'''
| Generate datetime feature for each row. Currently we generate following features:
| "MINUTE": The minute of the time stamp.
| "DAY": The day of the time stamp.
| "DAYOFYEAR": The ordinal day of the year of the time stamp.
| "HOUR": The hour of the time stamp.
| "WEEKDAY": The day of the week of the time stamp, Monday=0, Sunday=6.
| "WEEKOFYEAR": The ordinal week of the year of the time stamp.
| "MONTH": The month of the time stamp.
| "IS_AWAKE": Bool value indicating whether it belongs to awake hours for the time stamp,
| True for hours between 6A.M. and 1A.M.
| "IS_BUSY_HOURS": Bool value indicating whether it belongs to busy hours for the time
| stamp, True for hours between 7A.M. and 10A.M. and hours between 4P.M. and 8P.M.
| "IS_WEEKEND": Bool value indicating whether it belongs to weekends for the time stamp,
| True for Saturdays and Sundays.
:return: the tsdataset instance.
'''
df_list = [generate_dt_features(input_df=self.df[self.df[self.id_col] == id_name],
dt_col=self.dt_col)
for id_name in self._id_list]
self.df = pd.concat(df_list)
from zoo.chronos.data.utils.feature import TIME_FEATURE, \
ADDITIONAL_TIME_FEATURE_HOUR, ADDITIONAL_TIME_FEATURE_WEEKDAY
increased_attrbutes = list(TIME_FEATURE) +\
list(ADDITIONAL_TIME_FEATURE_HOUR) +\
list(ADDITIONAL_TIME_FEATURE_WEEKDAY)
self.feature_col += [attr + "({})".format(self.dt_col) for attr in increased_attrbutes]
return self
[docs] def gen_global_feature(self, settings="comprehensive", full_settings=None):
'''
Generate per-time-series feature for each time series.
This method will be implemented by tsfresh.
TODO: relationship with scale should be figured out.
:param settings: str or dict. If a string is set, then it must be one of "comprehensive"
"minimal" and "efficient". If a dict is set, then it should follow the instruction
for default_fc_parameters in tsfresh. The value is defaulted to "comprehensive".
:param full_settings: dict. It should follow the instruction for kind_to_fc_parameters in
tsfresh. The value is defaulted to None.
:return: the tsdataset instance.
'''
if full_settings is not None:
self.df,\
addtional_feature =\
generate_global_features(input_df=self.df,
column_id=self.id_col,
column_sort=self.dt_col,
kind_to_fc_parameters=full_settings)
self.feature_col += addtional_feature
return self
if isinstance(settings, str):
assert settings in ["comprehensive", "minimal", "efficient"], \
f"settings str should be one of \"comprehensive\", \"minimal\", \"efficient\"\
, but found {settings}."
default_fc_parameters = DEFAULT_PARAMS[settings]
else:
default_fc_parameters = settings
self.df,\
addtional_feature =\
generate_global_features(input_df=self.df,
column_id=self.id_col,
column_sort=self.dt_col,
default_fc_parameters=default_fc_parameters)
self.feature_col += addtional_feature
return self
[docs] def gen_rolling_feature(self,
window_size,
settings="comprehensive",
full_settings=None):
'''
Generate aggregation feature for each sample.
This method will be implemented by tsfresh.
TODO: relationship with scale should be figured out.
:param window_size: int, generate feature according to the rolling result.
:param settings: str or dict. If a string is set, then it must be one of "comprehensive"
"minimal" and "efficient". If a dict is set, then it should follow the instruction
for default_fc_parameters in tsfresh. The value is defaulted to "comprehensive".
:param full_settings: dict. It should follow the instruction for kind_to_fc_parameters in
tsfresh. The value is defaulted to None.
:return: the tsdataset instance.
'''
if isinstance(settings, str):
assert settings in ["comprehensive", "minimal", "efficient"], \
f"settings str should be one of \"comprehensive\", \"minimal\", \"efficient\"\
, but found {settings}."
default_fc_parameters = DEFAULT_PARAMS[settings]
else:
default_fc_parameters = settings
df_rolled = roll_time_series(self.df,
column_id=self.id_col,
column_sort=self.dt_col,
max_timeshift=window_size-1,
min_timeshift=window_size-1)
if not full_settings:
self.roll_feature_df = extract_features(df_rolled,
column_id=self.id_col,
column_sort=self.dt_col,
default_fc_parameters=default_fc_parameters)
else:
self.roll_feature_df = extract_features(df_rolled,
column_id=self.id_col,
column_sort=self.dt_col,
kind_to_fc_parameters=full_settings)
impute_tsfresh(self.roll_feature_df)
self.feature_col += list(self.roll_feature_df.columns)
self.roll_addional_feature = list(self.roll_feature_df.columns)
return self
[docs] def roll(self,
lookback,
horizon,
feature_col=None,
target_col=None,
id_sensitive=False):
'''
Sampling by rolling for machine learning/deep learning models.
:param lookback: int, lookback value.
:param horizon: int or list,
if `horizon` is an int, we will sample `horizon` step
continuously after the forecasting point.
if `horizon` is a list, we will sample discretely according
to the input list.
specially, when `horizon` is set to 0, ground truth will be generated as None.
:param feature_col: str or list, indicates the feature col name. Default to None,
where we will take all available feature in rolling.
:param target_col: str or list, indicates the target col name. Default to None,
where we will take all target in rolling. it should be a subset of target_col
you used to initialize the tsdataset.
:param id_sensitive: bool,
if `id_sensitive` is False, we will rolling on each id's sub dataframe
and fuse the sampings.
The shape of rolling will be
x: (num_sample, lookback, num_feature_col + num_target_col)
y: (num_sample, horizon, num_target_col)
where num_sample is the summation of sample number of each dataframe
if `id_sensitive` is True, we will rolling on the wide dataframe whose
columns are cartesian product of id_col and feature_col
The shape of rolling will be
x: (num_sample, lookback, new_num_feature_col + new_num_target_col)
y: (num_sample, horizon, new_num_target_col)
where num_sample is the sample number of the wide dataframe,
new_num_feature_col is the product of the number of id and the number of feature_col.
new_num_target_col is the product of the number of id and the number of target_col.
:return: the tsdataset instance.
roll() can be called by:
>>> # Here is a df example:
>>> # id datetime value "extra feature 1" "extra feature 2"
>>> # 00 2019-01-01 1.9 1 2
>>> # 01 2019-01-01 2.3 0 9
>>> # 00 2019-01-02 2.4 3 4
>>> # 01 2019-01-02 2.6 0 2
>>> tsdataset = TSDataset.from_pandas(df, dt_col="datetime",
>>> target_col="value", id_col="id",
>>> extra_feature_col=["extra feature 1",
>>> "extra feature 2"])
>>> horizon, lookback = 1, 1
>>> tsdataset.roll(lookback=lookback, horizon=horizon, id_sensitive=False)
>>> x, y = tsdataset.to_numpy()
>>> print(x, y) # x = [[[1.9, 1, 2 ]], [[2.3, 0, 9 ]]] y = [[[ 2.4 ]], [[ 2.6 ]]]
>>> print(x.shape, y.shape) # x.shape = (2, 1, 3) y.shape = (2, 1, 1)
>>> tsdataset.roll(lookback=lookback, horizon=horizon, id_sensitive=True)
>>> x, y = tsdataset.to_numpy()
>>> print(x, y) # x = [[[ 1.9, 2.3, 1, 2, 0, 9 ]]] y = [[[ 2.4, 2.6]]]
>>> print(x.shape, y.shape) # x.shape = (1, 1, 6) y.shape = (1, 1, 2)
'''
feature_col = _to_list(feature_col, "feature_col") if feature_col is not None \
else self.feature_col
target_col = _to_list(target_col, "target_col") if target_col is not None \
else self.target_col
if self.roll_addional_feature:
additional_feature_col =\
list(set(feature_col).intersection(set(self.roll_addional_feature)))
feature_col =\
list(set(feature_col) - set(self.roll_addional_feature))
self.roll_feature = feature_col + additional_feature_col
else:
additional_feature_col = None
self.roll_feature = feature_col
self.roll_target = target_col
num_id = len(self._id_list)
num_feature_col = len(self.roll_feature)
num_target_col = len(self.roll_target)
self.id_sensitive = id_sensitive
roll_feature_df = None if self.roll_feature_df is None \
else self.roll_feature_df[additional_feature_col]
rolling_result =\
self.df.groupby([self.id_col])\
.apply(lambda df: roll_timeseries_dataframe(df=df,
roll_feature_df=roll_feature_df,
lookback=lookback,
horizon=horizon,
feature_col=feature_col,
target_col=target_col))
# concat the result on required axis
concat_axis = 2 if id_sensitive else 0
self.numpy_x = np.concatenate([rolling_result[i][0]
for i in self._id_list],
axis=concat_axis).astype(np.float64)
if horizon != 0:
self.numpy_y = np.concatenate([rolling_result[i][1]
for i in self._id_list],
axis=concat_axis).astype(np.float64)
else:
self.numpy_y = None
# target first
if self.id_sensitive:
feature_start_idx = num_target_col*num_id
reindex_list = [list(range(i*num_target_col, (i+1)*num_target_col)) +
list(range(feature_start_idx+i*num_feature_col,
feature_start_idx+(i+1)*num_feature_col))
for i in range(num_id)]
reindex_list = functools.reduce(lambda a, b: a+b, reindex_list)
sorted_index = sorted(range(len(reindex_list)), key=reindex_list.__getitem__)
self.numpy_x = self.numpy_x[:, :, sorted_index]
# scaler index
num_roll_target = len(self.roll_target)
repeat_factor = len(self._id_list) if self.id_sensitive else 1
scaler_index = [self.target_col.index(self.roll_target[i])
for i in range(num_roll_target)] * repeat_factor
self.scaler_index = scaler_index
return self
[docs] def to_numpy(self):
'''
Export rolling result in form of a tuple of numpy ndarray (x, y).
:return: a 2-dim tuple. each item is a 3d numpy ndarray. The ndarray
is casted to float64.
'''
if self.numpy_x is None:
raise RuntimeError("Please call \"roll\" method\
before transform a TSDataset to numpy ndarray!")
return self.numpy_x, self.numpy_y
[docs] def to_pandas(self):
'''
Export the pandas dataframe.
:return: the internal dataframe.
'''
return self.df.copy()
[docs] def scale(self, scaler, fit=True):
'''
Scale the time series dataset's feature column and target column.
:param scaler: sklearn scaler instance, StandardScaler, MaxAbsScaler,
MinMaxScaler and RobustScaler are supported.
:param fit: if we need to fit the scaler. Typically, the value should
be set to True for training set, while False for validation and
test set. The value is defaulted to True.
:return: the tsdataset instance.
Assume there is a training set tsdata and a test set tsdata_test.
scale() should be called first on training set with default value fit=True,
then be called on test set with the same scaler and fit=False.
>>> from sklearn.preprocessing import StandardScaler
>>> scaler = StandardScaler()
>>> tsdata.scale(scaler, fit=True)
>>> tsdata_test.scale(scaler, fit=False)
'''
feature_col = self.feature_col
if self.roll_addional_feature:
feature_col = []
for feature in self.feature_col:
if feature not in self.roll_addional_feature:
feature_col.append(feature)
if fit:
self.df[self.target_col + feature_col] = \
scaler.fit_transform(self.df[self.target_col + feature_col])
else:
self.df[self.target_col + feature_col] = \
scaler.transform(self.df[self.target_col + feature_col])
self.scaler = scaler
return self
[docs] def unscale(self):
'''
Unscale the time series dataset's feature column and target column.
:return: the tsdataset instance.
'''
feature_col = self.feature_col
if self.roll_addional_feature:
feature_col = []
for feature in self.feature_col:
if feature not in self.roll_addional_feature:
feature_col.append(feature)
self.df[self.target_col + feature_col] = \
self.scaler.inverse_transform(self.df[self.target_col + feature_col])
return self
[docs] def unscale_numpy(self, data):
'''
Unscale the time series forecaster's numpy prediction result/ground truth.
:param data: a numpy ndarray with 3 dim whose shape should be exactly the
same with self.numpy_y.
:return: the unscaled numpy ndarray.
'''
return unscale_timeseries_numpy(data, self.scaler, self.scaler_index)
def _check_basic_invariants(self):
'''
This function contains a bunch of assertions to make sure strict rules(the invariants)
for the internal dataframe(self.df) must stands. If not, clear and user-friendly error
or warning message should be provided to the users.
This function will be called after each method(e.g. impute, deduplicate ...).
'''
# check type
_check_type(self.df, "df", pd.DataFrame)
_check_type(self.id_col, "id_col", str)
_check_type(self.dt_col, "dt_col", str)
_check_type(self.target_col, "target_col", list)
_check_type(self.feature_col, "feature_col", list)
# check valid name
_check_col_within(self.df, self.id_col)
_check_col_within(self.df, self.dt_col)
for target_col_name in self.target_col:
_check_col_within(self.df, target_col_name)
for feature_col_name in self.feature_col:
if self.roll_addional_feature and feature_col_name in self.roll_addional_feature:
continue
_check_col_within(self.df, feature_col_name)
# check no n/a in critical col
_check_col_no_na(self.df, self.dt_col)
_check_col_no_na(self.df, self.id_col)
def _to_list(item, name, expect_type=str):
if isinstance(item, list):
return item
if item is None:
return []
_check_type(item, name, expect_type)
return [item]
def _check_type(item, name, expect_type):
assert isinstance(item, expect_type),\
f"a {str(expect_type)} is expected for {name} but found {type(item)}"
def _check_col_within(df, col_name):
assert col_name in df.columns,\
f"{col_name} is expected in dataframe while not found"
def _check_col_no_na(df, col_name):
_check_col_within(df, col_name)
assert df[col_name].isna().sum() == 0,\
f"{col_name} column should not have N/A."