import inspect
from collections import defaultdict
from contextlib import contextmanager
# from .utils import Queue # TODO: Optimize queue
from queue import Queue
from queue import Empty
from threading import Thread
from .core import Listener, Streamer, Event, Killable
from .logging import create_logger, debug_handler, default_handler
try:
from .socket import SocketListener, SocketStreamer
# This is tested, just not by patching imports
except ImportError: # pragma: nocover
SocketListener, SocketStreamer = None, None # pragma: nocover
def _loop(event_queue, listeners, kill_event):
while not kill_event.is_set():
# Timeout on get and Empty catch ensure threads
# are not waiting forever for an item in the queue
try:
event = event_queue.get(timeout=0.001)
for bee in listeners:
bee.notify(event)
except Empty:
continue
except KeyboardInterrupt:
break
[docs]class Hive(Killable):
"""
"""
_listener_class = Listener
_streamer_class = Streamer
_socket_listener_class = SocketListener
_socket_streamer_class = SocketStreamer
def __init__(self):
super(Hive, self).__init__()
self.streamers = []
self.listeners = _ListenerTree()
self._event_queue = Queue()
self.logger = create_logger(handler=default_handler)
[docs] def add(self, *bees):
"""
:param bees:
"""
for bee in bees:
try:
assert isinstance(bee, Listener), \
'Bee must be an instance of Listener or Streamer'
self.listeners.add_listener(bee)
except AssertionError:
assert isinstance(bee, Streamer), \
'Bee must be an instance of Listener or Streamer'
bee.set_queue(self._event_queue)
self.streamers.append(bee)
[docs] def listener(self, chain=None, filters=None, **kwargs):
"""
:param chain:
:param filters:
:param kwargs:
:return:
"""
# for single decorator usage 'chain' is the on_event function
if inspect.isfunction(chain):
self._create_listener(chain, **kwargs)
else:
if chain:
self.listeners.validate_chain(chain)
if filters:
if not isinstance(filters, (list, set)):
filters = [filters]
self.listeners.validate_filters(filters)
def wrapper(f):
self._create_listener(f, chain, filters, **kwargs)
return wrapper
[docs] def streamer(self, topic=None, **kwargs):
"""
:param topic:
:param kwargs:
:return:
"""
# for single decorator usage 'topic' is the stream function
if inspect.isfunction(topic):
self._create_streamer(topic, **kwargs)
else:
if topic:
self.listeners.validate_filters([topic])
def wrapper(f):
self._create_streamer(f, topic=topic, **kwargs)
return wrapper
[docs] def socket_listener(self, address, chain=None, filters=None):
"""
:param address:
:param chain:
:param filters:
:return:
"""
if self._socket_listener_class is None:
raise RuntimeError('pyzmq required to create pybeehive sockets')
def wrapped(f):
return self.listener(
chain=chain, filters=filters,
klass=self._socket_listener_class, klass_args=(address,),
method_name='parse_event'
)(f)
return wrapped
[docs] def socket_streamer(self, address, topic=None):
"""
:param address:
:param topic:
:return:
"""
if self._socket_streamer_class is None:
raise RuntimeError('pyzmq required to create pybeehive sockets')
def wrapped(f):
return self.streamer(
topic=topic,
klass=self._socket_streamer_class, klass_args=(address,),
method_name=None
)(f)
return wrapped
[docs] def submit_event(self, event):
"""
:param event:
:return:
"""
assert isinstance(event, Event), "Can only submit Events to the Hive"
self._event_queue.put_nowait(event)
[docs] def run(self, threaded=False, debug=False):
"""
:param threaded:
:param debug:
:return:
"""
if debug:
self.logger.addHandler(debug_handler)
if threaded:
worker = Thread(target=self._run)
worker.start()
return worker
else:
self._run()
[docs] def close(self):
"""
:return:
"""
self.kill()
for streamer in self.streamers:
streamer.kill()
def _wrap_stream(self, stream_func):
return lambda s: stream_func()
def _create_listener(self, func, chain=None, filters=None,
klass=None, klass_args=(), method_name='on_event'):
_Listener = type(func.__name__, (klass or self._listener_class,), {
method_name: lambda s, e: func(e)
})
self.listeners.add_listener(
_Listener(*klass_args, filters=filters), chain=chain
)
def _create_streamer(self, func, topic=None,
klass=None, klass_args=(), method_name='stream'):
if method_name == 'stream':
klass_dict = {method_name: self._wrap_stream(func)}
else:
klass_dict = {}
klass = klass or self._streamer_class
_Streamer = type(func.__name__, (klass,), klass_dict)
self.add(_Streamer(*klass_args, topic=topic))
def _run(self):
with self._setup_teardown_streamers():
with self._setup_teardown_listeners():
self.logger.info("The hive is now live!")
try:
_loop(self._event_queue, self.listeners, self.kill_event)
finally:
self.logger.info("Shutting down hive...")
self.close()
@contextmanager
def _setup_teardown_listeners(self):
self.listeners.call_method_recursively('setup')
yield
self.listeners.call_method_recursively('teardown')
@contextmanager
def _setup_teardown_streamers(self):
for streamer in self.streamers:
self._setup_streamer(streamer)
yield
for streamer in self.streamers:
self._teardown_streamer(streamer)
def _setup_streamer(self, streamer):
try:
streamer.setup()
except Exception as e:
self.logger.exception("setup %s - %s", str(streamer), repr(e))
else:
self.logger.debug("setup %s - OK", str(streamer))
streamer.thread = Thread(target=streamer.run)
streamer.thread.start()
def _teardown_streamer(self, streamer):
try:
streamer.kill()
streamer.teardown()
streamer.thread.join()
self.logger.debug("teardown %s - OK", str(streamer))
except Exception as e:
self.logger.exception("teardown %s - %s", str(streamer), repr(e))
class _ListenerTree:
def __init__(self):
self._listeners = defaultdict(list)
self.logger = create_logger(name='pybeehive.hive.listeners')
def __iter__(self):
for v in self._listeners.values():
for l in v:
yield l
def __len__(self):
length = 0
for _ in self:
length += 1
return length
def add_listener(self, listener, chain=None):
if chain:
self.chain(listener, chain)
else:
self._listeners[listener.__class__.__name__].append(listener)
def call_method_recursively(self, method_name, *args, **kwargs):
results = []
for bee in self._recursive_iter():
try:
result = bee.__getattribute__(method_name)(*args, **kwargs)
except Exception as e:
self.logger.exception(
"%s %s - %s", method_name, str(bee), repr(e))
else:
self.logger.debug("%s %s - OK", method_name, str(bee))
results.append(result)
return results
def chain(self, listener, chain):
bees_to_chain = []
if isinstance(chain, str):
bees_to_chain.extend(self.listeners_by_name(chain))
else:
for c in chain:
bees_to_chain.extend(self.listeners_by_name(c))
for bee in bees_to_chain:
bee.chain(listener)
def listeners_by_name(self, name):
listeners = []
for listener in self._recursive_iter():
if listener.__class__.__name__ == name:
listeners.append(listener)
return listeners
@staticmethod
def validate_chain(c):
if isinstance(c, str):
return
elif isinstance(c, list) and all(isinstance(i, str) for i in c):
return
else:
raise TypeError("chain must be a string or a list of strings")
@staticmethod
def validate_filters(items):
for item in items:
try:
hash(item)
except TypeError:
raise TypeError("filter %s is not hashable" % str(item))
def _recursive_iter(self, listeners=None, visited=None):
visited = visited or []
if listeners is None:
listeners = self
for listener in listeners:
if listener not in visited:
visited.append(listener)
yield listener
for _l in self._recursive_iter(listener.chained_bees, visited):
if _l not in visited:
visited.append(_l)
yield _l