#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math
import numpy as np
from zoo.chronos.detector.anomaly.abstract import AnomalyDetector
from abc import ABC, abstractmethod
class Distance(ABC):
"""
The Base Distance Class.
"""
@abstractmethod
def abs_dist(self, x, y):
"""
Calculate the distance between x and y. a and b should be in same shape.
:param x: the first tensor
:param y: the second tensor
:return: the absolute distance between x and y
"""
pass
class EuclideanDistance(Distance):
"""
Euclidean Distance Measure
"""
def __init__(self):
pass
def abs_dist(self, x, y):
return np.linalg.norm(x - y)
def estimate_th(y,
yhat,
mode="default",
ratio=0.01,
dist_measure=EuclideanDistance()):
"""
Estimate the threshold based on y and yhat
:param y: actual values
:param yhat: predicted values
:param mode: types of ways to find threshold
"default" : fit data to a uniform distribution (the percentile way)
"gaussian": fit data to a gaussian distribution
:param ratio: the ratio of anomaly to consider as anomaly.
:param dist_measure: measure of distance
:return: the threshold
"""
assert y.shape == yhat.shape
diff = [dist_measure.abs_dist(m, n) for m, n in zip(y, yhat)]
if mode == "default":
threshold = np.percentile(diff, (1 - ratio) * 100)
return threshold
elif mode == "gaussian":
from scipy.stats import norm
mu, sigma = norm.fit(diff)
t = norm.ppf(1 - ratio)
return t * sigma + mu
else:
raise ValueError("Does not support", mode)
def detect_all(y, yhat, th, dist_measure):
anomaly_scores = np.zeros_like(y)
anomaly_indexes = []
for i, (y_i, yhat_i) in enumerate(zip(y, yhat)):
if dist_measure.abs_dist(y_i, yhat_i) > th:
anomaly_indexes.append(i)
anomaly_scores[i] = 1
return anomaly_indexes, anomaly_scores
def detect_range(y, th):
# use threshold (-1, 1) for each dimension
threshold_min = np.full_like(y, fill_value=th[0])
threshold_max = np.full_like(y, fill_value=th[1])
return detect_range_arr(y, (threshold_min, threshold_max))
def detect_range_arr(y, th_arr):
min_diff = y - th_arr[0]
max_diff = y - th_arr[1]
anomaly_indexes = np.logical_or(min_diff < 0, max_diff > 0)
anomaly_scores = np.zeros_like(y)
anomaly_scores[anomaly_indexes] = 1
# anomaly_index.update(np.where(max_diff > 0)[0])
return list(set(np.where(anomaly_scores > 0)[0])), anomaly_scores
def detect_anomaly(y,
yhat=None,
th=math.inf,
dist_measure=EuclideanDistance()):
"""
Detect anomalies. Each sample can have 1 or more dimensions.
:param y: the values to detect. shape could be 1-D (num_samples,)
or 2-D array (num_samples, features)
:param yhat: the estimated values, a tensor with same shape as y,
could be None when threshold is a tuple
:param th: threshold, could be
1. a single value - absolute distance threshold, same for all samples
2. a tuple (min, max) - min and max are either int/float or tensors in same shape as y,
yhat is ignored in this case
:param dist_measure: measure of distance
:return: the anomaly values indexes in the samples, i.e. num_samples dimension.
"""
if isinstance(th, int) or isinstance(th, float):
if yhat is None:
raise ValueError("Please specify a threshold range (min,max) ",
"if forecast values are not available")
return detect_all(y, yhat, th, dist_measure)
elif isinstance(th, tuple) and len(th) == 2:
# min max values are scalars
if (isinstance(th[0], int) or isinstance(th[0], float)) \
and (isinstance(th[1], int) or isinstance(th[1], float)):
if th[0] > th[1]:
raise ValueError(
"In threshold (min,max), max should be larger than min")
return detect_range(y, th)
# min max values are arrays
elif th[0].shape == y.shape and th[-1].shape == y.shape:
if np.any((th[1] - th[0]) < 0):
raise ValueError("In threshold (min,max) ",
"each data point in max tensor should be larger than min")
return detect_range_arr(y, th)
else:
raise ValueError("Threshold format", str(th), "is not supported")
else:
raise ValueError(
"Threshold format", str(th),
"is not supported")
[docs]class ThresholdDetector(AnomalyDetector):
"""
Example:
>>> #The dataset is split into x_train, x_test, y_train, y_test
>>> forecaster = Forecaster(...)
>>> forecaster.fit(x=x_train, y=y_train, ...)
>>> y_pred = forecaster.predict(x_test)
>>> td = ThresholdDetector()
>>> td.set_params(threshold=10)
>>> td.fit(y_test, y_pred)
>>> anomaly_scores = td.score()
>>> anomaly_indexes = td.anomaly_indexes()
"""
def __init__(self):
"""
Initialize a ThresholdDetector.
"""
self.th = math.inf
self.ratio = 0.01
self.dist_measure = EuclideanDistance()
self.mode = "default"
self.anomaly_indexes_ = None
self.anomaly_scores_ = None
[docs] def set_params(self,
mode="default",
ratio=0.01,
threshold=math.inf,
dist_measure=EuclideanDistance()):
"""
Set parameters for ThresholdDetector
:param mode: mode can be "default" or "gaussian".
"default" : fit data according to a uniform distribution
"gaussian": fit data according to a gaussian distribution
:param ratio: the ratio of anomaly to consider as anomaly.
:param threshold: threshold, could be
1. a single value - absolute distance threshold, same for all samples
2. a tuple (min, max) - min and max are either int/float or tensors in same shape as y,
yhat is ignored in this case
:param dist_measure: measure of distance
"""
self.ratio = ratio
self.dist_measure = dist_measure
self.mode = mode
self.th = threshold
[docs] def fit(self, y, y_pred=None):
"""
Fit the model
:param y: the values to detect. shape could be 1-D (num_samples,)
or 2-D array (num_samples, features)
:param y_pred: the estimated values, a tensor with same shape as y
could be None when threshold is a tuple
"""
if y_pred is not None and self.th == math.inf:
self.th = estimate_th(y,
y_pred,
mode=self.mode,
ratio=self.ratio,
dist_measure=self.dist_measure)
# calculate anomalies in advance in case score does not specify input
anomalies = detect_anomaly(y, y_pred, self.th, self.dist_measure)
self.anomaly_indexes_ = anomalies[0]
self.anomaly_scores_ = anomalies[1]
[docs] def score(self, y=None, y_pred=None):
"""
Gets the anomaly scores for each sample. Each anomaly score is either 0 or 1,
where 1 indicates an anomaly.
:param y: new time series to detect anomaly. if y is None, returns anomalies
in the fit input, y_pred is ignored in this case
:param y_pred: forecasts corresponding to y
:return: anomaly score for each sample, in an array format with the same size as input
"""
if y is None:
if self.anomaly_scores_ is None:
raise RuntimeError("please call fit before calling score")
return self.anomaly_scores_
else:
return detect_anomaly(y,
y_pred,
self.th,
self.dist_measure)[1]
[docs] def anomaly_indexes(self):
"""
Gets the indexes of the anomalies.
:return: the indexes of the anomalies.
"""
if self.anomaly_indexes_ is None:
raise RuntimeError("Please call fit first")
return self.anomaly_indexes_