Asynchronous Message Processing

The AMPS C++ client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the function call. The client returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there is usually a performance benefit to enqueuing work directly from the background thread. See Understanding Threading for a discussion of threading considerations, including considerations for message handlers.

The advantage of using asynchronous message processing is that it is extremely efficient -- your processing code runs directly on the thread that the AMPS client is using to read from the socket and has direct access to the buffer that the AMPS client uses. Further, when an HAClient is used (discussed later in this guide), the default disconnect handler for that client will automatically resume subscriptions that use asynchronous message processing. Last, but not least, since message processing runs directly on the receive thread, using asynchronous message processing will provide pushback on the socket in the event that messages are arriving faster than the application can process them (for example, in response to a sow query).

In return for these advantages, your processing code must be careful not to block the processing thread for an excessive amount of time, and must make a deep copy of any data that will be used outside of the processing code.

Asynchronous Processing Example

Here is a short example (error handling and connection details are omitted for brevity):

Client client(...);
client.connect(...);
/* Here we have created or received a Client that is properly connected to an AMPS server. */


client.logon();

/* Here we create a subscription with the following parameters:
 * command        : This is the AMPS Command object that contains the subscribe command.
 * MessageHandler : This is an AMPS MessageHandler object that refers to our message
 *                  handling function myHandlerFunction. This function is
 *                  called on a background thread each time a message arrives.
 *                  The second parameter, NULL, is passed as-is from the
 *                  client.subscribe() call to the message handler with every message,
 *                  allowing you to pass context about the subscription through to the
 *                  message handler.
 *
 * We create a command object for the subscribe command, specifying the topic
 * messages.
 */
string subscriptionId = client.executeAsync(Command("subscribe").setTopic("messages"),
                                            MessageHandler(myHandlerFunction, NULL));

...

/* The myHandlerFunction is a global function that is invoked by AMPS whenever a
 * matching message is received. The first parameter, message, is a reference to an
 * AMPS Message object that contains the data and headers of the received message.
 * The second parameter, userData, is set to whatever value was provided in the
 * MessageHandler constructor -- NULL in this example.
 */
void myHandlerFunction(const Message& message, void* userData)
{
    std::cout << message.getData() << std::endl;
}

The AMPS client resets and reuses the message provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.

With newer compilers, you can use additional constructs to specify a callback function. Recent improvements in C++ have added lambda functions -- unnamed functions declared in-line that can refer to names in the lexical scope of their creator. If available on your system, both Standard C++ Library function objects and lambda functions may be used as callbacks.

Check functional.cpp in the samples directory for numerous examples.

Using an Instance Method as a Message Handler

One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below.

class StatefulHandler
{
private:
    std::string _handlerName;
public:
    /* Construct the handler and save state. */
    StatefulHandler(const std::string& name) : _handlerName(name) {}

    /* Message handler method. */
    void operator()(const AMPS::Message & message)
    {
      std::cout << _handlerName << " got " << message.getData() << std::endl;
    }
};

You can then provide an instance of the handler directly wherever a message handler is required, as shown below:

client.subscribe(StatefulHandler("An instance"), "topic");

Last updated