Only this pageAll pages
Powered by GitBook
1 of 47

AMPS Python Client 5.3.4

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Providing Credentials to AMPS

When a client logs on to AMPS, the client sends AMPS a username and password. The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype to include the user name JohnDoe in the request.

For a given user name, the password is provided by an Authenticator. The AMPS client distribution includes a DefaultAuthenticator that simply returns the password, if any, provided in the URI. A logon() command that does not specify an Authenticator will use an instance of DefaultAuthenticator.

If your authentication system requires a different authentication token, you can implement an Authenticator that provides the appropriate token.

Providing Credentials in a Connection String

When using the DefaultAuthenticator, the AMPS clients support the standard format for including a username and password in a URI, as shown below:

tcp://user:password@host:port/protocol/message_type

When provided in this form, the default authenticator provides the username and password specified in the URI. If you have implemented another authenticator, that authenticator controls how passwords are provided to the AMPS server.

Client Identification

AMPS uses the name of the client as a session identifier and as part of the identifier for messages originating from that client.

For this reason, when a transaction log is enabled in the AMPS instance (that is, when the instance is recording a sequence of publishes and attempting to eliminate duplicate publishes), an AMPS instance will only allow one application with a given client name to connect to the instance.

When a transaction log is present, AMPS requires the client name for a publisher to be:

  • Unique within a set of replicated AMPS instances

  • Consistent from invocation to invocation if the publisher will be publishing the same logical stream of messages

If publishers do not meet this contract (for example, if the publisher changes its name and publishes the same messages, or if a different publisher uses the same session name), message loss or duplication can happen.

60East recommends always using consistent, unique client names. For example, the client name could be formed by combining the application name, an identifier for the host system, and the ID of the user running the application. A strategy like this provides a name that will be different for different users or on different systems, but consistent for instances of the application that should be treated as equivalent to the AMPS system.

Likewise, if a publisher is sending a completely independent stream of messages (for example, a microservice that sends a different, unrelated sequence of messages each time it connects to AMPS), there is no need for a publisher to retain the same name each time it starts. However, if a publisher is resuming a stream of messages (as in the case when using a file-backed publish store), that publisher must use the same client name, since the publisher is resuming the session.

Before You Start

Welcome to developing applications with AMPS, the Advanced Message Processing System from 60East Technologies!

These guides will help you learn how to develop applications using AMPS.

Before getting started with this guide, it is important to have a good understanding of the following topics:

  • Developing Applications in Python

    To be successful using this guide, and developing applications with AMPS, you will need to have a working knowledge of the language you are developing in.

  • AMPS Concepts

    This guide focuses on using the AMPS client libraries and how those libraries work with the AMPS server.

You will also need a system on which you can develop and run Python applications and a system where you can host the AMPS server.

Runtime

The features supported on your processor and operating system depend on the features supported by the underlying C++ client, as shown in the following table:

Feature
Linux x64 / aarch64
Windows x64
OSX x64 / aarch64

Incredible performance

X

X

X

Publish and subscribe

X

X

X

State of the World (SOW) queries

X

X

X

Topic and content filtering

X

X

X

Atomic SOW query and subscribe

X

X

X

Transaction log replay

X

X

X

Historical SOW query

X

X

X

Beautiful documentation

X

X

X

HA: automatic failover

X

X

X

HA: durable publish and subscribe

X

X

X

Setting Up a Development Instance

You will need an installed and running AMPS server to use the product as well. You can read the sample programs without a running server, but you will get the most out of this guide by running the programs against a working server.

Welcome to the AMPS Python Client

This guide provides information you need to get started with the AMPS Python client. It focuses specifically on the client and does not cover AMPS itself in detail.

This guide assumes that you have a development environment for Python and access to an AMPS server using the configuration provided with the Python samples (in the full source distribution of the client).

Obtaining and Installing the AMPS Python Client

Installing From PyPI

The AMPS Python client is published to the Python Package Index as amps-python-client. You can use pip to install the client directly from the repository using a command such as the following:

$ pip install amps-python-client

Obtaining the Client Source

The client source files are in the directory where you unpacked the files. By default, this is amps-python-client-<version>, where <version> is the current version of the python client (such as amps-python-client-4.3.1.0).

Installing the Prebuilt .whl on Linux

60East provides a prebuilt .whl file for x64 Linux distributions using Python 2.7 or 3.4 and later.

To install the .whl using the command line:

  1. You will need permission to update the Python distribution on the system you are installing on.

  2. Open a command prompt.

  3. Run the following command, substituting the appropriate path to the release wheel that you want to install:

$  pip install http://devnull.crankuptheamps.com/releases/amps/clients/amps_python_client-<version_number>-<platform and python version specifier>.whl

If this command reports a permission error, you do not have permission to update the Python distribution. Run the command as a different user, or use sudo to run the command as root.

Installing the Prebuilt .whl on Windows

60East provides a prebuilt .whl file for 64-bit Windows operating systems using Python 2.7 or 3.4 and later. Your Python distribution may include a more fully-featured package manager to assist with installing .whl files.

To install the .whl using the command line:

  1. You will need permission to update the Python distribution on the system you are installing on.

  2. Make sure that your Python distribution includes setuptools.

  3. Open a command prompt.

  4. Make sure that the directory that contains python is in your path.

  5. Run the following command, substituting the appropriate path to the release wheel that you want to install:

$ pip install \
http://devnull.crankuptheamps.com/releases/amps/clients/amps_python_client-<version_number>-<platform and python version specifier>.whl

If this command reports a permission error, you do not have permission to update the Python distribution. Run the command prompt from a different user, or start the command prompt with Run as Administrator.

Building the Client

The main AMPS Python client distribution includes the full source to the client. For most installations, you build the client with your Python distribution before using it. The process for building the client differs slightly depending on whether you are building the client for Linux or for Windows.

Building for Linux

Follow these steps to build the Linux version of the client:

  1. The Python client includes all necessary C++ client sources. If you are using a different version of the C++ client than the one included with the Python client, set the AMPS_CPP_DIR environment variable to the location of the AMPS C++ client source.

  2. Run python setup.py build from the AMPS Python client directory to build the client.

    This script uses the Python distutils to build the library, which makes it easy to build the library correctly and install the library into your Python distribution. To see the options available in your environment, run python setup.py --help.

    The module must be built with the same C++ compiler used to build python on your system. The setup.py script and distutils package will generally ensure this unless you have added a different compiler to your PATH. If you typically use a different C++ compiler, remove the path to that compiler before running setup.py.

  3. The script builds the module to a path in the build directory. The exact path depends on the version of Python you are building with.

  4. Add the build directory path to the PYTHONPATH environment variable. For example:

    $ export PYTHONPATH=/home/AMPSdev/amps-python-client/build/lib.linux-x86_64-2.7:$PYTHONPATH

  5. Test that the module loads correctly:

    $ python -c "import AMPS"

  6. Optionally, install the module to your local python installation. While this is not required, doing this makes the AMPS module available for all of the programs that use this installation of python:

    sudo python setup.py install

Building for Windows

Follow these steps to build the Windows version of the client:

  1. Use the Visual Studio Command Prompt shortcut to start a command prompt window for the type of module you want to build. For example, to build a 32-bit module, you use the command prompt for x86 builds.

  2. Add the Python directory (the location of the python.exe interpreter) to your PATH.

The platform of the python installation must match the target platform for the python module. If you want to build a 64-bit module, your PATH must include a 64-bit python installation. If you want to build a 32-bit module, your PATH must include a 32-bit python installation. The build process uses whatever python.exe is found first when searching the PATH. So, to build both 32-bit and 64-bit versions, you must build them separately, and change your PATH between builds.

  1. Run python setup.py build from the AMPS Python client directory to build the client.

    This script uses the Python distutils to build the library, which makes it easy to build the library correctly and install the library into your Python distribution. To see the options available in your environment, run python setup.py --help.

  2. The script builds the module to a path in the build directory. The exact path depends on the version of Python you are building with.

  3. Add the build directory path to the PYTHONPATH environment variable. For example:

    > set PYTHONPATH="C:\Users\AMPSdev\amps-python-client\build\lib.linux-x86_64-2.7;%PYTHONPATH%"

  4. Test that the module loads correctly:

    > python -c "import AMPS"

  5. Optionally, install the module to your local python installation.While this is not required, doing this makes the AMPS module available for all of the programs that use this installation of python:

    > python setup.py install

Connection Strings for AMPS

The AMPS clients use connection strings to determine the server, port, transport, and protocol to use to connect to AMPS. When the connection point in AMPS accepts multiple message types, the connection string also specifies the precise message type to use for this connection.

Connection strings have a number of elements:

As shown in the figure above, connection strings have the following elements:

  • Transport - Defines the network used to send and receive messages from AMPS. In this case, the transport is tcp. For connections to transports that use the Secure Sockets Layer (SSL), use tcps. For connections to AMPS over a Unix domain socket, use unix.

  • Host Address - Defines the destination on the network where the AMPS instance receives messages. The format of the address is dependent on the transport. For tcp and tcps, the address consists of a host name and port number. In this case, the host address is localhost:9007. For unix domain sockets, a value for hostname and port must be provided to form a valid URI, but the content of the hostname and port are ignored, and the file name provided in the path parameter is used instead (by convention, many connection strings use localhost:0 to indicate that this is a local connection that does not use TCP/IP).

  • Protocol - Sets the format in which AMPS receives commands from the client. Most code uses the default amps protocol, which sends header information in JSON format. AMPS supports the ability to develop custom protocols as extension modules, and AMPS also supports legacy protocols for backward compatibility.

  • MessageType - Specifies the message type that this connection uses. This component of the connection string is required if the protocol accepts multiple message types and the transport is configured to accept multiple message types. If the protocol does not accept multiple message types, this component of the connection string is optional, and defaults to the message type specified in the transport.

    Legacy protocols such as fix, nvfix and xml only accept a single message type, and therefore do not require or accept a message type in the connection string.

As an example, a connection string such as:

tcp://localhost:9007/amps/json

would work for programs connecting from the local host to a Transport configured as follows:

<AMPSConfig>
    ...

    <!-- This transport accepts any known message type for the instance: the
         client must specify the message type. -->
    <Transport>
        <Name>any-tcp</Name>
        <Type>tcp</Type>
        <InetAddr>9007</InetAddr>
        <Protocol>amps</Protocol>
    </Transport>

    ...
</AMPSConfig>

Your First AMPS Program

In this section, we will learn more about the structure and features of the AMPS Python client and build our first Python program using AMPS.

About the Client Library

The AMPS client library is packaged as a single binary file. The exact name of the file depends on the Python version and build environment. You can find the file under the build directory of your AMPS Python client install once you've completed the build process. If you have used a prepackaged .egg or .whl to install the Python client, the appropriate binary file will be installed in your Python environment.

Every Python application you build will need to be able to reference the library. If you choose to install the python client into your local Python installation, then Python has access to the client library in the installation, and you do not need to include the library with each specific script. Otherwise, you will need to package and include the library with your script and ensure that the library is in the path where python looks for shared libraries, or ensure that the system where your application will run has installed the appropriate prepackaged build.

Connecting to AMPS

Let's begin by writing a simple program that connects to an AMPS server and sends a single message to a topic. This code will be covered in detail just following the example.

In the example above, we show the entire program. Future examples will isolate one or more specific portions of the code.

Examining the Code

Let us revisit the code we listed above:

About Authentication

When a client logs on to AMPS, the client sends AMPS a username and password.

The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype to include the user name JohnDoe in the request.

For a given user name, the password is provided by an Authenticator. The AMPS client distribution includes a DefaultAuthenticator that simply returns the password, if any, provided in the URI. A logon() command that does not specify an Authenticator will use an instance of DefaultAuthenticator.

If your authentication system requires a different authentication token, you can implement an Authenticator that provides the appropriate token.

Before working through this guide, we recommend reading the guide.

Detailed explanations of the AMPS server behavior are in the .

Instructions for starting an instance of AMPS are available in the guide.

The AMPS server runs on x64 Linux. The and contain information on how to run an AMPS server on a development system that does not run Linux.

For an overview of AMPS and instructions on setting up your development environment, see the guide.

The AMPS Python client is available as a download from the website. Download the client from the site, then extract it.

See the for more information on configuring transports.

Introduction to AMPS
AMPS Server Documentation
Introduction to AMPS
Introduction to AMPS
AMPS FAQ
Introduction to AMPS
60East Technologies
AMPS Configuration Guide
import AMPS
import sys

uri_ = "tcp://127.0.0.1:9007/amps/json"

client = AMPS.Client("examplePublisher")

try:
    client.connect(uri_)
    client.logon()
    client.publish("messages", '{ "hi" : "Hello, world!"}')

except AMPS.AMPSException as e:
    sys.stderr.write(str(e))

client.publish_flush()
client.close()
# These import the AMPS and sys packages. All programs written using the AMPS
# Python client will need to include the import AMPS statement at a minimum.
import AMPS
import sys

# The URI to use to connect to AMPS. The URI consists of the transport, the
# address, and the protocol to use for the AMPS connection. In this case, the
# transport is tcp, the address is 127.0.0.1:9007, and the protocol is amps. This
# connection will be used for JSON messages. Check with the person who manages the
# AMPS instance to get the connection string to use for your programs.
uri_ = "tcp://127.0.0.1:9007/amps/json"


# This line creates a new Client object. Client encapsulates a single connection
# to an AMPS server. Methods on Client allow for connecting, disconnecting,
# publishing, and subscribing to an AMPS server. The argument to the Client
# constructor, "examplePublisher", is a name chosen by the client to
# identify itself to the server. Errors relating to this connection will be logged
# with reference to this name, and AMPS uses this name to help detect duplicate
# messages. AMPS enforces uniqueness for client names when a transaction log is
# configured, and it is good practice to always use unique client names.
client = AMPS.Client("examplePublisher")

# Here, we open a try block that concludes with except AMPS.AMPSException as e.
# All exceptions in AMPS derive from AMPSException. If an operation throws another
# exception, AMPS will wrap that exception into the AMPSException you receive. For
# example, if the call to connect() fails because the provided address is not
# reachable, the AMPSException will contain an inner exception from the operating
# system, likely a SocketException.
try:

    # With this statement, we provide the URI to the client and declare the AMPS
    # connection.
    client.connect(uri_)

    # The AMPS logon() command connects to AMPS and creates a named connection.
    #
    # This version of logon() uses the DefaultAuthenticator. If we
    # had provided logon credentials in the URI, that Authenticator would pass those
    # credentials to AMPS. Without credentials, the client logs on to AMPS
    # anonymously. AMPS versions 5.0 and later require a logon() command in the
    # default configuration.
    #
    # If you need to provide credentials in a different way, implement an Authenticator
    # and pass that Authenticator here in the 'authenticator' parameter.
    client.logon()

    # Here, we publish a single message to AMPS on the messages topic, containing the
    # data Hello, world!. This data is placed into a JSON message and sent to the
    # server. Upon successful completion of this function, the AMPS client has
    # enqueued the message to be sent to the server, and subscribers to the messages
    # topic will receive this JSON message: { "hi" : "Hello, world!" }.
    client.publish("messages", '{ "hi" : "Hello, world!"}')

