Asynchronous Message Processing

Asynchronous Message Processing Interface

The AMPS Python client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the method call. The client object returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there may be a performance benefit to enqueuing work directly from the background thread. See Understanding Threading for a discussion of threading considerations, including considerations for message handlers.

As with the simple, synchronous interface, the AMPS client provides both convenience methods and methods that use a Command object. The following example shows how to use the asynchronous message processing interface (error handling and connection details are omitted for brevity):

from AMPS import Client
from AMPS import Command

...

# Here, we create a Client object and connect to an AMPS server.
client = Client("exampleSubscriber")
client.connect("tcp://127.0.0.1:9007/amps/json")
client.logon()

def on_message_printer(message):
    print(message.get_data())

# Here, we create a subscription with the following parameters:
subscriptionid = client.execute_async(
    Command("subscribe").set_topic("messages"),
    on_message_printer
)
# on_message_printer is a function that acts as our message handler. When a
# message is received, this function is invoked, and in this case, the get_data()
# method from message is printed to the screen. Message is of type AMPS.Message.

Using an Instance Method as a Message Handler

One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below:

# Define a class that saves state and
# handles messages.
class StatefulHandler:
    # Initialize self with state to save
    def __init__(self, name):
        self._name = name
        self._count = 0

    # Use state from this instance while handling
    # the message.
    def __call__(self, message):
        self._count += 1
        print ("%s (count: %d) got %s" % (self._name, self._count, message.get_data()))

You can then provide an instance of the handler directly wherever a message handler is required, as shown below:

client.subscribe(StatefulHandler("An instance"), "topic")

When using asynchronous message processing, the AMPS client resets and reuses the message provided to MessageHandler functions between calls. This improves performance in the client, but means if your MessageHandler function needs to preserve information contained within the message you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.

Last updated