Data Message
Implements a data message as a structured dataclass. Used to pass data messages between the Django Message Broker clients and server.
- class django_message_broker.server.data_message.DataMessageCommands
Valid message commands for data messages.
Client to server:
SUBSCRIBE (b”SUBCHANX”) - Subscribe to a channel.
UNSUBSCRIBE (b”USUBCHAN”) - Unsubscribe from a channel.
SEND_TO_CHANNEL (b”SENDCHAN”) - Send this message to a channel.
SEND_TO_GROUP (b”SENDGRPX”) - Send this message to a group.
Server to client:
DELIVERY (b”DELIVERY”) - Deliver this message to a client.
COMPLETE (b”COMPLETE”) - Command complete
EXCEPTION (b”EXCEPTON”) - Command raised an exception.
SUBSCRIPTION_ERROR (b”ESUBCHAN”) - Error subscribing to channel.
Both ways:
KEEPALIVE (b”HARTBEAT”) - Intermittent message confirming client/server alive.
- class django_message_broker.server.data_message.DataMessage(endpoints: typing.List = <factory>, channel_name: bytes = b'Default', command: bytes = b'XXXXXXXX', id: bytes = <factory>, sequence: int = 0, properties: typing.Dict = <factory>, encoded_body: typing.Optional[bytes] = b'{}', body: typing.Optional[typing.Dict] = None)
Message is formatted on wire as n + 1 + 6 frames. Where n is the number of routing_ids added to the front of a message on zmq.DEALER to zmq.ROUTER messages; 1 is the null frame to separate routing_ids from the message; and 6 is the number of frames in the message.
frame -1:-n: Routing IDs (list of zmq.DEALER identities)
frame 0: b”” blank frame
frame 1: channel_name - Name of channel or group to which message is sent
frame 2: command - Message type (publish to channel, to group, subscribe to channel)
frame 3: id - Unique universal identifier identifying messages in the same sequence of communication.
frame 4: sequence - Sequence number of this message from this publisher.
frame 5: properties - Properties appended to message
frame 6: body (serialisable object) - Body of message (as dictionary) Maximum size 1MB.
- __getitem__(key: Union[int, str]) Any
Returns values stored in the properties frame of the message.
- Parameters
key (Union[int, str]) – Property key
- Returns
Value stored in the property.
- Return type
Optional[Any]
- Raises
KeyError – If the key doesn’t exist.
- get(key: Union[int, str], default: Optional[Any] = None) Optional[Any]
Returns values stored in the properties frame of the message.
- Parameters
key (Union[int, str]) – Property key.
default (Any, optional) – Default value if key doesn’t exist.
- Returns
Property value if key exists, default or None if it doesn’t.
- Return type
Optional[Any]
- __setitem__(key: Union[int, str], value: Any) None
Sets the value of a property in the properties frame of the message.
- Parameters
key (Union[int, str]) – Property key
value (Any) – Value of the property (must be serialisable by msgspec)
- get_body() Dict
Returns the value of the message, unpacking the message if the body is encoded.
Note: The default is NOT to decode the body of the message when it is received. This reduces the number of times the body (which can be up to 1MB in size) is decoded/encoded when traversing servers and routers. Accessing the body of the message using the message attribute is potentially unsafe if the body of the message has not be decoded. Therefore, accessing the body should be using the get_body() method.
- Returns
Body of the message.
- Return type
Dict
- __repr__() str
Returns a printable string of data message contents.
Returns:
Endpoint list
Channel name
Unique message id
Command
Size of data body
Properties
Data body
- Returns
Data message contents.
- Return type
str
- quick_copy() django_message_broker.server.data_message.DataMessage
Returns a new instance of the message with all new envelope items and shared (between instances) body object. The purpose of providing a new envelope with shared body is to support copying the message for transmission to multiple addressees where the envelope must change but the body of the message does not. Sharing the body object reduces copying time, memory utilisation.
- Returns
Copy of the message with shared body object.
- Return type
- copy() django_message_broker.server.data_message.DataMessage
Returns a new instance of the message deepcopying all objects. This returns an copy of the message which is independent of the original message.
- Returns
[description]
- Return type
- async send(socket: Union[zmq.sugar.socket.Socket, zmq.eventloop.zmqstream.ZMQStream]) None
Sends a message on the defined socket or stream.
- Parameters
socket (Union[zmq.Socket, ZMQStream]) – Socket or stream on which message sent.
- Raises
MessageTooLarge – The body of the message exceeds the maximum permissible.
ChannelsSocketClosed – Socket or stream closed.
MessageFormatException – Error occurred sending the message.
- classmethod recv(socket: zmq.sugar.socket.Socket) django_message_broker.server.data_message.DataMessage
Reads message from socket, returns new message instance.
- classmethod from_msg(multipart_message: Union[_asyncio.Future, List], unpack_body=True) django_message_broker.server.data_message.DataMessage
Returns a new message instance from zmq multipart message frames.
The body of the message is decoded by default, though this can be over-ridden if the body does not need decoded. This may be advantageous where the message is being stored and forwarded to the final destination (e.g. in a server or router).
- Parameters
multipart_message (Union[Future, List]) – A list (or list in a Future) of zmq multipart frames.
unpack_body (bool, optional) – Decode the body. Defaults to True.
- Raises
MessageFormatException – If the number of frames in the multipart message doesn’t match the schema.
- Returns
New instance of the message.
- Return type