Advanced Queue Configuration

This section describes more advanced configuration settings for AMPS message queues.

Using Multiple Underlying Topics

AMPS queues can contain messages from any number of underlying topics. This provides a flexible delivery model, and allows applications to populate multiple queues with a single publish to AMPS, which simplifies publisher code, reduces bandwidth, and ensures that the message is provided to all queues from the same point in the message stream.

To create a queue that includes messages from underlying topics, you provide a regular expression that matches the set of topic names that contain messages for the queue. You also provide a DefaultPublishTarget that specifies the topic name for AMPS to use when a message is published directly to the queue topic.

For example, you might configure a set of topics as follows:

<SOW>
  ...
  <Queue>
     <Name>ORDERS_ANALYTICS</Name>
     <MessageType>json</MessageType>
     <UnderlyingTopic>^ORDERS$|^ORDERS_ANALYTICS_DIRECT$</UnderlyingTopic>
     <DefaultPublishTarget>ORDERS_ANALYTICS_DIRECT</DefaultPublishTarget>
  </Queue>
  <Queue>
     <Name>ORDERS_RISK</Name>
     <MessageType>json</MessageType>
     <UnderlyingTopic>^ORDERS$|^ORDERS_RISK_DIRECT$</UnderlyingTopic>
     <DefaultPublishTarget>ORDERS_RISK_DIRECT</DefaultPublishTarget>
  </Queue>
  ...
</SOW>

In this case, when a message is published to the ORDERS topic, both the ORDERS_ANALYTICS and the ORDERS_RISK queues deliver the message. However, a publisher can also publish directly to each queue by publishing a message to the _DIRECT topic for that queue. Furthermore, any publish to the name of the queue will be routed to the appropriate _DIRECT topic.

The following table demonstrates how messages are provided to topics with this configuration:

Publish To
Results

ORDERS

Both ORDERS_ANALYTICS and ORDERS_RISK enqueue the message, since ORDERS matches the UnderlyingTopic of both queues.

ORDERS_ANALYTICS

The message is published to the DefaultPublishTarget of ORDERS_ANALYTICS, which is ORDERS_ANALYTICS_DIRECT.

The message is then enqueued to ORDERS_ANALYTICS, since ORDERS_ANALYTICS_DIRECT matches the UnderlyingTopic of ORDERS_ANALYTICS.

ORDERS_RISK

The message is published to the DefaultPublishTarget of ORDERS_RISK, which is ORDERS_RISK_DIRECT.

The message is then enqueued to ORDERS_RISK, since ORDERS_RISK_DIRECT matches the UnderlyingTopic ORDERS_RISK.

ORDERS_ANALYTICS_DIRECT

The message is published to ORDERS_ANALYTICS_DIRECT, and is then enqueued to ORDERS_ANALYTICS.

ORDERS_RISK_DIRECT

The message is published to ORDERS_RISK_DIRECT, and is then enqueued to ORDERS_RISK.

Priority Queues

AMPS includes the ability to specify that messages from a queue are delivered in order of priority rather than being delivered strictly in publication order. To enable prioritization on a Queue, add the Priority tag to the queue configuration and specify the field or expression to use to set the priority. When a queue definition specifies Priority, AMPS orders delivery based on a descending sort of the value calculated for the Priority rather than in descending order of age.

Priority expressions use the same grammar as other expressions in AMPS, as described in AMPS Expressions. The results of the Priority expression are interpreted as an unsigned long. That is, the result should be a positive integer that can be represented in 64 bits. Non-numeric values or NULL are treated as NaN, and have the lowest priority.

For example, to create a queue that delivers messages in order of the product of the price and quantity specified in the message, you could use the following queue definition:

<AMPSConfig>
    ...
    <!-- Other configuration, including
         recording  that OrderPriority
         and Orders are recorded in the
         transaction log.
      -->

    <SOW>
        ...
        <Queue>
            <Name>OrderPriority</Name>
            <MessageType>json</MessageType>
            <Semantics>at-least-once</Semantics>
            <UnderlyingTopic>Orders</UnderlyingTopic>
            <Priority>/price * /qty</Priority>
        </Queue>
    </SOW>

    ...
</AMPSConfig>

Synchronizing Work with Barrier Messages

AMPS queues provide the ability to synchronize work across a set of subscribers by sending the same message to all subscribers simultaneously, when the queue is fully processed up to the point of the message.

