Server Queue

Implements a message queue within the Django Message Broker server.

Includes:

  • Endpoint - A class containing zero message queue routing back to the subscriber’s client.

  • RoundRobinDict - Implements a dictionary of messages over which the channel queue iterates, returning to the first item in the list when the end is reached.

  • ChannelQueue - Implements a channel queue within the server.

class django_message_broker.server.server_queue.Endpoint(socket: typing.Union[zmq.sugar.socket.Socket, zmq.eventloop.zmqstream.ZMQStream], dealer: typing.List = <factory>, subscriber_name: bytes = b'', is_process_channel: bool = False, time_to_live: typing.Optional[int] = 86400, expiry: typing.Optional[datetime.datetime] = None)

Provides routing information for subscribers

socket

zmq.ROUTER socket or stream to which messages are sent.

Type

zmq.Socket, ZMQStream

dealer

Identity of zmq.DEALER to which messages are sent.

Type

bytes

is_process_channel

The channel is a process channel (can have only one delivery endpoint).

Type

bool

time_to_live

Number of seconds before the endpoint is discarded.

Type

int

expiry

Time when the endpoint will expire

Type

Optional[datetime]

property id: FrozenSet

Returns a unique endpoint id from the routing information and channel name.

Returns

Unique endpoint id.

Return type

FrozenSet

class django_message_broker.server.server_queue.RoundRobinDict(*args, **kwargs)

Dictionary of endpoints that returns the next endpoint when a next value is requested and continues from the start of the list when the end of the list is reached.

Notes:

  1. Returns the value of the dictionary entry, not the key

  2. StopIteration exception is not raised when the dictionary rolls over to the start.

__next__() Optional[django_message_broker.server.server_queue.Endpoint]

Gets the next value of the next entry in the dictionary, returns to the first entry once the end of the dictionary is reached. Values are returned by insertion order.

Returns

Value of the next dictionary entry.

Return type

Optional[Endpoint]

first_key() Optional[Any]

Returns the first key in the dictionary by insertion order.

Returns

Value of the first key in the dictionary.

Return type

Optional[int]

class django_message_broker.server.server_queue.ChannelQueue(channel_name: bytes = b'', max_length: Optional[int] = None)

Provides a managed queue for storing and forwarding messages to endpoints. Messages are pulled from the queue and delivered to endpoints according to a round robin algorithm. The ordering of the round robin list is updated each time endpoints subscribe to the queue, with most recently updated channels going to the end of the list.

Channel queues may be identified as Process Channels by the inclusion of an ‘!’ within the channel name. These channels only have one endpoint. Attempts to change the endpoint (channel_name or dealer_id) or add additional endpoints result in a Subscription Exception.

Once initialised, the queue starts an event loop which monitors the queue and sends messages in the queue once there are subscribers.

Two additional callbacks are created, one to periodically scan unsent messages in the queue and remove those messages which are unsent after their expiry time. The second periodically scans subscribers and removes those who have passed their expiry time.

Raises

SubscriptionError – Attempt to add multiple endpoints or change the endpoint of a Process Channel

stop()

Stop periodic callbacks and remove future tasks from the event queue. This should be called prior to deleting the object.

_set_subscribers_available() None

Set (or clear) asyncio event if there are subscribers to the queue.

_set_messages_available() None

Set (or clear) asyncio event if there are messages in the queue.

is_empty() bool

Returns true if there are no subscribers and no messages.

Returns

True if the queue is empty and has no subscribers.

Return type

bool

async event_loop() None

Queue event loop. Waits for subscribers, and if there are message in the queue pull them from the queue and send them to the subscriber.

subscribe(endpoint: django_message_broker.server.server_queue.Endpoint) None

Add subscribers to the channel. Subscribers are identified by the dealer that transmits the request. Multiple endpoints may be registered for normal channels (without the ! type indicator), only one endpoint may be registered for a process channel (indicated by the ! type indicator).

Parameters

endpoint (Endpoint) – Endpoint defining the receiver.

Raises

SubscriptionError – Attempt to change or add multiple endpoints for a Process Channel

push(message: django_message_broker.server.data_message.DataMessage, time_to_live: Optional[float] = 60)

Pushes a message onto the message queue.

Parameters
  • message (DataMessage) – Message to send to channel

  • time_to_live (float) – Number of seconds until message expires

async pull_and_send() None

Pulls a message from the queue and sends to an endpoints. Endpoints are chosen based upon a round-robin algorithm. If the mesage cannot be sent then it is add back to the end of the queue and the queue is penalised 100mSec to reduce rate of outbound messaging.

discard(endpoint: django_message_broker.server.server_queue.Endpoint)

Discards an endpoint from the queue.

Parameters

endpoint (Endpoint) – id of the dealer to discard.

_flush_messages() None

Flush expired messsages from the queue.

_flush_subscribers() None

Flush expired subscribers from the queue.