Source code for pybeehive.asyn.socket
from zmq.asyncio import Context, Poller
import asyncio
import zmq
from ..core import Event, Killable
from .core import Streamer, Listener
from .utils import AsyncGenerator
[docs]class Server(Killable):
_event_class = asyncio.Event
def __init__(self, address):
super(Server, self).__init__()
self.address = address
self.queue = asyncio.Queue()
self.context = Context.instance()
self.socket = self.context.socket(zmq.PULL)
self.poller = Poller()
self._listen_future = None
async def _receive_into_queue(self):
while self.alive:
try:
events = await self.poller.poll(timeout=1e-4)
if self.socket in dict(events):
data = await self.socket.recv()
await self.queue.put(data)
except zmq.error.ZMQError:
await asyncio.sleep(1e-4)
[docs] async def start(self):
self.socket.bind('tcp://%s:%s' % self.address)
self.poller.register(self.socket, zmq.POLLIN)
self._listen_future = asyncio.ensure_future(self._receive_into_queue())
await asyncio.sleep(0)
[docs] async def shutdown(self):
self.kill()
self.poller.unregister(self.socket)
if self._listen_future is not None and not self._listen_future.done():
self._listen_future.cancel()
self.socket.close(linger=0)
await asyncio.sleep(0)
[docs] def iter_messages(self):
async def wrapped():
while self.alive:
try:
result = self.queue.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(1e-6)
else:
await asyncio.sleep(0)
return result
return AsyncGenerator(wrapped)
[docs]class Client(Killable):
_event_class = asyncio.Event
def __init__(self, address):
super(Client, self).__init__()
self.address = address
self.context = Context.instance()
self.socket = self.context.socket(zmq.PUSH)
[docs] async def connect(self):
self.socket.connect('tcp://%s:%s' % self.address)
await asyncio.sleep(0)
[docs] async def send(self, data):
while self.alive:
return await self.socket.send(data, flags=zmq.NOBLOCK)
[docs] async def shutdown(self):
self.kill()
self.socket.close(linger=0)
await asyncio.sleep(0)
[docs]class SocketStreamer(Streamer):
def __init__(self, address, topic=None):
super(SocketStreamer, self).__init__(topic=topic)
self.server = Server(address)
self.server.kill_event = self.kill_event
[docs] async def setup(self):
await self.server.start()
[docs] async def teardown(self):
await self.server.shutdown()
[docs] def stream(self):
gen = self.server.iter_messages().__aiter__()
async def wrapped():
if self.alive:
msg = await gen.__anext__()
event = Event.fromstring(msg)
return Event(
event.data, topic=self.topic, created_at=event.created_at
)
return AsyncGenerator(wrapped)
[docs]class SocketListener(Listener):
def __init__(self, address, filters=None):
super(SocketListener, self).__init__(filters=filters)
self.client = Client(address)
[docs] async def setup(self):
await self.client.connect()
[docs] async def teardown(self):
await self.client.shutdown()
[docs] async def on_event(self, event):
event = await self.parse_event(event)
await self.client.send(event.tostring())
[docs] async def parse_event(self, event):
return event # pragma: nocover