except AMPS.AMPSException as e:
    sys.stderr.write(str(e))


# Here, we call publish_flush() on the client. This command waits until all messages
# from the client have been sent. If messages may still be in the process of
# transmission when your application is ready to close the client, publish_flush()
# helps ensure that the messages have been sent. In this case, we allow
# publish_flush() to block indefinitely. A production application might specify a
# timeout, to avoid hanging in the event that the application loses connectivity
# before the messages have been sent.
client.publish_flush()

# We close the connection to AMPS. While that doesn't matter here, since the
# application exits immediately after calling close(), it's good practice to close
# connections when you are done using them.
client.close()

Content Filtering

One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server so that your application and the network are not utilized by messages that are not relevant to your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client.

To apply a content filter to a subscription, simply pass it into the client.subscribe() call or use the set_filter method to add a filter to the command:

for message in client.subscribe(
    "letters",
    "/sender='mom'",
    timeout=5000):

   print("Mom says: %s" % message.get_data())

In this example, we have passed in a content filter /sender = 'mom'. This will result in the server only sending us messages from the messages topic that have the sender field equal to mom in the message.

For example, the AMPS server will send the following message, where /sender is mom:

{
    "sender" : "mom",
    "text" : "Happy Birthday!",
    "reminder" : "Call me Thursday!"
}

The AMPS server will not send a message with a different /sender value:

{
    "sender" : "henry dave",
    "text" : "Things do not change; we change."
}

Changing the Filter on a Subscription

AMPS allows you to update parameters, such as the content filter, on a subscription. When you replace a filter on the subscription, AMPS immediately begins sending only messages that match the updated filter. Notice that if the subscription was entered with a command that includes a SOW query, using the replace option can re-issue the SOW query (as described in the AMPS User Guide).

To update the filter on a subscription, you create a subscribe command. You set the SubscriptionId provided on the Command to the identifier of the existing subscription and include the replace option on the Command.

When you send the Command, AMPS atomically replaces the filter and sends messages that match the updated filter from that point forward.


sub_cmd = AMPS.Command("sow_and_subscribe") \
           .set_topic("orders-sow")      \
           .set_sub_id("A42")            \
           .set_filter("/details/items/description LIKE 'puppy'")


client.execute_async(sub_cmd, MyHandler)


# Elsewhere in the program...

replace_cmd = AMPS.Command("sow_and_subscribe") \
               .set_topic("orders-sow")      \
               .set_sub_id("A42")            \
               .set_filter("/details/items/description LIKE 'kitten'") \
               .set_options("replace")

client.execute_async(replace_cmd, MyHandler)

Subscriptions

Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent to only those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS Python client.

Subscribing

The AMPS client makes it simple to subscribe to a topic. You call client.subscribe() with the topic to subscribe to and the parameters for the subscription. The client submits the subscription to AMPS and returns a MessageStream that you can iterate over to receive the messages from the subscription. Below is a short example:

from AMPS import Client

# Here, we create a Client object and connect to an AMPS server.
client = Client("test")
client.connect("tcp://127.0.0.1:9007/amps/json")
client.logon()

# Here we subscribe to the topic messages. We do not provide a filter, so AMPS
# does not content-filter the topic. Although we don't use the object explicitly
# here, the subscribe method returns a MessageStream object that we iterate over.
# If, at any time, we no longer need to subscribe, we can break out of the loop.
# When there are no more active references to the MessageStream, the client sends
# an unsubscribe command to AMPS and stops receiving messages.
for message in client.subscribe("messages"):

    # Within the loop, we process the message. In this case, we simply print the
    # contents of the message.
    print(message.get_data())

AMPS creates a background thread that receives messages and copies them into the MessageStream that the for loop iterates over. This means that the client application as a whole can continue to receive messages while you are doing processing work.

The simple method described above is provided for convenience. The AMPS Python client provides convenience methods for the most common form of AMPS commands. The client also provides an interface that allows you to have precise control over the command. Using that interface, the example above becomes:

from AMPS import Client
from AMPS import Command

# Here, we create a Client object and connect to an AMPS server.
client = Client("test")
client.connect("tcp://127.0.0.1:9007/amps/json")
client.logon()

# Here, we create a Command object for the subscribe command, specifying the topic
# messages. We do not provide a filter, so AMPS does not content-filter the
# subscription. Although we don't use the object explicitly here, the execute
# method returns a MessageStream object that we iterate over. If, at any time, we
# no longer need to subscribe, we can break out of the loop. When we break out of
# the loop, there are no more references to the MessageStream and the AMPS client
# sends an unsubscribe message to AMPS.
for message in client.execute(Command("subscribe").set_topic("messages")):
    # Within the body of the loop, we can process the message as we need to. In this
    # case, we simply print the contents of the message.
    print(message.get_data())

The Command interface allows you to precisely customize the commands you send to AMPS. For flexibility and ease of maintenance, 60East recommends using the Command interface (rather than a named method) for any command that will receive messages from AMPS. For publishing messages, there can be a slight performance advantage to using the named commands where possible.

Understanding Threading

The first time a command causes an instance of the Client or HAClient to connect to AMPS (typically, the logon() command), the client creates a thread that runs in the background. This background thread is responsible for processing incoming messages from AMPS, which includes both messages that contain data and acknowledgments from the server.

When you call a command on the AMPS client, the command typically waits for an acknowledgment from the server and then returns. (The exception to this is publish. For performance, the publish command does not wait for an acknowledgment from the server before returning.)

In the simple case, using synchronous message processing, the client provides an internal handler function that populates the MessageStream. The client receive thread calls the internal handler function, which makes a deep copy of the incoming message and adds it to the MessageStream. The MessageStream is used on the calling thread, so operations on the MessageStream do not block the client receive thread.

When using asynchronous message processing, AMPS calls the handler function from the client receive thread. Message handlers provided for asynchronous message processing must be aware of the following considerations:

  • The client creates one client receive thread at a time, and the lifetime of the thread lasts for the lifetime of the connection to the AMPS server. A message handler that is only provided to a single client will only be called from a single thread at a time. If your message handler will be used by multiple clients, then multiple threads will call your message handler. In this case, you should take care to protect any state that will be shared between threads. Notice that if the client connection fails (or is closed), and the client reconnects, the client will create a different thread for the new connection.

  • For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to update an external database, a message handler that adds the relevant data to an update queue, that is processed by a different thread, will typically perform better than a message handler that does this update during the message handling.

  • While your message handler is running, the thread that calls your message handler is no longer receiving messages. This makes it easier to write a message handler because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed and your application will deadlock waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate thread or use a different client object to submit the commands.

  • The AMPS client resets and reuses the Message provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information (either by making a copy of the entire message or copying the required fields) rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it. Likewise, a message handler should not modify the Message -- this will result in modifying the message provided to other handlers (including handlers internal to the AMPS client).

Asynchronous Message Processing

Asynchronous Message Processing Interface

As with the simple, synchronous interface, the AMPS client provides both convenience methods and methods that use a Command object. The following example shows how to use the asynchronous message processing interface (error handling and connection details are omitted for brevity):

from AMPS import Client
from AMPS import Command

...

# Here, we create a Client object and connect to an AMPS server.
client = Client("exampleSubscriber")
client.connect("tcp://127.0.0.1:9007/amps/json")
client.logon()

def on_message_printer(message):
    print(message.get_data())

# Here, we create a subscription with the following parameters:
subscriptionid = client.execute_async(
    Command("subscribe").set_topic("messages"),
    on_message_printer
)
# on_message_printer is a function that acts as our message handler. When a
# message is received, this function is invoked, and in this case, the get_data()
# method from message is printed to the screen. Message is of type AMPS.Message.

Using an Instance Method as a Message Handler

One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below:

# Define a class that saves state and
# handles messages.
class StatefulHandler:
    # Initialize self with state to save
    def __init__(self, name):
        self._name = name
        self._count = 0

    # Use state from this instance while handling
    # the message.
    def __call__(self, message):
        self._count += 1
        print ("%s (count: %d) got %s" % (self._name, self._count, message.get_data()))

You can then provide an instance of the handler directly wherever a message handler is required, as shown below:

client.subscribe(StatefulHandler("An instance"), "topic")

When using asynchronous message processing, the AMPS client resets and reuses the message provided to MessageHandler functions between calls. This improves performance in the client, but means if your MessageHandler function needs to preserve information contained within the message you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.

The AMPS Python client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the method call. The client object returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there may be a performance benefit to enqueuing work directly from the background thread. See for a discussion of threading considerations, including considerations for message handlers.

Understanding Threading

Exception Types

The following table details each of the exception types thrown by AMPS.

Exception
When
Notes

AlreadyConnectedException

Connecting

Thrown when connect() is called on a Client that is already connected.

AMPSException

Anytime

Base class for all AMPS exceptions.

AuthenticationException

Anytime

Indicates an authentication failure occurred on the server.

BadFilterException

Subscribing

This typically indicates a syntax error in a filter expression.

BadRegexTopicException

Subscribing

Indicates a malformed regular expression was found in the topic name.

CommandException

Anytime

Base class for all exceptions relating to commands sent to AMPS.

ConnectionException

Anytime

Base class for all exceptions relating to the state of the AMPS connection.

ConnectionRefusedException

Connecting

The connection was actively refused by the server. Validate that the server is running, that network connectivity is available, and the settings on the client match those on the server.

DisconnectedException

Anytime

No connection is available when AMPS needed to send data to the server or the user's disconnect handler threw an exception.

InvalidTopicException

SOW query

The topic is not configured for the requested operation. For example, a sow command was issued for a topic that is not in the SOW or a bookmark subscribe was issued for a topic that is not recorded in the transaction log.

InvalidTransportOptionsEx ception

Connecting

An invalid option or option value was specified in the URI.

InvalidUriException

Connecting

The URI string provided to connect() was formatted improperly.

MessageTypeException

Connecting

The class for a given transport's message type was not found in AMPS.

NameInUseException

Connecting

The client name (specified when instantiating Client) is already in use on the server.

RetryOperationException

Anytime

An error occurred that caused processing of the last command to be aborted. Try issuing the command again.

StreamException

Anytime

Indicates that data corruption has occurred on the connection between the client and server. This usually indicates an internal error inside of AMPS -- contact AMPS support.

SubscriptionAlreadyExists Exception

Subscribing

A subscription has been requested using the same command ID string as another subscription. Create a unique command ID string for every subscription.

TimedOutException

Anytime

A timeout occurred waiting for a response to a command.

TransportTypeException

Connecting

Thrown when a transport type is selected in the URI that is unknown to AMPS.

UnknownException

Anytime

Thrown when an internal error occurs. Contact AMPS support immediately.

UsageException

Changing the properties of an object

Thrown when the object is not in a valid state for setting the properties. For example, some properties of a Client (such as the name) cannot be changed while that client is connected to AMPS.

Exceptions

Generally speaking, when an error occurs that prohibits an operation from succeeding, AMPS will throw an exception. AMPS exceptions universally derive from AMPS.AMPSException, so by catching AMPSException, you will be sure to catch anything AMPS throws, for example:

def read_and_evaluate(client):
    # read a new payload from the user
    payload = input("Please enter a message")

    # write a new message to AMPS
    if payload:
        try:
            client.publish(
                "UserMessage",
                "{ \"message\" : \"%s\"  }" % payload
            )
        except AMPS.AMPSException as e:
            sys.stderr.write("An AMPS exception" + "occurred: %s" % str(e))

In this example, if an error occurs, the program writes the error to stderr and the publish() command fails. However, client is still usable for continued publishing and subscribing. When the error occurs, the exception is written to the console. As with most Python exceptions, str() will convert the exception into a string that includes a descriptive message.

AMPS exception types vary based on the nature of the error that occurs. In your program, if you would like to handle certain kinds of errors differently than others, you can handle the appropriate subclass of AMPSException to detect those specific errors and do something different.

def create_new_subscription(client):
    messageStream = None
    topicName = None

    while messageStream is None:

    # attempts to retrieve a topic name (or regular expression) from the user.
    topicName = input("Please enter a topic name")
    try:

        # If an error occurs when setting up the subscription, the program decides whether
        # or not to try again based on the subclass of AMPSException that is thrown. In
        # this case, if the exception is a BadRegexTopicError, the exception indicates
        # that the user provided a bad regular expression. We would like to give the user
        # a chance to correct, so we ask the user for a new topic name.
        messageStream = client.subscribe(
            topicName,
            None
        )

    # This line indicates that the program catches the BadRegexTopicError exception
    # and displays a specific error to the user indicating the topic name or
    # expression was invalid. By not returning from the function in this except block,
    # the while loop runs again and the user is asked for another topic name.
    except BadRegexTopicError as e:
        print(
            "Error: bad topic name or regular expression " +
            topicName +
            ".  The exception was " +
            str(e) +
            "."
        )
        # we'll ask the user for another topic

    # If an AMPS exception of a type other than BadRegexTopicError is thrown by AMPS,
    # it is caught here. In that case, the program emits a different error message to
    # the user.
    except AMPSException as e:
        print (
            "Error: error setting up subscription to topic" +
            topicName +
            ".  The exception was " +
            str(e) +
            "."
        )

        # At this point the code stops attempting to subscribe to the client by the return
        # None statement.
        return None
   return messageStream

Synchronous Message Processing

The MessageStream object makes copies of the incoming messages. When there is no message available, the MessageStream will block.

A MessageStream will only remain active while the client that produced it is connected. If the client disconnects, the MessageStream will continue to provide any messages that have not yet been consumed, then throw an exception.

Understanding Message Objects

So far, we have seen that subscribing to a topic involves working with objects of AMPS.Message type. A Message represents a single message to or from an AMPS server. Messages are received or sent for every client/server operation in AMPS.

Header Properties

There are two parts of each message in AMPS: a set of headers that provide metadata for the message, and the data that the message contains. Every AMPS message has one or more header fields defined. The precise headers present depend on the type and context of the message. There are many possible fields in any given message, but only a few are used for any given message. For each header field, the Message class contains a distinct property that allows for retrieval and setting of that field. For example, the Message.get_command_id() function corresponds to the commandId header field, the Message.get_batch_size() function corresponds to the BatchSize header field, and so on. For more information on these header fields, consult the AMPS User Guide and AMPS Command Reference.

To work with header fields, a Message contains get_xxx() / set_xxx() methods corresponding to the header fields, as well as a number of getXXX() / setXXX() methods for compatibility with previous implementations of the AMPS Python client. 60East does not recommend attempting to parse header fields from the raw data of the message.

get_data() Method

Access to the data section of a message is provided via the get_data() method. The data property will contain the unparsed data of the message. Your application code parses and works with the data.

The AMPS Python client contains a collection of helper classes for working with message types that are specific to AMPS (for example, FIX, NVFIX, and AMPS composite message types). For message types that are widely used, such as JSON or XML, you can use the standard Python facilities or the library you typically use in your environment.

Message Field Reference

