Data Message

Implements a data message as a structured dataclass. Used to pass data messages between the Django Message Broker clients and server.

class django_message_broker.server.data_message.DataMessageCommands

Valid message commands for data messages.

Client to server:

  • SUBSCRIBE (b”SUBCHANX”) - Subscribe to a channel.

  • UNSUBSCRIBE (b”USUBCHAN”) - Unsubscribe from a channel.

  • SEND_TO_CHANNEL (b”SENDCHAN”) - Send this message to a channel.

  • SEND_TO_GROUP (b”SENDGRPX”) - Send this message to a group.

Server to client:

  • DELIVERY (b”DELIVERY”) - Deliver this message to a client.

  • COMPLETE (b”COMPLETE”) - Command complete

  • EXCEPTION (b”EXCEPTON”) - Command raised an exception.

  • SUBSCRIPTION_ERROR (b”ESUBCHAN”) - Error subscribing to channel.

Both ways:

  • KEEPALIVE (b”HARTBEAT”) - Intermittent message confirming client/server alive.

class django_message_broker.server.data_message.DataMessage(endpoints: typing.List = <factory>, channel_name: bytes = b'Default', command: bytes = b'XXXXXXXX', id: bytes = <factory>, sequence: int = 0, properties: typing.Dict = <factory>, encoded_body: typing.Optional[bytes] = b'{}', body: typing.Optional[typing.Dict] = None)

Message is formatted on wire as n + 1 + 6 frames. Where n is the number of routing_ids added to the front of a message on zmq.DEALER to zmq.ROUTER messages; 1 is the null frame to separate routing_ids from the message; and 6 is the number of frames in the message.

  • frame -1:-n: Routing IDs (list of zmq.DEALER identities)

  • frame 0: b”” blank frame

  • frame 1: channel_name - Name of channel or group to which message is sent

  • frame 2: command - Message type (publish to channel, to group, subscribe to channel)

  • frame 3: id - Unique universal identifier identifying messages in the same sequence of communication.

  • frame 4: sequence - Sequence number of this message from this publisher.

  • frame 5: properties - Properties appended to message

  • frame 6: body (serialisable object) - Body of message (as dictionary) Maximum size 1MB.

__getitem__(key: Union[int, str]) Any

Returns values stored in the properties frame of the message.

Parameters

key (Union[int, str]) – Property key

Returns

Value stored in the property.

Return type

Optional[Any]

Raises

KeyError – If the key doesn’t exist.

get(key: Union[int, str], default: Optional[Any] = None) Optional[Any]

Returns values stored in the properties frame of the message.

Parameters
  • key (Union[int, str]) – Property key.

  • default (Any, optional) – Default value if key doesn’t exist.

Returns

Property value if key exists, default or None if it doesn’t.

Return type

Optional[Any]

__setitem__(key: Union[int, str], value: Any) None

Sets the value of a property in the properties frame of the message.

Parameters
  • key (Union[int, str]) – Property key

  • value (Any) – Value of the property (must be serialisable by msgspec)

get_body() Dict

Returns the value of the message, unpacking the message if the body is encoded.

Note: The default is NOT to decode the body of the message when it is received. This reduces the number of times the body (which can be up to 1MB in size) is decoded/encoded when traversing servers and routers. Accessing the body of the message using the message attribute is potentially unsafe if the body of the message has not be decoded. Therefore, accessing the body should be using the get_body() method.

Returns

Body of the message.

Return type

Dict

__repr__() str

Returns a printable string of data message contents.

Returns:

  • Endpoint list

  • Channel name

  • Unique message id

  • Command

  • Size of data body

  • Properties

  • Data body

Returns

Data message contents.

Return type

str

quick_copy() django_message_broker.server.data_message.DataMessage

Returns a new instance of the message with all new envelope items and shared (between instances) body object. The purpose of providing a new envelope with shared body is to support copying the message for transmission to multiple addressees where the envelope must change but the body of the message does not. Sharing the body object reduces copying time, memory utilisation.

Returns

Copy of the message with shared body object.

Return type

DataMessage

copy() django_message_broker.server.data_message.DataMessage

Returns a new instance of the message deepcopying all objects. This returns an copy of the message which is independent of the original message.

Returns

[description]

Return type

DataMessage

async send(socket: Union[zmq.sugar.socket.Socket, zmq.eventloop.zmqstream.ZMQStream]) None

Sends a message on the defined socket or stream.

Parameters

socket (Union[zmq.Socket, ZMQStream]) – Socket or stream on which message sent.

Raises
classmethod recv(socket: zmq.sugar.socket.Socket) django_message_broker.server.data_message.DataMessage

Reads message from socket, returns new message instance.

classmethod from_msg(multipart_message: Union[_asyncio.Future, List], unpack_body=True) django_message_broker.server.data_message.DataMessage

Returns a new message instance from zmq multipart message frames.

The body of the message is decoded by default, though this can be over-ridden if the body does not need decoded. This may be advantageous where the message is being stored and forwarded to the final destination (e.g. in a server or router).

Parameters
  • multipart_message (Union[Future, List]) – A list (or list in a Future) of zmq multipart frames.

  • unpack_body (bool, optional) – Decode the body. Defaults to True.

Raises

MessageFormatException – If the number of frames in the multipart message doesn’t match the schema.

Returns

New instance of the message.

Return type

DataMessage