Server Queue
Implements a message queue within the Django Message Broker server.
Includes:
Endpoint - A class containing zero message queue routing back to the subscriber’s client.
RoundRobinDict - Implements a dictionary of messages over which the channel queue iterates, returning to the first item in the list when the end is reached.
ChannelQueue - Implements a channel queue within the server.
- class django_message_broker.server.server_queue.Endpoint(socket: typing.Union[zmq.sugar.socket.Socket, zmq.eventloop.zmqstream.ZMQStream], dealer: typing.List = <factory>, subscriber_name: bytes = b'', is_process_channel: bool = False, time_to_live: typing.Optional[int] = 86400, expiry: typing.Optional[datetime.datetime] = None)
Provides routing information for subscribers
- socket
zmq.ROUTER socket or stream to which messages are sent.
- Type
zmq.Socket, ZMQStream
- dealer
Identity of zmq.DEALER to which messages are sent.
- Type
bytes
- is_process_channel
The channel is a process channel (can have only one delivery endpoint).
- Type
bool
- time_to_live
Number of seconds before the endpoint is discarded.
- Type
int
- expiry
Time when the endpoint will expire
- Type
Optional[datetime]
- property id: FrozenSet
Returns a unique endpoint id from the routing information and channel name.
- Returns
Unique endpoint id.
- Return type
FrozenSet
- class django_message_broker.server.server_queue.RoundRobinDict(*args, **kwargs)
Dictionary of endpoints that returns the next endpoint when a next value is requested and continues from the start of the list when the end of the list is reached.
Notes:
Returns the value of the dictionary entry, not the key
StopIteration exception is not raised when the dictionary rolls over to the start.
- __next__() Optional[django_message_broker.server.server_queue.Endpoint]
Gets the next value of the next entry in the dictionary, returns to the first entry once the end of the dictionary is reached. Values are returned by insertion order.
- Returns
Value of the next dictionary entry.
- Return type
Optional[Endpoint]
- first_key() Optional[Any]
Returns the first key in the dictionary by insertion order.
- Returns
Value of the first key in the dictionary.
- Return type
Optional[int]
- class django_message_broker.server.server_queue.ChannelQueue(channel_name: bytes = b'', max_length: Optional[int] = None)
Provides a managed queue for storing and forwarding messages to endpoints. Messages are pulled from the queue and delivered to endpoints according to a round robin algorithm. The ordering of the round robin list is updated each time endpoints subscribe to the queue, with most recently updated channels going to the end of the list.
Channel queues may be identified as Process Channels by the inclusion of an ‘!’ within the channel name. These channels only have one endpoint. Attempts to change the endpoint (channel_name or dealer_id) or add additional endpoints result in a Subscription Exception.
Once initialised, the queue starts an event loop which monitors the queue and sends messages in the queue once there are subscribers.
Two additional callbacks are created, one to periodically scan unsent messages in the queue and remove those messages which are unsent after their expiry time. The second periodically scans subscribers and removes those who have passed their expiry time.
- Raises
SubscriptionError – Attempt to add multiple endpoints or change the endpoint of a Process Channel
- stop()
Stop periodic callbacks and remove future tasks from the event queue. This should be called prior to deleting the object.
- _set_subscribers_available() None
Set (or clear) asyncio event if there are subscribers to the queue.
- _set_messages_available() None
Set (or clear) asyncio event if there are messages in the queue.
- is_empty() bool
Returns true if there are no subscribers and no messages.
- Returns
True if the queue is empty and has no subscribers.
- Return type
bool
- async event_loop() None
Queue event loop. Waits for subscribers, and if there are message in the queue pull them from the queue and send them to the subscriber.
- subscribe(endpoint: django_message_broker.server.server_queue.Endpoint) None
Add subscribers to the channel. Subscribers are identified by the dealer that transmits the request. Multiple endpoints may be registered for normal channels (without the ! type indicator), only one endpoint may be registered for a process channel (indicated by the ! type indicator).
- Parameters
endpoint (Endpoint) – Endpoint defining the receiver.
- Raises
SubscriptionError – Attempt to change or add multiple endpoints for a Process Channel
- push(message: django_message_broker.server.data_message.DataMessage, time_to_live: Optional[float] = 60)
Pushes a message onto the message queue.
- Parameters
message (DataMessage) – Message to send to channel
time_to_live (float) – Number of seconds until message expires
- async pull_and_send() None
Pulls a message from the queue and sends to an endpoints. Endpoints are chosen based upon a round-robin algorithm. If the mesage cannot be sent then it is add back to the end of the queue and the queue is penalised 100mSec to reduce rate of outbound messaging.
- discard(endpoint: django_message_broker.server.server_queue.Endpoint)
Discards an endpoint from the queue.
- Parameters
endpoint (Endpoint) – id of the dealer to discard.
- _flush_messages() None
Flush expired messsages from the queue.
- _flush_subscribers() None
Flush expired subscribers from the queue.