These messages are referred to as barrier messages, since they are roughly equivalent to the concept of a barrier in multithreaded programming.

Barrier messages simplify coordination between current subscribers to a queue. They allow you to easily coordinate operations (such as end of day processing), update reference data at the precise point in the message stream that the update should take effect, send a shutdown message when all available work is processed, and so on. Since barrier messages are integrated with queue delivery, there is no need to write extra code to correlate the barrier message with the correct point in the queue, to manage multiple subscriptions, or to write code that guarantees delivery to all current subscribers.

To specify which messages should be treated as barrier messages, the queue configuration includes a BarrierExpression element that contains a filter. All messages that match that filter are considered barrier expressions.

When a message matches the BarrierExpression, AMPS delivers the message in the following way:

  • AMPS does not deliver the message until all previous messages in the queue are acknowledged.

  • AMPS does not deliver messages that are after the barrier message until the barrier message is sent to subscribers.

  • When all previous messages in the queue are acknowledged, AMPS delivers the barrier message to all current subscribers to the queue.

  • AMPS immediately removes the message from the queue, without requiring acknowledgment from any subscriber.

A barrier message sent to a subscriber does not count toward that subscriber's backlog, since the barrier message is immediately acknowledged. When sending the barrier message, AMPS does not consider the current backlog of the subscriber (so, subscribers that request max_backlog=0 or which use a regular expression topic and have their backlog filled by messages from other topics will still receive the barrier message).

AMPS does, however, apply content filters and entitlement filters for each subscription when delivering the message. If the content filter or entitlement filter for a subscription does not match the barrier message, that subscription will not receive the message. The fact that this subscription does not receive the actual barrier message does not affect the behavior of the barrier itself. A subscription that does not match the message will still see message delivery pause until the barrier is released, but will not receive the actual barrier message.

Each AMPS instance manages the delivery of barrier messages for the subscribers connected to that instance, when all messages prior to the barrier message in the local queue have been acknowledged.

Limiting Currently Deliverable Messages

In some cases, it may be important to limit the maximum number of messages that AMPS considers to be part of the currently active, deliverable part of the queue. This can be advantageous in cases where memory is limited, or in cases where it is expected that a queue is only processed infrequently (for example, a reconciliation process that runs at the end of each day).

The TargetQueueDepth option on a queue allows you to set the target depth for the number of messages that AMPS will consider to be deliverable at a given time.

If the number of messages for the queue exceeds the TargetQueueDepth on an instance, messages beyond the specified depth are considered to be inactive (though still part of the queue). Those messages are not deliverable to subscribers on this instance. They do not appear in a sow query of the queue, and are not considered when a view over the queue is calculated. A sow_delete that uses a filter will only apply to the messages in the queue that are currently deliverable. As messages are acknowledged from the queue, AMPS will add messages to the active part of the queue until the active part of the queue once again reaches the TargetQueueDepth. For messages that are not currently deliverable, AMPS will not read those messages from the transaction log and will not maintain queue state for those messages. This also means that, if no messages within the TargetQueueDepth match a given subscription, no messages will be delivered to that subscription even if later messages in the transaction log match the subscription.

Although AMPS does not consider messages that are currently inactive to be deliverable, AMPS will preserve queue delivery guarantees in the event that a message that is not currently deliverable is acknowledged before it enters the queue and becomes deliverable. To do this, AMPS keeps a list of acknowledgments that have been received for messages that are not in the queue. When a message that would be added to the active part of the queue has already been acknowledged, AMPS does not add that message to the queue. Applications that use TargetQueueDepth to limit memory growth should avoid a pattern where messages are acknowledged before they enter the queue to reduce memory consumption for acknowledgments that cannot yet be processed.

Likewise, if a transfer request from another instance arrives for a message that is in the transaction log, but is beyond the limit set by the TargetQueueDepth, AMPS will process messages further into the queue until it reaches the message to be transferred. At that point, AMPS will transfer the message and stop processing further messages until either:

  • The current depth drops below the TargetQueueDepth, or

  • AMPS once again receives a request for a message that is in the transaction log, but beyond the current active messages in the queue.

A TargetQueueDepth cannot be set for a queue that specifies a Priority or a BarrierExpression.

Last updated

Copyright 2013-2024 60East Technologies, Inc.