Source code for tensortrade.feed.core.base

# Copyright 2024 The TensorTrade and TensorTrade-NG Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import inspect
from abc import abstractmethod
from typing import (
    Generic,
    Iterable,
    TypeVar,
    Dict,
    Any,
    Callable,
    List,
    Tuple
)

from tensortrade.core import Observable
from tensortrade.feed.core.accessors import CachedAccessor
from tensortrade.feed.core.mixins import DataTypeMixin

T = TypeVar("T")


[docs] class Named: """A class for controlling the naming of objects. The purpose of this class is to control the naming of objects with respect to the `NameSpace` to which they belong to. This prevents conflicts that arise in the naming of similar objects under different contexts. Parameters ---------- name : str, optional The name of the object. Attributes ---------- name : str, optional The name of the object. """ generic_name: str = "generic" namespaces: List[str] = [] names: Dict[str, int] = {} def __init__(self, name: str = None): if not name: name = self.generic_name if name in Stream.names.keys(): Stream.names[name] += 1 name += ":/" + str(Stream.names[name] - 1) else: Stream.names[name] = 0 self.name = name
[docs] def rename(self, name: str, sep: str = ":/") -> "Named": """Renames the instance with respect to the current `NameSpace`. Parameters ---------- name : str The new name to give to the instance. sep : str The separator to put between the name of the `NameSpace` and the new name of the instance (e.g. ns:/example). Returns ------- `Named` The instance that was renamed. """ if len(Named.namespaces) > 0: name = Named.namespaces[-1] + sep + name self.name = name return self
[docs] class NameSpace(Named): """A class providing a context in which to create names. This becomes useful in cases where `Named` object would like to use the same name in a different context. In order to resolve naming conflicts in a `DataFeed`, this class provides a way to solve it. Parameters ---------- name : str The name for the `NameSpace`. """ def __init__(self, name: str) -> None: super().__init__(name) def __enter__(self) -> None: Named.namespaces += [self.name] def __exit__(self, exc_type, exc_val, exc_tb) -> None: Named.namespaces.pop()
[docs] class Stream(Generic[T], Named, Observable): """A class responsible for creating the inputs necessary to work in a `DataFeed`. Parameters ---------- name : str, optional The name fo the stream. dtype : str, optional The data type of the stream. Methods ------- source(iterable, dtype=None) Creates a stream from an iterable. group(streams) Creates a group of streams. sensor(obj,func,dtype=None) Creates a stream from observing a value from an object. select(streams,func) Selects a stream satisfying particular criteria from a list of streams. constant(value,dtype) Creates a stream to generate a constant value. asdtype(dtype) Converts the data type to `dtype`. """ _mixins: "Dict[str, DataTypeMixin]" = {} _accessors: "List[CachedAccessor]" = [] generic_name: str = "stream" def __new__(cls, *args, **kwargs): dtype = kwargs.get("dtype") instance = super().__new__(cls) if dtype in Stream._mixins.keys(): mixin = Stream._mixins[dtype] instance = Stream.extend_instance(instance, mixin) return instance def __init__(self, name: str = None, dtype: str = None): Named.__init__(self, name) Observable.__init__(self) self.dtype = dtype self.inputs = [] self.value = None
[docs] def __call__(self, *inputs) -> "Stream[T]": """Connects the inputs to this stream. Parameters ---------- *inputs : positional arguments The positional arguments, each a stream to be connected as an input to this stream. Returns ------- `Stream[T]` The current stream inputs are being connected to. """ self.inputs = inputs return self
[docs] def run(self) -> None: """Runs the underlying streams once and iterates forward.""" self.value = self.forward() for listener in self.listeners: listener.on_next(self.value)
[docs] @abstractmethod def forward(self) -> T: """Generates the next value from the underlying data streams. Returns ------- `T` The next value in the stream. """ raise NotImplementedError()
[docs] @abstractmethod def has_next(self) -> bool: """Checks if there is another value. Returns ------- bool If there is another value or not. """ raise NotImplementedError()
[docs] def astype(self, dtype: str) -> "Stream[T]": """Converts the data type to `dtype`. Parameters ---------- dtype : str The data type to be converted to. Returns ------- `Stream[T]` The same stream with the new underlying data type `dtype`. """ self.dtype = dtype mixin = Stream._mixins[dtype] return Stream.extend_instance(self, mixin)
[docs] def reset(self) -> None: """Resets all inputs to and listeners of the stream and sets stream value to None.""" for listener in self.listeners: if hasattr(listener, "reset"): listener.reset() for stream in self.inputs: stream.reset() self.value = None
[docs] def gather(self) -> "List[Tuple[Stream, Stream]]": """Gathers all the edges of the DAG connected in ancestry with this stream. Returns ------- `List[Tuple[Stream, Stream]]` The list of edges connected through ancestry to this stream. """ return self._gather(self, [], [])
[docs] @staticmethod def source(iterable: "Iterable[T]", dtype: str = None) -> "Stream[T]": """Creates a stream from an iterable. Parameters ---------- iterable : `Iterable[T]` The iterable to create the stream from. dtype : str, optional The data type of the stream. Returns ------- `Stream[T]` The stream with the data type `dtype` created from `iterable`. """ return IterableStream(iterable, dtype=dtype)
[docs] @staticmethod def group(streams: "List[Stream[T]]") -> "Stream[dict]": """Creates a group of streams. Parameters ---------- streams : `List[Stream[T]]` Streams to be grouped together. Returns ------- `Stream[dict]` A stream of dictionaries with each stream as a key/value in the dictionary being generated. """ return Group()(*streams)
[docs] @staticmethod def sensor(obj: "Any", func: "Callable[[Any], T]", dtype: str = None) -> "Stream[T]": """Creates a stream from observing a value from an object. Parameters ---------- obj : `Any` An object to observe values from. func : `Callable[[Any], T]` A function to extract the data to be observed from the object being watched. dtype : str, optional The data type of the stream. Returns ------- `Stream[T]` The stream of values being observed from the object. """ return Sensor(obj, func, dtype=dtype)
[docs] @staticmethod def select(streams: "List[Stream[T]]", func: "Callable[[Stream[T]], bool]") -> "Stream[T]": """Selects a stream satisfying particular criteria from a list of streams. Parameters ---------- streams : `List[Stream[T]]` A list of streams to select from. func : `Callable[[Stream[T]], bool]` The criteria to be used for finding the particular stream. Returns ------- `Stream[T]` The particular stream being selected. Raises ------ AttributeError Raised if no stream is found to satisfy the given criteria. """ for s in streams: if func(s): return s raise AttributeError("No stream satisfies selector condition.")
[docs] @staticmethod def constant(value: "T", dtype: str = None) -> "Stream[T]": """Creates a stream to generate a constant value. Parameters ---------- value : `T` The constant value to be streamed. dtype : str, optional The data type of the value. Returns ------- `Stream[T]` A stream of the constant value. """ return Constant(value, dtype=dtype)
[docs] @staticmethod def placeholder(dtype: str = None) -> "Stream[T]": """Creates a placholder stream for data to provided to at a later date. Parameters ---------- dtype : str The data type that will be provided. Returns ------- `Stream[T]` A stream representing a placeholder. """ return Placeholder(dtype=dtype)
@staticmethod def _gather(stream: "Stream", vertices: "List[Stream]", edges: "List[Tuple[Stream, Stream]]") -> "List[Tuple[Stream, Stream]]": """Gathers all the edges relating back to this particular node. Parameters ---------- stream : `Stream` The stream to inspect the connections of. vertices : `List[Stream]` The list of streams that have already been inspected. edges : `List[Tuple[Stream, Stream]]` The connections that have been found to be in the graph at the moment not including `stream`. Returns ------- `List[Tuple[Stream, Stream]]` The updated list of edges after inspecting `stream`. """ if stream not in vertices: vertices += [stream] for s in stream.inputs: edges += [(s, stream)] for s in stream.inputs: Stream._gather(s, vertices, edges) return edges
[docs] @staticmethod def toposort(edges: "List[Tuple[Stream, Stream]]") -> "List[Stream]": """Sorts the order in which streams should be run. Parameters ---------- edges : `List[Tuple[Stream, Stream]]` The connections that have been found in the DAG. Returns ------- `List[Stream]` The list of streams sorted with respect to the order in which they should be run. """ src = set([s for s, t in edges]) tgt = set([t for s, t in edges]) starting = list(src.difference(tgt)) process = starting.copy() while len(starting) > 0: start = starting.pop() edges = list(filter(lambda e: e[0] != start, edges)) src = set([s for s, t in edges]) tgt = set([t for s, t in edges]) starting += [v for v in src.difference(tgt) if v not in starting] if start not in process: process += [start] return process
[docs] @classmethod def register_accessor(cls, name: str): """A class decorator that registers an accessor providing useful methods for a particular data type.. Sets the data type accessor to be an attribute of this class. Parameters ---------- name : str The name of the data type. """ def wrapper(accessor): setattr(cls, name, CachedAccessor(name, accessor)) cls._accessors += [name] return accessor return wrapper
[docs] @classmethod def register_mixin(cls, dtype: str): """A class decorator the registers a data type mixin providing useful methods directly to the instance of the class. Parameters ---------- dtype : str The name of the data type the mixin is being registered for. """ def wrapper(mixin): cls._mixins[dtype] = mixin return mixin return wrapper
[docs] @classmethod def register_generic_method(cls, names: "List[str]"): """A function decorator that registers the decorated function with the names provided as a method to the `Stream` class. These methods can be used for any instance of `Stream`. Parameters ---------- names : `List[str]` The list of names to be used as aliases for the same method. """ def wrapper(func): def method(self, *args, **kwargs): args = (self,) + args return func(*args, **kwargs) for name in names: setattr(Stream, name, method) return method return wrapper
[docs] @staticmethod def extend_instance(instance: "Stream[T]", mixin: "DataTypeMixin") -> "Stream[T]": """Apply mix-ins to a class instance after creation. Parameters ---------- instance : `Stream[T]` An instantiation of `Stream` to be injected with mixin methods. mixin : `DataTypeMixin` The mixin holding the methods to be injected into the `instance`. Returns ------- `Stream[T]` The `instance` with the injected methods provided by the `mixin`. """ base_cls = instance.__class__ base_cls_name = instance.__class__.__name__ instance.__class__ = type(base_cls_name, (base_cls, mixin), {}) return instance
[docs] class IterableStream(Stream[T]): """A private class used the `Stream` class for creating data sources. Parameters ---------- source : `Iterable[T]` The iterable to be used for providing the data. dtype : str, optional The data type of the source. """ generic_name = "stream" def __init__(self, source: "Iterable[T]", dtype: str = None): super().__init__(dtype=dtype) self.is_gen = False self.iterable = None if inspect.isgeneratorfunction(source): self.gen_fn = source self.is_gen = True self.generator = self.gen_fn() else: self.iterable = source self.generator = iter(source) self.stop = False try: self.current = next(self.generator) except StopIteration: self.stop = True self._random_start = 0
[docs] def forward(self) -> T: v = self.current try: self.current = next(self.generator) except StopIteration: self.stop = True return v
[docs] def has_next(self): return not self.stop
[docs] def reset(self, random_start=0): if random_start != 0: self._random_start = random_start if self.is_gen: self.generator = self.gen_fn() else: self.generator = iter(self.iterable[self._random_start:]) self.stop = False try: self.current = next(self.generator) except StopIteration: self.stop = True super().reset()
[docs] class Group(Stream[T]): """A stream that groups together other streams into a dictionary.""" def __init__(self): super().__init__() def __call__(self, *inputs): self.inputs = inputs self.streams = {s.name: s for s in inputs} return self
[docs] def forward(self) -> "Dict[T]": return {s.name: s.value for s in self.inputs}
def __getitem__(self, name) -> "Stream[T]": return self.streams[name]
[docs] def has_next(self) -> bool: return True
[docs] class Sensor(Stream[T]): """A stream that watches and generates from a particular object.""" generic_name = "sensor" def __init__(self, obj, func, dtype=None): super().__init__(dtype=dtype) self.obj = obj self.func = func
[docs] def forward(self) -> T: return self.func(self.obj)
[docs] def has_next(self): return True
[docs] class Constant(Stream[T]): """A stream that generates a constant value.""" generic_name = "constant" def __init__(self, value, dtype: str = None): super().__init__(dtype=dtype) self.constant = value
[docs] def forward(self): return self.constant
[docs] def has_next(self): return True
[docs] class Placeholder(Stream[T]): """A stream that acts as a placeholder for data to be provided at later date. """ generic_name = "placeholder" def __init__(self, dtype: str = None) -> None: super().__init__(dtype=dtype)
[docs] def push(self, value: 'T') -> None: self.value = value
[docs] def forward(self) -> 'T': return self.value
[docs] def has_next(self) -> bool: return True
[docs] def reset(self) -> None: self.value = None