Socket Manager

Wrapper around Zero Message Queue to manage sockets.

class django_message_broker.server.socket_manager.SocketManager(context: Optional[zmq.sugar.context.Context] = None, bind_ip_address: str = '', port: int = 5555, is_server: bool = False, io_loop: Optional[tornado.ioloop.IOLoop] = None)

Creates a zero-messsage-queue (ZMQ) socket.

get_socket() zmq.sugar.socket.Socket

Get the ZMQ socket.


ChannelsServerError – Raises error if there is no socket.


ZMQ socket.

Return type


get_stream() zmq.eventloop.zmqstream.ZMQStream

Get the ZMQ stream.


ChannelsServerError – Raises error if there is no stream.


ZMQ stream.

Return type


get_routing_id() Optional[bytes]

Returns the routing ID if the sockets is a ZMQ.Dealer (client socket).


Routing ID of the socket.

Return type


start() None

Opens the socket, sets up streams and configures callbacks.

stop() None

Closes the socket, streams and clears configured callbacks.

set_receive_callback(callback: Callable[[Union[_asyncio.Future, List[Any]]], None]) None

Sets the callback when a message is received on the socket.


callback (Callable[[Union[Future, List[Any]]], None]) – Callback method accepting Future or List

Note: The callback needs to accept a multipart_message which could be either of the following types:

  • List: Multi-part message with each frame expressed as an element in the list.

  • asyncio Future: An asyncio Future containing the above list.

The callback function should test whether the returned multipart message is a Future and then extract the multipart list as follow:

from asyncio.futures import Future

def callback(multipart_message: Union[Future, List]):
    if isinstance(multipart_message, Future):
        multipart_list = multipart_message.result()
        multipart_list = multipart_message
clear_receive_callback() None

Clear the callback that is called when a message is received.