Socket Manager

Wrapper around Zero Message Queue to manage sockets.

class django_message_broker.server.socket_manager.SocketManager(context: Optional[zmq.sugar.context.Context] = None, bind_ip_address: str = '127.0.0.1', port: int = 5555, is_server: bool = False, io_loop: Optional[tornado.ioloop.IOLoop] = None)

Creates a zero-messsage-queue (ZMQ) socket.

get_socket() zmq.sugar.socket.Socket

Get the ZMQ socket.

Raises

ChannelsServerError – Raises error if there is no socket.

Returns

ZMQ socket.

Return type

zmq.Socket

get_stream() zmq.eventloop.zmqstream.ZMQStream

Get the ZMQ stream.

Raises

ChannelsServerError – Raises error if there is no stream.

Returns

ZMQ stream.

Return type

ZMQStream

get_routing_id() Optional[bytes]

Returns the routing ID if the sockets is a ZMQ.Dealer (client socket).

Returns

Routing ID of the socket.

Return type

Optional[bytes]

start() None

Opens the socket, sets up streams and configures callbacks.

stop() None

Closes the socket, streams and clears configured callbacks.

set_receive_callback(callback: Callable[[Union[_asyncio.Future, List[Any]]], None]) None

Sets the callback when a message is received on the socket.

Parameters

callback (Callable[[Union[Future, List[Any]]], None]) – Callback method accepting Future or List

Note: The callback needs to accept a multipart_message which could be either of the following types:

  • List: Multi-part message with each frame expressed as an element in the list.

  • asyncio Future: An asyncio Future containing the above list.

The callback function should test whether the returned multipart message is a Future and then extract the multipart list as follow:

from asyncio.futures import Future

def callback(multipart_message: Union[Future, List]):
    if isinstance(multipart_message, Future):
        multipart_list = multipart_message.result()
    else:
        multipart_list = multipart_message
clear_receive_callback() None

Clear the callback that is called when a message is received.