[docs]classApply(Stream[K]):"""An operator stream that applies a specific function to the values of a given stream. Parameters ---------- func : `Callable[[T], ...]` A function to be applied to the values of a stream. dtype : str, optional The data type of the values after function is applied. """def__init__(self,func:Callable[[T],K],dtype:str=None)->None:super().__init__(dtype=dtype)self.func=func
[docs]classLag(Stream[T]):"""An operator stream that returns the lagged value of a given stream. Parameters ---------- lag : int The number of steps to lag behind by dtype : str, optional The data type of the stream """generic_name="lag"def__init__(self,lag:int=1,dtype:str=None)->None:super().__init__(dtype=dtype)self.lag=lagself.runs=0self.history=[]
[docs]classAccumulator(Stream[T]):"""An operator stream that accumulates values of a given stream. Parameters ---------- func : Callable[[T,T], T] An accumulator function. dtype : str The data type of accumulated value. """def__init__(self,func:"Callable[[T, T], T]",dtype:str=None)->None:super().__init__(dtype)self.func=funcself.past=None
[docs]classCopy(Stream[T]):"""A stream operator that copies the values of a given stream."""generic_name="copy"def__init__(self)->None:super().__init__()
[docs]classFreeze(Stream[T]):"""A stream operator that freezes the value of a given stream and generates that value."""generic_name="freeze"def__init__(self)->None:super().__init__()self.freeze_value=None
[docs]classBinOp(Stream[T]):"""A stream operator that combines the values of two given streams into one value of the same type. Parameters ---------- op : `Callable[[T, T], T]` The binary operation to be applied. dtype : str, optional The data type of the stream. """generic_name="bin_op"def__init__(self,op:Callable[[T,T],T],dtype:str=None)->None:super().__init__(dtype=dtype)self.op=op