[docs]classDataFeed(Stream[dict]):"""A stream the compiles together streams to be run in an organized manner. Parameters ---------- streams : `List[Stream]` A list of streams to be used in the data feed. """def__init__(self,streams:"List[Stream]")->None:super().__init__()self.process=Noneself.compiled=Falseifstreams:self.__call__(*streams)
[docs]defcompile(self)->None:"""Compiles all the given stream together. Organizes the order in which streams should be run to get valid output. """edges=self.gather()self.process=self.toposort(edges)self.compiled=Trueself.reset()
[docs]defrun(self)->None:"""Runs all the streams in processing order."""ifnotself.compiled:self.compile()forsinself.process:s.run()super().run()
[docs]classPushFeed(DataFeed):"""A data feed for working with live data in an online manner. All sources of data to be used with this feed must be a `Placeholder`. This ensures that the user can wait until all of their data has been loaded for the next time step. Parameters ---------- streams : `List[Stream]` A list of streams to be used in the data feed. """def__init__(self,streams:"List[Stream]"):super().__init__(streams)self.compile()edges=self.gather()src=set([sfors,tinedges])tgt=set([tfors,tinedges])self.start=[sforsinsrc.difference(tgt)ifisinstance(s,Placeholder)]@propertydefis_loaded(self):returnall([s.valueisnotNoneforsinself.start])
[docs]defpush(self,data:dict)->dict:"""Generates the values from the data feed based on the values being provided in `data`. Parameters ---------- data : dict The data to be pushed to each of the placholders in the feed. Returns ------- dict The next data point generated from the feed based on `data`. """forsinself.start:s.push(data[s.name])output=self.next()forsinself.start:s.value=Nonereturnoutput
[docs]defnext(self)->dict:ifnotself.is_loaded:raiseException("No data has been pushed to the feed.")self.run()returnself.value