Source code for tensortrade.feed.api.float.window.rolling
"""rolling.py contains functions and classes for rolling stream operations."""fromtypingimportList,Callableimportnumpyasnpfromtensortrade.feed.core.baseimportStreamfromtensortrade.feed.api.floatimportFloat
[docs]classRollingNode(Stream[float]):"""A stream operator for aggregating a rolling window of a stream. Parameters ---------- func : `Callable[[List[float]], float]` A function that aggregates a rolling window. """def__init__(self,func:"Callable[[List[float]], float]"):super().__init__(dtype="float")self.func=funcself.n=0
[docs]classRollingCount(RollingNode):"""A stream operator that counts the number of non-missing values in the rolling window."""def__init__(self):super().__init__(lambdaw:(~np.isnan(w)).sum())
[docs]classRolling(Stream[List[float]]):"""A stream that generates a rolling window of values from a stream. Parameters ---------- window : int The size of the rolling window. min_periods : int, default 1 The number of periods to wait before producing values from the aggregation function. """generic_name="rolling"def__init__(self,window:int,min_periods:int=1)->None:super().__init__()assertmin_periods<=windowself.window=windowself.min_periods=min_periodsself.n=0self.nan=0self.history=[]
[docs]defagg(self,func:"Callable[[List[float]], float]")->"Stream[float]":"""Computes an aggregation of a rolling window of values. Parameters ---------- func : `Callable[[List[float]], float]` A aggregation function. Returns ------- `Stream[float]` A stream producing aggregations of a rolling window of values. """returnRollingNode(func)(self).astype("float")
[docs]defcount(self)->"Stream[float]":"""Computes a rolling count from the underlying stream. Returns ------- `Stream[float]` A rolling count stream. """returnRollingCount()(self).astype("float")
[docs]defsum(self)->"Stream[float]":"""Computes a rolling sum from the underlying stream. Returns ------- `Stream[float]` A rolling sum stream. """func=np.nansumifself.min_periods<self.windowelsenp.sumreturnself.agg(func).astype("float")
[docs]defmean(self)->"Stream[float]":"""Computes a rolling mean from the underlying stream. Returns ------- `Stream[float]` A rolling mean stream. """func=np.nanmeanifself.min_periods<self.windowelsenp.meanreturnself.agg(func).astype("float")
[docs]defvar(self)->"Stream[float]":"""Computes a rolling variance from the underlying stream. Returns ------- `Stream[float]` A rolling variance stream. """deffunc1(x):returnnp.nanvar(x,ddof=1)deffunc2(x):returnnp.var(x,ddof=1)func=func1ifself.min_periods<self.windowelsefunc2returnself.agg(func).astype("float")
[docs]defmedian(self)->"Stream[float]":"""Computes a rolling median from the underlying stream. Returns ------- `Stream[float]` A rolling median stream. """func=np.nanmedianifself.min_periods<self.windowelsenp.medianreturnself.agg(func).astype("float")
[docs]defstd(self)->"Stream[float]":"""Computes a rolling standard deviation from the underlying stream. Returns ------- `Stream[float]` A rolling standard deviation stream. """returnself.var().sqrt()
[docs]defmin(self)->"Stream[float]":"""Computes a rolling minimum from the underlying stream. Returns ------- `Stream[float]` A rolling minimum stream. """func=np.nanminifself.min_periods<self.windowelsenp.minreturnself.agg(func).astype("float")
[docs]defmax(self)->"Stream[float]":"""Computes a rolling maximum from the underlying stream. Returns ------- `Stream[float]` A rolling maximum stream. """func=np.nanmaxifself.min_periods<self.windowelsenp.maxreturnself.agg(func).astype("float")
[docs]@Float.register(["rolling"])defrolling(s:"Stream[float]",window:int,min_periods:int=1)->"Stream[List[float]]":"""Creates a stream that generates a rolling window of values from a stream. Parameters ---------- s : `Stream[float]` A float stream. window : int The size of the rolling window. min_periods : int, default 1 The number of periods to wait before producing values from the aggregation function. """returnRolling(window=window,min_periods=min_periods)(s)