Understanding AMPS Queuing

AMPS message queues take advantage of the full historical and transactional power of the AMPS engine. Each queue is implemented as a view over an underlying topic or set of topics. Each of the underlying topics must be recorded in a transaction log. Publishers publish to the underlying topic, and the messages are recorded in the transaction log. Consumers simply subscribe to the queue. AMPS tracks which messages have been delivered to subscribers and which messages have been processed by subscribers. AMPS delivers the next available message to the next subscriber.

Unlike traditional queues, which require consumers to poll for messages, AMPS queues use a subscription model. In this model, each queue consumer requests that AMPS provide messages from the queue. The consumer can also request a maximum number of messages to have outstanding from the queue at any given time, referred to as the backlog for that consumer. When a message is available, and the consumer has fewer messages outstanding than the backlog for that consumer, AMPS delivers the message to the consumer. This improves latency and conserves bandwidth, since there is no need for consumers to repeatedly poll the queue to see if work is available. In addition, the server maintains an overall view of the consumers, which allows the server to control message distribution strategies to optimize for latency, optimize to deliver to clients with the most unused capacity, or optimize for general fairness.

The following diagram presents a simplified view of an AMPS queue:

As the diagram indicates, a queue tracks a set of messages in the transaction log. The messages the queue is currently tracking are considered to be in the queue. When the queue delivers a message, it marks the message as having been delivered (shown as leased in the diagram above). Messages that have been processed are no longer tracked by the queue (for example, the message for the order 1 in the diagram above). When a message has been delivered and processed, that event is recorded in the transaction log to ensure that the queue meets the delivery guarantees even across restarts of AMPS.

Since queues are implemented as views over underlying topics, AMPS allows you to create any number of queues over the same underlying topic. Each queue tracks messages to the topic independently, and can have different policies for delivery and fairness. When a queue topic has a different name than the underlying topic, you can subscribe to the underlying topic directly, and that subscription is to the underlying (non-queue) topic. When a queue topic has the same name as the underlying topic (the default), all subscriptions to that topic are to the queue. (Notice that bookmark subscriptions to a queue are pub/sub subscriptions that replay from the underlying topic in the transaction log, so the behavior in that case is the same as if the subscription was directly to the underlying topic.)

Likewise, AMPS queues work seamlessly with the AMPS entitlement system. Permissions to queues are managed the same way permissions are managed to any other topic, as described in the Entitlement section, except that read permission to a queue also grants the ability to acknowledge messages (though not to publish messages to the queue).

While a message is in a queue, AMPS does not retain an extra copy of the message. Instead, AMPS retains in memory a small data structure indicating the state of the message and the position of the message in the transaction log. The amount of memory consumed by a queue is approximately 200 bytes per message, regardless of the size of the messages.

AMPS queues provide a variety of options to help you tailor the behavior of each queue to meet your application's needs.

Delivery Semantics

AMPS queues deliver a message to a single subscriber at a time. In the most common case, a message is delivered to exactly one subscriber, and that subscriber processes the message.

In the case that a subscriber does not successfully process a message, AMPS provides two delivery semantics to precisely specify the handling of the unprocessed message:

  • With at-least-once delivery, AMPS delivers the message to one subscriber at a time, and expects that subscriber to explicitly remove the message from the queue when the message has been received and processed. With this guarantee, each message from the queue must be processed within a specified timeout, or lease period. AMPS tracks the amount of time since the message was sent to the subscriber. If the subscriber has not responded by removing the message within the lease period, AMPS revokes the lease and the message is available to another subscriber. AMPS allows you to set limits on the number of times a message is made available to another subscriber, using the MaxCancels and MaxDeliveries configuration options.

    In this model, receiving a message is the equivalent of a non-destructive get from a traditional queue. To acknowledge and remove the message, a subscriber uses the sow_delete command with the bookmark of the message.

    Leases are broken and messages are returned to the queue if the lease holder disconnects from AMPS. This ensures that, if a message processor fails or loses its connection to AMPS, the message can immediately be processed by another message processor.

  • With at-most-once delivery, AMPS removes the message from the queue as soon as the message is sent to a subscriber. However, the subscriber still needs to acknowledge that the message was processed, so that AMPS can track the subscription backlog, as described below.

    In this model, receiving a message is the equivalent of a destructive get from a traditional queue. The message is immediately removed by AMPS, and is no longer available in the queue.

