LogoLogo
AMPS C++ Client 5.3.4
AMPS C++ Client 5.3.4
  • Welcome to the AMPS C / C++ Client
    • Before You Start
    • Introduction
    • Obtaining and Installing the AMPS C / C++ Client
    • Your First AMPS Program
      • Client Identification
      • Connection Strings for AMPS
      • Connection Parameters for AMPS
      • Providing Credentials to AMPS
    • Subscriptions
      • Content Filtering
        • Changing the Filter on a Subscription
      • Understanding Message Objects
      • Synchronous Message Processing
      • Asynchronous Message Processing
        • Understanding Threading
      • Regular Expression Subscriptions
      • Ending Subscriptions
    • Error Handling
      • Exceptions
      • Exception Types
      • Exception Handling and Asynchronous Message Processing
      • Controlling Blocking with Command Timeout
      • Disconnect Handling
        • Using a Heartbeat to Detect Disconnection
        • Managing Disconnection
        • Replacing Disconnect Handling
      • Unexpected Messages
      • Unhandled Exceptions
      • Detecting Write Failures
      • Monitoring Connection State
    • State of the World
      • Performing SOW Queries
        • Samples of Querying a Topic in the SOW
      • SOW and Subscribe
        • Samples of SOW and Subscribe
      • Setting Batch Size
      • Managing SOW Contents
      • Client Side Conflation
    • Using Queues
      • Backlog and Smart Pipelining
      • Returning a Message to the Queue
      • Acknowledgement Batching
      • Manual Acknowledgement
      • Samples of Working With a Queue
    • Delta Publish and Subscribe
      • Delta Subscribe
      • Delta Publish
    • High Availability
    • AMPS Programming: Working with Commands
    • Utility Classes
    • Advanced Topics
    • Exceptions Reference
    • AMPS Server Documentation
    • API Documentation
Powered by GitBook

Get Help

  • FAQ
  • Legacy Documentation
  • Support / Contact Us

Get AMPS

  • Evaluate
  • Develop

60East Resources

  • Website
  • Privacy Policy

Copyright 2013-2024 60East Technologies, Inc.

On this page
  • Asynchronous Processing Example
  • Using an Instance Method as a Message Handler
Export as PDF
  1. Welcome to the AMPS C / C++ Client
  2. Subscriptions

Asynchronous Message Processing

PreviousSynchronous Message ProcessingNextUnderstanding Threading

Last updated 4 months ago

The AMPS C++ client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the function call. The client returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there is usually a performance benefit to enqueuing work directly from the background thread. See for a discussion of threading considerations, including considerations for message handlers.

The advantage of using asynchronous message processing is that it is extremely efficient -- your processing code runs directly on the thread that the AMPS client is using to read from the socket and has direct access to the buffer that the AMPS client uses. Further, when an HAClient is used (discussed later in this guide), the default disconnect handler for that client will automatically resume subscriptions that use asynchronous message processing. Last, but not least, since message processing runs directly on the receive thread, using asynchronous message processing will provide pushback on the socket in the event that messages are arriving faster than the application can process them (for example, in response to a sow query).

In return for these advantages, your processing code must be careful not to block the processing thread for an excessive amount of time, and must make a deep copy of any data that will be used outside of the processing code.

Asynchronous Processing Example

Here is a short example (error handling and connection details are omitted for brevity):

Client client(...);
client.connect(...);
/* Here we have created or received a Client that is properly connected to an AMPS server. */


client.logon();

/* Here we create a subscription with the following parameters:
 * command        : This is the AMPS Command object that contains the subscribe command.
 * MessageHandler : This is an AMPS MessageHandler object that refers to our message
 *                  handling function myHandlerFunction. This function is
 *                  called on a background thread each time a message arrives.
 *                  The second parameter, NULL, is passed as-is from the
 *                  client.subscribe() call to the message handler with every message,
 *                  allowing you to pass context about the subscription through to the
 *                  message handler.
 *
 * We create a command object for the subscribe command, specifying the topic
 * messages.
 */
string subscriptionId = client.executeAsync(Command("subscribe").setTopic("messages"),
                                            MessageHandler(myHandlerFunction, NULL));

...

/* The myHandlerFunction is a global function that is invoked by AMPS whenever a
 * matching message is received. The first parameter, message, is a reference to an
 * AMPS Message object that contains the data and headers of the received message.
 * The second parameter, userData, is set to whatever value was provided in the
 * MessageHandler constructor -- NULL in this example.
 */
void myHandlerFunction(const Message& message, void* userData)
{
    std::cout << message.getData() << std::endl;
}

The AMPS client resets and reuses the message provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.

With newer compilers, you can use additional constructs to specify a callback function. Recent improvements in C++ have added lambda functions -- unnamed functions declared in-line that can refer to names in the lexical scope of their creator. If available on your system, both Standard C++ Library function objects and lambda functions may be used as callbacks.

Check functional.cpp in the samples directory for numerous examples.

Using an Instance Method as a Message Handler

One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below.

class StatefulHandler
{
private:
    std::string _handlerName;
public:
    /* Construct the handler and save state. */
    StatefulHandler(const std::string& name) : _handlerName(name) {}

    /* Message handler method. */
    void operator()(const AMPS::Message & message)
    {
      std::cout << _handlerName << " got " << message.getData() << std::endl;
    }
};

You can then provide an instance of the handler directly wherever a message handler is required, as shown below:

client.subscribe(StatefulHandler("An instance"), "topic");
Understanding Threading