Channels Client
Implements a channel client for the Django Message Broker.
- class django_message_broker.server.channels_client.ChannelsClient(*args, ip_address: str = '127.0.0.1', port: int = 5556, **kwargs)
Client for Django Channels message server.
Implements a client to the network message server for Django channels using zero message queue (ZMQ) to support communication between channels registered within different processes.
The server opens two sequential ports:
Data port (base port, default=5556): Transmission of data messages between the client and server.
Signalling port (base port + 1, default=5557): Transmission of signalling messages between the client and server.
- class DataCommands
Create registry of data commands using decorator.
- class SignallingCommands
Create registry of signalling commands using decorator.
- stop()
Stops the server and flushes all queues.
- _flush_queues() None
Periodic callback to flush queues where there are no subscribers or messages.
- async _await_data_response(message_id: bytes) None
Creates an event which is set when a confirmation message is received on the data channel from the server.
- Parameters
message_id (bytes) – Message id of the sent message for which a response is required.
- async _await_signalling_response(message_id: bytes) None
Creates an event which is set when a confirmation message is received on the signalling channel from the server.
- Parameters
message_id (bytes) – Message id of the sent message for which a response is required.
- get_routing_id() str
Returns the routing id from the zmq.DEALER socket used to route message from the server to the client. The routing id is a 32-bit unsigned integer which is appended to the string zmq_id_ to provide a unique channels client identifier.
- Returns
Routing id
- Return type
str
- get_subscriber_id() bytes
Returns a unique subscriber id across all clients connected to the server. Uniqueness is obtained by combining the routing_id of the ZMQ socket with a uuid4 reference. This minimises the chance that any two subscribers will have the same reference.
- Returns
Unique subscriber reference.
- Return type
bytes
- async _subscribe(channel_name: bytes, subscriber_name: bytes) None
Subscribes to a channel to ensure that messages are delivered to the client.
- Parameters
channel_name (bytes) – Name of channel subscribed to.
- async _unsubscribe(channel_name: bytes, subscriber_name: bytes) None
Unsubscribes from a channel this drops all local messages and removes the subscription from the server. This may result in messages being lost.
- Parameters
channel_name (bytes) – Name of channel subscribed to.
- async _receive(subscriber_name: bytes) Dict
Receive the first message that arrives on the channel. If more than one coroutine waits on the same channel, a random one of the waiting coroutines will get the result.
- Parameters
channel (bytes) – Channel name
- Returns
Received message.
- Return type
Dict
- async _send(channel_name: bytes, message: Dict, time_to_live: float = 60, acknowledge=False) None
Sends a message to a channel.
- Parameters
channel (Union[str, bytes]) – Channel name
message (Dict) – Message to send (as a dictionary)
time_to_live (float, optional) – Time to live (seconds). Defaults to 60.
acknowledge (bool) – Await server has processed message. Defaults to False.
- async _send_to_group(group_name: bytes, message: Dict, time_to_live: float = 60) None
Sends a message to a group.
- Parameters
group (Union[str, bytes]) – Group name
message (Dict) – Message to send (as dictionary)
time_to_live (int, optional) – Time to live (seconds). Defaults to 60.
- async _group_add(group_name: bytes, channel_name: bytes)
Adds the channel to a group. If the group doesn’t exist then it is created. A subscription request is also sent to the server to ensure that messages are delivered locally.
- Parameters
group (bytes) – Name of the group to join.
channel (bytes) – Channel joining the group.
- async _group_discard(group_name: bytes, channel_name: bytes) None
Removes a channel from a group
- Parameters
group (bytes) – Group name
channel (bytes) – Channel name
- _receive_data(multipart_message: Union[_asyncio.Future, List]) None
Callback that receives data messages from the server and dispatches them to the relevant handler method.
- Parameters
multipart_message (Union[Future, List]) – zmq multipart message
- Raises
ImportError – Error raised if the multitpart message cannot be parsed to a valid data message.
MessageCommandUnknown – The command in the message is unknown.
- _delivery(message: django_message_broker.server.data_message.DataMessage) None
Receive a message for delivery to subscribers of a channel and pushes it onto a message queue for later collection by the client method.
- Parameters
message (DataMessage) – Data message.
- _data_task_complete(message: django_message_broker.server.data_message.DataMessage) None
Response from server indicating that the data command completed successfully.
Called when a data message confirms that actions have completed. This then sets an event to release the data send thread to execute subsequent instructions.
- Parameters
message (SignallingMessage) – Signalling message
- _data_task_exception(message: django_message_broker.server.data_message.DataMessage) None
Response from server indicating that the data command generated a caught exception.
- Parameters
message (SignallingMessage) – Signalling message
- _receive_signalling(multipart_message: Union[_asyncio.Future, List]) None
Callback that receives signalling messages from the server and dispatches them to the relevant handler method.
- Parameters
multipart_message (Union[Future, List]) – zmq multipart message
- Raises
ImportError – Error raised if the multitpart message cannot be parsed to a valid data message.
MessageCommandUnknown – The command in the message is unknown.
- _signalling_task_complete(message: django_message_broker.server.signalling_message.SignallingMessage) None
Response from server indicating that the signalling command completed successfully.
Called when a signalling message confirms that actions have completed. This then sets an event to release the signalling send thread to execute subsequent instructions.
- Parameters
message (SignallingMessage) – Signalling message
- _signalling_task_exception(message: django_message_broker.server.signalling_message.SignallingMessage) None
Response from server indicating that the signalling command generated a caught exception.
- Parameters
message (SignallingMessage) – Signalling message
- _flush_messages(message: django_message_broker.server.signalling_message.SignallingMessage) None
Response from server indicating that the client should flush message which have not yet been received.
This method does not implement any actions. It is intended that when a client sends a flush all message to the server, the server will send a flush messages command to all clients to remove any undelivered messages from the clients.
- Parameters
message (SignallingMessage) – Signalling message
- async _flush_all()
Resets the server by flushing all messages from the message store, and groups from the group store.