"""
The Zeroless module API.
.. data:: log
A global Logger object. To use it, just add an Handler object
and set an appropriate logging level.
"""
import zmq
import logging
from time import sleep
from warnings import warn
from functools import partial
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
[docs]def connect(port, ip='127.0.0.1'):
"""
Returns a connector socket object.
:param port: port number from 1024 up to 65535
:type port: int
:param ip: ip address to connect (default=127.0.0.1)
:type ip: str or unicode
:rtype: ConnectSock
"""
return ConnectSock(ip, port)
[docs]def bind(port, interface='*'):
"""
Returns a binding socket object.
:param port: port number from 1024 up to 65535
:type port: int
:param interface: interface to bind (default=*)
:type interface: str or unicode
:rtype: BindSock
"""
return BindSock(interface, port)
[docs]class Sock:
def __init__(self):
pass
def __sock(self, pattern):
sock = zmq.Context().instance().socket(pattern)
self._setup(sock)
log.info('Ready...')
return sock
def __send(self, sock):
while True:
data = (yield)
sock.send_multipart(data)
log.debug('Sending: {0}'.format(data))
def __send_with_prefix(self, sock, prefix_frames):
while True:
data = prefix_frames + (yield)
sock.send_multipart(data)
log.debug('Sending: {0}'.format(data))
def __recv(self, sock):
while True:
frames = sock.recv_multipart()
log.debug('Receiving: {0}'.format(frames))
yield frames if len(frames) > 1 else frames[0]
def __send_function(self, sock, topic=None):
if topic:
gen = self.__send_with_prefix(sock, topic)
else:
gen = self.__send(sock)
gen.send(None)
func = lambda sender, *data: sender(data)
return partial(func, gen.send)
def __recv_generator(self, sock):
return self.__recv(sock)
# PubSub pattern
[docs] def pub(self, topic=b''):
"""
Returns a callable that can be used to transmit a message, with a given
``topic``, in a publisher-subscriber fashion. Note that the sender
function has a ``print`` like signature, with an infinite number of
arguments. Each one being a part of the complete message.
:param topic: the topic that will be published to (default=b'')
:type topic: bytes
:rtype: function
"""
sock = self.__sock(zmq.PUB)
return self.__send_function(sock, (topic,))
[docs] def sub(self, topics=(b'',)):
"""
Returns an iterable that can be used to iterate over incoming messages,
that were published with one of the topics specified in ``topics``. Note
that the iterable returns as many parts as sent by subscribed publishers.
:param topics: a list of topics to subscribe to (default=b'')
:type topics: list of bytes
:rtype: generator
"""
sock = self.__sock(zmq.SUB)
for topic in topics:
sock.setsockopt(zmq.SUBSCRIBE, topic)
return self.__recv_generator(sock)
# PushPull pattern
[docs] def push(self):
"""
Returns a callable that can be used to transmit a message in a push-pull
fashion. Note that the sender function has a ``print`` like signature,
with an infinite number of arguments. Each one being a part of the
complete message.
:rtype: a function
"""
sock = self.__sock(zmq.PUSH)
return self.__send_function(sock)
[docs] def pull(self):
"""
Returns an iterable that can be used to iterate over incoming messages,
that were pushed by a push socket. Note that the iterable returns as
many parts as sent by pushers.
:rtype: generator
"""
sock = self.__sock(zmq.PULL)
return self.__recv_generator(sock)
# ReqRep pattern
[docs] def request(self, *data):
"""
Returns a callable and an iterable respectively. Those can be used to
both transmit a message and/or iterate over incoming messages,
that were replied by a reply socket. Note that the iterable returns
as many parts as sent by repliers. Also, the sender function has a
``print`` like signature, with an infinite number of arguments. Each one
being a part of the complete message.
:rtype: (function, generator)
"""
sock = self.__sock(zmq.REQ)
return self.__send_function(sock), self.__recv_generator(sock)
[docs] def reply(self):
"""
Returns a callable and an iterable respectively. Those can be used to
both transmit a message and/or iterate over incoming messages,
that were requested by a request socket. Note that the iterable returns
as many parts as sent by requesters. Also, the sender function has a
``print`` like signature, with an infinite number of arguments. Each one
being a part of the complete message.
:rtype: (function, generator)
"""
sock = self.__sock(zmq.REP)
return self.__send_function(sock), self.__recv_generator(sock)
# Pair pattern
[docs] def pair(self, *data):
"""
Returns a callable and an iterable respectively. Those can be used to
both transmit a message and/or iterate over incoming messages, that were
sent by a pair socket. Note that the iterable returns as many parts as
sent by a pair. Also, the sender function has a ``print`` like signature,
with an infinite number of arguments. Each one being a part of the
complete message.
:rtype: (function, generator)
"""
sock = self.__sock(zmq.PAIR)
return self.__send_function(sock), self.__recv_generator(sock)
def _setup(self):
raise NotImplementedError()
[docs]class ConnectSock(Sock):
"""
A socket that will connect to a binding socket.
"""
def __init__(self, ip, port):
"""
Constructor of the connector socket.
:param port: port number from 1024 up to 65535
:type port: int
:param ip: ip address to connect
:type ip: str or unicode
"""
self._ip = ip
self._port = port
Sock.__init__(self)
def _setup(self, sock):
log.info('Connecting to {0} on port {1}'.format(self._ip,
self._port))
sock.connect('tcp://' + self._ip + ':' + str(self._port))
[docs]class BindSock(Sock):
"""
A socket that will bind for others to connect.
"""
def __init__(self, interface, port):
"""
Constructor of the binder object.
:param port: port number from 1024 up to 65535
:type port: int
:param interface: interface to bind
:type interface: str or unicode
"""
self._interface = interface
self._port = port
Sock.__init__(self)
def _setup(self, sock):
if sock.socket_type == zmq.SUB:
warning = 'SUB sockets that bind will not get any message before '
warning += 'they first ask for via the provided generator, so '
warning += 'prefer to bind PUB sockets if missing some messages '
warning += 'is not an option'
warn(warning)
log.info('Binding to interface {0} on port {1}'.format(self._interface,
self._port))
sock.bind('tcp://' + self._interface + ':' + str(self._port))