As mentioned , one way for an application to receive messages is to have the AMPS Python client return a MessageStream object that can be used to iterate over the results of the command.

The advantages of using a MessageStream that it provides a simple processing model, that receiving messages from a MessageStream does not block the client receive thread (see ) and that a copy of the message is automatically made for the application.

In return for these advantages, a MessageStream has higher overhead than , it will not be resumed if the client disconnects, and, by default, it will use as much memory as necessary to hold messages coming from the AMPS server.

The contains a full description of which fields are available and which fields are returned in response to specific commands.

earlier
Understanding Threading
Asynchronous Message Processing
AMPS Command Reference

Controlling Blocking with Command Timeout

The named convenience methods and the Command class provide a timeout setting that specifies how long the command should wait to receive a processed acknowledgment from AMPS. This can be helpful in cases where it is important for the caller to limit the amount of time to block waiting for AMPS to acknowledge the command. If the AMPS client does not receive the processed acknowledgment within the specified time, the client sends an unsubscribe command to the server to cancel the command and throws an exception.

Acknowledgments from AMPS are processed by the client receive thread on the same socket as data from AMPS. This means that any other data previously returned (such as the results of a large query) must be consumed before the acknowledgment can be processed. An application that submits a set of SOW queries in rapid succession should set a timeout that takes into account the amount of time required to process the results of the previous query.

Disconnect Handling

Every distributed system will experience occasional disconnections between one or more nodes. The reliability of the overall system depends on an application's ability to efficiently detect and recover from these disconnections. Using the AMPS Python client's disconnect handling, you can build powerful applications that are resilient in the face of connection failures and spurious disconnects. For additional reliability, you can also use the high availability client (discussed in the following sections), which provides both disconnect handling and features to help ensure that messages are reliably delivered.

Exception Handling and Asynchronous Message Processing

When using asynchronous message processing, exceptions thrown from the message handler are silently absorbed by the AMPS Python client by default. The AMPS Python client allows you to register an exception listener to detect and respond to these exceptions. When an exception listener is registered, AMPS will call the exception listener with the exception.

See for details.

Unhandled Exceptions

Detecting Write Failures

The publish methods in the Python client deliver the message to be published to AMPS then return immediately, without waiting for AMPS to return an acknowledgment. Likewise, the sow_delete methods request deletion of SOW messages, and return before AMPS processes the message and performs the deletion. This approach provides high performance for operations that are unlikely to fail in production. However, this means that the methods return before AMPS has processed the command, without the ability to return an error in the event the command fails.

If your application needs to know whether publishes succeeded and are durably persisted, the following approach is recommended:

  • Set a PublishStore on the client. This will ensure that messages are retransmitted if the client becomes disconnected before the message is acknowledged and request persisted acknowledgments for messages.

  • Install a failed_write_handler. In the event that AMPS reports an error for a given message, that event will be reported to the failed_write_handler.

  • Call publish_flush() and verify that all messages are persisted before the application exits.

When no failed_write_handler is registered, acknowledgments that indicate errors in persisting data are treated as unexpected messages and routed to the last_chance_message_handler. In this case, AMPS provides only the acknowledgment message and does not provide the additional information from the client publish store.

The AMPS Python client provides a failed_write_handler that is called when the client receives an acknowledgment that indicates a failure to persist data within AMPS. As with the last_chance_message_handler described in the section, your application registers a handler for this function. When an acknowledgment returns that indicates a failed write, AMPS calls the registered handler method with information from the acknowledgment message, supplemented with information from the client publish store if one is available. Your client can log this information, present an error to the user or take whatever action is appropriate for the failure.

Unexpected Messages

Regular Expression Subscriptions

Regular Expression (Regex) subscriptions allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.

To use a regular expression, simply supply the regular expression in place of the topic name in the subscribe() call. For example:

def message_handler(msg):
    topic = msg.get_topic()

subscription_id = client.subscribe(
    message_handler,
    "orders.*"
)

In this example, messages on topics orders-north-america, orders-europe, and new-orders would match the regular expression. Messages published to any of those topics will be sent to our message_handler function. As in the example, you can use the get_topic() method to determine the actual topic of the message sent to the function.

Unhandled Exceptions

When using the asynchronous interface, exceptions can occur that are not thrown to the user. For example, when an exception occurs in the process of reading subscription data from the AMPS server, the exception occurs on a thread inside of the AMPS Python client. Consider the following example using the asynchronous interface:

class MyApp:
    def on_message_handler(self,message):
        print(message.get_data())

    def wait_to_be_poked(self,client):
        client.subscribe(
            self.on_message_handler,
            "pokes",
            "/Pokee LIKE '%s'" % getpass.getuser(),
            timeout=5000)
        input("Press enter to exit")

In this example, we set up a subscription to wait for messages on the pokes topic, whose Pokee tag begins with our user name. When messages arrive, we print a message out to the console, but otherwise our application waits for a key to be pressed.

Inside of the AMPS client, the client creates a new thread of execution that reads data from the server, and invokes message handlers and disconnect handlers when those events occur. When exceptions occur inside this thread, however, there is no caller for them to be thrown to and by default they are ignored.

In applications that use the asynchronous interface, and where it is important to deal with every issue that occurs in using AMPS, you can set an ExceptionHandler via Client.set_exception_listener() that receives these otherwise unhandled exceptions. Making the modifications shown in the example below, to our previous example, will allow those exceptions to be caught and handled. In this case we are simply printing those caught exceptions out to the console.

In some cases, the AMPS Python client may wrap exceptions of unknown type into an AMPSException. Your application should always include an except block for AMPSException.

If your application will attempt to recover from an exception thrown on the background processing thread, your application should set a flag and attempt recovery on a different thread than the thread that called the exception listener.

At the point that the AMPS client calls the exception listener, it has handled the exception. Your exception listener must not rethrow the exception (or wrap the exception and throw a different exception type).

class MyApp:
    def on_exception(self, e):
        print ("Exception occurred: %s" % str(e))

    def on_message_handler(self,message):
        print (message.get_data())

    def wait_to_be_poked(self, client):
        client.set_exception_listener(self.on_exception)

        # Use the advanced interface to be able to
        # accept input while processing messages.

        client.subscribe(
            self.on_message_handler,
            "pokes",
            "/Pokee LIKE '%s'" % getpass.getuser(),
            timeout=5000)
        input("Press enter to exit")

In this example we have added a call to client.set_exception_listener(), registering a simple function that writes the text of the exception out to the console. If exceptions are thrown in the message handler, those exceptions are written to the console.

AMPS records the stack trace and provides it to the exception handler, if the provided method includes a parameter for the stack trace. The sample below demonstrates one way to do this. (For sample purposes, the message handler always throws an exception.)

import AMPS
import time
import traceback

def handler(message):
    print (message)
    raise RuntimeError("in my handler")

def exception_listener(exception, tb):
    print ("EXCEPTION RECEIVED", exception)
    if tb is not None:
        traceback.print_tb(tb)

client = AMPS.Client("client")

client.set_exception_listener(exception_listener)

client.connect("tcp://localhost:9007/amps/json")

client.logon()
client.subscribe(handler,"topic")
client.publish("topic","data")
time.sleep(1)
client.close()

Setting Batch Size

The AMPS clients include a batch size parameter that specifies how many messages the AMPS server will return to the client in a single batch when returning the results of a SOW query. The 60East clients set a batch size of 10 by default. This batch size works well for common message sizes and network configurations.

Adjusting the batch size may produce better network utilization and produce better performance overall for the application. The larger the batch size, the more messages AMPS will send to the network layer at a time. This can result in fewer packets being sent, and therefore less overhead in the network layer. The effect on performance is generally most noticeable for small messages, where setting a larger batch size will allow several messages to fit into a single packet. For larger messages, a batch size may still improve performance, but the improvement is less noticeable.

In general, 60East recommends setting a batch size that is large enough to produce few partially-filled packets. Bear in mind that AMPS holds the messages in memory while batching them, and the client must also hold the messages in memory while receiving the messages. Using batch sizes that require large amounts of memory for these operations can reduce overall application performance, even if network utilization is good.

For smaller message sizes, 60East recommends using the default batch size, and experimenting with tuning the batch size if performance improvements are necessary. For relatively large messages (especially messages with sizes over 1MB), 60East recommends explicitly setting a batch size of 1 as an initial value, and increasing the batch size only if performance testing with a larger batch size shows improved network utilization or faster overall performance.

State of the World

AMPS State of the World (SOW) allows you to automatically keep and query the latest information about a topic on the AMPS server, without building a separate database. Using SOW lets you build impressively high-performance applications that provide rich experiences to users. The AMPS Python client lets you query SOW topics and subscribe to changes with ease.

Performing SOW Queries

To begin, we will look at a simple example of issuing a SOW query.

for message in client.sow("orders", "/symbol='ROL'"):
    if message.get_command() == AMPS.Message.Command.GroupBegin:
        print("--- Begin SOW Results ---")
    if message.get_command() == AMPS.Message.Command.SOW:
        print(message.get_data())
    if message.get_command() == AMPS.Message.Command.GroupEnd:
        print("--- End SOW Results ---")

In the example above, the Client.sow() convenience method is used to execute a SOW query on the orders topic, for all entries that have a symbol of 'ROL'.

As the query executes, messages containing the data of matching entries have a Command of value "sow" (provided as a constant value, AMPS.Message.Command.SOW), so as those arrive, we write them to the console.

As with subscribe, the sow command also provides an asynchronous mode, where you provide a message handler.

def on_message_handler(message):
    if message.get_command() == AMPS.Message.Command.GroupBegin:
        print("--- Begin SOW Results ---")
    if message.get_command() == AMPS.Message.Command.SOW:
        print(message.get_data())
    if message.get_command() == AMPS.Message.Command.GroupEnd:
        print("--- End SOW Results ---")

def execute_sow_query(client):
    return client.execute_async(             \
            AMPS.Command("sow")              \
                .set_topic("orders")         \
                .set_filter("/symbol='ROL'"),\
            on_message_handler)

execute_sow_query(client)

In the example above, the execute_sow_query() function invokes Client.execute_async() to initiate a SOW query on the orders topic, for all entries that have a symbol of 'ROL'.

As the query executes, the on_message_handler() function is invoked for each matching entry in the topic. Messages containing the data of matching entries have a Command of value sow, so as those arrive, we write them to the console.

Connection Parameters for AMPS

When specifying a URI for connection to an AMPS server, you may specify a number of transport-specific options in the parameters section of the URI connection parameters. Here is an example:

tcp://localhost:9007/amps/json?tcp_nodelay=true&tcp_sndbuf=100000

In this example, we have specified the AMPS instance on localhost, port 9007, connecting to a transport that uses the amps protocol and sending JSON messages. We have also set two parameters: tcp_nodelay, a Boolean (true/false) parameter, and tcp_sndbuf, an integer parameter. Multiple parameters may be combined to finely tune settings available on the transport. Normally, you'll want to stick with the defaults on your platform, but there may be some cases where experimentation and fine-tuning will yield higher or more efficient performance.

The AMPS client supports the value of tcp in the scheme component connection string for TCP/IP connections, and the value of tcps as the scheme for SSL encrypted connections.

For connections that use Unix domain sockets, the client supports the value of unix in the scheme, and requires an additional option, as described in the Unix Transports Parameters section below.

IPv6 Connections

Starting with version 5.3.3.0, the AMPS client supports creating connections over both IPv4 and IPv6 protocols if supported by the underlying Operating System.

By default, the AMPS client will prefer to resolve host names to IPv4 addresses, but this behavior can be adjusted by supplying the ip_protocol_prefer transport option, described in the table below.

TCP and SSL Transport Options

The following transport options are available for TCP connections:

Option
Description

bind

(IP address) Sets the interface to bind the outgoing socket to.

Starting with version 5.3.3.0, both IPv4 and IPv6 addresses are fully supported for use with this parameter.

tcp_rcvbuf

(integer) Sets the socket receive buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/rmem_default.)

tcp_sndbuf

(integer) Sets the socket send buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/wmem_default.)

tcp_nodelay

(boolean) Enables or disables the TCP_NODELAY setting on the socket. By default TCP_NODELAY is disabled.

tcp_linger

(integer) Enables and sets the SO_LINGER value for the socket By default, SO_LINGER is enabled with a value of 10, which specifies that the socket will linger for 10 seconds.

tcp_keepalive

(boolean) Enables or disables the SO_KEEPALIVE value for the socket. The default value for this option is true.

ip_protocol_prefer

(string) Influence the IP protocol to prefer during DNS resolution of the host. If a DNS entry of the preferred protocol can not be found, the other non-preferred protocol will then be tried.

If this parameter is not set, the default will be to prefer IPv4.

If an explicit IPv4 address or IPv6 IP address is provided as the host, the format of the IP address is used to determine the IP protocol used and this setting has no effect.

Supported Values:

ipv4: Prefer an IPv4 address when resolving the host

ipv6: Prefer an IPv6 address when resolving the host

This parameter is available starting with version 5.3.3.0.

Unix Transport Parameters

The unix transport type communicates over Unix domain sockets. This transport requires the following additional option:

Option
Description

path

The path to the Unix domain socket to connect to.

Unix domain sockets always connect to the local system. When the scheme specified is unix, the host address is ignored in the connection string. For example, the connection string:

unix://localhost:0/amps/json?path=/sockets/the-amps-socket

and the connection string:

unix://unix:unix/amps/json?path=/sockets/the-amps-socket

are equivalent.

The other components of the connection string, including the protocol, message type, username, and authentication token are processed just as they would be for TCP/IP sockets.

AMPS Additional Logon Options

The connection string can also be used to pass logon parameters to AMPS. AMPS supports the following additional logon option:

Option
Description

pretty

Provide formatted representations of binary messages rather than the original message contents.

Backlog and Smart Pipelining

AMPS queues are designed for high-volume applications that need minimal latency and overhead. One of the features that helps performance is the subscription backlog feature, which allows applications to receive multiple messages at a time. The subscription backlog sets the maximum number of unacknowledged messages that AMPS will provide to the subscription.

When the subscription backlog is larger than 1, AMPS delivers additional messages to a subscriber before the subscriber has acknowledged the first message received. This technique allows subscribers to process messages as fast as possible, without ever having to wait for messages to be delivered. The technique of providing a consistent flow of messages to the application is called smart pipelining.

Subscription Backlog

The AMPS server determines the backlog for each subscription. An application can set the maximum backlog that it is willing to accept with the max_backlog option. Depending on the configuration of the queue (or queues) specified in the subscription, AMPS may assign a smaller backlog to the subscription. If no max_backlog option is specified, AMPS uses a max_backlog of 1 for that subscription.

In general, applications that have a constant flow of messages perform better with a max_backlog setting higher than 1. The reason for this is that, with a backlog greater than 1, the application can always have a message waiting when the previous message is processed. Setting the optimum max_backlog is a matter of understanding the messaging pattern of your application and how quickly your application can process messages.

To request a max_backlog for a subscription, you explicitly set the option on the subscribe command, as shown below:

command = Command("subscribe") \
            .set_topic("my_queue") \
            .set_options("max_backlog=10")

