Channels Client

Implements a channel client for the Django Message Broker.

class django_message_broker.server.channels_client.ChannelsClient(*args, ip_address: str = '127.0.0.1', port: int = 5556, **kwargs)

Client for Django Channels message server.

Implements a client to the network message server for Django channels using zero message queue (ZMQ) to support communication between channels registered within different processes.

The server opens two sequential ports:

  • Data port (base port, default=5556): Transmission of data messages between the client and server.

  • Signalling port (base port + 1, default=5557): Transmission of signalling messages between the client and server.

class DataCommands

Create registry of data commands using decorator.

class SignallingCommands

Create registry of signalling commands using decorator.

stop()

Stops the server and flushes all queues.

_flush_queues() None

Periodic callback to flush queues where there are no subscribers or messages.

async _await_data_response(message_id: bytes) None

Creates an event which is set when a confirmation message is received on the data channel from the server.

Parameters

message_id (bytes) – Message id of the sent message for which a response is required.

async _await_signalling_response(message_id: bytes) None

Creates an event which is set when a confirmation message is received on the signalling channel from the server.

Parameters

message_id (bytes) – Message id of the sent message for which a response is required.

get_routing_id() str

Returns the routing id from the zmq.DEALER socket used to route message from the server to the client. The routing id is a 32-bit unsigned integer which is appended to the string zmq_id_ to provide a unique channels client identifier.

Returns

Routing id

Return type

str

get_subscriber_id() bytes

Returns a unique subscriber id across all clients connected to the server. Uniqueness is obtained by combining the routing_id of the ZMQ socket with a uuid4 reference. This minimises the chance that any two subscribers will have the same reference.

Returns

Unique subscriber reference.

Return type

bytes

async _subscribe(channel_name: bytes, subscriber_name: bytes) None

Subscribes to a channel to ensure that messages are delivered to the client.

Parameters

channel_name (bytes) – Name of channel subscribed to.

async _unsubscribe(channel_name: bytes, subscriber_name: bytes) None

Unsubscribes from a channel this drops all local messages and removes the subscription from the server. This may result in messages being lost.

Parameters

channel_name (bytes) – Name of channel subscribed to.

async _receive(subscriber_name: bytes) Dict

Receive the first message that arrives on the channel. If more than one coroutine waits on the same channel, a random one of the waiting coroutines will get the result.

Parameters

channel (bytes) – Channel name

Returns

Received message.

Return type

Dict

async _send(channel_name: bytes, message: Dict, time_to_live: float = 60, acknowledge=False) None

Sends a message to a channel.

Parameters
  • channel (Union[str, bytes]) – Channel name

  • message (Dict) – Message to send (as a dictionary)

  • time_to_live (float, optional) – Time to live (seconds). Defaults to 60.

  • acknowledge (bool) – Await server has processed message. Defaults to False.

async _send_to_group(group_name: bytes, message: Dict, time_to_live: float = 60) None

Sends a message to a group.

Parameters
  • group (Union[str, bytes]) – Group name

  • message (Dict) – Message to send (as dictionary)

  • time_to_live (int, optional) – Time to live (seconds). Defaults to 60.

async _group_add(group_name: bytes, channel_name: bytes)

Adds the channel to a group. If the group doesn’t exist then it is created. A subscription request is also sent to the server to ensure that messages are delivered locally.

Parameters
  • group (bytes) – Name of the group to join.

  • channel (bytes) – Channel joining the group.

async _group_discard(group_name: bytes, channel_name: bytes) None

Removes a channel from a group

Parameters
  • group (bytes) – Group name

  • channel (bytes) – Channel name

_receive_data(multipart_message: Union[_asyncio.Future, List]) None

Callback that receives data messages from the server and dispatches them to the relevant handler method.

Parameters

multipart_message (Union[Future, List]) – zmq multipart message

Raises
  • ImportError – Error raised if the multitpart message cannot be parsed to a valid data message.

  • MessageCommandUnknown – The command in the message is unknown.

_delivery(message: django_message_broker.server.data_message.DataMessage) None

Receive a message for delivery to subscribers of a channel and pushes it onto a message queue for later collection by the client method.

Parameters

message (DataMessage) – Data message.

_data_task_complete(message: django_message_broker.server.data_message.DataMessage) None

Response from server indicating that the data command completed successfully.

Called when a data message confirms that actions have completed. This then sets an event to release the data send thread to execute subsequent instructions.

Parameters

message (SignallingMessage) – Signalling message

_data_task_exception(message: django_message_broker.server.data_message.DataMessage) None

Response from server indicating that the data command generated a caught exception.

Parameters

message (SignallingMessage) – Signalling message

_receive_signalling(multipart_message: Union[_asyncio.Future, List]) None

Callback that receives signalling messages from the server and dispatches them to the relevant handler method.

Parameters

multipart_message (Union[Future, List]) – zmq multipart message

Raises
  • ImportError – Error raised if the multitpart message cannot be parsed to a valid data message.

  • MessageCommandUnknown – The command in the message is unknown.

_signalling_task_complete(message: django_message_broker.server.signalling_message.SignallingMessage) None

Response from server indicating that the signalling command completed successfully.

Called when a signalling message confirms that actions have completed. This then sets an event to release the signalling send thread to execute subsequent instructions.

Parameters

message (SignallingMessage) – Signalling message

_signalling_task_exception(message: django_message_broker.server.signalling_message.SignallingMessage) None

Response from server indicating that the signalling command generated a caught exception.

Parameters

message (SignallingMessage) – Signalling message

_flush_messages(message: django_message_broker.server.signalling_message.SignallingMessage) None

Response from server indicating that the client should flush message which have not yet been received.

This method does not implement any actions. It is intended that when a client sends a flush all message to the server, the server will send a flush messages command to all clients to remove any undelivered messages from the clients.

Parameters

message (SignallingMessage) – Signalling message

async _flush_all()

Resets the server by flushing all messages from the message store, and groups from the group store.