Choosing Delivery Semantics to Handle Failures

Regardless of the delivery semantics you choose, during typical message processing from a queue, AMPS delivers each message in the queue to a single subscriber, with no duplicates or redelivery.

The difference between at-most-once and at-least-once semantics is important in cases where a failure happens. With at-most-once delivery, the message will not be redelivered even if the subscriber fails to process the message. With at-least-once delivery, the message will be redelivered to subscribers until the message is either explicitly acknowledged, explicitly removed, or expired based on the queue policy for expiration or retries.

With either setting, a message is delivered exactly once, to a single subscriber, during normal processing.

Consider the following recommendations when deciding how you would like AMPS to handle cases where the subscriber that receives the message fails to process the message:

  1. For ephemeral or lower-value data, where message loss in the case of failure is preferable to duplicating message delivery, consider at-most-once semantics. For example, processing a stream of events from sensors might fall into this category: each message should be processed once, in order, during normal operation, but missing a single data point from time to time may be less disruptive than reconciling duplicates.

  2. For higher value data, where duplicate message delivery in the event of failure is preferable to message loss, consider at-least-once semantics. For example, if inserting the same data into a database twice would simply be an in-place update of the same record with the same information, having a duplicate insert happen occasionally in the event of failure may be preferable to losing a message when a failure occurs.

  3. For higher value data, where manual intervention is required to reconcile messages when there is a question as to whether the message has been correctly processed, consider using at-least-once message delivery with a MaxDeliveries setting and an action to move failed messages to a dead-letter queue for manual reconciliation.

  4. For higher value data that is part of a transactional dataflow, where processors maintain state, it can be useful to include information about the message processed in the transaction. This can be used with at-least-once messaging to guarantee that the message is only processed once: if a processor receives a message that is already marked as processed in the transactional store, the processor knows that the message has already been acted on and should be acknowledged without further processing. This is the most common pattern used to produce the result that each message is processed "exactly once". This pattern involves additional overhead and processing, trading off increased work in the application for stronger guarantees that a message is only processed one time.

Subscription Backlog

For efficiency, queues in AMPS use a push model of delivery, providing messages to consumers when the message becomes available rather than requiring the consumer to poll the queue. To manage the workload among consumers, AMPS queues keep track of a subscription backlog. This backlog is the number of messages that have been provided to an individual subscription that have not yet been acknowledged. This backlog helps AMPS provide strong delivery guarantees while still optimizing for high throughput processing. AMPS calculates the subscription backlog for each subscription by calculating the minimum of the following:

  • The minimum MaxPerSubscriptionBacklog setting for the queues matched by the subscription

    or

  • The max_backlog specified on the subscribe command

Notice that, if a subscriber does not provide a max_backlog on a subscription, AMPS defaults to a max_backlog of 1. In practical terms, this means that an application must explicitly specify a backlog to be able to receive more than one message from a queue at a time, regardless of the queue configuration.

Subscribers request a max_backlog by adding the request to the options string of the subscribe command. For example, to request a max_backlog of 10, a subscriber includes max_backlog=10 in the options for the command.

To improve concurrency for subscribers, 60East recommends using a backlog of at least 2. This allows efficient pipelined delivery, as the consumer can be processing one message while the previous message is being acknowledged. With a max_backlog higher than 1, the consumer never needs to be stopped waiting for the next message from the queue.

Delivery Fairness

When a queue provides at-least-once delivery, AMPS provides three different algorithms for distributing messages among subscribers. Each algorithm has different performance and fairness guarantees. For at-most-once delivery, AMPS supports only the round-robin method of distributing messages.

AlgorithmDescription

fast

This strategy optimizes for the lowest latency.

AMPS delivers the message to the first subscription found that does not have a full backlog. With this algorithm, AMPS tries to minimize the time spent determining which subscription receives the message without attempting to distribute messages fairly across subscriptions.

round-robin

This strategy optimizes for general fairness across subscriptions.

AMPS delivers the message to the next available subscription that does not have a full backlog. With this algorithm, AMPS delivers messages evenly among the subscribers that have space in their backlog.

proportional

This strategy optimizes for delivery to subscriptions with the most unused capacity.

AMPS delivers the message to the subscription that has the highest proportion of backlog capacity unused. AMPS determines this by taking the ratio of unacknowledged messages to the maximum backlog.