Using Queues

To publish messages to a message queue, publishers simply publish to any topic that is collected by the queue. There is no difference between publishing to a queue and publishing to any other topic, and a publisher does not need to be aware that the topic will be collected into a queue.

Subscribers must be aware that they are subscribing to a queue, and acknowledge messages from the queue when the message is processed.

AMPS message queues provide a high-performance way of distributing messages across a set of workers. The AMPS User Guide describes AMPS in detail, including the features of AMPS referred to in this chapter. This chapter does not describe AMPS queues in detail, but instead explains how to use the AMPS Python client with message queues.

Queues

Delta Publish and Subscribe

Delta messaging in AMPS has two independent aspects:

  • Delta Subscribe - Allows subscribers to receive just the fields that are updated within a message.

  • Delta Publish - Allows publishers to update and add fields within a message by publishing only the updates into the SOW.

This chapter describes how to create delta publish and delta subscribe commands using the AMPS Python client. For a discussion of this capability, how it works, and how message types support this capability see the .

AMPS User Guide

Manual Acknowledgment

60East generally recommends that applications use an ack() method to acknowledge messages during normal processing. This approach functions correctly when used within a message handler, supports batching as explained elsewhere in this chapter, and is generally both easier to code and more efficient.

However, in some situations, you may need to manually acknowledge messages in the queue. This is most common when an application needs to operate on all messages with certain characteristics, rather than acknowledging individual messages. For example, an application that is doing updates to an order may want to cancel an order by both publishing a cancellation and immediately expiring all other messages in the queue for that order. With manual acknowledgment, that application can use a filter to remove all previous updates for that order, then publish the cancellation.

To manually acknowledge processed messages and remove the messages from the queue, applications use the sow_delete command. To remove specific messages from the queue, provide the bookmarks of those messages. To remove messages that match a given filter, provide the filter. Notice that AMPS only supports using a bookmark with sow_delete when removing messages from a queue, not when removing records from a SOW.

For example, given a Message object to acknowledge and a client, the code below acknowledges the message.


# Provided as a demonstration for cases
# where the message itself isn't available.
#
# Where the message is available, this method
# is typically less efficient than
# simply calling message.ack()

def acknowledgeSingle(client,message):
   acknowledge = Command("sow_delete")
   acknowledge.set_topic(message.get_topic()).set_bookmark(message.get_bookmark())
   client.execute_async(acknowledge,None)

In the example above, the program creates a sow_delete command, specifies the topic and the bookmark, and then sends the command to the server. Since the program does not need or expect a response from AMPS, this function provides None as the message handler.

While this method works, creating and sending an acknowledgment for each individual message can be inefficient if your application is processing a large volume of messages. Rather than acknowledging each message individually, your application can build a comma-delimited list of bookmarks from the processed messages and acknowledge all of the messages at the same time. In this case, it's important to be sure that the number of messages you wait for is less than the maximum backlog -- the number of messages your client can have unacknowledged at a given time. Notice that both automatic acknowledgment and the helper method on the Message object take the maximum backlog into account.

When constructing a command to acknowledge queue messages, AMPS allows an application to specify a filter rather than a set of bookmarks. AMPS interprets this as the client requesting acknowledgment of all messages that match the filter. (This may include messages that the client has not received, subject to the Leasing model for the queue.)

As a more typical example of manual acknowledgment, the code below expires all messages for a given id that have a status other than cancel. An application might do this to halt processing of an order that it is about to cancel.

def removePending(client, orderId):

  acknowledge = Command("sow_delete")
  acknowledge.set_topic(message.get_topic()) \
    .set_filter("/id = '%s' and /status != 'cancel'" % orderId).set_options("expire")
  client.execute_async(acknowledge,None)

In the example above, the program specifies a topic and a filter to use to find the messages that should be removed. In this case, the program also provides the expire option to indicate that the messages have been removed from the queue rather than successfully processed (of course, whether this is the correct behavior for a canceled order depends on the expected message flow for your application).

Notice that, as described in , this method of acknowledging a message should not be used from a message handler unless the sow_delete is sent from a different client than the client that called the message handler. Instead, 60East recommends using the ack() function from within a message handler.

Understanding Threading

Returning a Message to the Queue

A subscriber can also explicitly release a message back to the queue. AMPS returns the message to the queue and redelivers the message just as though the lease had expired. To do this, the subscriber sends a sow_delete command with the bookmark of the message to release and the cancel option.

When using automatic acknowledgments and the asynchronous API, AMPS will cancel a message if an exception is thrown from the message handler.

To return a message to the queue, you can build a sow_delete acknowledgment using the Command class, or pass an option to the ack() method on the message.

Option
Result

cancel

Returns the message to the queue.

expire

Immediately expires the message from the queue.

For example, to return a message to a queue, call ack() on the message and pass the cancel option.

message.ack("cancel")

Error Handling

In every distributed system, the robustness of your application depends on its ability to recover gracefully from unexpected events. The AMPS client provides the building blocks necessary to ensure your application can recover from the kinds of errors and special events that may occur when using AMPS.

Advanced Topics

Implementing Message Handlers in C or C++

The AMPS Python client provides a wrapper that works with the python ctypes module to allow you to create message handlers in C or C++ and expose them to Python. This can improve performance in the message handler. When you use this technique, messages are delivered directly from the C++ client to your message handler: there is no Python code involved in handling the messages.

To use this capability, you:

  1. Create a message handler with C linkage, and compile that message handler into a shared library.

  2. In your Python program, use the ctypes module to load the library.

  3. Construct an instance of CMessageHandler, a wrapper object that holds a pointer to the message handler function and the user data to be provided to the handler during each call.

  4. Pass the CMessageHandler to any method that expects a message handler.

The AMPS Python client registers the pointer and user data you provide as a C++ message handler. Once the handler is registered, no Python code is called when providing messages to the handler.

Implementing the Handler

To use this capability, you create a message handler that exposes a function with the following signature having C linkage:

extern "C"
void my_message_handler(
    AMPS::Message &message,
    void *userdata
);

Loading and Using the Handler

Once you've compiled the library, you use the ctypes module to load the library. You then create an instance of the message handler wrapper, and pass that wrapper to the AMPS client methods, as shown below:

import ctypes
import AMPS

...

# assumes that client is already created and connected

# load the shared object
dll = ctypes.CDLL("./libmymessagehandler.so")

# create a handler that points to the underlying C function
# and bind the user data to that handler.
handler = AMPS.CMessageHandler(dll.my_message_hander, "user data")

# handler can be used anywhere you would use a message handler
client.subscribe(handler, "myTopic")

client.set_last_chance_message_handler(handler)

# and so it goes

The AMPS.CMessageHandler type accepts a pointer to a message handler with the signature shown above and a Python object that can be marshalled into a native C type through the ctypes interface. Once marshalled, the object will be cast to a void * and provided in the userdata parameter of the message handler. Marshalling the userdata parameter follows the ctypes module conventions. If you need to explicitly control the way an object is marshalled, you can construct one of the ctypes objects and pass that new object into the method.

Using the C++ Client

While the AMPS Python client provides enough performance for a wide variety of applications, in some cases, using the underlying C++ implementation can provide extra performance. The AMPS Python client works with the ctypes module to allow you to pass the underlying C++ client to an arbitrary function, effectively allowing you to integrate C++ code directly into your Python program.

Consider using the C++ client directly when latency is at a premium or when your application works directly with C++. For example, you might you use the client directly when:

  • You are assembling messages from a C++ library without a Python binding

  • You need to customize client behavior that is implemented in C++ (for example, implementing a custom SubscriptionManager or BookmarkStore)

  • Your application needs to execute a set of commands with AMPS with minimal latency. For example, you might need to publish an array of values as individual messages with as little latency as possible. In this case, using the underlying C++ client directly can reduce latency.

To use the underlying C++ client, you:

  1. Create a function with C linkage, and compile that function into a shared library. One of the parameters of the function should be a reference to an AMPS::Client.

  2. In your Python program, use the ctypes module to load the library.

  3. Call the function on the library, passing the appropriate parameters for the C function.

Implementing the C++ Function

The only requirement on the C++ function is that it have C linkage and that one of the parameters is a reference to an AMPS::Client. By convention, 60East recommends that the first parameter is the AMPS::Client. However, this is not a requirement of the interface.

For example, a function that simply takes an AMPS::Client has the following signature:

extern "C"
void configure_client(AMPS::Client& client);

While a function that takes a client, a topic, and a pointer to data to be published might have the following signature:

extern "C"
void publish_data(
    AMPS::Client& client,
    const char * topic,
    const char * data
);

The ctypes module provides a standard AMPS::Client to these functions. Although the Client has been created by Python code, there is nothing Python-specific about the object within the C++ function. You can use the Client just as you would any other Client object.

You can also use the ctypes binding with AMPS::HAClient, as shown below:

extern "C"
void install_server_chooser(AMPS::HAClient& client);

Since the ctypes module passes the data through the C ABI, the module is not able to perform extensive type checking on C++ types. Your Python code must be careful to pass only objects of the appropriate type, or you may cause a segmentation violation. For example, if a method expecting an HAClient receives a Client and calls connectAndLogon (which is not a method provided by Client), your program will likely crash.

The ctypes module has a few important caveats.

The ctypes module does not provide strong type-safety guarantees for C++ classes. It is your responsibility to ensure that you call methods with objects of the appropriate type.

The ctypes module calls your function through an extern "C" interface. C++ exceptions cannot be propagated out of a function with C binding. You must catch all exceptions that may be thrown, or your application will likely crash.

Loading and Using the Function

Once you've compiled the library, you use the ctypes module to load the library. You can then call the function directly from Python, using the name of the C function and passing the appropriate arguments.

Let's look at a simple example. For this example, assume that you have compiled a module named module.so with the following function:

extern "C" void publish_message(AMPS::Client& client,
                                const char* topic,
                                const char* data)
{
    try
    {
        if (&client && topic && data) {
            client.publish(topic,data);
        }
    }
    catch (AMPS::Exception& e)
    {
        /* Handle error reporting and recovery logic */
    }
}

You can load the module and call the function as shown below:

import ctypes

module = ctypes.CDLL("module.so")
client = AMPS.Client("client")
client.connect("tcp://localhost:9007/amps/json")
client.logon()

module.publish_data(client, "my_topic", "some_data")

The ctypes module handles type conversions between Python and C types. In this case, the module passes the underlying Python client as the first argument of the C function. The two Python strings are passed as NULL-terminated char * arrays.

The ctypes module also handles more complicated signatures and correctly passes arrays. For example, you could implement a method that publishes an array of Python values as follows:

extern "C" void vector_publish(AMPS::Client& client,
                                const char* topic,
                                const char** data,
                                size_t vector_length)
{
    try
    {
        if (topic && &client) {
            for (;vector_length;--vector_length,++data) {
                client.publish(topic,*data);
            }
        }
    }
    catch (AMPS::Exception& e)
    {
        /* Handle reporting and recovery logic */
    }
}

You could then use this function from Python as follows:

import ctypes

module = ctypes.CDLL("module.so")
client = AMPS.Client("client")
client.connect("tcp://localhost:9007/amps/json")
client.logon()

TOPIC = "topic"
DATA  = [{"data":x, "string_data":"string_data"} for x in range(5)]


# initialize vector of data to publish by
# dumping the dictionaries to JSON strings

vector = [json.dumps(data) for data in DATA[1:]]

# Set up the parameters to be passed to a C
# function as explained in the ctype documentation
param = (ctypes.c_char_p * len(vector))()
param[:] = vector

# Call the function
module.vector_publish(client,TOPIC, param,len(param))

The sample above creates an array of dictionaries and creates an array of JSON objects from those dictionaries.

In this case, it is important for us to control how the array of JSON objects is passed to the C function. We need to pass an array of C-style strings, that is, const char**. To control how the array is marshalled, the sample creates an object that knows how to translate between a Python array and const char**, then assigns the array to that object (see the ctype documentation for full details). Once we have that object, we simply call the vector_publish function. None of the Python infrastructure is visible to the vector_publish function: that function is able to use the provided data as native C++ data.

Transport Filtering

The AMPS Python client offers the ability to filter incoming and outgoing messages in the format they are sent and received on the network. This allows you to inspect or modify outgoing messages before they are sent to the network, and incoming messages as they arrive from the network. This can be especially useful when using SSL connections, since this gives you a way to monitor outgoing network traffic before it is encrypted, and incoming network traffic after it is decrypted.

To create a transport filter, you create a callable that expects a string that contains the raw data, and a direction parameter indicating whether the string is output or not. For example, the following function simply prints the direction and data:

def printing_filter(data, direction):
    if direction:
        print("INCOMING ---> %s" % data)
    else:
        print("OUTGOING ---> %s" % data)

You then register the filter by calling set_transport_filter with the callable, as shown below.

# client is an AMPS client
client.set_transport_filter(printing_filter)

Notice that the transport filter function is called with the verbatim contents of data received from AMPS. This means that, for incoming data, the function may not be called precisely on message boundaries, and that the binary length encoding used by the client and server will be presented to the transport filter.

Working with Binary Data

A Message object contains two methods for retrieving the message payload:

  • get_data() returns the payload as a string

  • get_data_raw() returns the payload as bytes

If you are working with binary data that is not guaranteed to be valid UTF-8, use the get_data_raw method to avoid errors when attempting to encode the data to a string.

Using SSL

The AMPS Python client includes support for Secure Sockets Layer. To use this support in the Python client using the default OpenSSL implementation for the Python installation, you need only use tcps for the transport type in the connection string.

If your Python client does not have a default OpenSSL implementation, you must load an SSL implementation as described below. This is typically the case for Windows Python builds, and may be the case if your site uses a custom build of Python on Linux.

Loading a Different SSL Implementation

The Python client also allows you to load and use an OpenSSL implementation other than the default implementation for the Python installation. The AMPS Python client provides the method ssl_init, which takes the name of the library to load or a full path to the file that contains the library. For example, to load the SSL implementation at /opt/mycorp/trusted/vetted_ssl.so, you could use the following line of code:

AMPS.ssl_init("/opt/mycorp/trusted/vetted_ssl.so")

You must load the SSL library before making the connection.

Delta Publish

To delta publish, you use the delta_publish command as follows:

The message that you provide to AMPS must include the fields that the topic uses to generate the SOW key. Otherwise, AMPS will not be able to identify the message to update. For SOW topics that use a User-Generated SOW Key, use the Command form of delta_publish to set the SowKey, as shown below:

Notice that this signature is the same signature used by message handlers in the AMPS C++ client. You implement the function and compile it into a shared library or DLL, using the instructions provided with your Python implementation. For details on the C++ client, you can install the client itself, or consult the .

The full behavior of delta_publish is described in the section on .

C++ API documentation
# assumes that client is connected and logged on

msg = ... # obtain changed fields here

client.delta_publish("myTopic", msg)
# assumes that client is connected and logged on

msg = ... # obtain changed fields here
key = ... # obtain user-generated SOW key

