Source code for tensortrade.feed.api.generic.reduce
"""reduce.py contains functions and classes for reducing multiple streamsinto a single stream."""fromtypingimportList,Callableimportnumpyasnpfromtensortrade.feed.core.baseimportStream,T
[docs]classAggregate(Stream[T]):"""A multi-stream operator for aggregating multiple streams into a single stream. Parameters ---------- func : `Callable[[List[Stream]], T]` A function for aggregating the value of multiple streams. """generic_name="reduce"def__init__(self,func:Callable[[List[T]],T]):super().__init__()self.func=func
[docs]classReduce(Stream[list]):"""A stream for reducing multiple streams of the same type. Parameters ---------- dtype : str, optional The data type of the aggregated stream. """def__init__(self,dtype:str=None):super().__init__(dtype=dtype)
[docs]defagg(self,func:"Callable[[List[T]], T]")->"Stream[T]":"""Computes the aggregation of the input streams. Returns ------- `Stream[T]` An aggregated stream of the input streams. """returnAggregate(func)(*self.inputs).astype(self.inputs[0].dtype)
[docs]defsum(self)->"Stream[T]":"""Computes the reduced sum of the input streams. Returns ------- `Stream[T]` A reduced sum stream. """returnself.agg(np.sum)
[docs]defmin(self)->"Stream[T]":"""Computes the reduced minimum of the input streams. Returns ------- `Stream[T]` A reduced minimum stream. """returnself.agg(np.min)
[docs]defmax(self)->"Stream[T]":"""Computes the reduced maximum of the input streams. Returns ------- `Stream[T]` A reduced maximum stream. """returnself.agg(np.max)
[docs]defprod(self)->"Stream[T]":"""Computes the reduced product of the input streams. Returns ------- `Stream[T]` A reduced product stream. """returnself.agg(np.prod)
@Stream.register_generic_method(["reduce"])defreduce(streams:"List[Stream[T]]")->"Stream[List[T]]":"""Creates a reduce stream from given input streams. Parameters ---------- streams : `List[Stream[T]]` A list of input streams to be aggregated. Returns ------- `Stream[List[T]] A reduce stream that generates a list of values all from the input streams. """returnReduce()(*streams)