Source code for pybeehive.core

import pickle
from abc import ABC, abstractmethod
from threading import Event as _Event
from time import time


[docs]class Event: """ :param data: :param topic: :param created_at: """ def __init__(self, data, topic=None, created_at=None): if isinstance(data, Event): self.data = data.data self.topic = topic or data.topic if created_at: self.created_at = created_at self.id = self.create_id(self.data, created_at) else: self.created_at = data.created_at self.id = data.id else: self.data = data self.topic = topic self.created_at = created_at or time() self.id = self.create_id(self.data, self.created_at) def __eq__(self, other): return isinstance(other, Event) \ and self.id == other.id \ and self.topic == other.topic def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return self.id def __str__(self): return 'Event(created_at={}, data={})'.format( int(self.created_at), str(self.data)[:100] )
[docs] @staticmethod def create_id(data, time_created): """ :param data: :param time_created: :return: """ return hash(str(data) + str(time_created))
[docs] @staticmethod def fromstring(string): """ :param string: :return: """ return pickle.loads(string)
[docs] def tostring(self): """ :return: """ return pickle.dumps(self)
[docs]class Listener(ABC): """ :param filters: """ def __init__(self, filters=None): super(Listener, self).__init__() self.chained_bees = [] self.filters = set(filters or []) def __str__(self): return "%s(filters=%s)" % (self.__class__.__name__, self.filters)
[docs] def chain(self, bee): """ :param bee: :return: """ # static usage with list for many-to-one if isinstance(self, list): for other in self: other.chain(bee) else: self.chained_bees.append(bee) return bee
[docs] def filter(self, event): """ :param event: :return: """ # If no filters are defined then listens to all events return not self.filters or event.topic in self.filters
[docs] def notify(self, event): """ :param event: :return: """ if event and self.filter(event): try: result = self.on_event(event) if result is not None: event = Event(result) # Only propagate events that this listener can accept for bee in self.chained_bees: bee.notify(event) except Exception as e: self.on_exception(e) if not event: for bee in self.chained_bees: bee.notify(event) self.teardown()
[docs] @abstractmethod def on_event(self, event): """ :param event: :return: """ raise NotImplementedError # pragma: nocover
[docs] def on_exception(self, exception): """ :param exception: """ pass
[docs] def setup(self): """ """ pass
[docs] def teardown(self): """ """ pass
[docs]class Killable: """ """ _event_class = _Event def __init__(self): self.kill_event = self._event_class() @property def alive(self): """ :return: """ return not self.kill_event.is_set()
[docs] def kill(self): """ """ self.kill_event.set()
[docs]class Streamer(Killable, ABC): """ :param topic: """ def __init__(self, topic=None): super(Streamer, self).__init__() self.topic = topic self._q = None # this is set when a streamer is added to a hive def __str__(self): return "%s(topic=%s)" % (self.__class__.__name__, self.topic)
[docs] @abstractmethod def stream(self): """ :return: """ raise NotImplementedError # pragma: nocover
[docs] def on_exception(self, exception): """ :param exception: """ pass
[docs] def setup(self): """ """ pass
[docs] def teardown(self): """ """ pass
[docs] def set_queue(self, q): """ :param q: """ self._q = q
[docs] def run(self): """ """ self._assert_queue_is_set() while self.alive: try: for data in self.stream(): self._q.put(Event(data, topic=self.topic)) # break long running streams if the kill event is set if not self.alive: break except Exception as e: self.on_exception(e)
def _assert_queue_is_set(self): assert self._q is not None, \ "You must first set the output queue with " \ "Streamer.set_queue before running a Streamer"