cmd = AMPS.Command("delta_publish")
cmd.set_topic("delta_topic")
cmd.set_sow_key(key)
cmd.set_data(msg)

# Execute the delta publish. Use None for
# the message handler since any failure acks will
# be routed to the FailedWriteHandler.
client.execute_async(cmd,None)

Using a Heartbeat to Detect Disconnection

The AMPS client includes a heartbeat feature to help applications detect disconnection from the server within a predictable amount of time. Without using a heartbeat, an application must rely on the operating system to notify the application when a disconnect occurs. For applications that are simply receiving messages, it can be impossible to tell whether a socket is disconnected or whether there are simply no incoming messages for the client.

When you set a heartbeat, the AMPS client sends a heartbeat message to the AMPS server at a regular interval, and waits a specified amount of time for the response. If the operating system reports an error on send, or if there is no activity received from the server within the specified amount of time, the AMPS client considers the server to be disconnected. Likewise, the server will ensure that traffic is sent to the client at the specified interval, using heartbeat messages when no other traffic is being sent to the client. If, after sending a heartbeat message, no traffic from the client arrives within a period twice the specified interval, the server will consider the client to be disconnected or nonresponsive.

The AMPS client processes heartbeat messages on the client receive thread, which is the thread used for asynchronous message processing. If your application uses asynchronous message processing and occupies the thread for longer than the heartbeat interval, the client may fail to respond to heartbeat messages in a timely manner and may be disconnected by the server.

Ending Subscriptions

The AMPS server continues a subscription until the client explicitly ends the subscription (that is, unsubscribes) or until the connection to the client is closed.

With a MessageStream, AMPS automatically unsubscribes to the topic when there are no more references to the MessageStream. You can also call the close() method on the MessageStream object to remove the subscription.

With asynchronous message processing, when a subscription is successfully made, messages will begin flowing to the message handler function and the subscribe() or execute_async() call returns a string that serves as the identifier for this subscription. A Client can have any number of active subscriptions, and this subscription ID is how AMPS designates messages intended for this subscription. To unsubscribe, we simply call unsubscribe with the subscription identifier, as shown below:

client = Client("exampleClient")

# Register an asynchronous subscription
subId = client.execute_async(
    Command("subscribe").set_topic("messages"),
    on_message_printer
)

...

# when the program is done with the subscription, unsubscribe
client.unsubscribe(subId)

In this example we use the execute_async() method to create a subscription to the messages topic. When our application is done listening to this topic, it unsubscribes (on the last line) by passing in the subscription identifier returned by the subscribe command. After the subscription is removed, no more messages will flow into our on_message_printer function.

When an application calls unsubscribe(), the client sends an explicit unsubscribe command to AMPS. The AMPS server removes that subscription from the set of subscriptions for the client, and stops sending messages for that subscription. On the client side, the client unregisters the subscription so that the MessageStream or message handler for that subscription will no longer receive messages for that subscription.

Notice that calling unsubscribe does not destroy messages that the server has already sent to the client. If there are messages on the way to the client for this subscription, the AMPS client must consume those messages. If a last_chance_message_handler is registered, the handler may receive the messages. Otherwise, they will be discarded since no message handler matches the subscription ID on the message.

AMPS User Guide
Incremental Message Updates

AMPS Programming: Working with Commands

The AMPS clients provide named convenience methods for core AMPS functionality. These named methods work by creating messages and sending those messages to AMPS. All communication with AMPS occurs through messages.

You can use the Command object to customize the messages that AMPS sends. This is useful for more advanced scenarios where you need precise control over the message, or in cases where you need to use an earlier version of the client to communicate with a more recent version of AMPS, or in cases where a named method is not available.

Understanding AMPS Messages

AMPS messages are represented in the client as AMPS.Message objects. The Message object is generic and can represent any type of AMPS message, including both outgoing and incoming messages. This section includes a brief overview of elements common to AMPS command messages. Full details of commands to AMPS are provided in the AMPS Command Reference (linked at the bottom of this page).

All AMPS command messages contain the following elements:

  • Command - The command tells AMPS how to interpret the message. Without a command, AMPS will reject the message. Examples of commands include publish, subscribe, and sow.

  • CommandId - The command ID, together with the name of the client, uniquely identifies a command to AMPS. The command ID can be used later on to refer to the command or the results of the command. For example, the command ID for a subscribe message becomes the identifier for the subscription. The AMPS client provides a command ID when the command requires one and no command ID is set.

Most AMPS messages contain the following fields:

  • Topic - The topic that the command applies to, or a regular expression that identifies a set of topics that the command applies to. For most commands, the topic is required. Commands such as logon, start_timer, and stop_timer do not apply to a specific topic, and do not need this field.

  • Ack Type - The ack type tells AMPS how to acknowledge the message to the client. Each command has a default acknowledgment type that AMPS uses if no other type is provided.

  • Options - The options are a comma-separated list of options that affect how AMPS processes and responds to the message.

Beyond these fields, different commands include fields that are relevant to that particular command. For example, SOW queries, subscriptions, and some forms of SOW deletes accept the Filter field, which specifies the filter to apply to the subscription or query. As another example, publish commands accept the Expiration field, which sets the SOW expiration for the message.

For full details on the options available for each command and the acknowledgment messages returned by AMPS, see the AMPS Command Reference.

Creating and Populating the Command

To create a command, you simply construct a command object of the appropriate type:

command = AMPS.Command("sow")

Once created, you set the appropriate fields on the command. For example, the following code creates a SOW query, setting the command, topic, and filter for the query:

command = AMPS.Command("sow")                                   \
             .set_topic("messages-sow")                         \
             .set_filter("/id > 20")

When sent to AMPS using the execute() method, AMPS performs a SOW query from the topic messages-sow using a filter of /id > 20. The results of sending this message to AMPS are no different than using the form of the sow method that sets these fields.

Using Execute

Once you've created a command, use the execute method to send the command to AMPS. One form of the execute method returns a MessageStream that you can use from the calling thread to process responses from AMPS. The other form, execute_async method, sends the message to AMPS, waits for a processed acknowledgment, then returns. Messages are processed on the client background thread.

For example, the following snippet sends the command created above:

client.execute(command)

This returns a MessageStream identical to the MessageStream returned by the equivalent client.sow() method.

You can also provide a message handler to receive acknowledgments, statistics, or the results of subscriptions and SOW queries. The AMPS client maintains a background thread that receives and processes incoming messages. The call to execute_async returns on the main thread as soon as AMPS acknowledges the command as having been processed, and messages are received and processed on the background thread:

def handleMessages(m):
   print ("%s : %s" % (m.get_ack_type(), m.get_reason()))
   # other message handling here

client.execute_async(command, handleMessages)

While this message handler simply prints the ack type and reason for sample purposes, message handlers in production applications are typically designed with a specific purpose. For example, your message handler may fill a work queue, or check for success and throw an exception if the command failed.

Using Execute to Publish

Notice that the publish command typically does not return results other than acknowledgment messages. To send a publish command, use the execute_async() method, providing None for the message handler:

client.execute_async(publishCmd, None)

Since the code sets the message handler to None, this code does not receive acknowledgments. To detect publish failures, set the FailedWriteHandler for the client.

AMPS Command Cookbook

Managing Disconnection

The HAClient class, included with the AMPS Python client, contains a disconnect handler and other features for building highly-available applications. The HAClient includes features for managing a list of failover servers, resuming subscriptions, republishing in-flight messages, and other functionality that is commonly needed for high availability. 60East recommends using the HAClient for automatic reconnection wherever possible, as the HAClient disconnect handler has been carefully crafted to handle a wide variety of edge cases and potential failures.

If an application needs to reconnect or fail over, use an HAClient, and the AMPS client library will automatically handle failover and reconnection. You control which servers the client fails over to using an implementation of the ServerChooser interface, and you can control the timing of the failover using an implementation of the ReconnectDelayStrategy interface.

For most applications, the combination of the HAClient disconnect handler and a ConnectionStateListener gives you the ability to monitor disconnections and add custom behavior at the appropriate point in the reconnection process.

If you need to add custom behavior to the failover (such as logging, resetting an internal cache, refreshing credentials and so on), the ConnectionStateListener class allows your application to be notified and take action when disconnection is detected and at each stage of the reconnection process.

Utility Classes

The AMPS Python client includes a set of utilities and helper classes to make working with AMPS easier.

Composite Message Types

The client provides a pair of classes for creating and parsing composite message types:

  • CompositeMessageBuilder allows you to assemble the parts of a composite message and then serialize them in a format suitable for AMPS.

  • CompositeMessageParser extracts the individual parts of a composite message type.

Building Composite Messages

To build a composite message, create an instance of CompositeMessageBuilder, and populate the parts. The CompositeMessageBuilder copies the parts provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual parts of a message once they've been added to the builder.

The snippet below shows how to build a composite message that includes a JSON part, constructed as a string, and a binary part consisting of the bytes from an array.array that contains doubles.

json = '{"data":"sample"}'


data = array.array('d')
# populate data
...


# Create the payload for the composite message
builder = AMPS.CompositeMessageBuilder()

# Construct the composite
builder.append(json)
builder.append(data.tostring())

# Send the message
client.publish("messages", builder.get_data())

Parsing Composite Messages

To parse a composite message, create an instance of CompositeMessageParser, then use the parse() method to parse the message provided by the AMPS client. The CompositeMessageParser gives you access to each part of the message as a sequence of bytes.

For example, the following snippet parses and prints messages that contain a JSON part and a binary part that contains an array of doubles.

parts = parser.parse(message)

json = parser.get_part(0)
data = array.array('d')
data.fromstring(parser.get_part(1))

print("Received message with %d parts." % parts)
print(json)
datastring = ""
for d in data:
    datastring += "%f " % d
print(datastring)

Notice that the receiving application is written with explicit knowledge of the structure and content of the composite message type.

NVFIX Messages

The client provides a pair of classes for creating and parsing NVFIX messages:

  • NVFIXBuilder allows you to assemble an NVFIX message and then serialize it in a format suitable for AMPS.

  • NVFIXShredder extracts the individual fields of an NVFIX message type.

Building NVFIX Messages

To build an NVFIX message, create an instance of NVFIXBuilder, then add the fields of the message using append(). NVFIXBuilder copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.

The snippet below shows how to build an NVFIX message and publish it to the AMPS client.

# create the payload for the NVFIX Message
builder = AMPS.NVFIXBuilder()

# construct the NVFIX message
builder.append("sample","data")
builder.append("even", "more data")
...

# display the data
print(builder.get_string())

# publish the message
client.publish("messages-sow", builder.get_string())

Parsing NVFIX Messages

To parse an NVFIX message, create an instance of NVFIXShredder, then use the to_map() method to parse the message provided by the AMPS client. The NVFIXShredder gives you access to each field of the message in a map.

The snippet below shows how to parse and print an NVFIX message.

# create the shredder for the message and subscribe to the topic
shredder = AMPS.NVFIXShredder()
message = client.subscribe(topic="messages-sow", timeout=5000)

# shred the message to a map
message_map = shredder.to_map(message.next().get_data())

# display the values of the message
for key in message_map:
    print(key + " " + message_map[key])

FIX Messages

The client provides a pair of classes for creating and parsing FIX messages:

  • FIXBuilder allows you to assemble a FIX message and then serialize it in a format suitable for AMPS.

  • FIXShredder extracts the individual fields of a FIX message.

Building FIX Messages

To build a FIX message, create an instance of FIXBuilder, then add the fields of the message using append(). FIXBuilder copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.

The snippet below shows how to build a FIX message and publish it to the AMPS client.

# create the payload for the FIX Message
builder = AMPS.FIXBuilder()

# construct the FIX message
builder.append(0,"data")
builder.append(1, "more data")
...

# display the data
print(builder.get_string())

# publish the message
client.publish("messages-sow", builder.get_string())

Parsing FIX Messages

To parse a FIX message, create an instance of FIXShredder, then use the to_map() method to parse the message provided by the AMPS client. The FIXShredder gives you access to each field of the message in a map.

The snippet below shows how to parse and print a FIX message.

# create the shredder for the message and subscribe to the topic
shredder = AMPS.FIXShredder()
message = client.subscribe(topic="messages-sow", timeout=5000)

# shred the message to a map
message_map = shredder.to_map(message.next().get_data())

# display the values of the message
for key in message_map:
    print(str(key) + " " + message_map[key])

Acknowledging Messages

For each message delivered on a subscription, AMPS counts the message against the subscription backlog until the message is explicitly acknowledged. In addition, when a queue specifies at-least-once delivery, AMPS retains the message in the queue until the message expires or until the message has been explicitly acknowledged and removed from the queue. From the point of view of the AMPS server, acknowledgment is implemented as a sow_delete from the queue with the bookmarks of the messages to remove. The AMPS Python client provides several ways to make it easier for applications to create and send the appropriate sow_delete.

Automatic Acknowledgment

The AMPS client allows you to specify that messages should be automatically acknowledged. When this mode is on, AMPS acknowledges the message automatically in the following cases:

  • Asynchronous Message Processing Interface - The message handler returns without throwing an exception.

  • Synchronous Message Processing Interface - The application requests the next message from the MessageStream.

AMPS batches acknowledgments created with this method, as described in the following section.

To enable automatic acknowledgment, use the set_auto_ack() method.

Message Convenience Method

The AMPS Python client provides a convenience method, ack(), on delivered messages. When the application is finished with the message, the application simply calls ack() on the message. (This, in turn, provides the topic and bookmark to the ack() method of the client that received the message.)

For messages that originated from a queue with at-least-once semantics, this adds the bookmark from the message to the batch of messages to acknowledge. For other messages, this method has no effect.

Acknowledgment Batching

The AMPS Python client automatically batches acknowledgments when either of the convenience methods is used. Batching acknowledgments reduces the number of round-trips to AMPS, which reduces network traffic and improves overall performance. AMPS sends the batch of acknowledgments when the number of acknowledgments exceeds a specified size, or when the amount of time since the last batch was sent exceeds a specified timeout.

You can set the number of messages to batch and the maximum amount of time between batches, as shown below:

The AMPS Python client is aware of the subscription backlog for a subscription. When AMPS returns the acknowledgment for a subscription that contains queues, AMPS includes information on the subscription backlog for the subscription. If the requested batch size is larger than the subscription backlog, the AMPS Python client adjusts the requested batch size to match the subscription backlog.

60East recommends tuning the batch size to improve application performance. A value of 1/3 of the smallest max_backlog value is a good initial starting point for testing. 60East does not recommend setting the batch size larger than 1/2 of the max_backlog value without testing to ensure that the application does not run out of messages to process while the acknowledgment is being sent to AMPS.

The includes information on which fields and options to set on commands to get a specific result. The reference includes both reference information and a that provides a concise guide for commonly-used commands.

To extend the behavior of the AMPS client during reconnection, implement a ConnectionStateListener as described in the section on .

For more information regarding composite message types, refer to the chapter in the AMPS User Guide.

AMPS Command Reference
Command Cookbook
Monitoring Connection State
Message Types
client.set_auto_ack(True)  # enable AutoAck
message.ack() # Add this message to the next 
              # acknowledgment batch.