For example, if there are three active subscribers for the queue, with backlog settings and outstanding messages as follows:

  • Subscriber Inky: max_backlog=2, currently leased 1 message

  • Subscriber Blinky: max_backlog=4, currently leased 3 messages

  • Subscriber Clyde: max_backlog=10, currently leased 4 messages

In this case, with proportional delivery, a new message for the queue will be delivered to Clyde, since that subscriber has only filled 40% of the backlog, as compared with 50% for Inky and 75% for Blinky.

If more than one subscription has the same unused capacity, AMPS delivers the message to the first subscription found with that capacity.

AMPS defaults to proportional delivery for at-least-once queues and defaults to round-robin (the only valid delivery model) for at-most-once queues.

Each instance of AMPS manages delivery strategy from among the messages that it currently owns and the subscriptions that are present on that instance.

Delivery strategies apply only to a single instance, and are not applied across instances.

Acknowledging Messages

Subscribers must acknowledge each message to indicate to AMPS that a message has been processed. The point at which a subscriber acknowledges a message depends on the exact processing that the subscriber performs and the processing guarantees for the application. In general, applications acknowledge messages at the point at which the processing has a result that is durable and which would require an explicit action (such as another message) to change.

Some common points at which to acknowledge a message are:

  • When processing is fully completed.

  • When work is performed that would require a compensating action (that is, when information is committed to a database or forwarded to a downstream system).

  • When work is submitted to a processor that is guaranteed to either succeed or explicitly indicate failure.

To acknowledge a message, the subscriber typically uses the acknowledge convenience methods in the AMPS client. These commands issue a sow_delete command with the bookmark from the message to acknowledge. AMPS allows subscribers to acknowledge multiple messages simultaneously by providing a comma-delimited list of bookmarks in the sow_delete command: the AMPS clients provide facilities to batch acknowledgments for efficiency.

AMPS allows an application to acknowledge messages by providing a filter on a sow_delete command. In this case, the sow_delete acknowledges all messages that match the filter, regardless of whether the application that sends the command has a current lease on a given message or not. (The Leasing parameter on the queue specifies whether AMPS allows a client to successfully acknowledge messages that it does not currently have leased.)

For queues that use the at-least-once delivery model, there are two additional options available for acknowledging messages.

  • To return a message to the queue without processing it, the subscriber provides the cancel option on the acknowledgment. In this case, AMPS returns the message to the queue just as though the lease had expired. If the message is eligible for redelivery (that is, it has not exceeded the maximum time or maximum cancels configured for the queue), it is redelivered. Otherwise, the message is expired from the queue. The MaxCancels option allows you to configure how many times a message can be returned to the queue for redelivery before AMPS expires the message.

  • To immediately remove a message from the queue, the subscriber provides the expire option. In this case, AMPS does not return the message to the queue for redelivery, but instead immediately expires the message from the queue and triggers any configured amps-action-on-sow-expire-message actions that monitor the queue. The expire option can be especially helpful if your application can determine that the message cannot be successfully processed (for example, the message is unparseable or contains invalid data).

Options for acknowledging messages delivered from at-least-once queues:

Option

Result

<none>

Message is considered to be successfully processed and removed from the queue.

cancel

Message is returned to the queue.

The count of cancels for this message is incremented, and if the count is greater than the configured MaxCancels for the queue, the message is expired.

expire

Message is immediately expired from the queue.

For at-most-once queues, these options to acknowledgments have no effect, since the message is guaranteed not to be redelivered even if processing fails.

Persisting Metadata for Queues

A queue can, optionally, save the state of the queue as metadata in the journal directory. This can reduce the active memory footprint required for large queues, since the state of the queue can be paged out if necessary. This can also potentially improve recovery time if there are a large number of unacknowledged messages in the queue when AMPS starts. To persist metadata for a queue, set the FileBackedMetadata option to enabled for the queue.

When FileBackedMetadata is enabled, queue state that is not currently needed can be paged out to the file. This can substantially reduce the active memory needed for large queues that are infrequently used.

Notice that the state will only be paged out as necessary. While this option increases AMPS ability to manage queue state in low-memory situations, AMPS will still retain state in memory if memory is available to improve performance.

Optional Message Delivery Behaviors

