Asynchronous Message Processing

The AMPS Java client also supports an interface designed for asynchronous message processing. In this case, you add a message handler to the call to subscribe. The client object returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there may be a performance benefit to enqueuing work directly from the background thread. See Understanding Threading for a discussion of threading considerations, including considerations for message handlers.

As with the simple interface, the AMPS client provides both convenience interfaces and interfaces that use a Command object. The following example shows how to use the asynchronous interface.

class MyApp {
    public static void main(String[] args) {

        /* We create a Client here, then call connect() and logon() to connect to
         * AMPS.
         */
        Client client = new Client("subscribe");

        try {
            client.connect("tcp://127.0.0.1:9007/amps/json");
            client.logon();

            /* Here, we create a new MessagePrinter object. The MessagePrinter
             * class implements MessageHandler, as described below. The subscription
             * uses this object to handle all messages returned from the subscription.
             */
            MessagePrinter mp = new MessagePrinter();
            Command command = new Command("subscribe").setTopic("messages");

            /* Here, we call the overload of Client.execute() that specifies the
             * command and the message handler to invoke with messages received
             * in response to the command.
             */
            CommandId subscriptionId = client.executeAsync(command, mp);
        }
        catch(AMPSException e){;}
        finally {
            client.close();
        }
    }
}

A sample message handler implementation is shown below:

/* An implementation of MessageHandler provides an invoke() method that receives
 * a com.crankuptheamps.client.Message object. Notice that the same instance of
 * this class is called for all messages on a given subscription, and that the
 * instance is called asynchronously from the background thread created by the
 * client. Design your message handlers so they have access to any program state
 * that they need to do their work.
 */
public class MessagePrinter implements MessageHandler {
    public void invoke(Message m) {
        System.out.println(m.getData());
    }
}

When using asynchronous message processing, the AMPS client resets and reuses the message provided to MessageHandler functions between calls. This improves performance in the client, but means if your MessageHandler function needs to preserve information contained within the message you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.

Last updated