client.set_ack_batch_size(10)  # Send batch after 10 messages
client.set_ack_timeout(1000)   # ... or 1 second

Replacing Disconnect Handling

In some cases, an application does not want the AMPS client to reconnect, but instead wants to take a different action if disconnection occurs. For example, a stateless publisher that sends ephemeral data (such as telemetry or prices) may want to exit with an error if the connection is lost rather than risk falling behind and providing outdated messages. Often, in this case, a monitoring process will start another publisher if a publisher fails, and it is better for a message to be lost than to arrive late.

To cover cases where the application has unusual needs, the AMPS client library allows an application to provide custom disconnect handling.

Your application gets to specify exactly what happens when a disconnect occurs by supplying a function to client.set_disconnect_handler(), which is invoked whenever a disconnect occurs. This may be helpful for situations where a particular connection needs to do something completely different than reconnecting or failing over to another AMPS server.

Setting the disconnect handler completely replaces the disconnection and failover behavior for an HAClient and provides the only disconnection and failover behavior for a Client.

The handler runs on the thread that detects the disconnect. This may be the client receive thread (for example, if the disconnect is detected due to heartbeating) or an application thread (for example, if the disconnect is detected when sending a command to AMPS).

The example below shows the basics:

class MyApp:
    def __init__(self, _uri):
        self.uri = _uri
        self.client = None
        self.client = AMPS.Client(...)

        # set_disconnect_handler() method is called to supply a function for use when AMPS
        # detects a disconnect. At any time, this function may be called by AMPS to
        # indicate that the client has disconnected from the server, and to allow your
        # application to choose what to do about it. The application continues on to
        # connect and subscribe to the orders topic.
        self.client.set_disconnect_handler(self.exit_on_disconnection)
        self.client.connect(self.uri)
        self.client.logon()

    # display order data to the user
    def showMessage(self,m):
        pass

    # Our disconnect handler’s implementation begins here.
    #
    # In this example, we exit the application if the
    # connection fails.

    def exit_on_disconnection(self, client):
        print("Disconnection detected, exiting.")
        sys.exit(1)

Unexpected Messages

The AMPS Python client handles most incoming messages and takes appropriate action. Some messages are unexpected or occur only in very rare circumstances. The AMPS Python client provides a way for clients to process these messages. Rather than providing handlers for all of these unusual events, AMPS provides a single handler function for messages that can't be handled during normal processing.

Your application registers this handler by setting the last_chance_message_handler for the client. This handler is called when the client receives a message that can't be processed by any other handler. This is a rare event, and typically indicates an unexpected condition.

For example, if a client publishes a message that AMPS cannot parse, AMPS returns a failure acknowledgment. This is an unexpected event, so AMPS does not include an explicit handler for this event, and failure acknowledgments are received in the method registered as the last_chance_message_handler.

Your application is responsible for taking any corrective action needed. For example, if a message publication fails, your application can decide to republish the message, publish a compensating message, log the error, stop publication altogether, or any other action that is appropriate.

Monitoring Connection State

The AMPS client interface provides the ability to set one or more connection state listeners. A connection state listener is a callback that is invoked when the AMPS client detects a change to the connection state.

A connection state listener may be called from the client receive thread. An application should not submit commands to AMPS from a connection state listener, or the application risks creating a deadlock for commands that wait for acknowledgement from the server.

The AMPS client provides the following state values for a connection state listener:

State
Indicates

Connected

The client has established a connection to AMPS. If you are using a Client, this is delivered when connect() is successful.

If you are using an HAClient, this state indicates that the connect part of the connect and logon process has completed. An HAClient using the default disconnect handler will attempt to log on immediately after delivering this state.

Most applications that use Client will attempt to log on immediately after the call to connect() returns.

An application should not submit commands to AMPS from the connection state listener while the client is in this state unless the application knows that the state has been delivered from a Client and that the Client does not call logon().

LoggedOn

The client has successfully logged on to AMPS. If you are using a Client, this is delivered when logon() is successful.

If you are using an HAClient, this state indicates that the logon part of the connect and logon process has completed.

This state is delivered after the client is logged on, but before recovery of client state is complete. Recovery will continue after delivering this state: the application should not submit commands to AMPS from the connection state listener while the client is in this state if further recovery will take place.

HeartbeatInitiated

The client has successfully started heartbeat monitoring with AMPS. This state is delivered if the application has enabled heartbeating on the client.

This state is delivered before recovery of the client state is complete. Recovery may continue after this state is delivered. The application should not submit commands to AMPS from the connection state listener until the client is completely recovered.

PublishReplayed

Delivered when a client has completed replay of the publish store when recovering after connecting to AMPS.

This state is delivered when the client has a PublishStore configured.

If the client has a subscription manager set, (which is the default for an HAClient), the application should not submit commands from the connection state listener until the Resubscribed state is received.

Resubscribed

Delivered when a client has re-entered subscriptions when recovering after connecting to AMPS.

This state is delivered when the client has a subscription manager set (which is the default for an HAClient). This is the final recovery step. An application can submit commands to AMPS from the connection state listener after receiving this state.

Disconnected

The client is not connected. For an HAClient, this means that the client will attempt to reconnect to AMPS. For a Client, this means that the client will invoke the disconnect handler, if one is specified.

Shutdown

The client is shut down. For an HAClient, this means that the client will no longer attempt to reconnect to AMPS. This state is delivered when close() is called on the client or when a server chooser tells the HAClient to stop reconnecting to AMPS.

The enumeration provided for the connection state listener also includes a value of UNKNOWN, for use as a default or to represent additional states in a custom Client implementation. The 60East implementations of the client do not deliver this state.

The following table shows examples of the set of states that will be delivered during connection, in order, depending on what features of the client are set. Notice that, for an instance of the Client class, this table assumes that the application calls both connect() and logon(). For an HAClient, this table assumes that the HAClient is using the default DisconnectHandler for the HAClient.

Configuration
States

subscription manager

publish store

Connected

LoggedOn

PublishReplayed

Resubscribed

subscription manager

publish store

heartbeat set

Connected

LoggedOn

HeartbeatInitiated

PublishReplayed

Resubscribed

subscription manager

Connected

LoggedOn

Resubscribed

subscription manager

heartbeat set

Connected

LoggedOn

HeartbeatInitiated

Resubscribed

(default Client configuration)

Connected

LoggedOn

SOW and Subscribe

Imagine an application that displays real time information about the position and status of a fleet of delivery vans. When the application starts, it should display the current location of each of the vans along with their current status. As vans move around the city and post other status updates, the application should keep its display up to date. Vans upload information to the system by posting messages to the van_location topic, configured with a key of van_id on the AMPS server.

In this application, it is important to not only stay up-to-date on the latest information about each van, but to ensure all of the active vans are displayed as soon as the application starts. Combining a SOW with a subscription to the topic is exactly what is needed, and that is accomplished by the Client.sow_and_subscribe() method. As with the other methods for receiving messages, the AMPS Python client provides a basic, synchronous form of sow_and subscribe that provides you with a MessageStream to iterate over, and an asynchronous form that requires a message handler.

First, let's look at an example that uses the basic form of sow_and_subscribe:

def report_van_position(client):
    # sow_and_subscribe command to begin receiving information about all of the active
    # delivery vans in the system. All of the vans in the system now are returned as
    # Message objects whose get_command returns sow. New messages coming in are
    # returned as Message objects whose get_command returns publish.
    for message in client.execute(Command("sow_and_subscribe")  \
            .set_topic("van_location")                          \
            .set_filter("/status = 'ACTIVE'")                   \
            .set_batch_size(100)                                \
            .set_options("oof")):
            # Notice here that we specified the oof option to the command. Setting this
            # option causes AMPS to send Out-of-Focus (OOF) messages for the topic.
            # OOF messages are sent when an entry that was sent to us in the past no
            # longer matches our query. This happens when an entry is removed from the
            # SOW cache via a sow_delete operation, when the entry expires (as specified
            # by the expiration time on the message or by the configuration of that topic
            # on the AMPS server), or when the entry no longer matches the content filter
            # specified. In our case, if a van’s status changes to something other than
            # ACTIVE, it no longer matches the content filter, and becomes out of focus.
            # When this occurs, a Message is sent with Command set to oof. We use OOF
            # messages to remove vans from the display as they become inactive, expire,
            # or are deleted.
      if (message.get_command() == Message.Command.SOW or
          message.get_command() == Message.Command.Publish):

          # For each of these messages we call add_or_update_van(), that presumably adds
          # the van to our application’s display. As vans send updates to the AMPS server,
          # those are also received by the client because of the subscription placed by
          # sow_and_subscribe(). Our application does not need to distinguish between
          # updates and the original set of vans we found via the SOW query, so we use
          # add_or_update_van() to display the new position of vans as well.
          add_or_update_van(message)
      elif message.get_command() == Message.Command.OOF:
          remove_van(message)

Now we will look at an example that uses the asynchronous form of sow_and_subscribe:

def update_van_position(message):
    if (message.get_command() == Message.Command.SOW or
        message.get_command() == Message.Command.Publish):
        add_or_update_van(message)
    elif message.get_command() == Message.Command.OOF:
        remove_van(message)


def subscribe_to_van_location(client):
    client.execute_async(
           Command("sow_and_subscribe")          \
             .set_topic("van_location")          \
             .set_filter("/status = 'ACTIVE'")   \
             .set_batch_size(100)                \
             .set_options("oof"),                \
       update_van_position)

Notice that the two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens. The other form processes messages on the main thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the calls to add_or_update_van and remove_van receive the same data.

Managing SOW Contents

AMPS allows applications to manage the contents of the SOW by explicitly deleting messages that are no longer relevant. For example, if a particular delivery van is retired from service, the application can remove the record for the van by deleting the record for the van.

The client provides the following methods for deleting records from the SOW:

  • sow_delete - Accepts a topic and filter, and deletes all messages that match the filter from the topic specified.

  • sow_delete_by_keys - Accepts a set of SOW keys as a comma-delimited string and deletes messages for those keys, regardless of the contents of the messages. SOW keys are provided in the header of a SOW message, and are the internal identifier AMPS uses for that SOW message.

  • sow_delete_by_data - Accepts a topic and message, and deletes the SOW record that would be updated by that message.

The most efficient way to remove messages from the SOW is to use sow_delete_by_keys or sow_delete_by_data, since those options allow AMPS to exactly target the message or messages to be removed. Many applications use sow_delete, since this is the most flexible method for removing items from the SOW when the application does not have information on the exact messages to be removed.

In either case, AMPS sends an OOF message to all subscribers who have received updates for the messages removed, as described in the previous section.

The simple form of the sow_delete command returns a Message. This Message is an acknowledgment that contains information on the delete command. For example, the following snippet simply prints informational text with the number of messages deleted:

msg = client.sow_delete(
    "sow_topic",
    "/id IN (42, 64, 37)"
)

print("Got an %s message containing %s: deleted %s SOW entries" % \
      (msg.get_command(), msg.get_ack_type(), msg.get_matches()))

The sow_delete command also provides an asynchronous version that requires a message handler. This message handler is designed to receive sow_delete response messages from AMPS:

def delete_handler(m):
    print("Got an %s message containing %s: deleted %s SOW entries" % \
        (m.get_command(), m.get_ack_type(), m.get_matches()))

client.execute_async(
    Command("sow_delete")               \
    .set_topic("sow_topic")             \
    .set_filter("/id IN (42, 64, 37)")  \
    .add_ack_type("stats"),             \
    delete_handler
)

Acknowledging messages from a queue uses a form of the sow_delete command that is only supported for queues. Acknowledgment is discussed in the chapter in this guide.

Using Queues

Client Side Conflation

In many cases, applications that use SOW topics only need the current value of a message at the time the message is processed, rather than processing each change that led to the current value. On the server side, AMPS provides conflated topics to meet this need. Conflated topics are described in more detail in the AMPS User Guide, and require no special handling on the client side.

In some cases, though, it's important to conflate messages on the client side. This can be particularly useful for applications that do expensive processing on each message, applications that are more efficient when processing batches of messages, or for situations where you cannot provide an appropriate conflation interval for the server to use.

A MessageStream has the ability to conflate messages received for a subscription to a SOW topic, view, or conflated topic. When conflation is enabled, for each message received, the client checks to see whether it has already received an unprocessed message with the same SowKey. If so, the client replaces the unprocessed message with the new message. The application never receives the message that has been replaced.

To enable client-side conflation, you call conflate() on the MessageStream, and then use the MessageStream as usual:

# SOW query and subscribe
results = client.sow_and_subscribe("orders", "/symbol == 'ROL'")

# Turn on conflation
results.conflate()

# Process the results
for message in results:
    # process message here

When a MessageStream is used for a subscription that does not include SowKeys (such as a subscription to a topic that does not have a SOW), the MessageStream will allow you to turn on conflation, but no conflation will occur.

When using client-side conflation with delta subscriptions, bear in mind that client-side conflation replaces the whole message, and does not attempt to merge deltas. This means that updates can be lost when messages are replaced. For some applications (for example, a ticker application that simply sends delta updates that replace the current price), this causes no problems. For other applications (for example, when several processors may be updating different fields of a message simultaneously), using conflation with deltas could result in lost data, and server-side conflation is a safer alternative.

High Availability

The AMPS Python Client provides an easy way to create highly-available applications using AMPS, via the HAClient class. HAClient derives from Client and offers the same methods, but also adds protection against network, server, and client outages.

Using HAClient allows applications to automatically:

  • Recover from temporary disconnects between client and server.

  • Failover from one server to another when a server becomes unavailable.

Since the HAClient automatically manages failover and reconnection, 60East recommends using the HAClient for applications that need to:

  • Automatically reconnect and resume work in the case of disconnection.

  • Ensure no messages are lost or duplicated after a reconnect or failover.

  • Persist messages and bookmarks on disk for protection against client failure.

You can choose how your application uses HAClient features. For example, you might need automatic reconnection, but have no need to resume subscriptions or republish messages. The high availability behavior in HAClient is provided by implementations of defined interfaces. You can combine different implementations provided by 60East to meet your needs, and implement those interfaces to provide your own policies.

Some of these features require specific configuration settings on your AMPS instance(s). This chapter mentions these features and describes how to use them from the AMPS Python client. You can find full documentation for these settings and server features in the AMPS User Guide.

Overview of HAClient

HAClient derives from Client and offers the same methods for sending commands to AMPS and receiving messages from AMPS.

The HAClient differs from the Client in two ways:

  • The HAClient automatically installs a disconnect handler that reconnects to AMPS and resumes active (asynchronous) subscriptions. The disconnect handler optionally replays publish and sow_delete messages that have not been acknowledged by AMPS, using a PublishStore. The disconnect handler can optionally resume replays from the transaction log at a point that guarantees no messages are skipped and no duplicates are delivered to the application, using a BookmarkStore.

  • The HAClient includes the infrastructure needed for client failover, including a list of connection strings and their associated authentication mechanisms (provided by the ServerChooser), and options for controlling backoff behavior for reconnects (provided by the DelayStrategy). As a result, the HAClient provides a connect_and_logon() function for establishing a connection to AMPS, rather than treating these as independent steps that an application must manage itself.

