Source code for pybeehive.socket

from queue import Empty, Queue
from threading import Thread
import time
import zmq
from .core import Streamer, Listener, Event, Killable


[docs]class Server(Killable): def __init__(self, address): super(Server, self).__init__() self.address = address self.queue = Queue() self.context = zmq.Context() self.socket = self.context.socket(zmq.PULL) self._listener_thread = None def _receive_into_queue(self): while self.alive: try: data = self.socket.recv(flags=zmq.NOBLOCK) except zmq.error.ZMQError: time.sleep(1e-6) else: self.queue.put(data)
[docs] def iter_messages(self): while not self.kill_event.is_set(): try: msg = self.queue.get(timeout=0.001) yield msg except Empty: continue
[docs] def start(self): self.socket.bind("tcp://%s:%s" % self.address) self._listener_thread = Thread(target=self._receive_into_queue) self._listener_thread.start()
[docs] def shutdown(self): self.kill() try: self._listener_thread.join() except AttributeError: pass # there was an error in start, so _listener_thread is None self.socket.close(linger=0)
[docs]class Client(Killable): def __init__(self, address): super(Client, self).__init__() self.address = address self.context = zmq.Context() self.socket = self.context.socket(zmq.PUSH)
[docs] def send(self, data): while self.alive: return self.socket.send(data, flags=zmq.NOBLOCK)
[docs] def connect(self): self.socket.connect('tcp://%s:%s' % self.address)
[docs] def shutdown(self): self.kill_event.set() self.socket.close(linger=0)
[docs]class SocketStreamer(Streamer): """ :param address: :param topic: """ def __init__(self, address, topic=None): super(SocketStreamer, self).__init__(topic=topic) self.server = Server(address)
[docs] def setup(self): self.server.start()
[docs] def teardown(self): self.server.shutdown()
[docs] def stream(self): for msg in self.server.iter_messages(): event = Event.fromstring(msg) yield Event( event.data, topic=self.topic, created_at=event.created_at )
[docs]class SocketListener(Listener): """ :param address: :param filters: """ def __init__(self, address, filters=None): super(SocketListener, self).__init__(filters=filters) self.client = Client(address)
[docs] def setup(self): self.client.connect()
[docs] def teardown(self): self.client.shutdown()
[docs] def on_event(self, event): result = self.parse_event(event) self.client.send(result.tostring())
[docs] def parse_event(self, event): """ :param event: :return: """ return event # pragma: nocover