Source code for pybeehive.asyn.core
import asyncio
from abc import abstractmethod
from ..core import Event, Listener as SyncListener, Streamer as SyncStreamer
from .utils import AsyncContextManager
[docs]class Listener(SyncListener):
[docs] @abstractmethod
async def on_event(self, event):
raise NotImplementedError # pragma: nocover
[docs] async def notify(self, event):
if event:
try:
event = await self.on_event(event)
event = Event(event)
except Exception as e:
self.on_exception(e)
await asyncio.gather(*[
bee.notify(event) for bee in self.chained_bees
])
[docs] async def setup(self):
pass
[docs] async def teardown(self):
pass
[docs]class Streamer(SyncStreamer):
_event_class = asyncio.Event
[docs] @abstractmethod
async def stream(self):
raise NotImplementedError # pragma: nocover
[docs] async def setup(self):
pass
[docs] async def teardown(self):
pass
[docs] async def run(self):
self._assert_queue_is_set()
while self.alive:
try:
async with AsyncContextManager(self.stream()) as stream:
async for data in stream:
event = Event(data, topic=self.topic)
await self._q.put(event)
# break long running streams if the kill event is set
if not self.alive:
break
except Exception as e:
self.on_exception(e)