If your application needs to automatically reconnect to AMPS, 60East recommends using the HAClient and the automatically provided disconnect handler rather than using a Client or replacing the HAClient default disconnect handler.

Reconnection with HAClient

The most important difference between Client and HAClient is that HAClient automatically provides a reconnect handler.

This description provides a high-level framework for understanding the components involved in failover with the HAClient. The components are described in more detail in the following sections.

The HAClient reconnect handler performs the following steps when reconnecting:

  1. Calls the ServerChooser to determine the next URI to connect to and the authenticator to use for that connection.

    If the connection fails, calls get_error on the ServerChooser to get a description of the failure, sends an exception to the exception listener and stops the reconnection process.

  2. Calls the DelayStrategy to determine how long to wait before attempting to reconnect and waits for that period of time.

  3. Connects to the AMPS server. If the connection fails, calls report_failure on the ServerChooser and begins the process again.

  4. Logs on to the AMPS server. If the connection fails, calls report_failure on the ServerChooser and begins the process again.

  5. Calls report_success on the ServerChooser.

  6. Receives the bookmark for the last message that the server has persisted. Discards any older messages from the PublishStore.

  7. Republishes any messages in the PublishStore that have not been persisted by the server.

  8. Re-establishes subscriptions using the SubscriptionManager for the client. For bookmark subscriptions, the reconnect handler uses the BookmarkStore for the client to determine the most recent bookmark, and re-subscribes with that bookmark. For subscriptions that do not use a bookmark, the SubscriptionManager simply re-enters the subscription, meaning that it is entered at the point at which the HAClient reconnects.

The ServerChooser, DelayStrategy, PublishStore, and BookmarkStore are all extension points for the HAClient. You can adapt the failover and recovery behavior by setting a different object for the behavior you want to customize on the HAClient or by providing your own implementation.

For example, the convenience methods in the previous section customize the behavior of the PublishStore and BookmarkStore by providing either memory-backed or file-backed stores.

The reconnection process runs on the thread that discovers the disconnection. This means that, in the event that an application thread discovers the disconnection as a result of a call to the Python AMPS client, that call may not return until a connection is re-established (or until the server chooser indicates failure, in which case the application will receive an exception).

The Python client includes a retry_on_disconnect setting that controls this retry behavior when the client is disconnected. When set to True (the default), any call to the Client that results in a command being sent to AMPS may block until a connection is re-established. When set to False, the HAClient will retry the connection a single time and throw an exception if the connection cannot be re-established.

Regardless of the retry_on_disconnect setting, a call to publish will result in the message being stored in the PublishStore for the client if one is set.

Choosing Store Durability

If your application needs reliable publish to AMPS, install a PublishStore in the HAClient. If your application needs to resume replays from the transaction log, install a BookmarkStore in the HAClient.

These stores provide the following capabilities:

  • A bookmark store tracks received messages and is used to resume subscriptions that replay from the transaction log.

  • A publish store tracks published messages and is used to ensure that messages are persisted in AMPS.

The AMPS client provides a memory-backed version of each store and a file-backed version of each store. The store interface is public, and an application can create and provide a custom store as necessary. An HAClient can use either a memory backed store or a file backed store for protection. Each method provides resilience to different failures, as described below:

  • Memory-backed stores provide recovery disconnection from AMPS by storing messages and bookmarks in your process' address space. This is the highest performance option for working with AMPS in a highly available manner. The trade-off with this method is there is no protection from a crash or failure of your client application. If your application is terminated prematurely or, if the application terminates at the same time as an AMPS instance failure or network outage, then messages may be lost or duplicated. The state of bookmark replays will be lost when the application shuts down. Messages in the publish store when the application shuts down will not be maintained through a restart, so the application will not be able to attempt any necessary redelivery when the application restarts.

    A memory-backed store should only be used by one instance of a client at a time.

  • File-backed stores provide recovery after client failure and disconnection from AMPS by storing messages and bookmarks on disk. To use this protection method, the create_file_backed method requests additional arguments for the two files that will be used for both bookmark storage and message storage. If these files exist and are non-empty (as they would be after a client application is restarted), the HAClient loads their contents and ensures synchronization with the AMPS server once connected. The performance of this option depends heavily on the speed of the device on which these files are placed. When the files do not exist (as they would the first time a client starts on a given system), the HAClient creates and initializes the files, and in this case the client does not have a point at which to resume the subscription or messages to republish.

    A store file should only be used by one instance of a client at a time.

    When using file-backed stores, 60East recommends periodically removing unneeded entries by calling the prune() method. The precise strategy that your application uses to call prune() depends on the nature of the application. Most applications call prune() when the application exits.

    There are two basic strategies that applications follow while the application runs:

    • Install a resize handler and call prune() after a specified number of resize operations or when the store reaches a specific size.

    • Call prune() after a specific number of messages are processed (for example, every 10,000 messages received or every 1,000 updates completed).

    Regardless of the strategy, it is best to call prune() when the application is idle, since the prune() call rewrites the log file.

The store interface is public and an application can create and provide a custom store as necessary. While clients provide convenience methods for creating file-backed and memory-backed HAClient objects with the appropriate stores, you can also create and set the stores in your application code. For the AMPS Python client, stores are implemented in C++. You can implement stores using C++, and use the technique described in Chapter 12 of this guide - Using the C++ Client, to set the store on the client.

Starting in 5.3.2.0, the underlying AMPS client contains a recovery point adapter interface to make it easier to add a custom persistence layer to a bookmark store. The distribution includes a recovery point adapter that can store bookmark recovery information in an AMPS SOW topic.

The HAClient provides convenience methods for creating clients and setting stores. You can also construct an HAClient and set the store implementations you choose.

In this example, we create several clients. The first client uses memory stores for both bookmarks and publishes. The second client uses files for both bookmarks and publishes. The third client uses a file for bookmarks. The third client does not set a store for publishes, which means that AMPS provides the default store (and no outgoing messages are stored). The final client does not specify any stores, so has no persistence for published messages or bookmark subscriptions, but can take advantage of the automatic failover and reconnection in the HAClient.

# Memory publish store, memory bookmark store
memoryClient = AMPS.HAClient("lessImportantMessages")

# File-backed publish store, file-backed bookmark store
diskClient = AMPS.HAClient("moreImportantMessages",
          "/mnt/fastDisk/moreImportantMessages.outgoing",
          "/mnt/fastDisk/moreImportantMessages.incoming")

# No-op publish store, file-backed bookmark store
subscriberClient = AMPS.HAClient("subscriber", no_store=True)
subscriberClient.set_bookmark_store(     \
            AMPS.MMapBookmarkStore("/mnt/fastdisk/bookmark.store"))

# No-op publish store, no-op bookmark store
# Failover behavior only.
streamReader = AMPS.HAClient("streamReader",no_store=True)

Using the SOW Recovery Point Adapter

The AMPS client also includes the ability to use a SOW topic to store bookmark state for a bookmark store. This can be a useful option in a situation where an application needs a persistent bookmark store, but does not have the ability to store a file on the filesystem, or where an application has a bookmark file, but wants to have the ability to resume the subscription if the file is lost or damaged, or if the application is started on a system that does not have access to the file.

To use the SOW topic recovery point adapter, you create a bookmark store of the type you would like to use for the Client, passing an adapter when you construct the store. You then set this bookmark store as the store for the Client to use. The constructor for the SOW recovery adapter allows you to customize the topic name and field names used to store the recovery point information in AMPS. As with the RecoveryPointAdapter interface in general, it is possible to customize the behavior of the SOW recovery point adapter by overriding the provided methods.

This section describes how to use the adapter with the default settings. Should you need to change the behavior of the class, you would adjust the guidance in this section accordingly. (For example, if you override methods to produce a message with a different set of keys or a different message format, you would update the topic definition accordingly).

AMPS Topic Configuration

To store recovery point state in AMPS, the AMPS instance that will store the recovery point state must define a SOW/Topic to hold the recovery point data.

By default, the adapter uses a topic named /ADMIN/bookmark_store of json message type, with the /clientName and /subId fields as keys, similar to the following definition:

<Topic>
   <Name>/ADMIN/bookmark_store</Name>
   <MessageType>json</MessageType>
   <Key>/clientName</Key>
   <Key>/subId</Key>
   <!-- Storage/persistence configuration here.
        In most cases, this topic should be
        persisted to a file, but that is not
        a requirement.  -->
</Topic>

You must include this definition, or an equivalent definition, in the configuration file for the AMPS instance that will host the recovery point.

If you define a topic with a different configuration (for example, different key names, a different topic name or a different message type), you must ensure that the adapter that you create uses the same parameters as those configured on the server.

Constructing a Client for the Adapter

The AMPS SOW Recovery Point Adapter requires a Client or HAClient connected to the instance that contains the SOW topic. The Adapter will use this client to recover bookmark state and store bookmarks in AMPS. Notice that this client must not be a client that the Adapter is keeping state for. This must be a completely separate client instance, otherwise the client may deadlock while updating the store.

The client must be connected and logged in to the instance that contains the SOW topic, using the message type defined for the topic.

Capacity Planning and Store Sizing

When an application uses a file-backed store, it is important to make sure that there is enough space available on the file system to be able to manage the store.

For logged bookmark stores, an application needs to keep a bookmark record for each message received, each message discarded, and the persisted acknowledgments delivered by the server approximately once a second. Each bookmark entry consumes roughly 70 bytes of storage plus the length of the subscription ID for the subscription receiving the message. The logged bookmark store retains entries until an application explicitly calls prune(). The capacity needed for a logged bookmark store will depend on the strategy that the application uses for pruning the file.

For a file-backed publish store, the application needs to be able to store published messages until the AMPS server that the publisher is connected to acknowledges those messages as persisted. The volume of messages that needs to be stored depends on the failover policy for the server -- that is, the maximum amount of time that the server will allow a downstream instance to fail to acknowledge a message before the server downgrades that connection to async acknowledgment. By default, AMPS does not downgrade connections: this policy must be set explicitly using the AMPS actions. As an example, if the server is configured to downgrade connections that are more than 120 seconds behind, then -- for disaster recovery -- the application must have the capacity to store 120 seconds of published messages at peak publishing load. However, unlike the logged bookmark store, a file-backed publish store removes messages from the store and reuses the space once AMPS has acknowledged the message.

Connections and the Server Chooser

Unlike Client, the HAClient attempts to keep itself connected to an AMPS instance at all times, by automatically reconnecting or failing over when it detects that the client is disconnected. When you are using the Client directly, your disconnect handler usually takes care of reconnection. HAClient, on the other hand, provides a disconnect handler that automatically reconnects to the current server or to the next available server.

To inform the HAClient of the addresses of the AMPS instances in your system, you pass a ServerChooser instance to the HAClient. ServerChooser acts as a smart enumerator over the servers available: HAClient calls ServerChooser methods to inquire about what server should be connected, and calls methods to indicate whether a given server succeeded or failed.

The AMPS Python client provides a simple implementation of ServerChooser, called DefaultServerChooser, that provides very simple logic for reconnecting. This server chooser is most suitable for basic testing, or in cases where an application should simply rotate through a list of servers. For most applications, you implement the ServerChooser interface yourself for more advanced logic, such as choosing a backup server based on your network topology, or limiting the number of times your application should try to reconnect to a given address.

To connect to AMPS, you provide a ServerChooser to HAClient and then call connect_and_logon() to create the first connection:

memoryClient = AMPS.HAClient("myClient")

# primary.amps.xyz.com is the primary AMPS instance, and
# secondary.amps.xyz.com is the secondary
chooser = AMPS.DefaultServerChooser()
chooser.add("tcp://primary.amps.xyz.com:12345/fix")
chooser.add("tcp://secondary.amps.xyz.com:12345/fix")
memoryClient.set_server_chooser(chooser)
memoryClient.connect_and_logon()
...
myClient.disconnect()

Similar to Client, HAClient remains connected to the server until disconnect() is called. Unlike Client, HAClient automatically attempts to reconnect to your server if it detects a disconnect and if that server cannot be connected, fails over to the next server provided by the ServerChooser. In this example, the call to connect_and_logon() attempts to connect and login to primary.amps.xyz.com, and returns if that is successful. If it cannot connect, it tries secondary.amps.xyz.com, and continues trying servers from the ServerChooser until a connection is established. Likewise, if it detects a disconnection while the client is in use, then HAClient attempts to reconnect to the server with which it was most recently connected; if that is not possible, then it moves on to the next server provided by the ServerChooser.

The default ServerChooser simply provides the next URL in the sequence. This strategy works for many applications. If you need a different strategy, you can implement your own logic for failover by creating a class derived from ServerChooser.

Setting a Reconnect Delay and Timeout

You can control the amount of time between reconnection attempts and set a total amount of time for the HAClient to attempt to reconnect.

The AMPS Python client includes a method for setting a delay strategy on a client, set_reconnect_delay_strategy. This method accepts an instance of any type that provides the methods get_connect_wait_duration and reset, as described in the API documentation.

While you can easily implement your own delay strategy, the client also provides two delay strategies:

  • FixedDelayStrategy provides the same delay each time the HAClient tries to reconnect.

  • ExponentialDelayStrategy provides an exponential backoff until a connection attempt succeeds.

To use either of these classes, you simply create an instance, set the appropriate parameters, and install that instance as the delay strategy for the HAClient. For example, the following code sets up a reconnect delay that starts at 200ms and increases the delay by 1.5 times after each failure. The strategy allows a maximum delay between connection attempts of 5 seconds, and will not retry longer than 60 seconds.

theClient = AMPS.HAClient("myClient")

theClient.set_reconnect_delay_strategy(      \
   AMPS.ExponentialDelayStrategy(            \
                   initial_delay=200,        \
                   backoff_exponent=1.5,     \
                   maximum_delay=5000,       \
                   maximum_retry_time=60000) \
                 )

Implementing a Server Chooser

As described above, you provide the HAClient with connection strings to one or more AMPS servers using a ServerChooser. The purpose of a ServerChooser is to provide information to the HAClient. A ServerChooser does not manage the reconnection process, and should not call methods on the HAClient.

A ServerChooser has two required responsibilities to the HAClient:

  • Tells the HAClient the connection string for the server to connect to. If there are no servers, or the ServerChooser wants the connection to fail, the ServerChooser returns an empty string.

    To provide this information, the ServerChooser implements the get_current_uri() method.

  • Provides an Authenticator for the current connection string. This is especially important for installations where different servers require different credentials or authentication tokens must be reset after each connection attempt.

    To provide the authenticator, the ServerChooser implements the get_current_authenticator() method.

The HAClient calls the get_current_uri() and get_current_authenticator() methods each time it needs to make a connection.

Each time a connection succeeds, the HAClient calls the report_success() method of the ServerChooser. Each time a connection fails, the HAClient calls the report_failure() method of the ServerChooser. The HAClient does not require the ServerChooser to take any particular action when it calls these methods. These methods are provided for the HAClient to do internal maintenance, logging, or record keeping. For example, an HAClient might keep a list of available URIs with a current failure count, and skip over URIs that have failed more than 5 consecutive times until all URIs in the list have failed more than 5 consecutive times.

