Source code for pybeehive.asyn.hive
from contextlib import contextmanager
import asyncio
import inspect
from ..hive import Hive as SyncHive
from .core import Listener, Streamer
from .utils import AsyncGenerator
try:
from .socket import SocketListener, SocketStreamer
# This is tested, just not by patching imports
except ImportError: # pragma: nocover
SocketListener, SocketStreamer = None, None # pragma: nocover
async def _loop_async(event_queue, listeners, kill_event):
while not kill_event.is_set():
# This try except mimics 'await queue.get()',
# but continuously yields control back to loop
# in order to allow graceful shutdown
try:
event = event_queue.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(1e-3)
else:
await asyncio.gather(*[
bee.notify(event) for bee in listeners
])
[docs]class Hive(SyncHive):
_event_class = asyncio.Event
_listener_class = Listener
_streamer_class = Streamer
_socket_listener_class = SocketListener
_socket_streamer_class = SocketStreamer
def __init__(self):
super(Hive, self).__init__()
# This is set at runtime depending on the run context
self.loop = None
self._event_queue = asyncio.Queue()
def _wrap_stream(self, stream_func):
def stream(s):
# pre-python3.6 wrapping for async function
if inspect.iscoroutinefunction(stream_func):
return AsyncGenerator(stream_func)
# python3.6 or closure usage: return async generator
return stream_func()
return stream
def _run(self):
self._set_loop()
with self._setup_teardown_streamers() as jobs:
with self._setup_teardown_listeners():
task = asyncio.ensure_future(asyncio.gather(
*jobs,
_loop_async(
self._event_queue, self.listeners, self.kill_event
)
))
try:
self.logger.info("The hive is now live!")
self.loop.run_until_complete(task)
except KeyboardInterrupt:
pass # Need explicit catch here
finally:
self.logger.info("Shutting down hive...")
task.cancel()
self.close()
def _set_loop(self):
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
# Create new event loop when called from a thread
self.loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(self.loop)
@contextmanager
def _setup_teardown_listeners(self):
setup_futures = self.listeners.call_method_recursively('setup')
if setup_futures:
self.loop.run_until_complete(asyncio.gather(
*setup_futures, return_exceptions=True))
yield
teardown_futures = self.listeners.call_method_recursively('teardown')
if teardown_futures:
self.loop.run_until_complete(asyncio.gather(
*teardown_futures, return_exceptions=True))
@contextmanager
def _setup_teardown_streamers(self):
jobs = self.loop.run_until_complete(asyncio.gather(
*[self._setup_streamer(s) for s in self.streamers]
))
yield jobs
self.loop.run_until_complete(asyncio.gather(
*[self._teardown_streamer(s) for s in self.streamers]
))
async def _setup_streamer(self, streamer):
try:
await streamer.setup()
except Exception as e:
self.logger.exception("setup %s - %s", streamer, repr(e))
else:
self.logger.debug("setup %s - OK", streamer)
return streamer.run()
async def _teardown_streamer(self, streamer):
streamer.kill()
try:
await streamer.teardown()
self.logger.exception("teardown %s - OK", str(streamer))
except Exception as e:
self.logger.exception("teardown %s - %s", str(streamer), repr(e))