Subscriptions

Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent to only those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS Java client.

Subscribing

The AMPS client makes it simple to subscribe to a topic. You call Client.subscribe() with the topic to subscribe to and the parameters for the subscription. The client submits a subscription to AMPS and returns a MessageStream that you can iterate over to receive the messages from the subscription. Below is a short example (error handling and connection details are omitted for brevity):

class MyApp {
    public static void main(String[] args) {
        // We create a Client, then connect() and logon().
        Client client = new Client(...);

        try {
            client.connect(...);
            client.logon();

            /* Here, we subscribe to the topic "messages".
             * We do not provide a * filter, so the
             * subscription receives all of the messages
             * published to the topic, regardless of content.
             *
             * We protect the MessageStream in a try with
             * resources block, so that the stream is closed
             * (and the subscription ends) when control
             * exits the block.
             */
            try (MessageStream ms = client.subscribe("messages"))
            {
                /* Here, we iterate over the messages returned by the
                 * MessageStream. When we no longer need to subscribe, we can
                 * break out of the loop. When the MessageStream is cleaned up,
                 * the client sends an unsubscribe command to AMPS and stops
                 * receiving messages.
                 */
              for (Message m : ms) {

                /* Within the loop, we process the message. In this case, we
                 * simply print the contents of the message
                 */
                System.out.println(m.getData());
              }
            }
        }
        catch(AMPSException e){System.err.println(e);}
        finally {
            client.close();
        }
    }
}

AMPS creates a background thread that receives messages and copies them into the MessageStream that you iterate over. This means that the client application as a whole can continue to receive messages while you are doing processing work.

The simple method described above is provided for convenience. The AMPS Java client provides convenience methods for the most common forms of the commands. AMPS also provides an interface that allows you precise control over the command. Using that interface, the example above becomes:

class MyApp {
    public static void main(String[] args) {
        // We create a Client that is properly connected to an AMPS server.
        Client client = new Client("subscribe");

        try {
            client.connect("tcp://127.0.0.1:9007/amps");
            client.logon();


            // We create a Command object to subscribe to the messages topic.
            Command command = new Command("subscribe").setTopic("messages");

            /* Here we execute the command and subscribe to the topic messages.
             * This works exactly the same way as the command in Example 1. If,
             * at any time, we no longer need to subscribe, we can break out of
             * the loop. We use a try with resources to automatically clean up
             * the MessageStream when we leave the try block. When the
             * MessageStream is cleaned up, the client sends an unsubscribe
             * command to AMPS and stops receiving messages.
             */
            try (MessageStream ms = client.execute(command))
            for (Message m : ms) {
                /* Within the loop, we process the message. In this case, we
                 * simply print the contents of the message
                 */
                System.out.println(m.getData());
            }
        }
        catch(AMPSException e){;}
        finally {
            client.close();
        }
    }
}

The Command interface allows you to precisely customize the commands you send to AMPS. For flexibility and ease of maintenance, 60East recommends using the Command interface (rather than a named method) for any command that will receive messages from AMPS. For publishing messages, there can be a slight performance advantage to using the named commands where possible.

Last updated