When the ServerChooser returns an empty string from get_current_uri(), indicating that no servers are available for connection, the HAClient calls the get_error() method on the ServerChooser, if one is provided, and includes the string returned by get_error() in the generated exception.

Heartbeats and Failure Detection

Use of the HAClient allows your application to quickly recover from detected connection failures. By default, connection failure detection occurs when AMPS receives an operating system error on the connection. This system may result in unpredictable delays in detecting a connection failure on the client, particularly when failures in network routing hardware occur, and the client primarily acts as a subscriber.

The heartbeat feature of the AMPS client allows connection failure to be detected quickly. Heartbeats ensure that regular messages are sent between the AMPS client and server on a predictable schedule. The AMPS client and server both assume disconnection has occurred if these regular heartbeats cease, ensuring disconnection is detected in a timely manner. To use the heartbeat feature, call the set_heartbeat method on Client or HAClient:

memoryClient = AMPS.HAClient("importantStuff")
...
memoryClient.set_heartbeat(3)
memoryClient.connect_and_logon()
...

Method set_heartbeat takes one parameter: the heartbeat interval. The heartbeat interval specifies the periodicity of heartbeat messages sent by the server: the value 3 indicates messages are sent on a three-second interval. If the client receives no messages in a six-second window (two heartbeat intervals), the connection is assumed to be dead, and the HAClient attempts reconnection. An additional variant of set_heartbeat allows the idle period to be set to a value other than two heartbeat intervals. (The server, however, will always consider a connection to be closed after two heartbeat intervals without any traffic.)

Notice that, for HAClient, setHeartbeat must be called before the client is connected. For Client, setHeartbeat must be called after the client is connected.

Heartbeats are serviced on the receive thread created by the AMPS client. Your application must not block the receive thread for longer than the heartbeat interval or the application is subject to being disconnected.

Considerations for Publishers

Publishing with an HAClient is nearly identical to regular publishing; you simply call the publish() method with your message's topic and data. The AMPS client sends the message to AMPS, and then returns from the publish() call. For maximum performance, the client does not wait for the AMPS server to acknowledge that the message has been received.

When an HAClient sets a publish store, the publish store retains a copy of each outgoing message and requests that AMPS acknowledge that the message has been persisted. The AMPS server acknowledges messages back to the publisher. Acknowledgments can be delivered for multiple messages at periodic intervals (for topics recorded in the transaction log) or after each message (for topics that are not recorded in the transaction log). When an acknowledgment for a message is received, the HAClient removes that message from the bookmark store. When a connection to a server is made, the HAClient automatically determines which messages from the publish store (if any) the server has not processed, and replays those messages to the server once the connection is established.

For reliable publishers, the application must choose how best to handle application shutdown. For example, it is possible for the network to fail immediately after the publisher sends the message, while the message is still in transit. In this case, the publisher has sent the message, but the server has not processed it and acknowledged it. During normal operation, the HAClient will automatically connect and retry the message. On shutdown, however, the application must decide whether to wait for messages to be acknowledged, or whether to exit.

Publish store implementations provide an unpersisted_count() method that reports the number of messages that have not yet been acknowledged by the AMPS server. When the unpersisted_count() reaches 0, there are no unpersisted messages in the local publish store.

For the highest level of safety, an application can wait until the unpersisted_count() reaches 0, which indicates that all of the messages have been persisted to the instance that the application is connected to, and the synchronous replication destinations configured for that instance. When a synchronous replication destination goes offline, this approach will cause the publisher to wait to exit until the destination comes back online or until the destination is downgraded to asynchronous replication.

For applications that are shut down periodically for short periods of time (for example, applications that are only offline during a weekly maintenance window), another approach is to use the publish_flush() method to ensure that messages are delivered to AMPS, and then rely on the connection logic to replay messages as necessary when the application restarts.

For example, the following code flushes messages to AMPS, then warns if not all messages have been acknowledged:

client = AMPS.HAClient("ha-publisher",
         "/mnt/fastDisk/moreImportantMessages.outgoing",
         "/mnt/fastDisk/moreImportantMessages.incoming")
...
client.connect_and_logon()

# Publish messages
...


# We think we are done, but the server may not
# have received or acknowledged all messages yet.

# Wait until the server has received all messages.
# The program could also specify a timeout in this
# command to avoid blocking forever if the network
# is down or all servers are offline.

client.publish_flush()

# Print warning to the console if messages have
# been published but not yet acknowledged as
# persisted

if (client.get_unpersisted_count() > 0):
    print( "all messages have been published, " \
        + " but not all have been persisted" )

client.disconnect()

In this example, the client sends each message immediately when publish() is called. If AMPS becomes unavailable between the final publish() and the disconnect(), or one of the servers that the AMPS instance replicates to is offline, the client may not have received a persisted acknowledgment for all of the published messages. For example, if a message has not yet been persisted by all of the servers in the replication fabric that are connected with synchronous replication, AMPS will not have acknowledged the message.

Before shutting down the client, the code does two things:

  • First, the code flushes messages to the server to ensure that all messages have been delivered to AMPS.

  • Next, the code checks to see if all of the messages in the publish store have been acknowledged as persisted by AMPS. If the messages have not been acknowledged, they will remain in the publish store file and will be published to AMPS, if necessary, the next time the application connects. An application may choose to loop until get_unpersisted_count() returns 0, or (as we do in this case) simply warn that AMPS has not confirmed that the messages are fully persisted. The behavior you choose in your application should be consistent with the high-availability guarantees your application needs to provide.

AMPS uses the name of the HAClient to determine the origin of messages. For the AMPS server to correctly identify duplicate messages, each instance of an application that publishes messages must use a distinct name. That name must be consistent across different runs of the application.

If your application crashes or is terminated, some published messages may not have been persisted in the AMPS server. If you use the file-based store—in other words, if you provide file names for persistent storage when you create the HAClient—the HAClient will recover the messages, and once logged on, will correlate the message store to what the AMPS server has received, re-publishing any missing messages. This occurs automatically when HAClient connects, without any explicit consideration in your code, other than ensuring that the same file name is used to create the HAClient if recovery is desired.

AMPS provides persisted acknowledgment messages for topics that do not have a transaction log enabled. However, the level of durability provided for topics with no transaction log is minimal. Learn more about transaction logs in the AMPS User Guide.

Detecting Failover Ahead of Replication

AMPS replication provides two different acknowledgment modes for outgoing replication links from an instance:

  • For a link in sync acknowledgment mode, a message must be successfully acknowledged by the downstream instance of AMPS before this instance of AMPS will acknowledge the message.

  • For a link in async acknowledgment mode, this link is not considered for acknowledging the message. In this mode, the downstream side of the replication link may not have received or processed the message at the time that the publisher receives an acknowledgment.

As described in the AMPS User Guide, a publisher must not failover from one instance of AMPS to another instance when any link between those instances uses async acknowledgment unless replication is certain to have reached that instance. (For example, if replication is taking a maximum of 1.2 seconds between the instances and the publisher has been disconnected for 30 seconds, all messages from that publisher will have been replicated).

To help detect a situation where a publisher may be "jumping ahead" of messages that it has published, but which have not yet been replicated, the AMPS client allows an application to consider it to be an error to make a connection to a server that has not received messages previously published by the application.

To enable this behavior, set the setErrorOnPublishGap() method to set this property on the PublishStore in use for the client. When this property is set, the client will consider it to be an error to connect to a server that has not received messages previously published by the client, and consider the connection to have failed.

Notice that an application that uses this approach may need to handle situations where no server has received the message, particularly if the replication configuration uses automated replication downgrade.

Considerations for Subscribers

HAClient provides two important features for applications that subscribe to one or more topics: re-subscription, and a bookmark store to track the correct point at which to resume a bookmark subscription.

Resubscription with Asynchronous Message Processing

Any asynchronous subscription placed using an HAClient is automatically reinstated after a disconnect or a failover. These subscriptions are placed in an in-memory SubscriptionManager, which is created automatically when the HAClient is instantiated.

When a re-subscription occurs, the AMPS Python client re-executes the command as originally submitted, including the original topic, options, and so on. AMPS sends the subscriber any messages for the specified topic (or topic expression) that are published after the subscription is placed. For a sow_and_subscribe command, this means that the client re-issues the full command, including the SOW query as well as the subscription.

A sow command is a point-in-time query. It isn't added to the subscription manager, and isn't restarted if a disconnection happens in the middle of a query.

A sow_and_subscribe is a subscription, and is added to the subscription manager.

Resubscription with Synchronous Message Processing

The HAClient (starting with the AMPS Python client version 4.3.1.1) does not track synchronous message processing subscriptions in the SubscriptionManager. The reason for this is to preserve the iterator semantics. That is, once the MessageStream indicates that there are no more elements in the stream, it does not suddenly produce more elements.

To re-subscribe when the HAClient fails over, you can simply re-issue the subscription. For example, the snippet below re-issues the subscribe command when the message stream ends:

while still_need_to_process:
 # Exiting the for loop is the end of stream.
 # For a subscribe, this likely means that the
 # client has disconnected.
 try:
   for message in client.subscribe("messages"):
     # process messages here
     # check condition on still_need_to_process
     if still_need_to_process == False: break
 except AMPS.DisconnectedException as e:
     pass

Bookmark Stores

In cases where it is critical not to miss a single message, it is important to be able to resume a subscription at the exact point that a failure occurred. In this case, simply recreating a subscription isn't sufficient. Even though the subscription is recreated, the subscriber may have been disconnected at precisely the wrong time and will not see the message.

To ensure delivery of every message from a topic or set of topics, the AMPS HAClient includes a BookmarkStore that, combined with the bookmark subscription and transaction log functionality in the AMPS server, ensures that clients receive any messages that might have been missed. The client stores the bookmark associated with each message received, and tracks whether the application has processed that message; if a disconnect occurs, the client uses the BookmarkStore to determine the correct resubscription point, and sends that bookmark to AMPS when it re-subscribes. AMPS then replays messages from its transaction log from the point after the specified bookmark, thus ensuring the client is completely up-to-date.

HAClient helps you to take advantage of this bookmark mechanism through the BookmarkStore interface and bookmarkSubscribe() method on Client. When you create subscriptions with bookmarkSubscribe(), whenever a disconnection or failover occurs, your application automatically re-subscribes to the message after the last message it processed. HAClients created by createFileBacked() additionally store these bookmarks on disk, so that the application can restart with the appropriate message if the client application fails and restarts.

To take advantage of bookmark subscriptions, do the following:

  • Ensure the topic(s) to be subscribed to are included in a transaction log. See the AMPS User Guide for information on how to specify the contents of a transaction log.

  • Use bookmark_subscribe() instead of subscribe() when creating a subscription() and decide how the application will manage subscription identifiers (SubIds). If you are using a Command object, you can simply set the bookmark on that object.

  • Use the discard() method in message handlers to indicate when a message has been fully processed by the application, that is, when the application does not need to receive the message again if the application fails over.

The following example creates a bookmark subscription against a transaction-logged topic and fully processes each message as soon as it is delivered:

class MessagePrinter(object):
    def __init__(self, client):
        self._client = client

    def __call__(self, message):
       print (message.get_data())
       self._client.discard(message)

...

client = AMPS.HAClient(
    "aClient",
    "/logs/aClient.publishLog",
    "/logs/aClient.subscribeLog")

# Create ServerChooser, populate chooser, connect client
...

client.execute_async(                                    \
    AMPS.Command("subscribe")                            \
        .set_topic("myTopic")                            \
        .set_bookmark(AMPS.Client.Bookmarks.MOST_RECENT) \
        .set_sub_id("MySubID"),                          \
        MessagePrinter(client))

In this example, the client is a file-backed client, meaning that arriving bookmarks will be stored in a file (aClient.subscribeLog). Storing these bookmarks in a file allows the application to restart the subscription from the last message processed, in the event of either server or client failure.

For optimum performance, it is critical to discard every message once its processing is complete. If a message is never discarded, it remains in the bookmark store. During re-subscription, HAClient always restarts the bookmark subscription with the oldest undiscarded message, and then filters out any more recent messages that have been discarded. If an old message remains in the store, but is no longer important for the application’s functioning, then the client and the AMPS server will incur unnecessary network, disk and CPU activity.

The fourth parameter, sub_id, specifies an identifier to be used for this subscription. Passing None causes HAClient to generate one and return it, like most other Client functions. However, if you wish to resume a subscription from a previous point after the application has terminated and restarted, the application must pass the same subscription ID as during its previous run. Passing a different subscription ID bypasses any recovery mechanisms, creating an entirely new subscription. When you use an existing subscription ID, the HAClient locates the last-used bookmark for that subscription in the local store, and attempts to re-subscribe from that point.

Below are the different bookmark types that can be used to enable different recovery strategies for an application:

  • Client.Bookmarks.NOW specifies that the subscription should begin from the moment the server receives the subscription request. This results in the same messages being delivered as if you had invoked subscribe() instead, except that the messages will be accompanied by bookmarks. This is also the behavior that results if you supply an invalid bookmark.

  • Client.Bookmarks.EPOCH specifies that the subscription should begin from the beginning of the AMPS transaction log (that is, the first entry in the oldest journal file for the transaction log).

  • Client.Bookmarks.MOST_RECENT specifies that the subscription should begin from the last-used message in the associated BookmarkStore. Alternatively, if this subscription has not been seen before, it instructs the subscription to begin with EPOCH. This is the most common value for this parameter and is the value used in the preceding example. By using MOST_RECENT, the application automatically resumes from wherever the subscription left off, taking into account any messages that have already been processed and discarded.

When the HAClient re-subscribes after a disconnection and reconnection, it always uses MOST_RECENT, ensuring that the continued subscription always begins from the last message used before the disconnect, so that no messages are missed.

Conclusion

With only a few changes, most AMPS applications can take advantage of the HAClient and associated classes to become more highly-available and resilient. Using the PublishStore, publishers can ensure that every message published has actually been persisted by AMPS. Using BookmarkStore, subscribers can make sure that there are no gaps or duplicates in the messages received. HAClient makes both kinds of applications more resilient to network and server outages, as well as temporary issues. By utilizing the file based HAClient, clients can recover their state after an unexpected termination or crash. Though HAClient provides useful defaults for the Store, BookmarkStore, and ServerChooser, you can customize any or all of these to the specific needs of your application and architecture.

Delta Subscribe

To delta subscribe, you simply use the delta_subscribe command as follows:

# assumes that client is connected and logged on

cmd = AMPS.Command("delta_subscribe")
cmd.set_topic("delta_topic")
cmd.set_filter("/thingIWant = 'true'")

for m in client.execute(cmd):
    # Delta messages arrive here

As described in the section on , messages provided to a delta subscription will contain the fields used to generate the SOW key and any changed fields in the message. Your application is responsible for choosing how to handle the changed fields.

AMPS User Guide
Receiving Only Updated Fields