AMPS queues provide the following optional message delivery behaviors:

  • A Priority can be specified for a queue, in which case AMPS delivers messages in priority order rather than the order in which the instance received the messages.

  • A BarrierExpression can be specified to provide a synchronization point. When a message matches the expression, all previous messages must be acknowledged before AMPS will deliver the message that matches the expression, or subsequent messages.

These options are described in more detail in the Advanced Queue Configuration section.

Message Flow for Queues

The message flow for AMPS queues is as follows. The message flow differs depending on whether the queue is configured for at-most-once delivery or at-least-once delivery.

When the queue is configured for at-most-once delivery:

  1. A publisher publishes a message to an underlying topic.

  2. The message becomes available in the queue.

  3. For a message that is not a barrier message, the message is published to a subscriber when:

    • There is a subscription that matches the message, and the subscriber is entitled to see the message

    • The message is the oldest message in the queue that matches the subscription (if no Priority is specified for the queue) or it has the highest priority value for messages in the queue and is the oldest message at that priority (if Priority is specified).

    • The subscription has remaining capacity in its backlog

    • The subscription is the next subscription to receive the message as determined by the delivery fairness for the queue

    AMPS removes the message from the queue when the message is published.

  4. For a message that is a barrier message, AMPS does not deliver the message until all previous messages have been acknowledged. AMPS does not deliver messages after the barrier message until the barrier message has been delivered.

  5. If no subscription has requested the message, and the message has been in the queue longer than the Expiration time, AMPS removes the message from the queue. With AMPS queues, message expiration is considered to be a normal way for the message to leave the queue, and is not considered an error.

  6. The subscriber processes the message, and acknowledges the message when processing is finished to indicate to AMPS that the subscriber has capacity for another message.

When the queue is configured for at-least-once delivery:

  1. A publisher publishes a message to an underlying topic.

  2. The message becomes available in the queue.

  3. For a message that is not a barrier message, the message is published to a subscriber when:

    • There is a subscription that matches the message

    • The message is the oldest message in the queue that matches the subscription (if no Priority is specified for the queue) or it has the highest priority value for messages in the queue and is the oldest message at that priority (if Priority is specified).

    • The subscription has remaining capacity in its backlog

    • The subscription is the next subscription to receive a message as determined by the delivery fairness for the queue

    AMPS calculates the lease time for the message and provides that time to the subscriber along with the message.

  4. For a message that is a barrier message (that is, the queue has a BarrierExpression specified and the message matches that expression), AMPS does not deliver the message until all previous messages have been acknowledged. AMPS will not deliver messages after the barrier message until the barrier message is delivered.

  5. If the message has been in the queue longer than the Expiration time, and there is no current lease on the message, AMPS removes the message from the queue.

  6. If a subscriber has received the message, but has not removed the message from the queue at the time the lease expires, AMPS returns the message to the queue if the message has been in the queue less than the Expiration time, and it has been delivered fewer than the MaxDeliveries limit. If the message has been in the queue longer than the Expiration time or has been delivered more than the number of times specified by MaxDeliveries, AMPS removes the message from the queue when the lease expires.

  7. The subscriber processes the message, and removes the message from the queue by acknowledging the message (which is translated by the AMPS client into the appropriate sow_delete command).

Startup and Recovery for Queues

When the metadata for a queue is not persisted to a file, or if that file is not fully up to date with the transaction log, AMPS recovers the current state of a queue from the transaction log when the instance starts.

The point at which AMPS chooses to start recovery for a given queue is as follows:

  • If this is a new queue, or the configured RecoveryPoint for the queue has changed since the last time AMPS started, begin queue recovery at the configured recovery point. (This defaults to the beginning of the transaction log if no explicit RecoveryPoint is configured for the queue.)

  • Otherwise, start from the most recent point in the transaction log at which all previous messages have been acknowledged. This recovery point is stored in the queues.ack file in the journal directory for all queues in the instance. If that recovery point is older than the journals currently available, begin from the beginning of the transaction log.

To recover the state of the queue, AMPS proceeds through the journal file from the recovery point and restores the state of the queue based on the operations in the transaction log for that queue.

AMPS determines the oldest recovery point for all of the queues in the instance, and reads journals from that point forward to the end of the transaction log. AMPS rebuilds the state of each queue as the replay progresses past the recovery point for that queue.

Last updated

Copyright 2013-2024 60East Technologies, Inc.