Understanding AMPS Queuing
Last updated
Last updated
Copyright 2013-2024 60East Technologies, Inc.
AMPS message queues take advantage of the full historical and transactional power of the AMPS engine. Each queue is implemented as a view over an underlying topic or set of topics. Each of the underlying topics must be recorded in a transaction log. Publishers publish to the underlying topic, and the messages are recorded in the transaction log. Consumers simply subscribe to the queue. AMPS tracks which messages have been delivered to subscribers and which messages have been processed by subscribers. AMPS delivers the next available message to the next subscriber.
Unlike traditional queues, which require consumers to poll for messages, AMPS queues use a subscription model. In this model, each queue consumer requests that AMPS provide messages from the queue. The consumer can also request a maximum number of messages to have outstanding from the queue at any given time, referred to as the backlog for that consumer. When a message is available, and the consumer has fewer messages outstanding than the backlog for that consumer, AMPS delivers the message to the consumer. This improves latency and conserves bandwidth, since there is no need for consumers to repeatedly poll the queue to see if work is available. In addition, the server maintains an overall view of the consumers, which allows the server to control message distribution strategies to optimize for latency, optimize to deliver to clients with the most unused capacity, or optimize for general fairness.
The following diagram presents a simplified view of an AMPS queue:
As the diagram indicates, a queue tracks a set of messages in the transaction log. The messages the queue is currently tracking are considered to be in the queue. When the queue delivers a message, it marks the message as having been delivered (shown as leased in the diagram above). Messages that have been processed are no longer tracked by the queue (for example, the message for the order 1 in the diagram above). When a message has been delivered and processed, that event is recorded in the transaction log to ensure that the queue meets the delivery guarantees even across restarts of AMPS.
Since queues are implemented as views over underlying topics, AMPS allows you to create any number of queues over the same underlying topic. Each queue tracks messages to the topic independently, and can have different policies for delivery and fairness. When a queue topic has a different name than the underlying topic, you can subscribe to the underlying topic directly, and that subscription is to the underlying (non-queue) topic. When a queue topic has the same name as the underlying topic (the default), all subscriptions to that topic are to the queue. (Notice that bookmark subscriptions to a queue are pub/sub subscriptions that replay from the underlying topic in the transaction log, so the behavior in that case is the same as if the subscription was directly to the underlying topic.)
Likewise, AMPS queues work seamlessly with the AMPS entitlement system. Permissions to queues are managed the same way permissions are managed to any other topic, as described in the Entitlement section, except that read permission to a queue also grants the ability to acknowledge messages (though not to publish messages to the queue).
While a message is in a queue, AMPS does not retain an extra copy of the message. Instead, AMPS retains in memory a small data structure indicating the state of the message and the position of the message in the transaction log. The amount of memory consumed by a queue is approximately 200 bytes per message, regardless of the size of the messages.
AMPS queues provide a variety of options to help you tailor the behavior of each queue to meet your application's needs.
AMPS queues deliver a message to a single subscriber at a time. In the most common case, a message is delivered to exactly one subscriber, and that subscriber processes the message.
In the case that a subscriber does not successfully process a message, AMPS provides two delivery semantics to precisely specify the handling of the unprocessed message:
With at-least-once
delivery, AMPS delivers the message to one subscriber at a time, and expects that subscriber to explicitly remove the message from the queue when the message has been received and processed. With this guarantee, each message from the queue must be processed within a specified timeout, or lease period. AMPS tracks the amount of time since the message was sent to the subscriber. If the subscriber has not responded by removing the message within the lease period, AMPS revokes the lease and the message is available to another subscriber. AMPS allows you to set limits on the number of times a message is made available to another subscriber, using the MaxCancels
and MaxDeliveries
configuration options.
In this model, receiving a message is the equivalent of a non-destructive get from a traditional queue. To acknowledge and remove the message, a subscriber uses the sow_delete
command with the bookmark of the message.
Leases are broken and messages are returned to the queue if the lease holder disconnects from AMPS. This ensures that, if a message processor fails or loses its connection to AMPS, the message can immediately be processed by another message processor.
With at-most-once
delivery, AMPS removes the message from the queue as soon as the message is sent to a subscriber. However, the subscriber still needs to acknowledge that the message was processed, so that AMPS can track the subscription backlog, as described below.
In this model, receiving a message is the equivalent of a destructive get from a traditional queue. The message is immediately removed by AMPS, and is no longer available in the queue.
Regardless of the delivery semantics you choose, during typical message processing from a queue, AMPS delivers each message in the queue to a single subscriber, with no duplicates or redelivery.
The difference between at-most-once
and at-least-once
semantics is important in cases where a failure happens. With at-most-once
delivery, the message will not be redelivered even if the subscriber fails to process the message. With at-least-once
delivery, the message will be redelivered to subscribers until the message is either explicitly acknowledged, explicitly removed, or expired based on the queue policy for expiration or retries.
With either setting, a message is delivered exactly once, to a single subscriber, during normal processing.
Consider the following recommendations when deciding how you would like AMPS to handle cases where the subscriber that receives the message fails to process the message:
For ephemeral or lower-value data, where message loss in the case of failure is preferable to duplicating message delivery, consider at-most-once
semantics. For example, processing a stream of events from sensors might fall into this category: each message should be processed once, in order, during normal operation, but missing a single data point from time to time may be less disruptive than reconciling duplicates.
For higher value data, where duplicate message delivery in the event of failure is preferable to message loss, consider at-least-once
semantics. For example, if inserting the same data into a database twice would simply be an in-place update of the same record with the same information, having a duplicate insert happen occasionally in the event of failure may be preferable to losing a message when a failure occurs.
For higher value data, where manual intervention is required to reconcile messages when there is a question as to whether the message has been correctly processed, consider using at-least-once
message delivery with a MaxDeliveries
setting and an action to move failed messages to a dead-letter queue for manual reconciliation.
For higher value data that is part of a transactional dataflow, where processors maintain state, it can be useful to include information about the message processed in the transaction. This can be used with at-least-once
messaging to guarantee that the message is only processed once: if a processor receives a message that is already marked as processed in the transactional store, the processor knows that the message has already been acted on and should be acknowledged without further processing. This is the most common pattern used to produce the result that each message is processed "exactly once". This pattern involves additional overhead and processing, trading off increased work in the application for stronger guarantees that a message is only processed one time.
For efficiency, queues in AMPS use a push model of delivery, providing messages to consumers when the message becomes available rather than requiring the consumer to poll the queue. To manage the workload among consumers, AMPS queues keep track of a subscription backlog. This backlog is the number of messages that have been provided to an individual subscription that have not yet been acknowledged. This backlog helps AMPS provide strong delivery guarantees while still optimizing for high throughput processing. AMPS calculates the subscription backlog for each subscription by calculating the minimum of the following:
The minimum MaxPerSubscriptionBacklog
setting for the queues matched by the subscription
or
The max_backlog
specified on the subscribe command
Notice that, if a subscriber does not provide a max_backlog
on a subscription, AMPS defaults to a max_backlog
of 1
. In practical terms, this means that an application must explicitly specify a backlog to be able to receive more than one message from a queue at a time, regardless of the queue configuration.
Subscribers request a max_backlog
by adding the request to the options string of the subscribe
command. For example, to request a max_backlog
of 10, a subscriber includes max_backlog=10
in the options for the command.
To improve concurrency for subscribers, 60East recommends using a backlog of at least 2
. This allows efficient pipelined delivery, as the consumer can be processing one message while the previous message is being acknowledged. With a max_backlog
higher than 1
, the consumer never needs to be stopped waiting for the next message from the queue.
When a queue provides at-least-once
delivery, AMPS provides three different algorithms for distributing messages among subscribers. Each algorithm has different performance and fairness guarantees. For at-most-once
delivery, AMPS supports only the round-robin
method of distributing messages.
AMPS defaults to proportional
delivery for at-least-once
queues and defaults to round-robin
(the only valid delivery model) for at-most-once
queues.
Each instance of AMPS manages delivery strategy from among the messages that it currently owns and the subscriptions that are present on that instance.
Delivery strategies apply only to a single instance, and are not applied across instances.
Subscribers must acknowledge each message to indicate to AMPS that a message has been processed. The point at which a subscriber acknowledges a message depends on the exact processing that the subscriber performs and the processing guarantees for the application. In general, applications acknowledge messages at the point at which the processing has a result that is durable and which would require an explicit action (such as another message) to change.
Some common points at which to acknowledge a message are:
When processing is fully completed.
When work is performed that would require a compensating action (that is, when information is committed to a database or forwarded to a downstream system).
When work is submitted to a processor that is guaranteed to either succeed or explicitly indicate failure.
To acknowledge a message, the subscriber typically uses the acknowledge convenience methods in the AMPS client. These commands issue a sow_delete
command with the bookmark from the message to acknowledge. AMPS allows subscribers to acknowledge multiple messages simultaneously by providing a comma-delimited list of bookmarks in the sow_delete
command: the AMPS clients provide facilities to batch acknowledgments for efficiency.
AMPS allows an application to acknowledge messages by providing a filter on a sow_delete
command. In this case, the sow_delete
acknowledges all messages that match the filter, regardless of whether the application that sends the command has a current lease on a given message or not. (The Leasing
parameter on the queue specifies whether AMPS allows a client to successfully acknowledge messages that it does not currently have leased.)
For queues that use the at-least-once
delivery model, there are two additional options available for acknowledging messages.
To return a message to the queue without processing it, the subscriber provides the cancel
option on the acknowledgment. In this case, AMPS returns the message to the queue just as though the lease had expired. If the message is eligible for redelivery (that is, it has not exceeded the maximum time or maximum cancels configured for the queue), it is redelivered. Otherwise, the message is expired from the queue. The MaxCancels
option allows you to configure how many times a message can be returned to the queue for redelivery before AMPS expires the message.
To immediately remove a message from the queue, the subscriber provides the expire
option. In this case, AMPS does not return the message to the queue for redelivery, but instead immediately expires the message from the queue and triggers any configured amps-action-on-sow-expire-message
actions that monitor the queue. The expire
option can be especially helpful if your application can determine that the message cannot be successfully processed (for example, the message is unparseable or contains invalid data).
Options for acknowledging messages delivered from at-least-once
queues:
For at-most-once
queues, these options to acknowledgments have no effect, since the message is guaranteed not to be redelivered even if processing fails.
A queue can, optionally, save the state of the queue as metadata in the journal directory. This can reduce the active memory footprint required for large queues, since the state of the queue can be paged out if necessary. This can also potentially improve recovery time if there are a large number of unacknowledged messages in the queue when AMPS starts. To persist metadata for a queue, set the FileBackedMetadata
option to enabled
for the queue.
When FileBackedMetadata
is enabled, queue state that is not currently needed can be paged out to the file. This can substantially reduce the active memory needed for large queues that are infrequently used.
Notice that the state will only be paged out as necessary. While this option increases AMPS ability to manage queue state in low-memory situations, AMPS will still retain state in memory if memory is available to improve performance.
AMPS queues provide the following optional message delivery behaviors:
A Priority
can be specified for a queue, in which case AMPS delivers messages in priority order rather than the order in which the instance received the messages.
A BarrierExpression
can be specified to provide a synchronization point. When a message matches the expression, all previous messages must be acknowledged before AMPS will deliver the message that matches the expression, or subsequent messages.
These options are described in more detail in the Advanced Queue Configuration section.
The message flow for AMPS queues is as follows. The message flow differs depending on whether the queue is configured for at-most-once
delivery or at-least-once
delivery.
When the queue is configured for at-most-once
delivery:
A publisher publishes a message to an underlying topic.
The message becomes available in the queue.
For a message that is not a barrier message, the message is published to a subscriber when:
There is a subscription that matches the message, and the subscriber is entitled to see the message
The message is the oldest message in the queue that matches the subscription (if no Priority
is specified for the queue) or it has the highest priority value for messages in the queue and is the oldest message at that priority (if Priority
is specified).
The subscription has remaining capacity in its backlog
The subscription is the next subscription to receive the message as determined by the delivery fairness for the queue
AMPS removes the message from the queue when the message is published.
For a message that is a barrier message, AMPS does not deliver the message until all previous messages have been acknowledged. AMPS does not deliver messages after the barrier message until the barrier message has been delivered.
If no subscription has requested the message, and the message has been in the queue longer than the Expiration
time, AMPS removes the message from the queue. With AMPS queues, message expiration is considered to be a normal way for the message to leave the queue, and is not considered an error.
The subscriber processes the message, and acknowledges the message when processing is finished to indicate to AMPS that the subscriber has capacity for another message.
When the queue is configured for at-least-once
delivery:
A publisher publishes a message to an underlying topic.
The message becomes available in the queue.
For a message that is not a barrier message, the message is published to a subscriber when:
There is a subscription that matches the message
The message is the oldest message in the queue that matches the subscription (if no Priority
is specified for the queue) or it has the highest priority value for messages in the queue and is the oldest message at that priority (if Priority
is specified).
The subscription has remaining capacity in its backlog
The subscription is the next subscription to receive a message as determined by the delivery fairness for the queue
AMPS calculates the lease time for the message and provides that time to the subscriber along with the message.
For a message that is a barrier message (that is, the queue has a BarrierExpression
specified and the message matches that expression), AMPS does not deliver the message until all previous messages have been acknowledged. AMPS will not deliver messages after the barrier message until the barrier message is delivered.
If the message has been in the queue longer than the Expiration
time, and there is no current lease on the message, AMPS removes the message from the queue.
If a subscriber has received the message, but has not removed the message from the queue at the time the lease expires, AMPS returns the message to the queue if the message has been in the queue less than the Expiration
time, and it has been delivered fewer than the MaxDeliveries
limit. If the message has been in the queue longer than the Expiration
time or has been delivered more than the number of times specified by MaxDeliveries
, AMPS removes the message from the queue when the lease expires.
The subscriber processes the message, and removes the message from the queue by acknowledging the message (which is translated by the AMPS client into the appropriate sow_delete
command).
When the metadata for a queue is not persisted to a file, or if that file is not fully up to date with the transaction log, AMPS recovers the current state of a queue from the transaction log when the instance starts.
The point at which AMPS chooses to start recovery for a given queue is as follows:
If this is a new queue, or the configured RecoveryPoint
for the queue has changed since the last time AMPS started, begin queue recovery at the configured recovery point. (This defaults to the beginning of the transaction log if no explicit RecoveryPoint
is configured for the queue.)
Otherwise, start from the most recent point in the transaction log at which all previous messages have been acknowledged. This recovery point is stored in the queues.ack
file in the journal directory for all queues in the instance. If that recovery point is older than the journals currently available, begin from the beginning of the transaction log.
To recover the state of the queue, AMPS proceeds through the journal file from the recovery point and restores the state of the queue based on the operations in the transaction log for that queue.
AMPS determines the oldest recovery point for all of the queues in the instance, and reads journals from that point forward to the end of the transaction log. AMPS rebuilds the state of each queue as the replay progresses past the recovery point for that queue.
Algorithm | Description |
---|---|
fast
This strategy optimizes for the lowest latency.
AMPS delivers the message to the first subscription found that does not have a full backlog. With this algorithm, AMPS tries to minimize the time spent determining which subscription receives the message without attempting to distribute messages fairly across subscriptions.
round-robin
This strategy optimizes for general fairness across subscriptions.
AMPS delivers the message to the next available subscription that does not have a full backlog. With this algorithm, AMPS delivers messages evenly among the subscribers that have space in their backlog.
proportional
This strategy optimizes for delivery to subscriptions with the most unused capacity.
AMPS delivers the message to the subscription that has the highest proportion of backlog capacity unused. AMPS determines this by taking the ratio of unacknowledged messages to the maximum backlog.
For example, if there are three active subscribers for the queue, with backlog settings and outstanding messages as follows:
Subscriber Inky
: max_backlog=2
, currently leased 1 message
Subscriber Blinky
: max_backlog=4
, currently leased 3 messages
Subscriber Clyde
: max_backlog=10
, currently leased 4 messages
In this case, with proportional
delivery, a new message for the queue will be delivered to Clyde, since that subscriber has only filled 40% of the backlog, as compared with 50% for Inky and 75% for Blinky.
If more than one subscription has the same unused capacity, AMPS delivers the message to the first subscription found with that capacity.
Option
Result
<none>
Message is considered to be successfully processed and removed from the queue.
cancel
Message is returned to the queue.
The count of cancels for this message is incremented, and if the count is greater than the configured MaxCancels
for the queue, the message is expired.
expire
Message is immediately expired from the queue.