#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from zoo.chronos.model.tcmf_model import TCMFNdarrayModelWrapper, \
TCMFXshardsModelWrapper
from zoo.orca.data import SparkXShards
from zoo.chronos.forecaster.abstract import Forecaster
[docs]class TCMFForecaster(Forecaster):
"""
Example:
>>> import numpy as np
>>> model = TCMFForecaster()
>>> fit_params = dict(val_len=12,
start_date="2020-1-1",
freq="5min",
y_iters=1,
init_FX_epoch=1,
max_FX_epoch=1,
max_TCN_epoch=1,
alt_iters=2)
>>> ndarray_input = {'id': np.arange(300), 'y': np.random.rand(300, 480)}
>>> model.fit(ndarray_input, fit_params)
>>> horizon = np.random.randint(1, 50)
>>> yhat = model.predict(horizon=horizon)
>>> model.save({tempdirname})
>>> loaded_model = TCMFForecaster.load({tempdirname}, is_xshards_distributed=False)
>>> data_new = np.random.rand(300, horizon)
>>> model.evaluate(target_value=dict({"y": data_new}), metric=['mse'])
>>> model.fit_incremental({"y": data_new})
>>> yhat_incr = model.predict(horizon=horizon)
"""
def __init__(self,
vbsize=128,
hbsize=256,
num_channels_X=[32, 32, 32, 32, 32, 1],
num_channels_Y=[16, 16, 16, 16, 16, 1],
kernel_size=7,
dropout=0.1,
rank=64,
kernel_size_Y=7,
learning_rate=0.0005,
normalize=False,
use_time=True,
svd=True,):
"""
Build a TCMF Forecast Model.
:param vbsize: int, default is 128.
Vertical batch size, which is the number of cells per batch.
:param hbsize: int, default is 256.
Horizontal batch size, which is the number of time series per batch.
:param num_channels_X: list, default=[32, 32, 32, 32, 32, 1].
List containing channel progression of temporal convolution network for local model
:param num_channels_Y: list, default=[16, 16, 16, 16, 16, 1]
List containing channel progression of temporal convolution network for hybrid model.
:param kernel_size: int, default is 7.
Kernel size for local models
:param dropout: float, default is 0.1.
Dropout rate during training
:param rank: int, default is 64.
The rank in matrix factorization of global model.
:param kernel_size_Y: int, default is 7.
Kernel size of hybrid model
:param learning_rate: float, default is 0.0005
:param normalize: boolean, false by default.
Whether to normalize input data for training.
:param use_time: boolean, default is True.
Whether to use time coveriates.
:param svd: boolean, default is False.
Whether factor matrices are initialized by NMF
"""
self.internal = None
self.config = {
"vbsize": vbsize,
"hbsize": hbsize,
"num_channels_X": num_channels_X,
"num_channels_Y": num_channels_Y,
"kernel_size": kernel_size,
"dropout": dropout,
"rank": rank,
"kernel_size_Y": kernel_size_Y,
"learning_rate": learning_rate,
"normalize": normalize,
"use_time": use_time,
"svd": svd,
}
[docs] def fit(self,
x,
val_len=24,
start_date="2020-4-1",
freq="1H",
covariates=None,
dti=None,
period=24,
y_iters=10,
init_FX_epoch=100,
max_FX_epoch=300,
max_TCN_epoch=300,
alt_iters=10,
num_workers=None):
"""
Fit the model on x from scratch
:param x: the input for fit. Only dict of ndarray and SparkXShards of dict of ndarray
are supported. Example: {'id': id_arr, 'y': data_ndarray}, and data_ndarray
is of shape (n, T), where n is the number f target time series and T is the
number of time steps.
:param val_len: int, default is 24.
Validation length. We will use the last val_len time points as validation data.
:param start_date: str or datetime-like.
Start date time for the time-series. e.g. "2020-01-01"
:param freq: str or DateOffset, default is 'H'
Frequency of data
:param covariates: 2-D ndarray or None. The shape of ndarray should be (r, T), where r is
the number of covariates and T is the number of time points.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
:param dti: DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with start_date and freq.
:param period: int, default is 24.
Periodicity of input time series, leave it out if not known
:param y_iters: int, default is 10.
Number of iterations while training the hybrid model.
:param init_FX_epoch: int, default is 100.
Number of iterations while initializing factors
:param max_FX_epoch: int, default is 300.
Max number of iterations while training factors.
:param max_TCN_epoch: int, default is 300.
Max number of iterations while training the local model.
:param alt_iters: int, default is 10.
Number of iterations while alternate training.
:param num_workers: the number of workers you want to use for fit. If None, it defaults to
num_ray_nodes in the created RayContext or 1 if there is no active RayContext.
"""
if self.internal is None:
if isinstance(x, SparkXShards):
self.internal = TCMFXshardsModelWrapper(self.config)
elif isinstance(x, dict):
self.internal = TCMFNdarrayModelWrapper(self.config)
else:
raise ValueError("value of x should be a dict of ndarray or "
"an xShards of dict of ndarray")
try:
self.internal.fit(x,
num_workers=num_workers,
val_len=val_len,
start_date=start_date,
freq=freq,
covariates=covariates,
dti=dti,
period=period,
y_iters=y_iters,
init_FX_epoch=init_FX_epoch,
max_FX_epoch=max_FX_epoch,
max_TCN_epoch=max_TCN_epoch,
alt_iters=alt_iters,
)
except Exception as inst:
self.internal = None
raise inst
else:
raise Exception("This model has already been fully trained, "
"you can only run full training once.")
[docs] def fit_incremental(self, x_incr, covariates_incr=None, dti_incr=None):
"""
Incrementally fit the model. Note that we only incrementally fit X_seq (TCN in global model)
:param x_incr: incremental data to be fitted. It should be of the same format as input x in
fit, which is a dict of ndarray or SparkXShards of dict of ndarray.
Example: {'id': id_arr, 'y': incr_ndarray}, and incr_ndarray is of shape (n, T_incr)
, where n is the number of target time series, T_incr is the number of time steps
incremented. You can choose not to input 'id' in x_incr, but if you do, the elements
of id in x_incr should be the same as id in x of fit.
:param covariates_incr: covariates corresponding to x_incr. 2-D ndarray or None.
The shape of ndarray should be (r, T_incr), where r is the number of covariates.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
:param dti_incr: dti corresponding to the x_incr. DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with the last date of x in
fit and freq.
"""
self.internal.fit_incremental(x_incr,
covariates_incr=covariates_incr,
dti_incr=dti_incr)
[docs] def evaluate(self,
target_value,
metric=['mae'],
target_covariates=None,
target_dti=None,
num_workers=None,
):
"""
Evaluate the model
:param target_value: target value for evaluation. We interpret its second dimension of
as the horizon length for evaluation.
:param metric: the metrics. A list of metric names.
:param target_covariates: covariates corresponding to target_value.
2-D ndarray or None.
The shape of ndarray should be (r, horizon), where r is the number of covariates.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
:param target_dti: dti corresponding to target_value.
DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with the last date of x in
fit and freq.
:param num_workers: the number of workers to use in evaluate. If None, it defaults to
num_ray_nodes in the created RayContext or 1 if there is no active RayContext.
:return: A list of evaluation results. Each item represents a metric.
"""
return self.internal.evaluate(y=target_value,
metric=metric,
target_covariates=target_covariates,
target_dti=target_dti,
num_workers=num_workers)
[docs] def predict(self,
horizon=24,
future_covariates=None,
future_dti=None,
num_workers=None,
):
"""
Predict using a trained forecaster.
:param horizon: horizon length to look forward.
:param future_covariates: covariates corresponding to future horizon steps data to predict.
2-D ndarray or None.
The shape of ndarray should be (r, horizon), where r is the number of covariates.
Global covariates for all time series. If None, only default time coveriates will be
used while use_time is True. If not, the time coveriates used is the stack of input
covariates and default time coveriates.
:param future_dti: dti corresponding to future horizon steps data to predict.
DatetimeIndex or None.
If None, use default fixed frequency DatetimeIndex generated with the last date of x in
fit and freq.
:param num_workers: the number of workers to use in predict. If None, it defaults to
num_ray_nodes in the created RayContext or 1 if there is no active RayContext.
:return: A numpy ndarray with shape of (nd, horizon), where nd is the same number
of time series as input x in fit_eval.
"""
if self.internal is None:
raise Exception("You should run fit before calling predict()")
else:
return self.internal.predict(horizon,
future_covariates=future_covariates,
future_dti=future_dti,
num_workers=num_workers)
[docs] def save(self, path):
"""
Save the forecaster.
:param path: Path to target saved file.
"""
if self.internal is None:
raise Exception("You should run fit before calling save()")
else:
self.internal.save(path)
[docs] def is_xshards_distributed(self):
"""
Check whether model is distributed by input xshards.
:return: True if the model is distributed by input xshards
"""
if self.internal is None:
raise ValueError(
"You should run fit before calling is_xshards_distributed()")
else:
return self.internal.is_xshards_distributed()
[docs] @classmethod
def load(cls, path, is_xshards_distributed=False, minPartitions=None):
"""
Load a saved model.
:param path: The location you want to save the forecaster.
:param is_xshards_distributed: Whether the model is distributed trained with
input of dict of SparkXshards.
:param minPartitions: The minimum partitions for the XShards.
:return: the model loaded
"""
loaded_model = TCMFForecaster()
if is_xshards_distributed:
loaded_model.internal = TCMFXshardsModelWrapper(
loaded_model.config)
loaded_model.internal.load(path, minPartitions=minPartitions)
else:
loaded_model.internal = TCMFNdarrayModelWrapper(
loaded_model.config)
loaded_model.internal.load(path)
return loaded_model