On Message Affinity

AMPS includes a module to help with building a message affinitization strategy.

With message affinitization, each record in a SOW Topic can be assigned to a single affinitized client. The amps-action-on-message-affinity module handles monitoring the SOW topic and running an action when a key is affinitized or de-affinitized.

A client connection indicates that it wants to participate in affinitization by subscribing to a topic to be used for the assignment metadata, referred to as the "control topic".

Notice that although this action runs when message affinitization is updated, the On step does not, by itself, specify what happens when affinitization is updated. It is up to the Action configuration to take appropriate steps when affinitization is updated.

This module keeps track of affinitizing messages to processors and runs an action when affinity is assigned or changed. However, it is up to the configuration of the action to manage alerting processors, and it is up to the processors themselves to respond to the alert and adjust their subscriptions accordingly.

This action requires the following parameters. In addition to that, this module also accepts an optional parameter listed below:

ParameterDescription

MessageType (required)

The message type of the monitored topic and the control topic.

ControlTopic (required)

The name of the control topic.

This topic is monitored for subscriptions. Any client that subscribes to the ControlTopic is considered to be eligible to have messages affinitized to that client.

DataTopic (required)

The name of the topic that contains the messages to be affinitized. Each distinct record in the topic (that is, each distinct SOW Key) will be affinitized.

By default, all data within the topic will be affinitized. Optionally, you can restrict affinitization to only certain records (for example, orders in a certain state) by setting the DataFilter option.

DataFilter

When present, restricts affinitization to only those messages in the DataTopic that also match the DataFilter.

This module adds the following variables to the AMPS context:

VariableDescription

AMPS_DATA

The data of the message being affinitized.

AMPS_CLIENT_NAME

The name of the client the message is being affinitized to or removed from.

AMPS_AFFINITY_ACTION

The type of event.

If the message is being affinitized to this client, the type will be assign. If the message is being de-affinitized, the type will be unassign.

AMPS_AFFINITY_REASON

The reason for the event.

For example, if the message no longer matches the DataFilter or has been deleted, the reason would be oof. If the update is happening because the client that was assigned to the message is no longer active, the reason would be unsubscribe.

Considerations and Limitations

The amps-action-on-message-affinity module has the following considerations for use:

  • Affinitization is tracked independently on each instance of AMPS. In a replicated mesh of AMPS instances, each instance independently monitors the DataTopic and ControlTopic. AMPS does not make any effort to coordinate affinitization across replicated instances.

  • The affinitization module relies on the SOW key (or grouping, for affinitization over views) to determine message identity. Therefore, affinitization can only be used for a Topic, RegexTopic, View, or ConflatedTopic. Since each message in a Queue, LocalQueue, or GroupLocalQueue is considered to be a distinct message, monitoring one of these topic types for affinitization would not produce useful results (however, if the underlying topic of a queue is maintained as a Topic in the SOW, using that Topic for affinitization could be useful).

Affinitization Configuration Example

The following example shows one way to use the amps-action-on-message-affinity module to assign specific symbols to a processor. In this example, a processor would subscribe to the symbol_processor_assignments topic to receive the symbols that have been affinitized to that processor. It would then maintain a subscription to the orders topic with a filter that limits the subscription to just the symbols that have been provided to the subscriber from the symbol_processor_assignments topic.

When a processor receives an event with a symbol and the event "assign", the processor adds the symbol in that event to the list of symbols in the filter for the subscription to the orders topic (typically, using the replace option to adjust the existing subscription in place). When a processor receives an event with a symbol and the event "unassign", the processor removes that symbol from the filter for the subscription to the orders topic.

<Action>
   <On>
     <Module>amps-action-on-message-affinity</Module>
     <Options>
       <MessageType>json</MessageType>
       <DataTopic>symbols</DataTopic>
       <ControlTopic>symbol_processor_assignments</ControlTopic>
     </Options>
   </On>
   <Do>
       <Module>amps-action-do-extract-values</Module>
       <Options>
         <Data>{{AMPS_DATA}}</Data>
        <MessageType>json</MessageType>
         <Value>SYMBOL=/symbol</Value>
       </Options>
   </Do>
   <Do>
     <Module>amps-action-do-publish-message</Module>
     <Options>
       <MessageType>json</MessageType>
       <Topic>symbol_processor_assignments</Topic>
       <Data>{"client_name":"{{AMPS_CLIENT_NAME}}",
              "symbol":"{{SYMBOL}}",
              "event":"{{AMPS_AFFINITY_ACTION}}",
              "reason":"{{AMPS_AFFINITY_REASON}}"}</Data>
     </Options>
   </Do>
   <If>
     <Module>amps-action-if-condition</Module>
     <Options>
        <Condition>"{{AMPS_AFFINITY_ACTION}}" == "unassign"</Condition>
     </Options>
   </If>
   <Do>
     <Module>amps-action-do-delete-sow</Module>
     <Options>
        <Topic>symbol_processor_assignments</Topic>
        <MessageType>json</MessageType>
        <Filter>/symbol = "{{SYMBOL}}"</Filter>
     </Options>
   </Do>
 </Action>

The underlying topics can be defined as needed. For example, the following configuration automatically tracks the set of symbols in an orders topic to be affinitized using the action above:

<SOW>
  <Topic>
     <Name>orders</Name>
     <MessageType>json</MessageType>
     <Key>/id</Key>
     <FileName>${AMPS_DATA}/sow/%n.sow</FileName>
  </Topic>
  <View>
     <Name>symbols</Name>
     <MessageType>json</MessageType>
     <UnderlyingTopic>orders</UnderlyingTopic>
     <Projection><Field>/symbol</Field></Projection>
     <Grouping><Field>/symbol</Field></Grouping>
   </View>
   <Topic>
      <Name>symbol_processor_assignments</Name>
      <MessageType>json</MessageType>
      <Key>/symbol</Key>
      <Durability>transient</Durability>
  </Topic>
</SOW>

This action attempts to affinitize new messages so as to keep the overall number of messages roughly balanced across the current processors. If a processor unsubscribes from the ControlTopic, messages currently affinitized to that processor will be distributed to other processors. However, messages are not rebalanced among running processors. Once a given SOW Key is affinitized to a processor, it remains affinitized to that processor until the processor unsubscribes, or the record is deleted from the topic.

Last updated

Copyright 2013-2024 60East Technologies, Inc.