Socket Manager
Wrapper around Zero Message Queue to manage sockets.
- class django_message_broker.server.socket_manager.SocketManager(context: Optional[zmq.sugar.context.Context] = None, bind_ip_address: str = '127.0.0.1', port: int = 5555, is_server: bool = False, io_loop: Optional[tornado.ioloop.IOLoop] = None)
Creates a zero-messsage-queue (ZMQ) socket.
- get_socket() zmq.sugar.socket.Socket
Get the ZMQ socket.
- Raises
ChannelsServerError – Raises error if there is no socket.
- Returns
ZMQ socket.
- Return type
zmq.Socket
- get_stream() zmq.eventloop.zmqstream.ZMQStream
Get the ZMQ stream.
- Raises
ChannelsServerError – Raises error if there is no stream.
- Returns
ZMQ stream.
- Return type
ZMQStream
- get_routing_id() Optional[bytes]
Returns the routing ID if the sockets is a ZMQ.Dealer (client socket).
- Returns
Routing ID of the socket.
- Return type
Optional[bytes]
- start() None
Opens the socket, sets up streams and configures callbacks.
- stop() None
Closes the socket, streams and clears configured callbacks.
- set_receive_callback(callback: Callable[[Union[_asyncio.Future, List[Any]]], None]) None
Sets the callback when a message is received on the socket.
- Parameters
callback (Callable[[Union[Future, List[Any]]], None]) – Callback method accepting Future or List
Note: The callback needs to accept a multipart_message which could be either of the following types:
List: Multi-part message with each frame expressed as an element in the list.
asyncio Future: An asyncio Future containing the above list.
The callback function should test whether the returned multipart message is a Future and then extract the multipart list as follow:
from asyncio.futures import Future def callback(multipart_message: Union[Future, List]): if isinstance(multipart_message, Future): multipart_list = multipart_message.result() else: multipart_list = multipart_message
- clear_receive_callback() None
Clear the callback that is called when a message is received.