Source code for tensortrade.feed.api.float.window.expanding
"""expanding.py contains functions and classes for expanding stream operations."""fromtypingimportCallable,Listimportnumpyasnpfromtensortrade.feed.core.baseimportStreamfromtensortrade.feed.api.floatimportFloat
[docs]classExpandingNode(Stream[float]):"""A stream operator for aggregating an entire history of a stream. Parameters ---------- func : `Callable[[List[float]], float]` A function that aggregates the history of a stream. """def__init__(self,func:"Callable[[List[float]], float]")->None:super().__init__()self.func=func
[docs]classExpandingCount(ExpandingNode):"""A stream operator that counts the number of non-missing values."""def__init__(self)->None:super().__init__(lambdaw:(~np.isnan(w)).sum())
[docs]classExpanding(Stream[List[float]]):"""A stream that generates the entire history of a stream at each time step. Parameters ---------- min_periods : int, default 1 The number of periods to wait before producing values from the aggregation function. """generic_name="expanding"def__init__(self,min_periods:int=1)->None:super().__init__()self.min_periods=min_periodsself.history=[]
[docs]defagg(self,func:Callable[[List[float]],float])->"Stream[float]":"""Computes an aggregation of a stream's history. Parameters ---------- func : `Callable[[List[float]], float]` A aggregation function. Returns ------- `Stream[float]` A stream producing aggregations of the stream history at each time step. """returnExpandingNode(func)(self).astype("float")
[docs]defcount(self)->"Stream[float]":"""Computes an expanding count fo the underlying stream. Returns ------- `Stream[float]` An expanding count stream. """returnExpandingCount()(self).astype("float")
[docs]defsum(self)->"Stream[float]":"""Computes an expanding sum fo the underlying stream. Returns ------- `Stream[float]` An expanding sum stream. """returnself.agg(np.sum).astype("float")
[docs]defmean(self)->"Stream[float]":"""Computes an expanding mean fo the underlying stream. Returns ------- `Stream[float]` An expanding mean stream. """returnself.agg(np.mean).astype("float")
[docs]defvar(self)->"Stream[float]":"""Computes an expanding variance fo the underlying stream. Returns ------- `Stream[float]` An expanding variance stream. """returnself.agg(lambdax:np.var(x,ddof=1)).astype("float")
[docs]defmedian(self)->"Stream[float]":"""Computes an expanding median fo the underlying stream. Returns ------- `Stream[float]` An expanding median stream. """returnself.agg(np.median).astype("float")
[docs]defstd(self)->"Stream[float]":"""Computes an expanding standard deviation fo the underlying stream. Returns ------- `Stream[float]` An expanding standard deviation stream. """returnself.agg(lambdax:np.std(x,ddof=1)).astype("float")
[docs]defmin(self)->"Stream[float]":"""Computes an expanding minimum fo the underlying stream. Returns ------- `Stream[float]` An expanding minimum stream. """returnself.agg(np.min).astype("float")
[docs]defmax(self)->"Stream[float]":"""Computes an expanding maximum fo the underlying stream. Returns ------- `Stream[float]` An expanding maximum stream. """returnself.agg(np.max).astype("float")
[docs]@Float.register(["expanding"])defexpanding(s:"Stream[float]",min_periods:int=1)->"Stream[List[float]]":"""Computes a stream that generates the entire history of a stream at each time step. Parameters ---------- s : `Stream[float]` A float stream. min_periods : int, default 1 The number of periods to wait before producing values from the aggregation function. """returnExpanding(min_periods=min_periods)(s)