"""
ewm.py contains functions and classes for exponential weighted moving stream
operations.
"""
from typing import List, Tuple
import numpy as np
from tensortrade.feed.core.base import Stream
from tensortrade.feed.api.float import Float
[docs]
class ExponentialWeightedMovingAverage(Stream[float]):
r"""A stream operator that computes an exponential weighted moving average
on a given float stream.
Parameters
----------
alpha : float
The smoothing factor :math:`\alpha` directly,
:math:`0 < \alpha \leq 1`.
adjust : bool
Divide by decaying adjustment factor in beginning periods to account
for imbalance in relative weightings (viewing EWMA as a moving average).
ignore_na : bool
Ignore missing values when calculating weights.
min_periods : int
Minimum number of observations in window required to have a value
(otherwise result is NA).
References
----------
.. [1] https://github.com/pandas-dev/pandas/blob/d9fff2792bf16178d4e450fe7384244e50635733/pandas/_libs/window/aggregations.pyx#L1801
"""
def __init__(self,
alpha: float,
adjust: bool,
ignore_na: bool,
min_periods: int) -> None:
super().__init__()
self.alpha = alpha
self.adjust = adjust
self.ignore_na = ignore_na
self.min_periods = max(min_periods, 1)
self.i = 0
self.n = 0
self.avg = None
self.factor = 1 - alpha
self.new_wt = 1 if self.adjust else self.alpha
self.old_wt = 1
[docs]
def forward(self) -> float:
value = self.inputs[0].value
if self.avg is None:
is_observation = (value == value)
self.n += int(is_observation)
self.avg = value
return self.avg if self.n >= self.min_periods else np.nan
is_observation = (value == value)
self.n += is_observation
if self.avg == self.avg:
if is_observation or not self.ignore_na:
self.old_wt *= self.factor
if is_observation:
# avoid numerical errors on constant series
if self.avg != value:
num = self.old_wt * self.avg + self.new_wt * value
den = self.old_wt + self.new_wt
self.avg = num / den
if self.adjust:
self.old_wt += self.new_wt
else:
self.old_wt = 1
elif is_observation:
self.avg = value
return self.avg if self.n >= self.min_periods else np.nan
[docs]
def has_next(self) -> bool:
return True
[docs]
def reset(self) -> None:
self.i = 0
self.n = 0
self.avg = None
self.old_wt = 1
super().reset()
[docs]
class ExponentialWeightedMovingCovariance(Stream[float]):
r"""A stream operator that computes an exponential weighted moving average
on a given float stream.
Parameters
----------
alpha : float
The smoothing factor :math:`\alpha` directly,
:math:`0 < \alpha \leq 1`.
adjust : bool
Divide by decaying adjustment factor in beginning periods to account
for imbalance in relative weightings (viewing EWMA as a moving average).
ignore_na : bool
Ignore missing values when calculating weights.
min_periods : int
Minimum number of observations in window required to have a value
(otherwise result is NA).
bias : bool
Use a standard estimation bias correction
"""
def __init__(self,
alpha: float,
adjust: bool,
ignore_na: bool,
min_periods: int,
bias: bool) -> None:
super().__init__()
self.alpha = alpha
self.adjust = adjust
self.ignore_na = ignore_na
self.min_periods = min_periods
self.bias = bias
self.i = 0
self.n = 0
self.minp = max(self.min_periods, 1)
self.avg = None
self.factor = 1 - alpha
self.new_wt = 1 if self.adjust else self.alpha
self.old_wt = 1
self.mean_x = None
self.mean_y = None
self.cov = 0
self.sum_wt = 1
self.sum_wt2 = 1
self.old_wt = 1
[docs]
def forward(self) -> float:
v1 = self.inputs[0].value
v2 = self.inputs[1].value
if self.mean_x is None and self.mean_y is None:
self.mean_x = v1
self.mean_y = v2
is_observation = (self.mean_x == self.mean_x) and (self.mean_y == self.mean_y)
self.n += int(is_observation)
if not is_observation:
self.mean_x = np.nan
self.mean_y = np.nan
return (0. if self.bias else np.nan) if self.n >= self.minp else np.nan
is_observation = (v1 == v1) and (v2 == v2)
self.n += is_observation
if self.mean_x == self.mean_x:
if is_observation or not self.ignore_na:
self.sum_wt *= self.factor
self.sum_wt2 *= (self.factor * self.factor)
self.old_wt *= self.factor
if is_observation:
old_mean_x = self.mean_x
old_mean_y = self.mean_y
# avoid numerical errors on constant streams
wt_sum = self.old_wt + self.new_wt
if self.mean_x != v1:
self.mean_x = ((self.old_wt * old_mean_x) + (self.new_wt * v1)) / wt_sum
# avoid numerical errors on constant series
if self.mean_y != v2:
self.mean_y = ((self.old_wt * old_mean_y) + (self.new_wt * v2)) / wt_sum
d1 = old_mean_x - self.mean_x
d2 = old_mean_y - self.mean_y
d3 = v1 - self.mean_x
d4 = v2 - self.mean_y
t1 = self.old_wt * (self.cov + d1 * d2)
t2 = self.new_wt * d3 * d4
self.cov = (t1 + t2) / wt_sum
self.sum_wt += self.new_wt
self.sum_wt2 += self.new_wt * self.new_wt
self.old_wt += self.new_wt
if not self.adjust:
self.sum_wt /= self.old_wt
self.sum_wt2 /= self.old_wt * self.old_wt
self.old_wt = 1
elif is_observation:
self.mean_x = v1
self.mean_y = v2
if self.n >= self.minp:
if not self.bias:
numerator = self.sum_wt * self.sum_wt
denominator = numerator - self.sum_wt2
if denominator > 0:
output = ((numerator / denominator) * self.cov)
else:
output = np.nan
else:
output = self.cov
else:
output = np.nan
return output
[docs]
def has_next(self) -> bool:
return True
[docs]
def reset(self) -> None:
self.avg = None
self.new_wt = 1 if self.adjust else self.alpha
self.old_wt = 1
self.mean_x = None
self.mean_y = None
self.cov = 0
self.sum_wt = 1
self.sum_wt2 = 1
self.old_wt = 1
super().reset()
[docs]
class EWM(Stream[List[float]]):
r"""Provide exponential weighted (EW) functions.
Exactly one parameter: `com`, `span`, `halflife`, or `alpha` must be
provided.
Parameters
----------
com : float, optional
Specify decay in terms of center of mass,
:math:`\alpha = 1 / (1 + com)`, for :math:`com \geq 0`.
span : float, optional
Specify decay in terms of span,
:math:`\alpha = 2 / (span + 1)`, for :math:`span \geq 1`.
halflife : float, str, timedelta, optional
Specify decay in terms of half-life,
:math:`\alpha = 1 - \exp\left(-\ln(2) / halflife\right)`, for
:math:`halflife > 0`.
If ``times`` is specified, the time unit (str or timedelta) over which an
observation decays to half its value. Only applicable to ``mean()``
and halflife value will not apply to the other functions.
alpha : float, optional
Specify smoothing factor :math:`\alpha` directly,
:math:`0 < \alpha \leq 1`.
min_periods : int, default 0
Minimum number of observations in window required to have a value
(otherwise result is NA).
adjust : bool, default True
Divide by decaying adjustment factor in beginning periods to account
for imbalance in relative weightings (viewing EWMA as a moving average).
- When ``adjust=True`` (default), the EW function is calculated using weights
:math:`w_i = (1 - \alpha)^i`. For example, the EW moving average of the series
[:math:`x_0, x_1, ..., x_t`] would be:
.. math::
y_t = \frac{x_t + (1 - \alpha)x_{t-1} + (1 - \alpha)^2 x_{t-2} + ... + (1 -
\alpha)^t x_0}{1 + (1 - \alpha) + (1 - \alpha)^2 + ... + (1 - \alpha)^t}
- When ``adjust=False``, the exponentially weighted function is calculated
recursively:
.. math::
\begin{split}
y_0 &= x_0\\
y_t &= (1 - \alpha) y_{t-1} + \alpha x_t,
\end{split}
ignore_na : bool, default False
Ignore missing values when calculating weights.
- When ``ignore_na=False`` (default), weights are based on absolute positions.
- When ``ignore_na=True``, weights are based on relative positions.
See Also
--------
.. [1] https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.ewm.html
References
----------
.. [1] https://github.com/pandas-dev/pandas/blob/d9fff2792bf16178d4e450fe7384244e50635733/pandas/core/window/ewm.py#L65
"""
def __init__(
self,
com: float = None,
span: float = None,
halflife: float = None,
alpha: float = None,
min_periods: int = 0,
adjust: bool = True,
ignore_na: bool = False):
super().__init__()
self.com = com
self.span = span
self.halflife = halflife
self.min_periods = min_periods
self.adjust = adjust
self.ignore_na = ignore_na
if alpha:
assert 0 < alpha <= 1
self.alpha = alpha
elif com:
assert com >= 0
self.alpha = 1 / (1 + com)
elif span:
assert span >= 1
self.alpha = 2 / (1 + span)
elif halflife:
assert halflife > 0
self.alpha = 1 - np.exp(np.log(0.5) / halflife)
self.history = []
self.weights = []
[docs]
def forward(self) -> "Tuple[List[float], List[float]]":
value = self.inputs[0].value
if self.ignore_na:
if not np.isnan(value):
self.history += [value]
# Compute weights
if not self.adjust and len(self.weights) > 0:
self.weights[-1] *= self.alpha
self.weights += [(1 - self.alpha) ** len(self.history)]
else:
self.history += [value]
# Compute weights
if not self.adjust and len(self.weights) > 0:
self.weights[-1] *= self.alpha
self.weights += [(1 - self.alpha)**len(self.history)]
return self.history, self.weights
[docs]
def has_next(self) -> bool:
return True
[docs]
def mean(self) -> "Stream[float]":
"""Computes the exponential weighted moving average.
Returns
-------
`Stream[float]`
The exponential weighted moving average stream based on the
underlying stream of values.
"""
return ExponentialWeightedMovingAverage(
alpha=self.alpha,
min_periods=self.min_periods,
adjust=self.adjust,
ignore_na=self.ignore_na
)(self.inputs[0]).astype("float")
[docs]
def var(self, bias=False) -> "Stream[float]":
"""Computes the exponential weighted moving variance.
Returns
-------
`Stream[float]`
The exponential weighted moving variance stream based on the
underlying stream of values.
"""
return ExponentialWeightedMovingCovariance(
alpha=self.alpha,
adjust=self.adjust,
ignore_na=self.ignore_na,
min_periods=self.min_periods,
bias=bias
)(self.inputs[0], self.inputs[0]).astype("float")
[docs]
def std(self, bias=False) -> "Stream[float]":
"""Computes the exponential weighted moving standard deviation.
Returns
-------
`Stream[float]`
The exponential weighted moving standard deviation stream based on
the underlying stream of values.
"""
return self.var(bias).sqrt()
[docs]
def reset(self) -> None:
self.history = []
self.weights = []
super().reset()
[docs]
@Float.register(["ewm"])
def ewm(s: "Stream[float]",
com: float = None,
span: float = None,
halflife: float = None,
alpha: float = None,
min_periods: int = 0,
adjust: bool = True,
ignore_na: bool = False) -> "Stream[Tuple[List[float], List[float]]]":
r"""Computes the weights and values in order to perform an exponential
weighted moving operation.
Parameters
----------
s : `Stream[float]`
A float stream.
com : float, optional
Specify decay in terms of center of mass,
:math:`\alpha = 1 / (1 + com)`, for :math:`com \geq 0`.
span : float, optional
Specify decay in terms of span,
:math:`\alpha = 2 / (span + 1)`, for :math:`span \geq 1`.
halflife : float, optional
Specify decay in terms of half-life,
:math:`\alpha = 1 - \exp\left(-\ln(2) / halflife\right)`, for
:math:`halflife > 0`.
alpha : float, optional
Specify smoothing factor :math:`\alpha` directly,
:math:`0 < \alpha \leq 1`.
min_periods : int, default 0
Minimum number of observations in window required to have a value
(otherwise result is NA).
adjust : bool, default True
Divide by decaying adjustment factor in beginning periods to account
for imbalance in relative weightings (viewing EWMA as a moving average).
ignore_na : bool, default False
Ignore missing values when calculating weights.
Returns
-------
`Stream[Tuple[List[float], List[float]]]`
A stream of weights and values to be used for computation of exponential
weighted moving operations.
"""
return EWM(
com=com,
span=span,
halflife=halflife,
alpha=alpha,
min_periods=min_periods,
adjust=adjust,
ignore_na=ignore_na
)(s)