Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
This guide contains information about the AMPS Python client.
It focuses on information that is specific to the AMPS Python client. The guide does not contain detailed information on AMPS itself.
To get started with AMPS, see the Introduction to AMPS guide. That guide contains an overview of AMPS and instructions for setting up a development environment.
This guide assumes that you have a development environment for Python and access to an AMPS server using the configuration provided with the Python samples (in the full source distribution of the client).
In this section, we will learn more about the structure and features of the AMPS Python client and build our first Python program using AMPS.
The AMPS client library is packaged as a single binary file. The exact name of the file depends on the Python version and build environment. You can find the file under the build
directory of your AMPS Python client install once you've completed the build process. If you have used a prepackaged .egg
or .whl
to install the Python client, the appropriate binary file will be installed in your Python environment.
Every Python application you build will need to be able to reference the library. If you choose to install the python client into your local Python installation, then Python has access to the client library in the installation, and you do not need to include the library with each specific script. Otherwise, you will need to package and include the library with your script and ensure that the library is in the path where python looks for shared libraries, or ensure that the system where your application will run has installed the appropriate prepackaged build.
Let's begin by writing a simple program that connects to an AMPS server and sends a single message to a topic. This code will be covered in detail just following the example.
In the example above, we show the entire program. Future examples will isolate one or more specific portions of the code.
Let us revisit the code we listed above:
When a client logs on to AMPS, the client sends AMPS a username and password.
The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype
to include the user name JohnDoe
in the request.
For a given user name, the password is provided by an Authenticator
. The AMPS client distribution includes a DefaultAuthenticator
that simply returns the password, if any, provided in the URI. A logon()
command that does not specify an Authenticator
will use an instance of DefaultAuthenticator
.
If your authentication system requires a different authentication token, you can implement an Authenticator
that provides the appropriate token.
The AMPS clients use connection strings to determine the server, port, transport, and protocol to use to connect to AMPS. When the connection point in AMPS accepts multiple message types, the connection string also specifies the precise message type to use for this connection.
Connection strings have a number of elements:
As shown in the figure above, connection strings have the following elements:
Transport - Defines the network used to send and receive messages from AMPS. In this case, the transport is tcp
. For connections to transports that use the Secure Sockets Layer (SSL), use tcps
. For connections to AMPS over a Unix domain socket, use unix
.
Host Address - Defines the destination on the network where the AMPS instance receives messages. The format of the address is dependent on the transport. For tcp
and tcps
, the address consists of a host name and port number. In this case, the host address is localhost:9007
. For unix
domain sockets, a value for hostname and port must be provided to form a valid URI, but the content of the hostname and port are ignored, and the file name provided in the path parameter is used instead (by convention, many connection strings use localhost:0
to indicate that this is a local connection that does not use TCP/IP).
Protocol - Sets the format in which AMPS receives commands from the client. Most code uses the default amps
protocol, which sends header information in JSON format. AMPS supports the ability to develop custom protocols as extension modules, and AMPS also supports legacy protocols for backward compatibility.
MessageType - Specifies the message type that this connection uses. This component of the connection string is required if the protocol accepts multiple message types and the transport is configured to accept multiple message types. If the protocol does not accept multiple message types, this component of the connection string is optional, and defaults to the message type specified in the transport.
Legacy protocols such as fix
, nvfix
and xml
only accept a single message type, and therefore do not require or accept a message type in the connection string.
As an example, a connection string such as:
would work for programs connecting from the local host to a Transport
configured as follows:
See the AMPS Configuration Guide for more information on configuring transports.
The AMPS Python client is published to the Python Package Index as amps-python-client
. You can use pip
to install the client directly from the repository using a command such as the following:
The AMPS Python client is available as a download from the 60East Technologies website, https://www.crankuptheamps.com/develop/. Download the client from the site, then extract it.
The client source files are in the directory where you unpacked the files. By default, this is amps-python-client-<version>
, where <version>
is the current version of the python client (such as amps-python-client-4.3.1.0
).
60East provides a prebuilt .whl
file for x64 Linux distributions using Python 2.7 or 3.4 and later.
To install the .whl
using the command line:
You will need permission to update the Python distribution on the system you are installing on.
Open a command prompt.
Run the following command, substituting the appropriate path to the release wheel that you want to install:
If this command reports a permission error, you do not have permission to update the Python distribution. Run the command as a different user, or use sudo
to run the command as root
.
60East provides a prebuilt .whl
file for 64-bit Windows operating systems using Python 2.7 or 3.4 and later. Your Python distribution may include a more fully-featured package manager to assist with installing .whl
files.
To install the .whl
using the command line:
You will need permission to update the Python distribution on the system you are installing on.
Make sure that your Python distribution includes setuptools.
Open a command prompt.
Make sure that the directory that contains python is in your path.
Run the following command, substituting the appropriate path to the release wheel that you want to install:
If this command reports a permission error, you do not have permission to update the Python distribution. Run the command prompt from a different user, or start the command prompt with Run as Administrator.
The main AMPS Python client distribution includes the full source to the client. For most installations, you build the client with your Python distribution before using it. The process for building the client differs slightly depending on whether you are building the client for Linux or for Windows.
Follow these steps to build the Linux version of the client:
The Python client includes all necessary C++ client sources. If you are using a different version of the C++ client than the one included with the Python client, set the AMPS_CPP_DIR
environment variable to the location of the AMPS C++ client source.
Run python setup.py build
from the AMPS Python client directory to build the client.
This script uses the Python distutils
to build the library, which makes it easy to build the library correctly and install the library into your Python distribution. To see the options available in your environment, run python setup.py --help
.
The module must be built with the same C++ compiler used to build python on your system. The setup.py
script and distutils
package will generally ensure this unless you have added a different compiler to your PATH
. If you typically use a different C++ compiler, remove the path to that compiler before running setup.py
.
The script builds the module to a path in the build
directory. The exact path depends on the version of Python you are building with.
Add the build directory path to the PYTHONPATH environment variable. For example:
$ export PYTHONPATH=/home/AMPSdev/amps-python-client/build/lib.linux-x86_64-2.7:$PYTHONPATH
Test that the module loads correctly:
$ python -c "import AMPS"
Optionally, install the module to your local python installation. While this is not required, doing this makes the AMPS module available for all of the programs that use this installation of python:
sudo python setup.py install
Follow these steps to build the Windows version of the client:
Use the Visual Studio Command Prompt shortcut to start a command prompt window for the type of module you want to build. For example, to build a 32-bit module, you use the command prompt for x86 builds.
Add the Python directory (the location of the python.exe
interpreter) to your PATH
.
The platform of the python installation must match the target platform for the python module. If you want to build a 64-bit module, your PATH must include a 64-bit python installation. If you want to build a 32-bit module, your PATH must include a 32-bit python installation. The build process uses whatever python.exe is found first when searching the PATH. So, to build both 32-bit and 64-bit versions, you must build them separately, and change your PATH between builds.
Run python setup.py build
from the AMPS Python client directory to build the client.
This script uses the Python distutils
to build the library, which makes it easy to build the library correctly and install the library into your Python distribution. To see the options available in your environment, run python setup.py --help
.
The script builds the module to a path in the build
directory. The exact path depends on the version of Python you are building with.
Add the build directory path to the PYTHONPATH environment variable. For example:
> set PYTHONPATH="C:\Users\AMPSdev\amps-python-client\build\lib.linux-x86_64-2.7;%PYTHONPATH%"
Test that the module loads correctly:
> python -c "import AMPS"
Optionally, install the module to your local python installation.While this is not required, doing this makes the AMPS module available for all of the programs that use this installation of python:
> python setup.py install
When a client logs on to AMPS, the client sends AMPS a username and password. The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype
to include the user name JohnDoe
in the request.
For a given user name, the password is provided by an Authenticator
. The AMPS client distribution includes a DefaultAuthenticator
that simply returns the password, if any, provided in the URI. A logon()
command that does not specify an Authenticator
will use an instance of DefaultAuthenticator
.
If your authentication system requires a different authentication token, you can implement an Authenticator
that provides the appropriate token.
When using the DefaultAuthenticator
, the AMPS clients support the standard format for including a username and password in a URI, as shown below:
When provided in this form, the default authenticator provides the username and password specified in the URI. If you have implemented another authenticator, that authenticator controls how passwords are provided to the AMPS server.
One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server so that your application and the network are not utilized by messages that are not relevant to your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client.
To apply a content filter to a subscription, simply pass it into the client.subscribe()
call or use the set_filter
method to add a filter to the command:
In this example, we have passed in a content filter /sender = 'mom'
. This will result in the server only sending us messages from the messages
topic that have the sender field equal to mom
in the message.
For example, the AMPS server will send the following message, where /sender
is mom
:
The AMPS server will not send a message with a different /sender
value:
When specifying a URI for connection to an AMPS server, you may specify a number of transport-specific options in the parameters section of the URI connection parameters. Here is an example:
In this example, we have specified the AMPS instance on localhost
, port 9007
, connecting to a transport that uses the amps
protocol and sending JSON messages. We have also set two parameters: tcp_nodelay
, a Boolean (true/false) parameter, and tcp_sndbuf
, an integer parameter. Multiple parameters may be combined to finely tune settings available on the transport. Normally, you'll want to stick with the defaults on your platform, but there may be some cases where experimentation and fine-tuning will yield higher or more efficient performance.
The AMPS client supports the value of tcp
in the scheme component connection string for TCP/IP connections, and the value of tcps
as the scheme for SSL encrypted connections.
For connections that use Unix domain sockets, the client supports the value of unix
in the scheme, and requires an additional option, as described in the Unix Transports Parameters section below.
Starting with version 5.3.3.0, the AMPS client supports creating connections over both IPv4 and IPv6 protocols if supported by the underlying Operating System.
By default, the AMPS client will prefer to resolve host names to IPv4 addresses, but this behavior can be adjusted by supplying the ip_protocol_prefer
transport option, described in the table below.
The following transport options are available for TCP connections:
Option
Description
bind
(IP address) Sets the interface to bind the outgoing socket to.
Starting with version 5.3.3.0, both IPv4 and IPv6 addresses are fully supported for use with this parameter.
tcp_rcvbuf
(integer) Sets the socket receive buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/rmem_default
.)
tcp_sndbuf
(integer) Sets the socket send buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/wmem_default
.)
tcp_nodelay
(boolean) Enables or disables the TCP_NODELAY
setting on the socket. By default TCP_NODELAY
is disabled.
tcp_linger
(integer) Enables and sets the SO_LINGER
value for the socket By default, SO_LINGER
is enabled with a value of 10
, which specifies that the socket will linger for 10 seconds.
tcp_keepalive
(boolean) Enables or disables the SO_KEEPALIVE
value for the socket. The default value for this option is true.
ip_protocol_prefer
(string) Influence the IP protocol to prefer during DNS resolution of the host. If a DNS entry of the preferred protocol can not be found, the other non-preferred protocol will then be tried.
If this parameter is not set, the default will be to prefer IPv4.
If an explicit IPv4 address or IPv6 IP address is provided as the host, the format of the IP address is used to determine the IP protocol used and this setting has no effect.
Supported Values:
ipv4
: Prefer an IPv4 address when resolving the host
ipv6
: Prefer an IPv6 address when resolving the host
This parameter is available starting with version 5.3.3.0.
The unix
transport type communicates over Unix domain sockets. This transport requires the following additional option:
Option
Description
path
The path to the Unix domain socket to connect to.
Unix domain sockets always connect to the local system. When the scheme specified is unix
, the host address is ignored in the connection string. For example, the connection string:
and the connection string:
are equivalent.
The other components of the connection string, including the protocol, message type, username, and authentication token are processed just as they would be for TCP/IP sockets.
The connection string can also be used to pass logon parameters to AMPS. AMPS supports the following additional logon option:
Option
Description
pretty
Provide formatted representations of binary messages rather than the original message contents.
As mentioned earlier, one way for an application to receive messages is to have the AMPS Python client return a MessageStream
object that can be used to iterate over the results of the command.
The MessageStream
object makes copies of the incoming messages. When there is no message available, the MessageStream
will block.
A MessageStream
will only remain active while the client that produced it is connected. If the client disconnects, the MessageStream
will continue to provide any messages that have not yet been consumed, then throw an exception.
The advantages of using a MessageStream
that it provides a simple processing model, that receiving messages from a MessageStream
does not block the client receive thread (see Understanding Threading ) and that a copy of the message is automatically made for the application.
In return for these advantages, a MessageStream
has higher overhead than Asynchronous Message Processing, it will not be resumed if the client disconnects, and, by default, it will use as much memory as necessary to hold messages coming from the AMPS server.
The AMPS Python client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the method call. The client object returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there may be a performance benefit to enqueuing work directly from the background thread. See for a discussion of threading considerations, including considerations for message handlers.
As with the simple, synchronous interface, the AMPS client provides both convenience methods and methods that use a Command
object. The following example shows how to use the asynchronous message processing interface (error handling and connection details are omitted for brevity):
One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below:
You can then provide an instance of the handler directly wherever a message handler is required, as shown below:
When using asynchronous message processing, the AMPS client resets and reuses the message provided to MessageHandler
functions between calls. This improves performance in the client, but means if your MessageHandler
function needs to preserve information contained within the message you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.
The first time a command causes an instance of the Client
or HAClient
to connect to AMPS (typically, the logon()
command), the client creates a thread that runs in the background. This background thread is responsible for processing incoming messages from AMPS, which includes both messages that contain data and acknowledgments from the server.
When you call a command on the AMPS client, the command typically waits for an acknowledgment from the server and then returns. (The exception to this is publish
. For performance, the publish
command does not wait for an acknowledgment from the server before returning.)
In the simple case, using synchronous message processing, the client provides an internal handler function that populates the MessageStream
. The client receive thread calls the internal handler function, which makes a deep copy of the incoming message and adds it to the MessageStream
. The MessageStream
is used on the calling thread, so operations on the MessageStream
do not block the client receive thread.
When using asynchronous message processing, AMPS calls the handler function from the client receive thread. Message handlers provided for asynchronous message processing must be aware of the following considerations:
The client creates one client receive thread at a time, and the lifetime of the thread lasts for the lifetime of the connection to the AMPS server. A message handler that is only provided to a single client will only be called from a single thread at a time. If your message handler will be used by multiple clients, then multiple threads will call your message handler. In this case, you should take care to protect any state that will be shared between threads. Notice that if the client connection fails (or is closed), and the client reconnects, the client will create a different thread for the new connection.
For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to update an external database, a message handler that adds the relevant data to an update queue, that is processed by a different thread, will typically perform better than a message handler that does this update during the message handling.
While your message handler is running, the thread that calls your message handler is no longer receiving messages. This makes it easier to write a message handler because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed and your application will deadlock waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate thread or use a different client object to submit the commands.
The AMPS client resets and reuses the Message
provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information (either by making a copy of the entire message or copying the required fields) rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it. Likewise, a message handler should not modify the Message
-- this will result in modifying the message provided to other handlers (including handlers internal to the AMPS client).
So far, we have seen that subscribing to a topic involves working with objects of type com.crankuptheamps.client.Message
. A Message
represents a single message to or from an AMPS server. Messages are received or sent for every client/server operation in AMPS.
There are two parts of each message in AMPS: a set of headers that provide metadata for the message, and the data that the message contains. Every AMPS message has one or more header fields defined. The precise headers present depend on the type and context of the message. There are many possible fields in any given message, but only a few are used for any given message. For each header field, the Message
class contains a distinct property that allows for retrieval and setting of that field. For example, the Message.getCommandId()
function corresponds to the commandId
header field, the Message.getBatchSize()
function corresponds to the BatchSize
header field, and so on. For more information on these header fields, consult the AMPS User Guide and AMPS Command Reference.
To work with header fields, a Message
contains getXxx()
/ setXxx()
methods corresponding to the header fields. 60East does not recommend attempting to parse header fields from the raw data of the message.
In AMPS, fields sometimes need to be set to a unique identifier value. For example, when creating a new subscription, or sending a manually constructed message, you'll need to assign a new unique identifier to multiple fields such as CommandId
and SubscriptionId
. For this purpose, Message
provides newXxx()
methods for each field that generates a new unique identifier and sets the field to that new value.
Access to the data section of a message is provided via the getData()
method. The data
contains the unparsed data in the message. The getData()
method returns the data as a Java string, which is suitable for message formats that can be represented as Unicode text, such as JSON, XML, FIX, or NVFIX. For binary data, the AMPS Java client provides a getDataRaw()
method to allow you to work with the underlying byte array in the message. See the Working with Messages and Byte Buffers section for details.
The AMPS Java client contains a collection of helper classes for working with message types that are specific to AMPS (for example, FIX, NVFIX, and AMPS composite message types). For message types that are widely used, such as JSON or XML, you can use whichever library you typically use in your environment.
The AMPS Command Reference contains a full description of which fields are available and which fields are returned in response to specific commands.
AMPS uses the name of the client as a session identifier and as part of the identifier for messages originating from that client.
For this reason, when a transaction log is enabled in the AMPS instance (that is, when the instance is recording a sequence of publishes and attempting to eliminate duplicate publishes), an AMPS instance will only allow one application with a given client name to connect to the instance.
When a transaction log is present, AMPS requires the client name for a publisher to be:
Unique within a set of replicated AMPS instances
Consistent from invocation to invocation if the publisher will be publishing the same logical stream of messages
If publishers do not meet this contract (for example, if the publisher changes its name and publishes the same messages, or if a different publisher uses the same session name), message loss or duplication can happen.
60East recommends always using consistent, unique client names. For example, the client name could be formed by combining the application name, an identifier for the host system, and the ID of the user running the application. A strategy like this provides a name that will be different for different users or on different systems, but consistent for instances of the application that should be treated as equivalent to the AMPS system.
Likewise, if a publisher is sending a completely independent stream of messages (for example, a microservice that sends a different, unrelated sequence of messages each time it connects to AMPS), there is no need for a publisher to retain the same name each time it starts. However, if a publisher is resuming a stream of messages (as in the case when using a file-backed publish store), that publisher must use the same client name, since the publisher is resuming the session.
The AMPS server continues a subscription until the client explicitly ends the subscription (that is, unsubscribes) or until the connection to the client is closed.
With a MessageStream
, AMPS automatically unsubscribes to the topic when there are no more references to the MessageStream
. You can also call the close()
method on the MessageStream
object to remove the subscription.
With asynchronous message processing, when a subscription is successfully made, messages will begin flowing to the message handler function and the subscribe()
or execute_async()
call returns a string that serves as the identifier for this subscription. A Client
can have any number of active subscriptions, and this subscription ID is how AMPS designates messages intended for this subscription. To unsubscribe, we simply call unsubscribe
with the subscription identifier, as shown below:
In this example we use the execute_async()
method to create a subscription to the messages
topic. When our application is done listening to this topic, it unsubscribes (on the last line) by passing in the subscription identifier returned by the subscribe
command. After the subscription is removed, no more messages will flow into our on_message_printer
function.
When an application calls unsubscribe()
, the client sends an explicit unsubscribe
command to AMPS. The AMPS server removes that subscription from the set of subscriptions for the client, and stops sending messages for that subscription. On the client side, the client unregisters the subscription so that the MessageStream
or message handler for that subscription will no longer receive messages for that subscription.
Notice that calling unsubscribe
does not destroy messages that the server has already sent to the client. If there are messages on the way to the client for this subscription, the AMPS client must consume those messages. If a last_chance_message_handler
is registered, the handler may receive the messages. Otherwise, they will be discarded since no message handler matches the subscription ID on the message.
Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent to only those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS Python client.
The AMPS client makes it simple to subscribe to a topic. You call client.subscribe()
with the topic to subscribe to and the parameters for the subscription. The client submits the subscription to AMPS and returns a MessageStream
that you can iterate over to receive the messages from the subscription. Below is a short example:
AMPS creates a background thread that receives messages and copies them into the MessageStream
that the for
loop iterates over. This means that the client application as a whole can continue to receive messages while you are doing processing work.
The simple method described above is provided for convenience. The AMPS Python client provides convenience methods for the most common form of AMPS commands. The client also provides an interface that allows you to have precise control over the command. Using that interface, the example above becomes:
The Command
interface allows you to precisely customize the commands you send to AMPS. For flexibility and ease of maintenance, 60East recommends using the Command
interface (rather than a named method) for any command that will receive messages from AMPS. For publishing messages, there can be a slight performance advantage to using the named commands where possible.
Generally speaking, when an error occurs that prohibits an operation from succeeding, AMPS will throw an exception. AMPS exceptions universally derive from AMPS.AMPSException
, so by catching AMPSException
, you will be sure to catch anything AMPS throws, for example:
In this example, if an error occurs, the program writes the error to stderr
and the publish()
command fails. However, client
is still usable for continued publishing and subscribing. When the error occurs, the exception is written to the console. As with most Python exceptions, str()
will convert the exception into a string that includes a descriptive message.
AMPS exception types vary based on the nature of the error that occurs. In your program, if you would like to handle certain kinds of errors differently than others, you can handle the appropriate subclass of AMPSException
to detect those specific errors and do something different.
The following table details each of the exception types thrown by AMPS.
Exception
When
Notes
AlreadyConnectedException
Connecting
Thrown when connect()
is called on a Client
that is already connected.
AMPSException
Anytime
Base class for all AMPS exceptions.
AuthenticationException
Anytime
Indicates an authentication failure occurred on the server.
BadFilterException
Subscribing
This typically indicates a syntax error in a filter expression.
BadRegexTopicException
Subscribing
Indicates a malformed regular expression was found in the topic name.
CommandException
Anytime
Base class for all exceptions relating to commands sent to AMPS.
ConnectionException
Anytime
Base class for all exceptions relating to the state of the AMPS connection.
ConnectionRefusedException
Connecting
The connection was actively refused by the server. Validate that the server is running, that network connectivity is available, and the settings on the client match those on the server.
DisconnectedException
Anytime
No connection is available when AMPS needed to send data to the server or the user's disconnect handler threw an exception.
InvalidTopicException
SOW query
The topic is not configured for the requested operation. For example, a sow
command was issued for a topic that is not in the SOW or a bookmark subscribe was issued for a topic that is not recorded in the transaction log.
InvalidTransportOptionsEx
ception
Connecting
An invalid option or option value was specified in the URI.
InvalidUriException
Connecting
The URI string provided to connect()
was formatted improperly.
MessageTypeException
Connecting
The class for a given transport's message type was not found in AMPS.
NameInUseException
Connecting
The client name (specified when instantiating Client
) is already in use on the server.
RetryOperationException
Anytime
An error occurred that caused processing of the last command to be aborted. Try issuing the command again.
StreamException
Anytime
Indicates that data corruption has occurred on the connection between the client and server. This usually indicates an internal error inside of AMPS -- contact AMPS support.
SubscriptionAlreadyExists
Exception
Subscribing
A subscription has been requested using the same command ID string as another subscription. Create a unique command ID string for every subscription.
TimedOutException
Anytime
A timeout occurred waiting for a response to a command.
TransportTypeException
Connecting
Thrown when a transport type is selected in the URI that is unknown to AMPS.
UnknownException
Anytime
Thrown when an internal error occurs. Contact AMPS support immediately.
UsageException
Changing the properties of an object
Thrown when the object is not in a valid state for setting the properties. For example, some properties of a Client
(such as the name) cannot be changed while that client is connected to AMPS.
The named convenience methods and the Command
class provide a timeout
setting that specifies how long the command should wait to receive a processed
acknowledgment from AMPS. This can be helpful in cases where it is important for the caller to limit the amount of time to block waiting for AMPS to acknowledge the command. If the AMPS client does not receive the processed acknowledgment within the specified time, the client sends an unsubscribe
command to the server to cancel the command and throws an exception.
Acknowledgments from AMPS are processed by the client receive thread on the same socket as data from AMPS. This means that any other data previously returned (such as the results of a large query) must be consumed before the acknowledgment can be processed. An application that submits a set of SOW queries in rapid succession should set a timeout that takes into account the amount of time required to process the results of the previous query.
The AMPS client includes a heartbeat feature to help applications detect disconnection from the server within a predictable amount of time. Without using a heartbeat, an application must rely on the operating system to notify the application when a disconnect occurs. For applications that are simply receiving messages, it can be impossible to tell whether a socket is disconnected or whether there are simply no incoming messages for the client.
When you set a heartbeat, the AMPS client sends a heartbeat message to the AMPS server at a regular interval, and waits a specified amount of time for the response. If the operating system reports an error on send, or if there is no activity received from the server within the specified amount of time, the AMPS client considers the server to be disconnected. Likewise, the server will ensure that traffic is sent to the client at the specified interval, using heartbeat messages when no other traffic is being sent to the client. If, after sending a heartbeat message, no traffic from the client arrives within a period twice the specified interval, the server will consider the client to be disconnected or nonresponsive.
The AMPS client processes heartbeat messages on the client receive thread, which is the thread used for asynchronous message processing. If your application uses asynchronous message processing and occupies the thread for longer than the heartbeat interval, the client may fail to respond to heartbeat messages in a timely manner and may be disconnected by the server.
In every distributed system, the robustness of your application depends on its ability to recover gracefully from unexpected events. The AMPS client provides the building blocks necessary to ensure your application can recover from the kinds of errors and special events that may occur when using AMPS.
When using asynchronous message processing, exceptions thrown from the message handler are silently absorbed by the AMPS Python client by default. The AMPS Python client allows you to register an exception listener to detect and respond to these exceptions. When an exception listener is registered, AMPS will call the exception listener with the exception.
See for details.
Welcome to developing applications with AMPS, the Advanced Message Processing System from 60East Technologies!
These guides will help you learn how to develop applications using AMPS.
Before getting started with this guide, it is important to have a good understanding of the following topics:
Developing Applications in Python To be successful using this guide, and developing applications with AMPS, you will need to have a working knowledge of the language you are developing in.
AMPS Concepts This guide focuses on using the AMPS client libraries and how those libraries work with the AMPS server.
Before working through this guide, we recommend reading the Introduction to AMPS guide.
Detailed explanations of the AMPS server behavior are in the AMPS Server Documentation.
You will also need a system on which you can develop and run Python applications and a system where you can host the AMPS server.
The features supported on your processor and operating system depend on the features supported by the underlying C++ client, as shown in the following table:
Feature
Linux x64 / aarch64
Windows x64
OSX x64 /aarch64
Incredible performance
X
X
X
Publish and subscribe
X
X
X
State of the World (SOW) queries
X
X
X
Topic and content filtering
X
X
X
Atomic SOW query and subscribe
X
X
X
Transaction log replay
X
X
X
Historical SOW query
X
X
X
Beautiful documentation
X
X
X
HA: automatic failover
X
X
X
HA: durable publish and subscribe
X
X
X
You will need an installed and running AMPS server to use the product as well. You can read the sample programs without a running server, but you will get the most out of this guide by running the programs against a working server.
Instructions for starting an instance of AMPS are available in the Introduction to AMPS guide.
The AMPS server runs on x64 Linux. The Introduction to AMPS and AMPS FAQ contain information on how to run an AMPS server on a development system that does not run Linux.
The HAClient
class, included with the AMPS Python client, contains a disconnect handler and other features for building highly-available applications. The HAClient
includes features for managing a list of failover servers, resuming subscriptions, republishing in-flight messages, and other functionality that is commonly needed for high availability. 60East recommends using the HAClient
for automatic reconnection wherever possible, as the HAClient disconnect handler has been carefully crafted to handle a wide variety of edge cases and potential failures.
If an application needs to reconnect or fail over, use an HAClient
, and the AMPS client library will automatically handle failover and reconnection. You control which servers the client fails over to using an implementation of the ServerChooser
interface, and you can control the timing of the failover using an implementation of the ReconnectDelayStrategy
interface.
For most applications, the combination of the HAClient
disconnect handler and a ConnectionStateListener
gives you the ability to monitor disconnections and add custom behavior at the appropriate point in the reconnection process.
If you need to add custom behavior to the failover (such as logging, resetting an internal cache, refreshing credentials and so on), the ConnectionStateListener
class allows your application to be notified and take action when disconnection is detected and at each stage of the reconnection process.
To extend the behavior of the AMPS client during reconnection, implement a ConnectionStateListener
as described in the section on .
The publish
methods in the Python client deliver the message to be published to AMPS then return immediately, without waiting for AMPS to return an acknowledgment. Likewise, the sow_delete
methods request deletion of SOW messages, and return before AMPS processes the message and performs the deletion. This approach provides high performance for operations that are unlikely to fail in production. However, this means that the methods return before AMPS has processed the command, without the ability to return an error in the event the command fails.
The AMPS Python client provides a failed_write_handler
that is called when the client receives an acknowledgment that indicates a failure to persist data within AMPS. As with the last_chance_message_handler
described in the {ref}Unexpected Messages <#unexpected-messages>
section, your application registers a handler for this function. When an acknowledgment returns that indicates a failed write, AMPS calls the registered handler method with information from the acknowledgment message, supplemented with information from the client publish store if one is available. Your client can log this information, present an error to the user or take whatever action is appropriate for the failure.
If your application needs to know whether publishes succeeded and are durably persisted, the following approach is recommended:
Set a PublishStore
on the client. This will ensure that messages are retransmitted if the client becomes disconnected before the message is acknowledged and request persisted
acknowledgments for messages.
Install a failed_write_handler
. In the event that AMPS reports an error for a given message, that event will be reported to the failed_write_handler
.
Call publish_flush()
and verify that all messages are persisted before the application exits.
When no failed_write_handler
is registered, acknowledgments that indicate errors in persisting data are treated as unexpected messages and routed to the last_chance_message_handler
. In this case, AMPS provides only the acknowledgment message and does not provide the additional information from the client publish store.
Every distributed system will experience occasional disconnections between one or more nodes. The reliability of the overall system depends on an application's ability to efficiently detect and recover from these disconnections. Using the AMPS Python client's disconnect handling, you can build powerful applications that are resilient in the face of connection failures and spurious disconnects. For additional reliability, you can also use the high availability client (discussed in the following sections), which provides both disconnect handling and features to help ensure that messages are reliably delivered.
AMPS allows you to update parameters, such as the content filter, on a subscription. When you replace a filter on the subscription, AMPS immediately begins sending only messages that match the updated filter. Notice that if the subscription was entered with a command that includes a SOW query, using the replace
option can re-issue the SOW query (as described in the AMPS User Guide).
To update the filter on a subscription, you create a subscribe
command. You set the SubscriptionId
provided on the Command
to the identifier of the existing subscription and include the replace
option on the Command
.
When you send the Command
, AMPS atomically replaces the filter and sends messages that match the updated filter from that point forward.
The AMPS client interface provides the ability to set one or more connection state listeners. A connection state listener is a callback that is invoked when the AMPS client detects a change to the connection state.
A connection state listener may be called from the client receive thread. An application should not submit commands to AMPS from a connection state listener, or the application risks creating a deadlock for commands that wait for acknowledgement from the server.
The AMPS client provides the following state values for a connection state listener:
Connected
The client has established a connection to AMPS. If you are using a Client
, this is delivered when connect()
is successful.
If you are using an HAClient
, this state indicates that the connect
part of the connect and logon process has completed. An HAClient
using the default disconnect handler will attempt to log on immediately after delivering this state.
Most applications that use Client
will attempt to log on immediately after the call to connect()
returns.
An application should not submit commands to AMPS from the connection state listener while the client is in this state unless the application knows that the state has been delivered from a Client
and that the Client
does not call logon()
.
LoggedOn
The client has successfully logged on to AMPS. If you are using a Client
, this is delivered when logon()
is successful.
If you are using an HAClient
, this state indicates that the logon
part of the connect and logon process has completed.
This state is delivered after the client is logged on, but before recovery of client state is complete. Recovery will continue after delivering this state: the application should not submit commands to AMPS from the connection state listener while the client is in this state if further recovery will take place.
HeartbeatInitiated
The client has successfully started heartbeat monitoring with AMPS. This state is delivered if the application has enabled heartbeating on the client.
This state is delivered before recovery of the client state is complete. Recovery may continue after this state is delivered. The application should not submit commands to AMPS from the connection state listener until the client is completely recovered.
PublishReplayed
Delivered when a client has completed replay of the publish store when recovering after connecting to AMPS.
This state is delivered when the client has a PublishStore configured.
If the client has a subscription manager set, (which is the default for an HAClient
), the application should not submit commands from the connection state listener until the Resubscribed
state is received.
Resubscribed
Delivered when a client has re-entered subscriptions when recovering after connecting to AMPS.
This state is delivered when the client has a subscription manager set (which is the default for an HAClient
). This is the final recovery step. An application can submit commands to AMPS from the connection state listener after receiving this state.
Disconnected
The client is not connected. For an HAClient
, this means that the client will attempt to reconnect to AMPS. For a Client
, this means that the client will invoke the disconnect handler, if one is specified.
Shutdown
The client is shut down. For an HAClient
, this means that the client will no longer attempt to reconnect to AMPS. This state is delivered when close()
is called on the client or when a server chooser tells the HAClient
to stop reconnecting to AMPS.
The enumeration provided for the connection state listener also includes a value of UNKNOWN
, for use as a default or to represent additional states in a custom Client
implementation. The 60East implementations of the client do not deliver this state.
The following table shows examples of the set of states that will be delivered during connection, in order, depending on what features of the client are set. Notice that, for an instance of the Client
class, this table assumes that the application calls both connect()
and logon()
. For an HAClient
, this table assumes that the HAClient
is using the default DisconnectHandler
for the HAClient
.
subscription manager
publish store
Connected
LoggedOn
PublishReplayed
Resubscribed
subscription manager
publish store
heartbeat set
Connected
LoggedOn
HeartbeatInitiated
PublishReplayed
Resubscribed
subscription manager
Connected
LoggedOn
Resubscribed
subscription manager
heartbeat set
Connected
LoggedOn
HeartbeatInitiated
Resubscribed
(default Client
configuration)
Connected
LoggedOn
When using the asynchronous interface, exceptions can occur that are not thrown to the user. For example, when an exception occurs in the process of reading subscription data from the AMPS server, the exception occurs on a thread inside of the AMPS Python client. Consider the following example using the asynchronous interface:
In this example, we set up a subscription to wait for messages on the pokes topic, whose Pokee tag begins with our user name. When messages arrive, we print a message out to the console, but otherwise our application waits for a key to be pressed.
Inside of the AMPS client, the client creates a new thread of execution that reads data from the server, and invokes message handlers and disconnect handlers when those events occur. When exceptions occur inside this thread, however, there is no caller for them to be thrown to and by default they are ignored.
In applications that use the asynchronous interface, and where it is important to deal with every issue that occurs in using AMPS, you can set an ExceptionHandler
via Client.set_exception_listener()
that receives these otherwise unhandled exceptions. Making the modifications shown in the example below, to our previous example, will allow those exceptions to be caught and handled. In this case we are simply printing those caught exceptions out to the console.
In some cases, the AMPS Python client may wrap exceptions of unknown type into an AMPSException
. Your application should always include an except block for AMPSException
.
If your application will attempt to recover from an exception thrown on the background processing thread, your application should set a flag and attempt recovery on a different thread than the thread that called the exception listener.
At the point that the AMPS client calls the exception listener, it has handled the exception. Your exception listener must not rethrow the exception (or wrap the exception and throw a different exception type).
In this example we have added a call to client.set_exception_listener()
, registering a simple function that writes the text of the exception out to the console. If exceptions are thrown in the message handler, those exceptions are written to the console.
AMPS records the stack trace and provides it to the exception handler, if the provided method includes a parameter for the stack trace. The sample below demonstrates one way to do this. (For sample purposes, the message handler always throws an exception.)
Regular Expression (Regex) subscriptions allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.
To use a regular expression, simply supply the regular expression in place of the topic name in the subscribe()
call. For example:
In this example, messages on topics orders-north-america
, orders-europe
, and new-orders
would match the regular expression. Messages published to any of those topics will be sent to our message_handler
function. As in the example, you can use the get_topic()
method to determine the actual topic of the message sent to the function.
Imagine an application that displays real time information about the position and status of a fleet of delivery vans. When the application starts, it should display the current location of each of the vans along with their current status. As vans move around the city and post other status updates, the application should keep its display up to date. Vans upload information to the system by posting messages to the van_location
topic, configured with a key of van_id
on the AMPS server.
In this application, it is important to not only stay up-to-date on the latest information about each van, but to ensure all of the active vans are displayed as soon as the application starts. Combining a SOW with a subscription to the topic is exactly what is needed, and that is accomplished by the Client.sow_and_subscribe()
method. As with the other methods for receiving messages, the AMPS Python client provides a basic, synchronous form of sow_and subscribe
that provides you with a MessageStream
to iterate over, and an asynchronous form that requires a message handler.
First, let's look at an example that uses the basic form of sow_and_subscribe
:
Now we will look at an example that uses the asynchronous form of sow_and_subscribe
:
Notice that the two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens. The other form processes messages on the main thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the calls to add_or_update_van
and remove_van
receive the same data.
AMPS allows applications to manage the contents of the SOW by explicitly deleting messages that are no longer relevant. For example, if a particular delivery van is retired from service, the application can remove the record for the van by deleting the record for the van.
The client provides the following methods for deleting records from the SOW:
sow_delete
- Accepts a topic and filter, and deletes all messages that match the filter from the topic specified.
sow_delete_by_keys
- Accepts a set of SOW keys as a comma-delimited string and deletes messages for those keys, regardless of the contents of the messages. SOW keys are provided in the header of a SOW message, and are the internal identifier AMPS uses for that SOW message.
sow_delete_by_data
- Accepts a topic and message, and deletes the SOW record that would be updated by that message.
The most efficient way to remove messages from the SOW is to use sow_delete_by_keys
or sow_delete_by_data
, since those options allow AMPS to exactly target the message or messages to be removed. Many applications use sow_delete
, since this is the most flexible method for removing items from the SOW when the application does not have information on the exact messages to be removed.
In either case, AMPS sends an OOF message to all subscribers who have received updates for the messages removed, as described in the previous section.
The simple form of the sow_delete
command returns a Message
. This Message
is an acknowledgment that contains information on the delete command. For example, the following snippet simply prints informational text with the number of messages deleted:
The sow_delete
command also provides an asynchronous version that requires a message handler. This message handler is designed to receive sow_delete
response messages from AMPS:
Acknowledging messages from a queue uses a form of the sow_delete
command that is only supported for queues. Acknowledgment is discussed in the Using Queues chapter in this guide.
In many cases, applications that use SOW topics only need the current value of a message at the time the message is processed, rather than processing each change that led to the current value. On the server side, AMPS provides conflated topics to meet this need. Conflated topics are described in more detail in the AMPS User Guide, and require no special handling on the client side.
In some cases, though, it's important to conflate messages on the client side. This can be particularly useful for applications that do expensive processing on each message, applications that are more efficient when processing batches of messages, or for situations where you cannot provide an appropriate conflation interval for the server to use.
A MessageStream
has the ability to conflate messages received for a subscription to a SOW topic, view, or conflated topic. When conflation is enabled, for each message received, the client checks to see whether it has already received an unprocessed message with the same SowKey
. If so, the client replaces the unprocessed message with the new message. The application never receives the message that has been replaced.
To enable client-side conflation, you call conflate()
on the MessageStream
, and then use the MessageStream
as usual:
When a MessageStream
is used for a subscription that does not include SowKeys
(such as a subscription to a topic that does not have a SOW), the MessageStream
will allow you to turn on conflation, but no conflation will occur.
When using client-side conflation with delta subscriptions, bear in mind that client-side conflation replaces the whole message, and does not attempt to merge deltas. This means that updates can be lost when messages are replaced. For some applications (for example, a ticker application that simply sends delta updates that replace the current price), this causes no problems. For other applications (for example, when several processors may be updating different fields of a message simultaneously), using conflation with deltas could result in lost data, and server-side conflation is a safer alternative.
AMPS message queues provide a high-performance way of distributing messages across a set of workers. The AMPS User Guide describes AMPS queues in detail, including the features of AMPS referred to in this chapter. This chapter does not describe AMPS queues in detail, but instead explains how to use the AMPS Python client with message queues.
To publish messages to a message queue, publishers simply publish to any topic that is collected by the queue. There is no difference between publishing to a queue and publishing to any other topic, and a publisher does not need to be aware that the topic will be collected into a queue.
Subscribers must be aware that they are subscribing to a queue, and acknowledge messages from the queue when the message is processed.
AMPS queues are designed for high-volume applications that need minimal latency and overhead. One of the features that helps performance is the subscription backlog feature, which allows applications to receive multiple messages at a time. The subscription backlog sets the maximum number of unacknowledged messages that AMPS will provide to the subscription.
When the subscription backlog is larger than 1
, AMPS delivers additional messages to a subscriber before the subscriber has acknowledged the first message received. This technique allows subscribers to process messages as fast as possible, without ever having to wait for messages to be delivered. The technique of providing a consistent flow of messages to the application is called smart pipelining.
The AMPS server determines the backlog for each subscription. An application can set the maximum backlog that it is willing to accept with the max_backlog
option. Depending on the configuration of the queue (or queues) specified in the subscription, AMPS may assign a smaller backlog to the subscription. If no max_backlog
option is specified, AMPS uses a max_backlog
of 1
for that subscription.
In general, applications that have a constant flow of messages perform better with a max_backlog
setting higher than 1
. The reason for this is that, with a backlog greater than 1
, the application can always have a message waiting when the previous message is processed. Setting the optimum max_backlog
is a matter of understanding the messaging pattern of your application and how quickly your application can process messages.
To request a max_backlog
for a subscription, you explicitly set the option on the subscribe command, as shown below:
The AMPS Python client handles most incoming messages and takes appropriate action. Some messages are unexpected or occur only in very rare circumstances. The AMPS Python client provides a way for clients to process these messages. Rather than providing handlers for all of these unusual events, AMPS provides a single handler function for messages that can't be handled during normal processing.
Your application registers this handler by setting the last_chance_message_handler
for the client. This handler is called when the client receives a message that can't be processed by any other handler. This is a rare event, and typically indicates an unexpected condition.
For example, if a client publishes a message that AMPS cannot parse, AMPS returns a failure acknowledgment. This is an unexpected event, so AMPS does not include an explicit handler for this event, and failure acknowledgments are received in the method registered as the last_chance_message_handler
.
Your application is responsible for taking any corrective action needed. For example, if a message publication fails, your application can decide to republish the message, publish a compensating message, log the error, stop publication altogether, or any other action that is appropriate.
AMPS State of the World (SOW) allows you to automatically keep and query the latest information about a topic on the AMPS server, without building a separate database. Using SOW lets you build impressively high-performance applications that provide rich experiences to users. The AMPS Python client lets you query SOW topics and subscribe to changes with ease.
To begin, we will look at a simple example of issuing a SOW query.
In the example above, the Client.sow()
convenience method is used to execute a SOW query on the orders
topic, for all entries that have a symbol of 'ROL'
.
As the query executes, messages containing the data of matching entries have a Command
of value "sow"
(provided as a constant value, AMPS.Message.Command.SOW
), so as those arrive, we write them to the console.
As with subscribe
, the sow
command also provides an asynchronous mode, where you provide a message handler.
In the example above, the execute_sow_query()
function invokes Client.execute_async()
to initiate a SOW query on the orders
topic, for all entries that have a symbol of 'ROL'
.
As the query executes, the on_message_handler()
function is invoked for each matching entry in the topic. Messages containing the data of matching entries have a Command
of value sow
, so as those arrive, we write them to the console.
In some cases, an application does not want the AMPS client to reconnect, but instead wants to take a different action if disconnection occurs. For example, a stateless publisher that sends ephemeral data (such as telemetry or prices) may want to exit with an error if the connection is lost rather than risk falling behind and providing outdated messages. Often, in this case, a monitoring process will start another publisher if a publisher fails, and it is better for a message to be lost than to arrive late.
To cover cases where the application has unusual needs, the AMPS client library allows an application to provide custom disconnect handling.
Your application gets to specify exactly what happens when a disconnect occurs by supplying a function to client.set_disconnect_handler()
, which is invoked whenever a disconnect occurs. This may be helpful for situations where a particular connection needs to do something completely different than reconnecting or failing over to another AMPS server.
Setting the disconnect handler completely replaces the disconnection and failover behavior for an HAClient
and provides the only disconnection and failover behavior for a Client
.
The handler runs on the thread that detects the disconnect. This may be the client receive thread (for example, if the disconnect is detected due to heartbeating) or an application thread (for example, if the disconnect is detected when sending a command to AMPS).
The example below shows the basics:
Delta messaging in AMPS has two independent aspects:
Delta Subscribe - Allows subscribers to receive just the fields that are updated within a message.
Delta Publish - Allows publishers to update and add fields within a message by publishing only the updates into the SOW.
This chapter describes how to create delta publish and delta subscribe commands using the AMPS Python client. For a discussion of this capability, how it works, and how message types support this capability see the AMPS User Guide sections on Incremental Message Updates for delta_publish
and Receiving Only Updated Fields for delta_subscribe
.
60East generally recommends that applications use an ack()
method to acknowledge messages during normal processing. This approach functions correctly when used within a message handler, supports batching as explained elsewhere in this chapter, and is generally both easier to code and more efficient.
However, in some situations, you may need to manually acknowledge messages in the queue. This is most common when an application needs to operate on all messages with certain characteristics, rather than acknowledging individual messages. For example, an application that is doing updates to an order may want to cancel an order by both publishing a cancellation and immediately expiring all other messages in the queue for that order. With manual acknowledgment, that application can use a filter to remove all previous updates for that order, then publish the cancellation.
To manually acknowledge processed messages and remove the messages from the queue, applications use the sow_delete
command. To remove specific messages from the queue, provide the bookmarks of those messages. To remove messages that match a given filter, provide the filter. Notice that AMPS only supports using a bookmark with sow_delete
when removing messages from a queue, not when removing records from a SOW.
For example, given a Message
object to acknowledge and a client, the code below acknowledges the message.
In the example above, the program creates a sow_delete
command, specifies the topic and the bookmark, and then sends the command to the server. Since the program does not need or expect a response from AMPS, this function provides None
as the message handler.
While this method works, creating and sending an acknowledgment for each individual message can be inefficient if your application is processing a large volume of messages. Rather than acknowledging each message individually, your application can build a comma-delimited list of bookmarks from the processed messages and acknowledge all of the messages at the same time. In this case, it's important to be sure that the number of messages you wait for is less than the maximum backlog -- the number of messages your client can have unacknowledged at a given time. Notice that both automatic acknowledgment and the helper method on the Message
object take the maximum backlog into account.
When constructing a command to acknowledge queue messages, AMPS allows an application to specify a filter rather than a set of bookmarks. AMPS interprets this as the client requesting acknowledgment of all messages that match the filter. (This may include messages that the client has not received, subject to the Leasing
model for the queue.)
As a more typical example of manual acknowledgment, the code below expires all messages for a given id
that have a status other than cancel
. An application might do this to halt processing of an order that it is about to cancel.
In the example above, the program specifies a topic and a filter to use to find the messages that should be removed. In this case, the program also provides the expire
option to indicate that the messages have been removed from the queue rather than successfully processed (of course, whether this is the correct behavior for a canceled order depends on the expected message flow for your application).
Notice that, as described in Understanding Threading, this method of acknowledging a message should not be used from a message handler unless the sow_delete
is sent from a different client than the client that called the message handler. Instead, 60East recommends using the ack()
function from within a message handler.
For each message delivered on a subscription, AMPS counts the message against the subscription backlog until the message is explicitly acknowledged. In addition, when a queue specifies at-least-once
delivery, AMPS retains the message in the queue until the message expires or until the message has been explicitly acknowledged and removed from the queue. From the point of view of the AMPS server, acknowledgment is implemented as a sow_delete
from the queue with the bookmarks of the messages to remove. The AMPS Python client provides several ways to make it easier for applications to create and send the appropriate sow_delete
.
The AMPS client allows you to specify that messages should be automatically acknowledged. When this mode is on, AMPS acknowledges the message automatically in the following cases:
Asynchronous Message Processing Interface - The message handler returns without throwing an exception.
Synchronous Message Processing Interface - The application requests the next message from the MessageStream
.
AMPS batches acknowledgments created with this method, as described in the following section.
To enable automatic acknowledgment, use the set_auto_ack()
method.
The AMPS Python client provides a convenience method, ack()
, on delivered messages. When the application is finished with the message, the application simply calls ack()
on the message. (This, in turn, provides the topic and bookmark to the ack()
method of the client that received the message.)
For messages that originated from a queue with at-least-once
semantics, this adds the bookmark from the message to the batch of messages to acknowledge. For other messages, this method has no effect.
The AMPS Python client automatically batches acknowledgments when either of the convenience methods is used. Batching acknowledgments reduces the number of round-trips to AMPS, which reduces network traffic and improves overall performance. AMPS sends the batch of acknowledgments when the number of acknowledgments exceeds a specified size, or when the amount of time since the last batch was sent exceeds a specified timeout.
You can set the number of messages to batch and the maximum amount of time between batches, as shown below:
The AMPS Python client is aware of the subscription backlog for a subscription. When AMPS returns the acknowledgment for a subscription that contains queues, AMPS includes information on the subscription backlog for the subscription. If the requested batch size is larger than the subscription backlog, the AMPS Python client adjusts the requested batch size to match the subscription backlog.
60East recommends tuning the batch size to improve application performance. A value of 1/3 of the smallest max_backlog
value is a good initial starting point for testing. 60East does not recommend setting the batch size larger than 1/2 of the max_backlog
value without testing to ensure that the application does not run out of messages to process while the acknowledgment is being sent to AMPS.
The AMPS clients include a batch size parameter that specifies how many messages the AMPS server will return to the client in a single batch when returning the results of a SOW query. The 60East clients set a batch size of 10 by default. This batch size works well for common message sizes and network configurations.
Adjusting the batch size may produce better network utilization and produce better performance overall for the application. The larger the batch size, the more messages AMPS will send to the network layer at a time. This can result in fewer packets being sent, and therefore less overhead in the network layer. The effect on performance is generally most noticeable for small messages, where setting a larger batch size will allow several messages to fit into a single packet. For larger messages, a batch size may still improve performance, but the improvement is less noticeable.
In general, 60East recommends setting a batch size that is large enough to produce few partially-filled packets. Bear in mind that AMPS holds the messages in memory while batching them, and the client must also hold the messages in memory while receiving the messages. Using batch sizes that require large amounts of memory for these operations can reduce overall application performance, even if network utilization is good.
For smaller message sizes, 60East recommends using the default batch size, and experimenting with tuning the batch size if performance improvements are necessary. For relatively large messages (especially messages with sizes over 1MB), 60East recommends explicitly setting a batch size of 1 as an initial value, and increasing the batch size only if performance testing with a larger batch size shows improved network utilization or faster overall performance.
To delta publish, you use the delta_publish
command as follows:
The message that you provide to AMPS must include the fields that the topic uses to generate the SOW key. Otherwise, AMPS will not be able to identify the message to update. For SOW topics that use a User-Generated SOW Key, use the Command
form of delta_publish
to set the SowKey
, as shown below:
The full behavior of delta_publish
is described in the AMPS User Guide section on Incremental Message Updates.
The AMPS Python client includes a set of utilities and helper classes to make working with AMPS easier.
The client provides a pair of classes for creating and parsing composite message types:
CompositeMessageBuilder
allows you to assemble the parts of a composite message and then serialize them in a format suitable for AMPS.
CompositeMessageParser
extracts the individual parts of a composite message type.
For more information regarding composite message types, refer to the Message Types chapter in the AMPS User Guide.
To build a composite message, create an instance of CompositeMessageBuilder
, and populate the parts. The CompositeMessageBuilder
copies the parts provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual parts of a message once they've been added to the builder.
The snippet below shows how to build a composite message that includes a JSON part, constructed as a string, and a binary part consisting of the bytes from an array.array
that contains doubles.
To parse a composite message, create an instance of CompositeMessageParser
, then use the parse()
method to parse the message provided by the AMPS client. The CompositeMessageParser
gives you access to each part of the message as a sequence of bytes.
For example, the following snippet parses and prints messages that contain a JSON part and a binary part that contains an array of doubles.
Notice that the receiving application is written with explicit knowledge of the structure and content of the composite message type.
The client provides a pair of classes for creating and parsing NVFIX messages:
NVFIXBuilder
allows you to assemble an NVFIX message and then serialize it in a format suitable for AMPS.
NVFIXShredder
extracts the individual fields of an NVFIX message type.
To build an NVFIX message, create an instance of NVFIXBuilder
, then add the fields of the message using append()
. NVFIXBuilder
copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.
The snippet below shows how to build an NVFIX message and publish it to the AMPS client.
To parse an NVFIX message, create an instance of NVFIXShredder
, then use the to_map()
method to parse the message provided by the AMPS client. The NVFIXShredder
gives you access to each field of the message in a map.
The snippet below shows how to parse and print an NVFIX message.
The client provides a pair of classes for creating and parsing FIX messages:
FIXBuilder
allows you to assemble a FIX message and then serialize it in a format suitable for AMPS.
FIXShredder
extracts the individual fields of a FIX message.
To build a FIX message, create an instance of FIXBuilder
, then add the fields of the message using append()
. FIXBuilder
copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.
The snippet below shows how to build a FIX message and publish it to the AMPS client.
To parse a FIX message, create an instance of FIXShredder
, then use the to_map()
method to parse the message provided by the AMPS client. The FIXShredder
gives you access to each field of the message in a map.
The snippet below shows how to parse and print a FIX message.
A subscriber can also explicitly release a message back to the queue. AMPS returns the message to the queue and redelivers the message just as though the lease had expired. To do this, the subscriber sends a sow_delete
command with the bookmark of the message to release and the cancel
option.
When using automatic acknowledgments and the asynchronous API, AMPS will cancel a message if an exception is thrown from the message handler.
To return a message to the queue, you can build a sow_delete
acknowledgment using the Command
class, or pass an option to the ack()
method on the message.
Option
Result
cancel
Returns the message to the queue.
expire
Immediately expires the message from the queue.
For example, to return a message to a queue, call ack()
on the message and pass the cancel
option.
To delta subscribe, you simply use the delta_subscribe
command as follows:
As described in the AMPS User Guide section on Receiving Only Updated Fields, messages provided to a delta subscription will contain the fields used to generate the SOW key and any changed fields in the message. Your application is responsible for choosing how to handle the changed fields.
The AMPS clients provide named convenience methods for core AMPS functionality. These named methods work by creating messages and sending those messages to AMPS. All communication with AMPS occurs through messages.
You can use the Command
object to customize the messages that AMPS sends. This is useful for more advanced scenarios where you need precise control over the message, or in cases where you need to use an earlier version of the client to communicate with a more recent version of AMPS, or in cases where a named method is not available.
AMPS messages are represented in the client as AMPS.Message
objects. The Message
object is generic and can represent any type of AMPS message, including both outgoing and incoming messages. This section includes a brief overview of elements common to AMPS command messages. Full details of commands to AMPS are provided in the AMPS Command Reference Guide.
All AMPS command messages contain the following elements:
Command - The command tells AMPS how to interpret the message. Without a command, AMPS will reject the message. Examples of commands include publish
, subscribe
, and sow
.
CommandId - The command ID, together with the name of the client, uniquely identifies a command to AMPS. The command ID can be used later on to refer to the command or the results of the command. For example, the command ID for a subscribe
message becomes the identifier for the subscription. The AMPS client provides a command ID when the command requires one and no command ID is set.
Most AMPS messages contain the following fields:
Topic - The topic that the command applies to, or a regular expression that identifies a set of topics that the command applies to. For most commands, the topic is required. Commands such as logon
, start_timer
, and stop_timer
do not apply to a specific topic, and do not need this field.
Ack Type - The ack type tells AMPS how to acknowledge the message to the client. Each command has a default acknowledgment type that AMPS uses if no other type is provided.
Options - The options
are a comma-separated list of options that affect how AMPS processes and responds to the message.
Beyond these fields, different commands include fields that are relevant to that particular command. For example, SOW queries, subscriptions, and some forms of SOW deletes accept the Filter field, which specifies the filter to apply to the subscription or query. As another example, publish commands accept the Expiration field, which sets the SOW expiration for the message.
For full details on the options available for each command and the acknowledgment messages returned by AMPS, see the AMPS Command Reference Guide.
To create a command, you simply construct a command object of the appropriate type:
Once created, you set the appropriate fields on the command. For example, the following code creates a SOW query, setting the command, topic, and filter for the query:
When sent to AMPS using the execute()
method, AMPS performs a SOW query from the topic messages-sow
using a filter of /id > 20
. The results of sending this message to AMPS are no different than using the form of the sow
method that sets these fields.
Once you've created a command, use the execute
method to send the command to AMPS. One form of the execute
method returns a MessageStream
that you can use from the calling thread to process responses from AMPS. The other form, execute_async
method, sends the message to AMPS, waits for a processed
acknowledgment, then returns. Messages are processed on the client background thread.
For example, the following snippet sends the command created above:
This returns a MessageStream
identical to the MessageStream
returned by the equivalent client.sow()
method.
You can also provide a message handler to receive acknowledgments, statistics, or the results of subscriptions and SOW queries. The AMPS client maintains a background thread that receives and processes incoming messages. The call to execute_async
returns on the main thread as soon as AMPS acknowledges the command as having been processed, and messages are received and processed on the background thread:
While this message handler simply prints the ack type and reason for sample purposes, message handlers in production applications are typically designed with a specific purpose. For example, your message handler may fill a work queue, or check for success and throw an exception if the command failed.
Notice that the publish
command typically does not return results other than acknowledgment messages. To send a publish
command, use the execute_async()
method, providing None
for the message handler:
Since the code sets the message handler to None
, this code does not receive acknowledgments. To detect publish failures, set the FailedWriteHandler
for the client.
The AMPS Command Reference includes information on which fields and options to set on commands to get a specific result. The reference includes both reference information and a Command Cookbook that provides a concise guide for commonly-used commands.
The AMPS Python Client provides an easy way to create highly-available applications using AMPS, via the HAClient
class. HAClient
derives from Client
and offers the same methods, but also adds protection against network, server, and client outages.
Using HAClient
allows applications to automatically:
Recover from temporary disconnects between client and server.
Failover from one server to another when a server becomes unavailable.
Since the HAClient
automatically manages failover and reconnection, 60East recommends using the HAClient
for applications that need to:
Automatically reconnect and resume work in the case of disconnection.
Ensure no messages are lost or duplicated after a reconnect or failover.
Persist messages and bookmarks on disk for protection against client failure.
You can choose how your application uses HAClient
features. For example, you might need automatic reconnection, but have no need to resume subscriptions or republish messages. The high availability behavior in HAClient
is provided by implementations of defined interfaces. You can combine different implementations provided by 60East to meet your needs, and implement those interfaces to provide your own policies.
Some of these features require specific configuration settings on your AMPS instance(s). This chapter mentions these features and describes how to use them from the AMPS Java client. You can find full documentation for these settings and server features in the AMPS User Guide.
HAClient
derives from Client
and offers the same methods for sending commands to AMPS and receiving messages from AMPS.
The HAClient
differs from the Client
in two ways:
The HAClient
automatically installs a disconnect handler that reconnects to AMPS and resumes active (asynchronous) subscriptions. The disconnect handler optionally replays publish
and sow_delete
messages that have not been acknowledged by AMPS, using a PublishStore
. The disconnect handler can optionally resume replays from the transaction log at a point that guarantees no messages are skipped and no duplicates are delivered to the application, using a BookmarkStore
.
The HAClient
includes the infrastructure needed for client failover, including a list of connection strings and their associated authentication mechanisms (provided by the ServerChooser
), and options for controlling backoff behavior for reconnects (provided by the DelayStrategy
). As a result, the HAClient
provides a connect_and_logon()
function for establishing a connection to AMPS, rather than treating these as independent steps that an application must manage itself.
If your application needs to automatically reconnect to AMPS, 60East recommends using the HAClient
and the automatically provided disconnect handler rather than using a Client
or replacing the HAClient
default disconnect handler.
The most important difference between Client
and HAClient
is that HAClient
automatically provides a reconnect handler.
This description provides a high-level framework for understanding the components involved in failover with the HAClient
. The components are described in more detail in the following sections.
The HAClient
reconnect handler performs the following steps when reconnecting:
Calls the ServerChooser
to determine the next URI to connect to and the authenticator to use for that connection.
If the connection fails, calls get_error
on the ServerChooser
to get a description of the failure, sends an exception to the exception listener and stops the reconnection process.
Calls the DelayStrategy
to determine how long to wait before attempting to reconnect and waits for that period of time.
Connects to the AMPS server. If the connection fails, calls report_failure
on the ServerChooser
and begins the process again.
Logs on to the AMPS server. If the connection fails, calls report_failure
on the ServerChooser
and begins the process again.
Calls report_success
on the ServerChooser
.
Receives the bookmark for the last message that the server has persisted. Discards any older messages from the PublishStore
.
Republishes any messages in the PublishStore
that have not been persisted by the server.
Re-establishes subscriptions using the SubscriptionManager
for the client. For bookmark subscriptions, the reconnect handler uses the BookmarkStore
for the client to determine the most recent bookmark, and re-subscribes with that bookmark. For subscriptions that do not use a bookmark, the SubscriptionManager
simply re-enters the subscription, meaning that it is entered at the point at which the HAClient
reconnects.
The ServerChooser
, DelayStrategy
, PublishStore
, and BookmarkStore
are all extension points for the HAClient
. You can adapt the failover and recovery behavior by setting a different object for the behavior you want to customize on the HAClient
or by providing your own implementation.
For example, the convenience methods in the previous section customize the behavior of the PublishStore
and BookmarkStore
by providing either memory-backed or file-backed stores.
The reconnection process runs on the thread that discovers the disconnection. This means that, in the event that an application thread discovers the disconnection as a result of a call to the Python AMPS client, that call may not return until a connection is re-established (or until the server chooser indicates failure, in which case the application will receive an exception).
The Python client includes a retry_on_disconnect
setting that controls this retry behavior when the client is disconnected. When set to True
(the default), any call to the Client that results in a command being sent to AMPS may block until a connection is re-established. When set to False
, the HAClient
will retry the connection a single time and throw an exception if the connection cannot be re-established.
Regardless of the retry_on_disconnect
setting, a call to publish
will result in the message being stored in the PublishStore
for the client if one is set.
If your application needs reliable publish to AMPS, install a PublishStore
in the HAClient
. If your application needs to resume replays from the transaction log, install a BookmarkStore
in the HAClient
.
These stores provide the following capabilities:
A bookmark store tracks received messages and is used to resume subscriptions that replay from the transaction log.
A publish store tracks published messages and is used to ensure that messages are persisted in AMPS.
The AMPS client provides a memory-backed version of each store and a file-backed version of each store. The store interface is public, and an application can create and provide a custom store as necessary. An HAClient
can use either a memory backed store or a file backed store for protection. Each method provides resilience to different failures, as described below:
Memory-backed stores provide recovery disconnection from AMPS by storing messages and bookmarks in your process' address space. This is the highest performance option for working with AMPS in a highly available manner. The trade-off with this method is there is no protection from a crash or failure of your client application. If your application is terminated prematurely or, if the application terminates at the same time as an AMPS instance failure or network outage, then messages may be lost or duplicated. The state of bookmark replays will be lost when the application shuts down. Messages in the publish store when the application shuts down will not be maintained through a restart, so the application will not be able to attempt any necessary redelivery when the application restarts.
A memory-backed store should only be used by one instance of a client at a time.
File-backed stores provide recovery after client failure and disconnection from AMPS by storing messages and bookmarks on disk. To use this protection method, the create_file_backed
method requests additional arguments for the two files that will be used for both bookmark storage and message storage. If these files exist and are non-empty (as they would be after a client application is restarted), the HAClient
loads their contents and ensures synchronization with the AMPS server once connected. The performance of this option depends heavily on the speed of the device on which these files are placed. When the files do not exist (as they would the first time a client starts on a given system), the HAClient
creates and initializes the files, and in this case the client does not have a point at which to resume the subscription or messages to republish.
A store file should only be used by one instance of a client at a time.
When using file-backed stores, 60East recommends periodically removing unneeded entries by calling the prune()
method. The precise strategy that your application uses to call prune()
depends on the nature of the application. Most applications call prune()
when the application exits.
There are two basic strategies that applications follow while the application runs:
Install a resize handler and call prune()
after a specified number of resize operations or when the store reaches a specific size.
Call prune()
after a specific number of messages are processed (for example, every 10,000 messages received or every 1,000 updates completed).
Regardless of the strategy, it is best to call prune()
when the application is idle, since the prune()
call rewrites the log file.
The store interface is public and an application can create and provide a custom store as necessary. While clients provide convenience methods for creating file-backed and memory-backed HAClient
objects with the appropriate stores, you can also create and set the stores in your application code. For the AMPS Python client, stores are implemented in C++. You can implement stores using C++, and use the technique described in Chapter 12 of this guide - Using the C++ Client, to set the store on the client.
Starting in 5.3.2.0, the underlying AMPS client contains a recovery point adapter interface to make it easier to add a custom persistence layer to a bookmark store. The distribution includes a recovery point adapter that can store bookmark recovery information in an AMPS SOW topic.
The HAClient
provides convenience methods for creating clients and setting stores. You can also construct an HAClient
and set the store implementations you choose.
In this example, we create several clients. The first client uses memory stores for both bookmarks and publishes. The second client uses files for both bookmarks and publishes. The third client uses a file for bookmarks. The third client does not set a store for publishes, which means that AMPS provides the default store (and no outgoing messages are stored). The final client does not specify any stores, so has no persistence for published messages or bookmark subscriptions, but can take advantage of the automatic failover and reconnection in the HAClient
.
The AMPS client also includes the ability to use a SOW topic to store bookmark state for a bookmark store. This can be a useful option in a situation where an application needs a persistent bookmark store, but does not have the ability to store a file on the filesystem, or where an application has a bookmark file, but wants to have the ability to resume the subscription if the file is lost or damaged, or if the application is started on a system that does not have access to the file.
To use the SOW topic recovery point adapter, you create a bookmark store of the type you would like to use for the Client
, passing an adapter when you construct the store. You then set this bookmark store as the store for the Client
to use. The constructor for the SOW recovery adapter allows you to customize the topic name and field names used to store the recovery point information in AMPS. As with the RecoveryPointAdapter
interface in general, it is possible to customize the behavior of the SOW recovery point adapter by overriding the provided methods.
This section describes how to use the adapter with the default settings. Should you need to change the behavior of the class, you would adjust the guidance in this section accordingly. (For example, if you override methods to produce a message with a different set of keys or a different message format, you would update the topic definition accordingly).
To store recovery point state in AMPS, the AMPS instance that will store the recovery point state must define a SOW/Topic
to hold the recovery point data.
By default, the adapter uses a topic named /ADMIN/bookmark_store
of json
message type, with the /clientName
and /subId
fields as keys, similar to the following definition:
You must include this definition, or an equivalent definition, in the configuration file for the AMPS instance that will host the recovery point.
If you define a topic with a different configuration (for example, different key names, a different topic name or a different message type), you must ensure that the adapter that you create uses the same parameters as those configured on the server.
The AMPS SOW Recovery Point Adapter requires a Client
or HAClient
connected to the instance that contains the SOW topic. The Adapter will use this client to recover bookmark state and store bookmarks in AMPS. Notice that this client must not be a client that the Adapter is keeping state for. This must be a completely separate client instance, otherwise the client may deadlock while updating the store.
The client must be connected and logged in to the instance that contains the SOW topic, using the message type defined for the topic.
When an application uses a file-backed store, it is important to make sure that there is enough space available on the file system to be able to manage the store.
For logged bookmark stores, an application needs to keep a bookmark record for each message received, each message discarded, and the persisted acknowledgments delivered by the server approximately once a second. Each bookmark entry consumes roughly 70 bytes of storage plus the length of the subscription ID for the subscription receiving the message. The logged bookmark store retains entries until an application explicitly calls prune()
. The capacity needed for a logged bookmark store will depend on the strategy that the application uses for pruning the file.
For a file-backed publish store, the application needs to be able to store published messages until the AMPS server that the publisher is connected to acknowledges those messages as persisted. The volume of messages that needs to be stored depends on the failover policy for the server -- that is, the maximum amount of time that the server will allow a downstream instance to fail to acknowledge a message before the server downgrades that connection to async
acknowledgment. By default, AMPS does not downgrade connections: this policy must be set explicitly using the AMPS actions. As an example, if the server is configured to downgrade connections that are more than 120 seconds behind, then -- for disaster recovery -- the application must have the capacity to store 120 seconds of published messages at peak publishing load. However, unlike the logged bookmark store, a file-backed publish store removes messages from the store and reuses the space once AMPS has acknowledged the message.
Unlike Client
, the HAClient
attempts to keep itself connected to an AMPS instance at all times, by automatically reconnecting or failing over when it detects that the client is disconnected. When you are using the Client
directly, your disconnect handler usually takes care of reconnection. HAClient
, on the other hand, provides a disconnect handler that automatically reconnects to the current server or to the next available server.
To inform the HAClient
of the addresses of the AMPS instances in your system, you pass a ServerChooser
instance to the HAClient
. ServerChooser
acts as a smart enumerator over the servers available: HAClient
calls ServerChooser
methods to inquire about what server should be connected, and calls methods to indicate whether a given server succeeded or failed.
The AMPS Python client provides a simple implementation of ServerChooser
, called DefaultServerChooser
, that provides very simple logic for reconnecting. This server chooser is most suitable for basic testing, or in cases where an application should simply rotate through a list of servers. For most applications, you implement the ServerChooser
interface yourself for more advanced logic, such as choosing a backup server based on your network topology, or limiting the number of times your application should try to reconnect to a given address.
To connect to AMPS, you provide a ServerChooser
to HAClient
and then call connect_and_logon()
to create the first connection:
Similar to Client
, HAClient
remains connected to the server until disconnect()
is called. Unlike Client
, HAClient
automatically attempts to reconnect to your server if it detects a disconnect and if that server cannot be connected, fails over to the next server provided by the ServerChooser
. In this example, the call to connect_and_logon()
attempts to connect and login to primary.amps.xyz.com
, and returns if that is successful. If it cannot connect, it tries secondary.amps.xyz.com
, and continues trying servers from the ServerChooser
until a connection is established. Likewise, if it detects a disconnection while the client is in use, then HAClient
attempts to reconnect to the server with which it was most recently connected; if that is not possible, then it moves on to the next server provided by the ServerChooser
.
The default ServerChooser
simply provides the next URL in the sequence. This strategy works for many applications. If you need a different strategy, you can implement your own logic for failover by creating a class derived from ServerChooser
.
You can control the amount of time between reconnection attempts and set a total amount of time for the HAClient
to attempt to reconnect.
The AMPS Python client includes a method for setting a delay strategy on a client, set_reconnect_delay_strategy
. This method accepts an instance of any type that provides the methods get_connect_wait_duration
and reset
, as described in the API documentation.
While you can easily implement your own delay strategy, the client also provides two delay strategies:
FixedDelayStrategy
provides the same delay each time the HAClient
tries to reconnect.
ExponentialDelayStrategy
provides an exponential backoff until a connection attempt succeeds.
To use either of these classes, you simply create an instance, set the appropriate parameters, and install that instance as the delay strategy for the HAClient
. For example, the following code sets up a reconnect delay that starts at 200ms and increases the delay by 1.5 times after each failure. The strategy allows a maximum delay between connection attempts of 5 seconds, and will not retry longer than 60 seconds.
As described above, you provide the HAClient
with connection strings to one or more AMPS servers using a ServerChooser
. The purpose of a ServerChooser
is to provide information to the HAClient
. A ServerChooser
does not manage the reconnection process, and should not call methods on the HAClient
.
A ServerChooser
has two required responsibilities to the HAClient
:
Tells the HAClient
the connection string for the server to connect to. If there are no servers, or the ServerChooser
wants the connection to fail, the ServerChooser
returns an empty string.
To provide this information, the ServerChooser
implements the get_current_uri()
method.
Provides an Authenticator
for the current connection string. This is especially important for installations where different servers require different credentials or authentication tokens must be reset after each connection attempt.
To provide the authenticator, the ServerChooser
implements the get_current_authenticator()
method.
The HAClient
calls the get_current_uri()
and get_current_authenticator()
methods each time it needs to make a connection.
Each time a connection succeeds, the HAClient
calls the report_success()
method of the ServerChooser
. Each time a connection fails, the HAClient
calls the report_failure()
method of the ServerChooser
. The HAClient
does not require the ServerChooser
to take any particular action when it calls these methods. These methods are provided for the HAClient
to do internal maintenance, logging, or record keeping. For example, an HAClient
might keep a list of available URIs with a current failure count, and skip over URIs that have failed more than 5 consecutive times until all URIs in the list have failed more than 5 consecutive times.
When the ServerChooser
returns an empty string from get_current_uri()
, indicating that no servers are available for connection, the HAClient
calls the get_error()
method on the ServerChooser
, if one is provided, and includes the string returned by get_error()
in the generated exception.
Use of the HAClient
allows your application to quickly recover from detected connection failures. By default, connection failure detection occurs when AMPS receives an operating system error on the connection. This system may result in unpredictable delays in detecting a connection failure on the client, particularly when failures in network routing hardware occur, and the client primarily acts as a subscriber.
The heartbeat feature of the AMPS client allows connection failure to be detected quickly. Heartbeats ensure that regular messages are sent between the AMPS client and server on a predictable schedule. The AMPS client and server both assume disconnection has occurred if these regular heartbeats cease, ensuring disconnection is detected in a timely manner. To use the heartbeat feature, call the set_heartbeat
method on Client
or HAClient
:
Method set_heartbeat
takes one parameter: the heartbeat interval. The heartbeat interval specifies the periodicity of heartbeat messages sent by the server: the value 3
indicates messages are sent on a three-second interval. If the client receives no messages in a six-second window (two heartbeat intervals), the connection is assumed to be dead, and the HAClient
attempts reconnection. An additional variant of set_heartbeat
allows the idle period to be set to a value other than two heartbeat intervals. (The server, however, will always consider a connection to be closed after two heartbeat intervals without any traffic.)
Notice that, for HAClient
, setHeartbeat
must be called before the client is connected. For Client
, setHeartbeat
must be called after the client is connected.
Heartbeats are serviced on the receive thread created by the AMPS client. Your application must not block the receive thread for longer than the heartbeat interval or the application is subject to being disconnected.
Publishing with an HAClient
is nearly identical to regular publishing; you simply call the publish()
method with your message's topic and data. The AMPS client sends the message to AMPS, and then returns from the publish()
call. For maximum performance, the client does not wait for the AMPS server to acknowledge that the message has been received.
When an HAClient
sets a publish store, the publish store retains a copy of each outgoing message and requests that AMPS acknowledge that the message has been persisted. The AMPS server acknowledges messages back to the publisher. Acknowledgments can be delivered for multiple messages at periodic intervals (for topics recorded in the transaction log) or after each message (for topics that are not recorded in the transaction log). When an acknowledgment for a message is received, the HAClient
removes that message from the bookmark store. When a connection to a server is made, the HAClient
automatically determines which messages from the publish store (if any) the server has not processed, and replays those messages to the server once the connection is established.
For reliable publishers, the application must choose how best to handle application shutdown. For example, it is possible for the network to fail immediately after the publisher sends the message, while the message is still in transit. In this case, the publisher has sent the message, but the server has not processed it and acknowledged it. During normal operation, the HAClient
will automatically connect and retry the message. On shutdown, however, the application must decide whether to wait for messages to be acknowledged, or whether to exit.
Publish store implementations provide an unpersisted_count()
method that reports the number of messages that have not yet been acknowledged by the AMPS server. When the unpersisted_count()
reaches 0
, there are no unpersisted messages in the local publish store.
For the highest level of safety, an application can wait until the unpersisted_count()
reaches 0
, which indicates that all of the messages have been persisted to the instance that the application is connected to, and the synchronous replication destinations configured for that instance. When a synchronous replication destination goes offline, this approach will cause the publisher to wait to exit until the destination comes back online or until the destination is downgraded to asynchronous replication.
For applications that are shut down periodically for short periods of time (for example, applications that are only offline during a weekly maintenance window), another approach is to use the publish_flush()
method to ensure that messages are delivered to AMPS, and then rely on the connection logic to replay messages as necessary when the application restarts.
For example, the following code flushes messages to AMPS, then warns if not all messages have been acknowledged:
In this example, the client sends each message immediately when publish()
is called. If AMPS becomes unavailable between the final publish()
and the disconnect()
, or one of the servers that the AMPS instance replicates to is offline, the client may not have received a persisted acknowledgment for all of the published messages. For example, if a message has not yet been persisted by all of the servers in the replication fabric that are connected with synchronous replication, AMPS will not have acknowledged the message.
Before shutting down the client, the code does two things:
First, the code flushes messages to the server to ensure that all messages have been delivered to AMPS.
Next, the code checks to see if all of the messages in the publish store have been acknowledged as persisted by AMPS. If the messages have not been acknowledged, they will remain in the publish store file and will be published to AMPS, if necessary, the next time the application connects. An application may choose to loop until get_unpersisted_count()
returns 0
, or (as we do in this case) simply warn that AMPS has not confirmed that the messages are fully persisted. The behavior you choose in your application should be consistent with the high-availability guarantees your application needs to provide.
AMPS uses the name of the HAClient
to determine the origin of messages. For the AMPS server to correctly identify duplicate messages, each instance of an application that publishes messages must use a distinct name. That name must be consistent across different runs of the application.
If your application crashes or is terminated, some published messages may not have been persisted in the AMPS server. If you use the file-based store—in other words, if you provide file names for persistent storage when you create the HAClient
—the HAClient
will recover the messages, and once logged on, will correlate the message store to what the AMPS server has received, re-publishing any missing messages. This occurs automatically when HAClient
connects, without any explicit consideration in your code, other than ensuring that the same file name is used to create the HAClient
if recovery is desired.
AMPS provides persisted acknowledgment messages for topics that do not have a transaction log enabled. However, the level of durability provided for topics with no transaction log is minimal. Learn more about transaction logs in the AMPS User Guide.
AMPS replication provides two different acknowledgment modes for outgoing replication links from an instance:
For a link in sync
acknowledgment mode, a message must be successfully acknowledged by the downstream instance of AMPS before this instance of AMPS will acknowledge the message.
For a link in async
acknowledgment mode, this link is not considered for acknowledging the message. In this mode, the downstream side of the replication link may not have received or processed the message at the time that the publisher receives an acknowledgment.
As described in the AMPS User Guide, a publisher must not failover from one instance of AMPS to another instance when any link between those instances uses async
acknowledgment unless replication is certain to have reached that instance. (For example, if replication is taking a maximum of 1.2 seconds between the instances and the publisher has been disconnected for 30 seconds, all messages from that publisher will have been replicated).
To help detect a situation where a publisher may be "jumping ahead" of messages that it has published, but which have not yet been replicated, the AMPS client allows an application to consider it to be an error to make a connection to a server that has not received messages previously published by the application.
To enable this behavior, set the setErrorOnPublishGap()
method to set this property on the PublishStore
in use for the client. When this property is set, the client will consider it to be an error to connect to a server that has not received messages previously published by the client, and consider the connection to have failed.
Notice that an application that uses this approach may need to handle situations where no server has received the message, particularly if the replication configuration uses automated replication downgrade.
HAClient
provides two important features for applications that subscribe to one or more topics: re-subscription, and a bookmark store to track the correct point at which to resume a bookmark subscription.
Any asynchronous subscription placed using an HAClient
is automatically reinstated after a disconnect or a failover. These subscriptions are placed in an in-memory SubscriptionManager
, which is created automatically when the HAClient
is instantiated.
When a re-subscription occurs, the AMPS Python client re-executes the command as originally submitted, including the original topic, options, and so on. AMPS sends the subscriber any messages for the specified topic (or topic expression) that are published after the subscription is placed. For a sow_and_subscribe
command, this means that the client re-issues the full command, including the SOW query as well as the subscription.
The HAClient
(starting with the AMPS Python client version 4.3.1.1) does not track synchronous message processing subscriptions in the SubscriptionManager
. The reason for this is to preserve the iterator semantics. That is, once the MessageStream
indicates that there are no more elements in the stream, it does not suddenly produce more elements.
To re-subscribe when the HAClient
fails over, you can simply re-issue the subscription. For example, the snippet below re-issues the subscribe command when the message stream ends:
In cases where it is critical not to miss a single message, it is important to be able to resume a subscription at the exact point that a failure occurred. In this case, simply recreating a subscription isn't sufficient. Even though the subscription is recreated, the subscriber may have been disconnected at precisely the wrong time and will not see the message.
To ensure delivery of every message from a topic or set of topics, the AMPS HAClient
includes a BookmarkStore
that, combined with the bookmark subscription and transaction log functionality in the AMPS server, ensures that clients receive any messages that might have been missed. The client stores the bookmark associated with each message received, and tracks whether the application has processed that message; if a disconnect occurs, the client uses the BookmarkStore
to determine the correct resubscription point, and sends that bookmark to AMPS when it re-subscribes. AMPS then replays messages from its transaction log from the point after the specified bookmark, thus ensuring the client is completely up-to-date.
HAClient
helps you to take advantage of this bookmark mechanism through the BookmarkStore
interface and bookmarkSubscribe()
method on Client
. When you create subscriptions with bookmarkSubscribe()
, whenever a disconnection or failover occurs, your application automatically re-subscribes to the message after the last message it processed. HAClients
created by createFileBacked()
additionally store these bookmarks on disk, so that the application can restart with the appropriate message if the client application fails and restarts.
To take advantage of bookmark subscriptions, do the following:
Ensure the topic(s) to be subscribed to are included in a transaction log. See the AMPS User Guide for information on how to specify the contents of a transaction log.
Use bookmark_subscribe()
instead of subscribe()
when creating a subscription()
and decide how the application will manage subscription identifiers (SubIds). If you are using a Command
object, you can simply set the bookmark on that object.
Use the discard()
method in message handlers to indicate when a message has been fully processed by the application, that is, when the application does not need to receive the message again if the application fails over.
The following example creates a bookmark subscription against a transaction-logged topic and fully processes each message as soon as it is delivered:
In this example, the client is a file-backed client, meaning that arriving bookmarks will be stored in a file (aClient.subscribeLog
). Storing these bookmarks in a file allows the application to restart the subscription from the last message processed, in the event of either server or client failure.
For optimum performance, it is critical to discard every message once its processing is complete. If a message is never discarded, it remains in the bookmark store. During re-subscription, HAClient
always restarts the bookmark subscription with the oldest undiscarded message, and then filters out any more recent messages that have been discarded. If an old message remains in the store, but is no longer important for the application’s functioning, then the client and the AMPS server will incur unnecessary network, disk and CPU activity.
The fourth parameter, sub_id
, specifies an identifier to be used for this subscription. Passing None
causes HAClient
to generate one and return it, like most other Client
functions. However, if you wish to resume a subscription from a previous point after the application has terminated and restarted, the application must pass the same subscription ID as during its previous run. Passing a different subscription ID bypasses any recovery mechanisms, creating an entirely new subscription. When you use an existing subscription ID, the HAClient
locates the last-used bookmark for that subscription in the local store, and attempts to re-subscribe from that point.
Below are the different bookmark types that can be used to enable different recovery strategies for an application:
Client.Bookmarks.NOW
specifies that the subscription should begin from the moment the server receives the subscription request. This results in the same messages being delivered as if you had invoked subscribe()
instead, except that the messages will be accompanied by bookmarks. This is also the behavior that results if you supply an invalid bookmark.
Client.Bookmarks.EPOCH
specifies that the subscription should begin from the beginning of the AMPS transaction log (that is, the first entry in the oldest journal file for the transaction log).
Client.Bookmarks.MOST_RECENT
specifies that the subscription should begin from the last-used message in the associated BookmarkStore
. Alternatively, if this subscription has not been seen before, it instructs the subscription to begin with EPOCH
. This is the most common value for this parameter and is the value used in the preceding example. By using MOST_RECENT
, the application automatically resumes from wherever the subscription left off, taking into account any messages that have already been processed and discarded.
When the HAClient
re-subscribes after a disconnection and reconnection, it always uses MOST_RECENT
, ensuring that the continued subscription always begins from the last message used before the disconnect, so that no messages are missed.
With only a few changes, most AMPS applications can take advantage of the HAClient
and associated classes to become more highly-available and resilient. Using the PublishStore
, publishers can ensure that every message published has actually been persisted by AMPS. Using BookmarkStore
, subscribers can make sure that there are no gaps or duplicates in the messages received. HAClient
makes both kinds of applications more resilient to network and server outages, as well as temporary issues. By utilizing the file based HAClient
, clients can recover their state after an unexpected termination or crash. Though HAClient
provides useful defaults for the Store
, BookmarkStore
, and ServerChooser
, you can customize any or all of these to the specific needs of your application and architecture.
The AMPS Python client provides a wrapper that works with the python ctypes
module to allow you to create message handlers in C or C++ and expose them to Python. This can improve performance in the message handler. When you use this technique, messages are delivered directly from the C++ client to your message handler: there is no Python code involved in handling the messages.
To use this capability, you:
Create a message handler with C linkage, and compile that message handler into a shared library.
In your Python program, use the ctypes
module to load the library.
Construct an instance of CMessageHandler
, a wrapper object that holds a pointer to the message handler function and the user data to be provided to the handler during each call.
Pass the CMessageHandler
to any method that expects a message handler.
The AMPS Python client registers the pointer and user data you provide as a C++ message handler. Once the handler is registered, no Python code is called when providing messages to the handler.
To use this capability, you create a message handler that exposes a function with the following signature having C linkage:
Notice that this signature is the same signature used by message handlers in the AMPS C++ client. You implement the function and compile it into a shared library or DLL, using the instructions provided with your Python implementation. For details on the C++ client, you can install the client itself, or consult the C++ API documentation.
Once you've compiled the library, you use the ctypes
module to load the library. You then create an instance of the message handler wrapper, and pass that wrapper to the AMPS client methods, as shown below:
The AMPS.CMessageHandler
type accepts a pointer to a message handler with the signature shown above and a Python object that can be marshalled into a native C type through the ctypes
interface. Once marshalled, the object will be cast to a void *
and provided in the userdata
parameter of the message handler. Marshalling the userdata
parameter follows the ctypes
module conventions. If you need to explicitly control the way an object is marshalled, you can construct one of the ctypes
objects and pass that new object into the method.
While the AMPS Python client provides enough performance for a wide variety of applications, in some cases, using the underlying C++ implementation can provide extra performance. The AMPS Python client works with the ctypes
module to allow you to pass the underlying C++ client to an arbitrary function, effectively allowing you to integrate C++ code directly into your Python program.
Consider using the C++ client directly when latency is at a premium or when your application works directly with C++. For example, you might you use the client directly when:
You are assembling messages from a C++ library without a Python binding
You need to customize client behavior that is implemented in C++ (for example, implementing a custom SubscriptionManager or BookmarkStore)
Your application needs to execute a set of commands with AMPS with minimal latency. For example, you might need to publish an array of values as individual messages with as little latency as possible. In this case, using the underlying C++ client directly can reduce latency.
To use the underlying C++ client, you:
Create a function with C linkage, and compile that function into a shared library. One of the parameters of the function should be a reference to an AMPS::Client
.
In your Python program, use the ctypes
module to load the library.
Call the function on the library, passing the appropriate parameters for the C function.
The only requirement on the C++ function is that it have C linkage and that one of the parameters is a reference to an AMPS::Client
. By convention, 60East recommends that the first parameter is the AMPS::Client
. However, this is not a requirement of the interface.
For example, a function that simply takes an AMPS::Client
has the following signature:
While a function that takes a client, a topic, and a pointer to data to be published might have the following signature:
The ctypes
module provides a standard AMPS::Client
to these functions. Although the Client
has been created by Python code, there is nothing Python-specific about the object within the C++ function. You can use the Client
just as you would any other Client
object.
You can also use the ctypes
binding with AMPS::HAClient
, as shown below:
Since the ctypes
module passes the data through the C ABI, the module is not able to perform extensive type checking on C++ types. Your Python code must be careful to pass only objects of the appropriate type, or you may cause a segmentation violation. For example, if a method expecting an HAClient
receives a Client
and calls connectAndLogon
(which is not a method provided by Client), your program will likely crash.
The ctypes
module has a few important caveats.
The ctypes
module does not provide strong type-safety guarantees for C++ classes. It is your responsibility to ensure that you call methods with objects of the appropriate type.
The ctypes
module calls your function through an extern "C" interface. C++ exceptions cannot be propagated out of a function with C binding. You must catch all exceptions that may be thrown, or your application will likely crash.
Once you've compiled the library, you use the ctypes
module to load the library. You can then call the function directly from Python, using the name of the C function and passing the appropriate arguments.
Let's look at a simple example. For this example, assume that you have compiled a module named module.so
with the following function:
You can load the module and call the function as shown below:
The ctypes
module handles type conversions between Python and C types. In this case, the module passes the underlying Python client as the first argument of the C function. The two Python strings are passed as NULL-terminated char *
arrays.
The ctypes
module also handles more complicated signatures and correctly passes arrays. For example, you could implement a method that publishes an array of Python values as follows:
You could then use this function from Python as follows:
The sample above creates an array of dictionaries and creates an array of JSON objects from those dictionaries.
In this case, it is important for us to control how the array of JSON objects is passed to the C function. We need to pass an array of C-style strings, that is, const char**
. To control how the array is marshalled, the sample creates an object that knows how to translate between a Python array and const char**
, then assigns the array to that object (see the ctype
documentation for full details). Once we have that object, we simply call the vector_publish
function. None of the Python infrastructure is visible to the vector_publish
function: that function is able to use the provided data as native C++ data.
The AMPS Python client offers the ability to filter incoming and outgoing messages in the format they are sent and received on the network. This allows you to inspect or modify outgoing messages before they are sent to the network, and incoming messages as they arrive from the network. This can be especially useful when using SSL connections, since this gives you a way to monitor outgoing network traffic before it is encrypted, and incoming network traffic after it is decrypted.
To create a transport filter, you create a callable that expects a string that contains the raw data, and a direction parameter indicating whether the string is output or not. For example, the following function simply prints the direction and data:
You then register the filter by calling set_transport_filter
with the callable, as shown below.
Notice that the transport filter function is called with the verbatim contents of data received from AMPS. This means that, for incoming data, the function may not be called precisely on message boundaries, and that the binary length encoding used by the client and server will be presented to the transport filter.
A Message
object contains two methods for retrieving the message payload:
get_data()
returns the payload as a string
get_data_raw()
returns the payload as bytes
If you are working with binary data that is not guaranteed to be valid UTF-8, use the get_data_raw
method to avoid errors when attempting to encode the data to a string.
The AMPS Python client includes support for Secure Sockets Layer. To use this support in the Python client using the default OpenSSL implementation for the Python installation, you need only use tcps
for the transport type in the connection string.
If your Python client does not have a default OpenSSL implementation, you must load an SSL implementation as described below. This is typically the case for Windows Python builds, and may be the case if your site uses a custom build of Python on Linux.
The Python client also allows you to load and use an OpenSSL implementation other than the default implementation for the Python installation. The AMPS Python client provides the method ssl_init
, which takes the name of the library to load or a full path to the file that contains the library. For example, to load the SSL implementation at /opt/mycorp/trusted/vetted_ssl.so
, you could use the following line of code:
You must load the SSL library before making the connection.