Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
AMPS uses the name of the client as a session identifier and as part of the identifier for messages originating from that client.
For this reason, when a transaction log is enabled in the AMPS instance (that is, when the instance is recording a sequence of publishes and attempting to eliminate duplicate publishes), an AMPS instance will only allow one application with a given client name to connect to the instance.
When a transaction log is present, AMPS requires the client name for a publisher to be:
Unique within a set of replicated AMPS instances
Consistent from invocation to invocation if the publisher will be publishing the same logical stream of messages
If publishers do not meet this contract (for example, if the publisher changes its name and publishes the same messages, or if a different publisher uses the same session name), message loss or duplication can happen.
60East recommends always using consistent, unique client names. For example, the client name could be formed by combining the application name, an identifier for the host system, and the ID of the user running the application. A strategy like this provides a name that will be different for different users or on different systems, but consistent for instances of the application that should be treated as equivalent to the AMPS system.
Likewise, if a publisher is sending a completely independent stream of messages (for example, a microservice that sends a different, unrelated sequence of messages each time it connects to AMPS), there is no need for a publisher to retain the same name each time it starts. However, if a publisher is resuming a stream of messages (as in the case when using a file-backed publish store), that publisher must use the same client name, since the publisher is resuming the session.
This guide contains information about the AMPS C/C++ client.
It focuses on information that is specific to the AMPS C/C++ client. The guide does not contain detailed information on AMPS itself.
To get started with AMPS, see the Introduction to AMPS guide. That guide contains an overview of AMPS and instructions for setting up a development environment.
This guide assumes that you have a development environment for C/C++ and access to an AMPS server using the configuration provided with the C/C++ samples (in the full source distribution of the client).
Welcome to developing applications with AMPS, the Advanced Message Processing System from 60East Technologies!
These guides will help you learn how to develop applications using AMPS.
Before getting started with this guide, it is important to have a good understanding of the following topics:
Developing Applications in C or C++
To be successful using this guide, and developing applications with AMPS, you will need to have a working knowledge of the language you are developing in.
AMPS Concepts
This guide focuses on using the AMPS client libraries and how those libraries work with the AMPS server.
Before working through this guide, we recommend reading the guide.
Detailed explanations of the AMPS server behavior are in the .
You will also need a system on which you can compile and run code, and a system where you can host the AMPS server.
You will need an installed and running AMPS server to use the product as well. You can write and compile programs that use AMPS without a running server, but you will get the most out of this guide by running the programs against a working server.
Instructions for starting an instance of AMPS are available in the guide.
The AMPS server runs on x64 Linux. The and contain information on how to run an AMPS server on a development system that does not run Linux.
The C/C++ client package includes both the AMPS C++ client and a basic C client that uses only C language features.
This version of the AMPS C++ client supports the following operating systems and features:
Incredible performance
X
X
X
Publish and subscribe
X
X
X
State of the World (SOW) queries
X
X
X
Topic and content filtering
X
X
X
Atomic SOW query and subscribe
X
X
X
Transaction log replay
X
X
X
Historical SOW query
X
X
X
Beautiful documentation
X
X
X
HA: automatic failover
X
X
X
HA: durable publish and subscribe
X
X
X
This version of the AMPS C++ client has been tested with the following compilers and versions. Other compilers or versions may work, but have not been tested by 60East:
Linux: gcc 4.8 or later
Windows: Visual Studio versions under current mainstream support
OSX: clang
In this chapter, we will learn more about the structure and features of the AMPS C/C++ library, and build our first C/C++ program using AMPS.
Let’s begin by writing a simple program that connects to an AMPS server and sends a single message to a topic:
In the example above, we show the entire program.
Future examples will isolate one or more specific portions of the code. The next section describes how to build and run the application and explains the code in further detail.
To build the program that you've created:
Create a new .cpp
file and use your c compiler to build it, making sure the amps-c++-client/include
directory is in your compiler’s include
path
Link to the libamps.a
or amps.lib
static libraries.
Additionally, link to any operating system libraries required by AMPS; a full list may be found by examining the Makefile and project files in the samples
directory.
If the message is published successfully, there is no output to the console. We will demonstrate how to create a subscriber to receive messages in Subscriptions section.
Let us revisit the code we listed earlier:
When a client logs on to AMPS, the client sends AMPS a username and password.
The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype
to include the user name JohnDoe
in the request.
For a given user name, the password is provided by an Authenticator
. The AMPS client distribution includes a DefaultAuthenticator
that simply returns the password, if any, provided in the URI. A logon()
command that does not specify an Authenticator
will use an instance of DefaultAuthenticator
.
If your authentication system requires a different authentication token, you can implement an Authenticator
that provides the appropriate token.
When a client logs on to AMPS, the client sends AMPS a username and password. The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype
to include the user name JohnDoe
in the request.
For a given user name, the password is provided by an Authenticator
. The AMPS client distribution includes a DefaultAuthenticator
that simply returns the password, if any, provided in the URI. A logon()
command that does not specify an Authenticator
will use an instance of DefaultAuthenticator
.
If your authentication system requires a different authentication token, you can implement an Authenticator
that provides the appropriate token.
When using the DefaultAuthenticator
, the AMPS clients support the standard format for including a username and password in a URI, as shown below:
When provided in this form, the default authenticator provides the username and password specified in the URI. If you have implemented another authenticator, that authenticator controls how passwords are provided to the AMPS server.
AMPS allows you to update parameters, such as the content filter, on a subscription. When you replace a filter on the subscription, AMPS immediately begins sending only messages that match the updated filter. Notice that if the subscription was entered with a command that includes a SOW query, using the replace
option can re-issue the SOW query (as described in the AMPS User Guide).
To update the filter on a subscription, you create a subscribe
command. You set the SubscriptionId
provided on the Command
to the identifier of the existing subscription and include the replace
option on the Command
.
When you send the Command
, AMPS atomically replaces the filter and sends messages that match the updated filter from that point forward.
Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent only to those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS C/C++ client.
Subscribing to an AMPS topic takes place by calling Client.subscribe()
. Here is a short example showing the simplest way to subscribe to a topic (error handling and connection details are omitted for brevity):
AMPS creates a background thread that receives messages and copies them into a MessageStream
that you iterate over. This means that the client application as a whole can continue to receive messages while you are doing processing work.
The simple method described above is provided for convenience. The AMPS C++ client provides convenience methods for the most common form of the AMPS commands. The client also provides an interface that allows you to have precise control over the command. Using that interface, the example above becomes:
The Command
interface allows you to precisely customize the commands you send to AMPS. For flexibility and ease of maintenance, 60East recommends using the Command
interface (rather than a named method) for any command that will receive messages from AMPS. For publishing messages, there can be a slight performance advantage to using the named commands where possible.
The first time a command causes an instance of the Client
or HAClient
to connect to AMPS (typically, the logon()
command), the client creates a thread that runs in the background. This background thread is responsible for processing incoming messages from AMPS, which includes both messages that contain data and acknowledgments from the server.
When you call a command on the AMPS client, the command typically waits for an acknowledgment from the server and then returns. (The exception to this is publish
. For performance, the publish
command does not wait for an acknowledgment from the server before returning.)
In the simple case, using synchronous message processing, the client provides an internal handler function that populates the MessageStream
. The client receive thread calls the internal handler function, which makes a deep copy of the incoming message and adds it to the MessageStream
. The MessageStream
is used on the calling thread, so operations on the MessageStream
do not block the client receive thread.
When using asynchronous message processing, AMPS calls the handler function from the client receive thread. Message handlers provided for asynchronous message processing must be aware of the following considerations:
The client creates one client receive thread at a time, and the lifetime of the thread lasts for the lifetime of the connection to the AMPS server. A message handler that is only provided to a single client will only be called from a single thread at a time. If your message handler will be used by multiple clients, then multiple threads will call your message handler. In this case, you should take care to protect any state that will be shared between threads. Notice that if the client connection fails (or is closed), and the client reconnects, the client will create a different thread for the new connection.
For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to update an external database, a message handler that adds the relevant data to an update queue, that is processed by a different thread, will typically perform better than a message handler that does this update during the message handling.
While your message handler is running, the thread that calls your message handler is no longer receiving messages. This makes it easier to write a message handler because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed and your application will deadlock waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate thread or use a different client object to submit the commands.
The AMPS client resets and reuses the Message
provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information (either by making a copy of the entire message or copying the required fields) rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it. Likewise, a message handler should not modify the Message
-- this will result in modifying the message provided to other handlers (including handlers internal to the AMPS client).
The AMPS C++ client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the function call. The client returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there is usually a performance benefit to enqueuing work directly from the background thread. See Understanding Threading for a discussion of threading considerations, including considerations for message handlers.
The advantage of using asynchronous message processing is that it is extremely efficient -- your processing code runs directly on the thread that the AMPS client is using to read from the socket and has direct access to the buffer that the AMPS client uses. Further, when an HAClient
is used (discussed later in this guide), the default disconnect handler for that client will automatically resume subscriptions that use asynchronous message processing. Last, but not least, since message processing runs directly on the receive thread, using asynchronous message processing will provide pushback on the socket in the event that messages are arriving faster than the application can process them (for example, in response to a sow
query).
In return for these advantages, your processing code must be careful not to block the processing thread for an excessive amount of time, and must make a deep copy of any data that will be used outside of the processing code.
Here is a short example (error handling and connection details are omitted for brevity):
The AMPS client resets and reuses the message provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.
With newer compilers, you can use additional constructs to specify a callback function. Recent improvements in C++ have added lambda functions -- unnamed functions declared in-line that can refer to names in the lexical scope of their creator. If available on your system, both Standard C++ Library function objects and lambda functions may be used as callbacks.
Check functional.cpp
in the samples directory for numerous examples.
One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below.
You can then provide an instance of the handler directly wherever a message handler is required, as shown below:
Before using the client, you will need to download and install it on your development computer. The client is packaged into a single file, amps-c++-client-<version>.tar.gz
, where <version> is replaced by the version of the client, such as amps-c++-client-5.3.0.zip
. In the following examples, the version number is omitted from the filename.
Once expanded, the amps-c++-client
directory will be created, containing sources, samples and makefiles for the C++ client. You’re welcome to locate this directory anywhere that seems convenient; but for the remainder of this book, we’ll simply refer to this directory as the amps-c++-client
directory.
The client is organized into a number of directories that you’ll be using through this book. Understanding this organization now will save you time in the future. The top level directories are:
Sources and makefile for the AMPS C++ client library.
Location of include
files for C and C++ programs. When building your own program, you’ll add the include
directory to your include
path.
Getting started with a new C/C++ library can be challenging. For your reference, we provide a number of small samples, along with a makefile.
After unpacking the amps-c++-client
directory, you must build the client library for your platform.
To build on Linux, change to the amps-c++-client
directory and, from a command prompt, type:
to make a static library, or
to make a shared object.
On Windows, from a Visual Studio Command Prompt, change to the amps-c++-client
directory and type:
Upon successful completion, the AMPS libraries and samples are built in the lib
and samples
directories, respectively.
To build on OS X, change to the amps-c++-client
directory and, from a command prompt, type:
to make a static library, or
to make a shared object.
Before writing programs in AMPS, make sure connectivity to your AMPS development instance is working from your AMPS development environment.
Launch a terminal window and change the directory to the AMPS directory in your AMPS installation and use spark
to test connectivity to your server.
For example:
If spark
returns an error, verify that your AMPS server is running and that there is no firewall blocking access (including local firewalls between a host instance and virtual machine or container).
Without connectivity to AMPS, you will be unable to make best use of this guide.
So far, we have seen that subscribing to a topic involves working with objects of AMPS::Message
type. A Message
represents a single message to or from an AMPS server. Messages are received or sent for every client/server operation in AMPS.
There are two parts of each message in AMPS: a set of headers that provide metadata for the message, and the data that the message contains. Every AMPS message has one or more header fields defined. The precise headers present depend on the type and context of the message. There are many possible fields in any given message, but only a few are used for any given message. For each header field, the Message
class contains a distinct property that allows for retrieval and setting of that field. For example, the Message.get_command_id()
function corresponds to the commandId
header field, the Message.get_batch_size()
function corresponds to the BatchSize
header field, and so on. For more information on these header fields, consult the AMPS User Guide and AMPS Command Reference.
The Message
object contains several different accessors for header fields.
For retrieving the value of fields on a message:
The getXxx()
functions return a Field
. The Field
contains pointers to the underlying buffer in the Message
. Data is not copied until either deepCopy()
is called or the Field
is converted to another format (such as constructing a std::string
from the Field
). The value of the Field
is only valid for the lifetime of the message unless it is copied using deepCopy()
.
The getRawXxx()
functions take a pointer and a length. The pointer is assigned to the first byte of the value in the underlying buffer in the Message
. The length is set to the length of the value.
To assign values to a field in a message, the Message
object provides several different options. The differences between these options are a matter of managing the memory for the value.
The setXxx()
functions copy the value provided into the specified header.
The assignXxx()
functions set the value of the specified header, avoiding a copy if possible. When this function is used, the Message
may refer directly to the data passed in. That data should not change or be deallocated while the Message
is in use. (Notice, though, that a copy of the message produced using deepCopy
will copy the data and can be used safely even if the original data is changed or deallocated.)
60East does not recommend attempting to parse header fields from the raw data of the message, nor does 60East recommend attempting to manipulate the fields of the message without using the accessor methods.
In AMPS, fields sometimes need to be set to a unique identifier value. For example, when creating a new subscription, or sending a manually constructed message, you’ll need to assign a new unique identifier to multiple fields such as CommandId
and SubscriptionId
. For this purpose, Message provides newXxx()
methods for each field that generates a new unique identifier and sets the field to that new value.
Access to the data section of a message is provided via the getData()
method. The data
contains the unparsed data in the message, returned as a series of bytes (a string
or const char *
). Your application code parses and works with the data.
The AMPS C++ client contains a collection of helper classes for working with message types that are specific to AMPS (for example, FIX, NVFIX, and AMPS composite message types). For message types that are widely used, such as JSON or XML, you can use whichever library you typically use in your environment.
The AMPS Command Reference contains a full description of which fields are available and which fields are returned in response to specific commands.
One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server, so that your application and the network are not utilized by messages that are uninteresting for your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client. The Filtering Subscriptions by Content under the AMPS User Guide provides full details on content filtering. The AMPS Expressions section of the AMPS User Guide describes the basic syntax of AMPS expressions, including content filters, and the AMPS Functions section describes functions that are available for use in filters.
To apply a content filter to a subscription, simply pass it into the client.subscribe()
call:
In this case, the application will receive messages where the sender
field equals mom
.
In this example, we have passed in a content filter /sender = 'mom'
. This will result in the server only sending us messages, from the messages
topic, that have the sender field equal to mom
in the message.
For example, the AMPS server will send the following message, where /sender
is mom
:
The AMPS server will not send a message with a different /sender
value:
When specifying a URI for connection to an AMPS server, you may specify a number of transport-specific options in the parameters section of the URI connection parameters. Here is an example:
In this example, we have specified the AMPS instance on localhost
, port 9007
, connecting to a transport that uses the amps
protocol and sending JSON messages. We have also set two parameters: tcp_nodelay
, a Boolean (true/false) parameter, and tcp_sndbuf
, an integer parameter. Multiple parameters may be combined to finely tune settings available on the transport. Normally, you'll want to stick with the defaults on your platform, but there may be some cases where experimentation and fine-tuning will yield higher or more efficient performance.
The AMPS client supports the value of tcp
in the scheme component connection string for TCP/IP connections, and the value of tcps
as the scheme for SSL encrypted connections.
For connections that use Unix domain sockets, the client supports the value of unix
in the scheme, and requires an additional option, as described in the Unix Transports Parameters section below.
Starting with version 5.3.3.0, the AMPS client supports creating connections over both IPv4 and IPv6 protocols if supported by the underlying Operating System.
By default, the AMPS client will prefer to resolve host names to IPv4 addresses, but this behavior can be adjusted by supplying the ip_protocol_prefer
transport option, described in the table below.
The following transport options are available for TCP connections:
bind
(IP address) Sets the interface to bind the outgoing socket to.
Starting with version 5.3.3.0, both IPv4 and IPv6 addresses are fully supported for use with this parameter.
tcp_rcvbuf
(integer) Sets the socket receive buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/rmem_default
.)
tcp_sndbuf
(integer) Sets the socket send buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/wmem_default
.)
tcp_nodelay
(boolean) Enables or disables the TCP_NODELAY
setting on the socket. By default TCP_NODELAY
is disabled.
tcp_linger
(integer) Enables and sets the SO_LINGER
value for the socket By default, SO_LINGER
is enabled with a value of 10
, which specifies that the socket will linger for 10 seconds.
tcp_keepalive
(boolean) Enables or disables the SO_KEEPALIVE
value for the socket. The default value for this option is true.
ip_protocol_prefer
(string) Influence the IP protocol to prefer during DNS resolution of the host. If a DNS entry of the preferred protocol can not be found, the other non-preferred protocol will then be tried.
If this parameter is not set, the default will be to prefer IPv4.
If an explicit IPv4 address or IPv6 IP address is provided as the host, the format of the IP address is used to determine the IP protocol used and this setting has no effect.
Supported Values:
ipv4
: Prefer an IPv4 address when resolving the host
ipv6
: Prefer an IPv6 address when resolving the host
This parameter is available starting with version 5.3.3.0.
The unix
transport type communicates over Unix domain sockets. This transport requires the following additional option:
path
The path to the Unix domain socket to connect to.
Unix domain sockets always connect to the local system. When the scheme specified is unix
, the host address is ignored in the connection string. For example, the connection string:
and the connection string:
are equivalent.
The other components of the connection string, including the protocol, message type, username, and authentication token are processed just as they would be for TCP/IP sockets.
The connection string can also be used to pass logon parameters to AMPS. AMPS supports the following additional logon option:
pretty
Provide formatted representations of binary messages rather than the original message contents.
The AMPS server continues a subscription until the client explicitly ends the subscription (that is, unsubscribes) or until the connection to the client is closed.
With the synchronous interface, AMPS automatically unsubscribes to the topic when the destructor for the MessageStream
runs. You can also explicitly call the close()
method on the MessageStream
object to remove the subscription.
In the asynchronous interface, when a subscription is successfully made, messages will begin flowing to the message handler, and the subscribe()
or executeAsync()
call will return a string for the subscription id that serves as the identifier for this subscription. A Client
can have any number of active subscriptions, and this subscription id is how AMPS designates messages intended for this particular subscription. To unsubscribe, we simply call unsubscribe
with the subscription identifier:
In this example, as in the previous section, we use the client.executeAsync()
method to create a subscription to the messages
topic. When our application is done listening to this topic, it unsubscribes by passing in the subId
returned by subscribe()
. After the subscription is removed, no more messages will flow into our myHandlerFunction()
.
When an application calls unsubscribe()
, the client sends an explicit unsubscribe
command to AMPS. The AMPS server removes that subscription from the set of subscriptions for the client, and stops sending messages for that subscription. On the client side, the client unregisters the subscription so that the MessageStream
or MessageHandler
for that subscription will no longer receive messages for that subscription.
Notice that calling unsubscribe
does not destroy messages that the server has already sent to the client. If there are messages on the way to the client for this subscription, the AMPS client must consume those messages. If a LastChanceMessageHandler
is registered, the handler will receive the messages. Otherwise, they will be discarded since no message handler matches the subscription ID on the message.
Generally speaking, when an error occurs that prohibits an operation from succeeding, AMPS will throw an exception. AMPS exceptions universally derive from AMPS::AMPSException, so by catching AMPSException, you will be sure to catch anything AMPS throws. For example:
In this example, if an error occurs, the program writes the error to stderr and the publish() command fails. However, client is still usable for continued publishing and subscribing. When the error occurs, the exception is written to the console, converting the exception to a string via the toString() method.
AMPS exception types vary based on the nature of the error that occurs. In your program, if you would like to handle certain kinds of errors differently than others, you can catch the appropriate subclass of AMPSException to detect those specific errors and do something different.
The AMPS clients use connection strings to determine the server, port, transport, and protocol to use to connect to AMPS. When the connection point in AMPS accepts multiple message types, the connection string also specifies the precise message type to use for this connection.
Connection strings have a number of elements:
As shown in the figure above, connection strings have the following elements:
Transport - Defines the network used to send and receive messages from AMPS. In this case, the transport is tcp
. For connections to transports that use the Secure Sockets Layer (SSL), use tcps
. For connections to AMPS over a Unix domain socket, use unix
.
Host Address - Defines the destination on the network where the AMPS instance receives messages. The format of the address is dependent on the transport. For tcp
and tcps
, the address consists of a host name and port number. In this case, the host address is localhost:9007
. For unix
domain sockets, a value for hostname and port must be provided to form a valid URI, but the content of the hostname and port are ignored, and the file name provided in the path parameter is used instead (by convention, many connection strings use localhost:0
to indicate that this is a local connection that does not use TCP/IP).
Protocol - Sets the format in which AMPS receives commands from the client. Most code uses the default amps
protocol, which sends header information in JSON format. AMPS supports the ability to develop custom protocols as extension modules, and AMPS also supports legacy protocols for backward compatibility.
MessageType - Specifies the message type that this connection uses. This component of the connection string is required if the protocol accepts multiple message types and the transport is configured to accept multiple message types. If the protocol does not accept multiple message types, this component of the connection string is optional, and defaults to the message type specified in the transport.
Legacy protocols such as fix
, nvfix
and xml
only accept a single message type, and therefore do not require or accept a message type in the connection string.
As an example, a connection string such as:
would work for programs connecting from the local host to a Transport
configured as follows:
See the AMPS Configuration Guide for more information on configuring transports.
As mentioned , one way for an application to receive messages is to have the AMPS C++ client return a MessageStream
object that can be used to iterate over the results of the command.
The MessageStream
object makes copies of the incoming messages. When there is no message available, the MessageStream
will block.
A MessageStream
will only remain active while the client that produced it is connected. If the client disconnects, the MessageStream
will continue to provide any messages that have not yet been consumed, then throw an exception.
The advantages of using a MessageStream
that it provides a simple processing model, that receiving messages from a MessageStream
does not block the client receive thread (see ) and that a copy of the message is automatically made for the application.
In return for these advantages, a MessageStream
has higher overhead than , it will not be resumed if the client disconnects, and, by default, it will use as much memory as necessary to hold messages coming from the AMPS server.
The named convenience methods and the Command
class provide a timeout
setting that specifies how long the command should wait to receive a processed
acknowledgment from AMPS. This can be helpful in cases where it is important for the caller to limit the amount of time to block waiting for AMPS to acknowledge the command. If the AMPS client does not receive the processed acknowledgment within the specified time, the client sends an unsubscribe
command to the server to cancel the command and throws an exception.
Acknowledgments from AMPS are processed by the client receive thread on the same socket as data from AMPS. This means that any other data previously returned (such as the results of a large query) must be consumed before the acknowledgment can be processed. An application that submits a set of SOW queries in rapid succession should set a timeout that takes into account the amount of time required to process the results of the previous query.
Each method in AMPS documents the kinds of exceptions that it can throw. The following table details each of the exception types thrown by AMPS. They are all declared in the AMPS
namespace, and all of these types publicly inherit from class std::runtime_error
.
AlreadyConnectedException
Connecting
Thrown when connect()
is called on a Client
that is already connected.
AMPSException
Anytime
Base class for all AMPS exceptions.
AuthenticationException
Connecting
Indicates an authentication failure occurred on the server.
BadFilterException
Subscribe or Query
This typically indicates a syntax error in a filter expression.
BadRegexTopicException
Subscribe or Query
Indicates a malformed regular expression was found in the topic name.
BadSowKeyException
Subscribing, Publishing, Deleting
Raised when a command uses an invalid SOW key.
CommandException
Anytime
Base class for all exceptions relating to commands sent to AMPS.
ConnectionException
Anytime
Base class for all exceptions relating to the state of the AMPS connection.
ConnectionRefusedException
Connecting
The connection was actively refused by the server. Validate that the server is running, that network connectivity is available, and the settings on the client match those on the server.
DisconnectedException
Anytime
No connection is available when AMPS needed to send data to the server or the user's disconnect handler threw an exception.
DuplicateLogonException
Connecting
A client tried to logon after already logging on.
InvalidBookmarkException
Subscribe or Query
Command specifies an invalid bookmark.
InvalidOptionsException
Subscribe or Query
Command specifies invalid options.
InvalidOrderByException
Query
Command specifies an invalid orderby
clause.
InvalidSubIdException
Subscribe or Query
Command specifies an invalid subid
.
InvalidTopicException
Subscribe or Query
The topic is not configured for the requested operation. For example, a sow
command was issued for a topic that is not in the SOW or a bookmark subscribe was issued for a topic that is not recorded in the transaction log.
InvalidURIException
Connecting
The URI string provided to connect()
was formatted improperly.
LogonRequiredException
Anytime
A client attempted to execute a command before calling logon.
MessageStreamFullException
Internal client use
Indicates that a MessageStream
has reached its max depth and can't enqueue another message.
MissingFieldsException
Subscribe or Query
Thrown when a command is missing required fields.
NameInUseException
Connecting
The client name (specified when instantiating Client
) is already in use on the server.
NotEntitledException
Connecting, Subscribe or Query
An authenticated client attempted to access a resource to which the user has not been granted proper entitlements.
PublishException
Publishing
A client attempted to publish an invalid message or some other error occurs with the message. Requested operation.
PublishStoreGapException
Connecting
The client attempted to logon to a server that appears to be missing messages from this client that are no longer in the publish store.
ReconnectMaximumExceededException
Connecting
The maximum allowed time for attempting to connect to the server has been exceeded.
RetryOperationException
Anytime
An error occurred that caused processing of the last command to be aborted. Try issuing the command again.
StoreException
Publishing, Subscribing
Thrown when a publish store or bookmark store experiences some internal error (such as lack of resources or permissions), or is in an improper state for the requested operation.
SubidInUseException
Subscribing
Indicates a subscription has been placed with the same subscription ID.
SubscriptionAlreadyExistsException
Subscribing
A subscription has been requested using the same CommandId
as another subscription. Create a unique CommandId
for every subscription.
TimedOutException
Anytime
A timeout occurred waiting for a response to a command.
TransportTypeException
Connecting
Thrown when a transport type was selected in the URI that is unknown to AMPS.
UnknownException
Anytime
Thrown when an internal error occurs. Contact AMPS support immediately.
UsageException
Changing the properties of an object.
Thrown when the object is not in a valid state for setting the properties. For example, some properties of a Client
(such as the BookmarkStore
used) cannot be changed while that client is connected to AMPS.
Regular Expression (Regex) subscriptions allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.
To use a regular expression, simply supply the regular expression in place of the topic name in the subscribe
command. For example:
In this example, messages on topics orders-north-america
, orders-europe
, and new-orders
would match the regular expression. Messages published to any of those topics will be sent to our message_handler
function. As in the example, you can use the getTopic()
function to determine the actual topic of the message sent to the function.
In every distributed system, the robustness of your application depends on its ability to recover gracefully from unexpected events. The AMPS client provides the building blocks necessary to ensure your application can recover from the kinds of errors and special events that may occur when using AMPS.
Every distributed system will experience occasional disconnections between one or more nodes. The reliability of the overall system depends on an application’s ability to efficiently detect and recover from these disconnections. Using the AMPS C/C++ client’s disconnect handling, you can build powerful applications that are resilient in the face of connection failures and spurious disconnects.
The AMPS client includes a heartbeat feature to help applications detect disconnection from the server within a predictable amount of time. Without using a heartbeat, an application must rely on the operating system to notify the application when a disconnect occurs. For applications that are simply receiving messages, it can be impossible to tell whether a socket is disconnected or whether there are simply no incoming messages for the client.
When you set a heartbeat, the AMPS client sends a heartbeat message to the AMPS server at a regular interval, and waits a specified amount of time for the response. If the operating system reports an error on send, or if there is no activity received from the server within the specified amount of time, the AMPS client considers the server to be disconnected. Likewise, the server will ensure that traffic is sent to the client at the specified interval, using heartbeat messages when no other traffic is being sent to the client. If, after sending a heartbeat message, no traffic from the client arrives within a period twice the specified interval, the server will consider the client to be disconnected or nonresponsive.
The AMPS client processes heartbeat messages on the client receive thread, which is the thread used for asynchronous message processing. If your application uses asynchronous message processing and occupies the thread for longer than the heartbeat interval, the client may fail to respond to heartbeat messages in a timely manner and may be disconnected by the server.
The HAClient
class, included with the AMPS C++ client, contains a disconnect handler and other features for building highly-available applications. The HAClient
includes features for managing a list of failover servers, resuming subscriptions, republishing in-flight messages, and other functionality that is commonly needed for high availability. 60East recommends using the HAClient
for automatic reconnection wherever possible, as the HAClient disconnect handler has been carefully crafted to handle a wide variety of edge cases and potential failures.
If an application needs to reconnect or fail over, use an HAClient
, and the AMPS client library will automatically handle failover and reconnection. You control which servers the client fails over to using an implementation of the ServerChooser
interface, and control the timing of the failover using an implementation of the ReconnectDelayStrategy
interface.
For most applications, the combination of the HAClient
disconnect handler and a ConnectionStateListener
gives you the ability to monitor disconnections and add custom behavior at the appropriate point in the reconnection process.
If you need to add custom behavior to the failover (such as logging, resetting an internal cache, refreshing credentials and so on), the ConnectionStateListener
class allows your application to be notified and take action when disconnection is detected and at each stage of the reconnection process.
To extend the behavior of the AMPS client during reconnection, implement a ConnectionStateListener
.
The AMPS C++ client handles most incoming messages and takes appropriate action. Some messages are unexpected or occur only in very rare circumstances. The AMPS C++ client provides a way for clients to process these messages. Rather than providing handlers for all of these unusual events, AMPS provides a single handler function for messages that can't be handled during normal processing.
Your application registers this handler by setting the UnhandledMessageHandler
for the client. This handler is called when the client receives a message that can't be processed by any other handler. This is a rare event, and typically indicates an unexpected condition.
For example, if a client publishes a message that AMPS cannot parse, AMPS returns a failure acknowledgment. This is an unexpected event, so AMPS does not include an explicit handler for this event, and failure acknowledgments are received in the method registered as the UnhandledMessageHandler
.
Your application is responsible for taking any corrective action needed. For example, if a message publication fails, your application can decide to republish the message, publish a compensating message, log the error, stop publication altogether, or any other action that is appropriate.
When using asynchronous message processing, exceptions thrown from the message handler are silently absorbed by the AMPS C++ client by default. The AMPS C++ client allows you to register an exception listener to detect and respond to these exceptions. When an exception listener is registered, AMPS will call the exception listener with the exception. See the section on for details.
To begin, we will look at a simple example of issuing a SOW query.
In the listing above, the program invokes ampsClient.sow()
to initiate a SOW query on the orders
topic, for all entries that have a symbol of ’ROL’. The SOW query is requested with a batch size of 100, meaning that AMPS will attempt to send 100 messages at a time as results are returned.
As the query executes, each matching entry in the topic at the time of the query is returned. Messages containing the data of matching entries have a Command
of value sow
, so as those arrive, we write them to the console. AMPS sends a "group_begin" message before the first SOW result, and a "group_end" message after the last SOW result.
When the SOW query is complete, the MessageStream
completes iteration and the loop completes. There's no need to explicitly break out of the loop.
As with subscribe, the sow function also provides an asynchronous version. In this case, you provide a message handler that will be called on a background thread:
In the listing above, the ExecuteSOWQuery()
function invokes client.sow()
to initiate a SOW query on the orders topic, for all entries that have a symbol of ROL
. The SOW query is requested with a batch size of 100, meaning that AMPS will attempt to send 100 messages at a time as results are returned.
As the query executes, the HandleSOW()
method is invoked for each matching entry in the topic. Messages containing the data of matching entries have a Command
of sow
, so as those arrive, we write them to the console.
The publish
methods in the C++ client deliver the message to be published to AMPS and then return immediately, without waiting for AMPS to return an acknowledgment. Likewise, the sowDelete
methods request deletion of SOW messages, and return before AMPS processes the message and performs the deletion. This approach provides high performance for operations that are unlikely to fail in production. However, this means that the methods return before AMPS has processed the command, without the ability to return an error in the event that the command fails.
The AMPS C++ client provides a FailedWriteHandler
that is called when the client receives an acknowledgment that indicates a failure to persist data within AMPS. To use this functionality, you implement the FailedWriteHandler
interface, construct an instance of your new class, and register that instance with the setFailedWriteHandler()
function on the client. When an acknowledgment returns that indicates a failed write, AMPS calls the registered handler method with information from the acknowledgment message, supplemented with information from the client publish store if one is available. Your client can log this information, present an error to the user, or take whatever action is appropriate for the failure.
If your application needs to know whether publishes succeeded and are durably persisted, the following approach is recommended:
Set a PublishStore
on the client. This will ensure that messages are retransmitted if the client becomes disconnected before the message is acknowledged and request persisted
acknowledgments for messages.
Install a FailedWriteHandler
. In the event that AMPS reports an error for a given message, that event will be reported to the FailedWriteHandler
.
Call publishFlush()
and verify that all messages are persisted before the application exits.
When no FailedWriteHandler
is registered, acknowledgments that indicate errors in persisting data are treated as unexpected messages and routed to the LastChanceMessageHandler
. In this case, AMPS provides only the acknowledgment message and does not provide the additional information from the client publish store.
In some cases, an application does not want the AMPS client to reconnect, but instead wants to take a different action if disconnection occurs. For example, a stateless publisher that sends ephemeral data (such as telemetry or prices) may want to exit with an error if the connection is lost rather than risk falling behind and providing outdated messages. Often, in this case, a monitoring process will start another publisher if a publisher fails, and it is better for a message to be lost than to arrive late.
To cover cases where the application has unusual needs, the AMPS client library allows an application to provide custom disconnect handling.
Your application gets to specify exactly what happens when a disconnect occurs by supplying a function to client.setDisconnectHandler()
, which is invoked whenever a disconnect occurs. This may be helpful for situations where a particular connection needs to do something completely different than reconnecting or failing over to another AMPS server.
Setting the disconnect handler completely replaces the disconnection and failover behavior for an HAClient
and provides the only disconnection and failover behavior for a Client
.
The handler runs on the thread that detects the disconnect. This may be the client receive thread (for example, if the disconnect is detected due to heartbeating) or an application thread (for example, if the disconnect is detected when sending a command to AMPS).
The example below` shows a disconnect handler that will exit the application when a disconnect is detected:
The AMPS client interface provides the ability to set one or more connection state listeners. A connection state listener is a callback that is invoked when the AMPS client detects a change to the connection state.
A connection state listener may be called from the client receive thread. An application should not submit commands to AMPS from a connection state listener, or the application risks creating a deadlock for commands that wait for acknowledgement from the server.
The AMPS client provides the following state values for a connection state listener:
Connected
The client has established a connection to AMPS. If you are using a Client
, this is delivered when connect()
is successful.
If you are using an HAClient
, this state indicates that the connect
part of the connect and logon process has completed. An HAClient
using the default disconnect handler will attempt to log on immediately after delivering this state.
Most applications that use Client
will attempt to log on immediately after the call to connect()
returns.
An application should not submit commands to AMPS from the connection state listener while the client is in this state unless the application knows that the state has been delivered from a Client
and that the Client
does not call logon()
.
LoggedOn
The client has successfully logged on to AMPS. If you are using a Client
, this is delivered when logon()
is successful.
If you are using an HAClient
, this state indicates that the logon
part of the connect and logon process has completed.
This state is delivered after the client is logged on, but before recovery of client state is complete. Recovery will continue after delivering this state: the application should not submit commands to AMPS from the connection state listener while the client is in this state if further recovery will take place.
HeartbeatInitiated
The client has successfully started heartbeat monitoring with AMPS. This state is delivered if the application has enabled heartbeating on the client.
This state is delivered before recovery of the client state is complete. Recovery may continue after this state is delivered. The application should not submit commands to AMPS from the connection state listener until the client is completely recovered.
PublishReplayed
Delivered when a client has completed replay of the publish store when recovering after connecting to AMPS.
This state is delivered when the client has a PublishStore
configured.
If the client has a subscription manager set, (which is the default for an HAClient
), the application should not submit commands from the connection state listener until the Resubscribed
state is received.
Resubscribed
Delivered when a client has re-entered subscriptions when recovering after connecting to AMPS.
This state is delivered when the client has a subscription manager set (which is the default for an HAClient
). This is the final recovery step. An application can submit commands to AMPS from the connection state listener after receiving this state.
Disconnected
The client is not connected. For an HAClient
, this means that the client will attempt to reconnect to AMPS. For a Client
, this means that the client will invoke the disconnect handler, if one is specified.
Shutdown
The client is shut down. For an HAClient
, this means that the client will no longer attempt to reconnect to AMPS. This state is delivered when close()
is called on the client or when a server chooser tells the HAClient
to stop reconnecting to AMPS.
The enumeration provided for the connection state listener also includes a value of UNKNOWN
, for use as a default or to represent additional states in a custom Client
implementation. The 60East implementations of the client do not deliver this state.
The following table shows examples of the set of states that will be delivered during connection, in order, depending on what features of the client are set. Notice that, for an instance of the Client
class, this table assumes that the application calls both connect()
and logon()
. For an HAClient
, this table assumes that the HAClient
is using the default DisconnectHandler
for the HAClient
.
subscription manager
publish store
Connected
LoggedOn
PublishReplayed
Resubscribed
subscription manager
publish store
heartbeat set
Connected
LoggedOn
HeartbeatInitiated
PublishReplayed
Resubscribed
subscription manager
Connected
LoggedOn
Resubscribed
subscription manager
heartbeat set
Connected
LoggedOn
HeartbeatInitiated
Resubscribed
(default Client
configuration)
Connected
LoggedOn
In the AMPS C++ client, exceptions can occur that are not thrown to the main thread of the application. For example, when an exception is thrown from a message handler running on a background thread, AMPS does not automatically propagate that exception to the main thread.
Instead, AMPS provides the exception to an unhandled exception handler if one is specified on the client. The unhandled exception handler receives a reference to the exception object, and takes whatever action is necessary. Typically, this involves logging the exception or setting an error flag that the main thread can act on. Notice that AMPS C++ client only catches exceptions that derive from std::exception
. If your message handler contains code that can throw exceptions that do not derive from std::exception
, 60East recommends catching these exceptions and throwing an equivalent exception that derives from std::exception
.
If your application will attempt to recover from an exception thrown on the background processing thread, your application should set a flag and attempt recovery on a different thread than the thread that called the exception listener.
At the point that the AMPS client calls the exception listener, it has handled the exception. Your exception listener must not rethrow the exception (or wrap the exception and throw a different exception type).
For example, the unhandled exception handler below takes a std::ostream
, and logs information from each exception to that std::ostream
.
The C++ client includes the following samples that demonstrate how to query a topic in the SOW and enter a subscription to receive updates to the topic.
amps_publish_sow.cpp
Publishing messages to a SOW topic.
amps_sow_and_subscribe.cpp
Querying messages from a SOW topic and entering a subscription to receive updates.
The C++ client includes the following samples that demonstrate how to query a topic in the SOW.
amps_publish_sow.cpp
Publishing messages to a SOW topic.
amps_query_sow.cpp
Querying messages from a SOW topic.
The AMPS State of the World (SOW) allows you to automatically keep and query the latest information about a topic on the AMPS server, without building a separate database. Using SOW lets you build impressively high-performance applications that provide rich experiences to users. The AMPS C++ client lets you query SOW topics and subscribe to changes with ease. AMPS SOW topics can be used as a current value cache to provide the most recently published value for each record, as a key/value object store, as the source for an aggregate or conflated topic, or all of the above uses. For more information on State of the World topics, see the AMPS User Guide
In many cases, applications that use SOW topics only need the current value of a message at the time the message is processed, rather than processing each change that lead to the current value. On the server side, AMPS provides conflated topics to meet this need. See Conflated Topics under the AMPS User Guide for more detail. It require no special handling on the client side.
In some cases, though, it's important to conflate messages on the client side. This can be particularly useful for applications that do expensive processing on each message, applications that are more efficient when processing batches of messages, or for situations where you cannot provide an appropriate conflation interval for the server to use.
A MessageStream
has the ability to conflate messages received for a subscription to a SOW topic, view, or conflated topic. When conflation is enabled, for each message received, the client checks to see whether it has already received an unprocessed message with the same SowKey
. If so, the client replaces the unprocessed message with the new message. The application never receives the message that has been replaced.
To enable client-side conflation, you call conflate()
on the MessageStream
, and then use the MessageStream
as usual:
Notice that if the MessageStream
is used for a subscription that does not include SowKeys
(such as a subscription to a topic that does not have a SOW), no conflation will occur.
When using client-side conflation with delta subscriptions, bear in mind that client-side conflation replaces the whole message, and does not attempt to merge deltas. This means that updates can be lost when messages are replaced. For some applications (for example, a ticker application that simply sends delta updates that replace the current price), this causes no problems. For other applications (for example, when several processors may be updating different fields of a message simultaneously), using conflation with deltas could result in lost data, and server-side conflation is a safer alternative.
AMPS allows applications to manage the contents of the SOW by explicitly deleting messages that are no longer relevant. For example, if a particular delivery van is retired from service, the application can remove the record for the van by deleting the record for the van.
The client provides the following methods for deleting records from the SOW.
sowDelete
- Accepts a filter, and deletes all messages that match the filter.
sowDeleteByKeys
- Accepts a set of SOW keys as a comma-delimited string and deletes messages for those keys, regardless of the contents of the messages. A SOW key is provided in the header of a SOW message, and is the internal identifier AMPS uses for that SOW message.
sowDeleteByData
- Accepts a message, and deletes the record that would be updated by that message.
The most efficient way to remove messages from the SOW is to use sowDeleteByKeys
or sowDeleteByData
, since those options allow AMPS to exactly target the message or messages to be removed. Many applications use sowDelete
, since this is the most flexible method for removing items from the SOW when the application does not have information on the exact messages to be removed.
Regardless of the command used, AMPS sends an OOF message to all subscribers who have received updates for the messages removed, as described in the previous section.
The simple form of the sowDelete
command returns a MessageStream
that receives the response. The response is an acknowledgment message that contains information on the delete command. For example, the following snippet simply prints informational text with the number of messages deleted:
You can also use client.execute
to send a SOW delete command. As with the other SOW methods, the client provides an asynchronous versions of the SOW delete commands that require a message handler to be invoked:
Acknowledging messages from a queue uses a form of the sow_delete
command that is only supported for queues. Acknowledgment is discussed in the Using Queues chapter in this guide.
Imagine an application that displays real-time information about the position and status of a fleet of delivery vans. When the application starts, it should display the current location of each of the vans along with their current status. As vans move around the city and post other status updates, the application should keep its display up to date. Vans upload information to the system by posting message to a van location
topic, configured with a key of van_id
on the AMPS server.
In this application, it is important to not only stay up-to-date on the latest information about each van, but to ensure all of the active vans are displayed as soon as the application starts. Combining a SOW with a subscription to the topic is exactly what is needed, and that is accomplished by the Client.sowAndSubscribe()
method. Now we will look at an example:
Now we will look at an example that uses the asynchronous form of sowAndSubscribe
:
In the listing above, the trackVanPositions
function invokes sowAndSubscribe
to begin tracking vans, and returns the subscription ID. The application can later use this to unsubscribe.
The two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens, while the other form processes messages on the calling thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the application receives and processes the same messages.
The AMPS clients include a batch size parameter that specifies how many messages the AMPS server will return to the client in a single batch when returning the results of a SOW query. The 60East clients set a batch size of 10 by default. This batch size works well for common message sizes and network configurations.
Adjusting the batch size may produce better network utilization and produce better performance overall for the application. The larger the batch size, the more messages AMPS will send to the network layer at a time. This can result in fewer packets being sent, and therefore less overhead in the network layer. The effect on performance is generally most noticeable for small messages, where setting a larger batch size will allow several messages to fit into a single packet. For larger messages, a batch size may still improve performance, but the improvement is less noticeable.
In general, 60East recommends setting a batch size that is large enough to produce few partially-filled packets. Bear in mind that AMPS holds the messages in memory while batching them, and the client must also hold the messages in memory while receiving the messages. Using batch sizes that require large amounts of memory for these operations can reduce overall application performance, even if network utilization is good.
For smaller message sizes, 60East recommends using the default batch size, and experimenting with tuning the batch size if performance improvements are necessary. For relatively large messages (especially messages with sizes over 1MB), 60East recommends explicitly setting a batch size of 1 as an initial value, and increasing the batch size only if performance testing with a larger batch size shows improved network utilization or faster overall performance.
AMPS message queues provide a high-performance way of distributing messages across a set of workers. The AMPS User Guide describes AMPS queues in detail, including the features of AMPS referred to in this chapter. This chapter does not describe message queues in detail, but instead explains how to use the AMPS C++ client with message queues.
To publish messages to a message queue, publishers simply publish to any topic that is collected by the queue. There is no difference between publishing to a queue and publishing to any other topic, and a publisher does not need to be aware that the topic will be collected into a queue.
Subscribers must be aware that they are subscribing to a queue, and acknowledge messages from the queue when the message is processed.
AMPS queues are designed for high-volume applications that need minimal latency and overhead. One of the features that helps performance is the subscription backlog feature, which allows applications to receive multiple messages at a time. The subscription backlog sets the maximum number of unacknowledged messages that AMPS will provide to the subscription.
When the subscription backlog is larger than 1
, AMPS delivers additional messages to a subscriber before the subscriber has acknowledged the first message received. This technique allows subscribers to process messages as fast as possible, without ever having to wait for messages to be delivered. The technique of providing a consistent flow of messages to the application is called smart pipelining.
The AMPS server determines the backlog for each subscription. An application can set the maximum backlog that it is willing to accept with the max_backlog
option. Depending on the configuration of the queue (or queues) specified in the subscription, AMPS may assign a smaller backlog to the subscription. If no max_backlog
option is specified, AMPS uses a max_backlog
of 1
for that subscription.
In general, applications that have a constant flow of messages perform better with a max_backlog
setting higher than 1
. The reason for this is that, with a backlog greater than 1
, the application can always have a message waiting when the previous message is processed. Setting the optimum max_backlog
is a matter of understanding the messaging pattern of your application and how quickly your application can process messages.
To request a max_backlog
for a subscription, you explicitly set the option on the subscribe command, as shown below:
For each message delivered on a subscription, AMPS counts the message against the subscription backlog until the message is explicitly acknowledged. In addition, when a queue specifies at-least-once
delivery, AMPS retains the message in the queue until the message expires or until the message has been explicitly acknowledged and removed from the queue. From the point of view of the AMPS server, this is implemented as a sow_delete
from the queue with the bookmarks of the messages to remove. The AMPS C++ client provides several ways to make it easier for applications to create and send the appropriate sow_delete
.
The AMPS client allows you to specify that messages should be automatically acknowledged. When this mode is on, AMPS acknowledges the message automatically in the following cases:
Asynchronous message processing interface - The message handler returns without throwing an exception.
Synchronous message processing interface - The application requests the next message from the MessageStream
.
AMPS batches acknowledgments created with this method, as described in the following section.
To enable automatic acknowledgment, use the setAutoAck()
method.
The AMPS C++ client provides a convenience method, ack()
, on delivered messages. When the application is finished with the message, the application simply calls ack()
on the message. (This, in turn, provides the topic and bookmark to the ack()
function of the client that received the message.)
For messages that originated from a queue with at-least-once
semantics, this adds the bookmark from the message to the batch of messages to acknowledge. For other messages, this method has no effect.
A subscriber can also explicitly release a message back to the queue. AMPS returns the message to the queue, and redelivers the message just as though the lease had expired. To do this, the subscriber sends a sow_delete
command with the bookmark of the message to release and the cancel
option.
When using automatic acknowledgments and the asynchronous API, AMPS will cancel a message if an exception is thrown from the message handler.
To return a message to the queue, you can build a sow_delete
acknowledgment using the Command
class, or pass an option to the ack()
method on the message.
cancel
Returns the message to the queue.
expire
Immediately expire the message from the queue.
For example, to return a message to a queue, call ack()
on the message and pass the cancel
option.
The AMPS C++ client automatically batches acknowledgments when either of the convenience methods is used. Batching acknowledgments reduces the number of round-trips to AMPS, reducing network traffic and improving overall performance. AMPS sends the batch of acknowledgments when the number of acknowledgments exceeds a specified size, or when the amount of time since the last batch was sent exceeds a specified timeout.
You can set the number of messages to batch and the maximum amount of time between batches:
The AMPS C++ client is aware of the subscription backlog for a subscription. When AMPS returns the acknowledgment for a subscription that contains queues, AMPS includes information on the subscription backlog for the subscription. If the batch size is larger than the subscription backlog, the AMPS C++ client adjusts the requested batch size to match the subscription backlog.
60East recommends tuning the batch size to improve application performance. A value of 1/3 of the smallest max_backlog
value is a good initial starting point for testing. 60East does not recommend setting the batch size larger than 1/2 of the max_backlog
value without testing the setting to ensure that the application does not run out of messages to process.
60East generally recommends that applications use an ack()
method to acknowledge messages during normal processing. This approach works properly from within a message handler, provides batching support as described elsewhere in this chapter, and is generally both easier to code and more efficient.
However, in some situations, you may need to manually acknowledge messages in the queue. This is most common when an application needs to operate on all messages with certain characteristics, rather than acknowledging individual messages. For example, an application that is doing updates to an order may want cancel an order by both publishing a cancellation and immediately expiring all other messages in the queue for that order. With manual acknowledgment, that application can use a filter to remove all previous updates for that order, then publish the cancellation.
To manually acknowledge processed messages and remove the messages from the queue, applications use the sow_delete
command. To remove specific messages from the queue, provide the bookmarks of those messages. To remove messages that match a given filter, provide the filter. Notice that AMPS only supports using a bookmark with sow_delete
when removing messages from a queue, not when removing records from a SOW.
For example, given a Message
object to acknowledge and a client, the code below acknowledges the message.
In the above listing the program creates a sow_delete
command, specifies the topic and the bookmark, and then sends the command to the server.
While this method works, creating and sending an acknowledgment for each individual message can be inefficient if your application is processing a large volume of messages. Rather than acknowledging each message individually, your application can build a comma-delimited list of bookmarks from the processed messages and acknowledge all of the messages at the same time. In this case, it's important to be sure that the number of messages you wait for is less than the maximum backlog -- the number of messages your client can have unacknowledged at a given time. Notice that both automatic acknowledgment and the helper method on the Message
object take the maximum backlog into account.
When constructing a command to acknowledge queue messages, AMPS allows an application to specify a filter rather than a set of bookmarks. AMPS interprets this as the client requesting acknowledgment of all messages that match the filter. (This may include messages that the client has not received, subject to the Leasing
model for the queue.)
As a more typical example of manual acknowledgment, the code below expires all messages for a given id
that have a status other than cancel
. An application might do this to halt processing of an order that it is about to cancel:
In the above listing the program specifies a topic and a filter to use to find the messages that should be removed. In this case, the program also provides the expire
option to indicate that the messages have been removed from the queue rather than successfully processed (of course, whether this is the correct behavior for a canceled order depends on the expected message flow for your application).
Notice that, as described in the section on multithreading, this method of acknowledging a message should not be used from a message handler unless the sow_delete
is sent from a different client than the client that called the message handler. Instead, 60East recommends using the ack()
function from within a message handler.
The C++ client includes the following samples that demonstrate how to query a topic in the SOW.
amps_publish_queue.cpp
Publishing messages to a queue topic.
amps_consume_queue.cpp
Consuming messages from a queue topic.
Delta messaging in AMPS has two independent aspects:
Delta subscribe allows subscribers to receive just the fields that are updated within a message.
Delta publish allows publishers to update and add fields within a message by publishing only the updates into the SOW.
This chapter describes how to create delta publish and delta subscribe commands using the AMPS C++ client. For a discussion of this capability, how it works, and how message types support this capability see the AMPS User Guide.
To delta subscribe, you simply use the delta_subscribe
command as follows:
As described in the section on , messages provided to a delta subscription will contain the fields used to generate the SOW key and any changed fields in the message. Your application is responsible for choosing how to handle the changed fields.
To delta publish, you use the delta_publish
command as follows:
The message that you provide to AMPS must include the fields that the topic uses to generate the SOW key. Otherwise, AMPS will not be able to identify the message to update. For SOW topics that use a User-Generated SOW Key, use the Command
form of delta_publish
to set the SowKey
.
The AMPS User Guide section on making Incremental Message Updates describes how the AMPS server processes the delta_publish
command.
The AMPS clients provide named convenience methods for core AMPS functionality. These named methods work by creating messages and sending those messages to AMPS. All communication with AMPS occurs through messages.
You can use the Command
object to customize the messages that AMPS sends. This is useful for more advanced scenarios where you need precise control over AMPS, in cases where you need to use an earlier version of the client to communicate with a more recent version of AMPS, or in cases where a named method is not available.
AMPS messages are represented in the client as AMPS.Message
objects. The Message
object is generic, and can represent any type of AMPS message, including both outgoing and incoming messages. This section includes a brief overview of elements common to AMPS command messages. Full details of commands to AMPS are provided in the AMPS Command Reference Guide.
All AMPS command messages contain the following elements:
Command - The command tells AMPS how to interpret the message. Without a command, AMPS will reject the message. Examples of commands include publish
, subscribe
, and sow
.
CommandId - The command ID, together with the name of the client, uniquely identifies a command to AMPS. The command ID can be used later on to refer to the command or the results of the command. For example, the command ID for a subscribe
message becomes the identifier for the subscription. The AMPS client provides a command ID when the command requires one and no command ID is set.
Most AMPS messages contain the following fields:
Topic - The topic that the command applies to, or a regular expression that identifies a set of topics that the command applies to. For most commands, the topic is required. Commands such as logon
, start_timer
, and stop_timer
do not apply to a specific topic, and do not need this field.
Ack Type - The ack type tells AMPS how to acknowledge the message to the client. Each command has a default acknowledgment type that AMPS uses if no other type is provided.
Options - The options
are a comma-separated list of options that affect how AMPS processes and responds to the message.
Beyond these fields, different commands include fields that are relevant to that particular command. For example, SOW queries, subscriptions, and some forms of SOW deletes accept the Filter field, which specifies the filter to apply to the subscription or query. As another example, publish commands accept the Expiration field, which sets the SOW expiration for the message.
For full details on the options available for each command and the acknowledgment messages returned by AMPS, see the AMPS Command Reference Guide.
To create a command, you simply construct a command object of the appropriate type:
Once created, you set the appropriate fields on the command. For example, the following code creates a SOW query, setting the command, topic and filter for the query:
When sent to AMPS using the execute()
method, AMPS performs a SOW query from the topic messages-sow
using a filter of /id > 20
. The results of sending this message to AMPS are no different than using the form of the sow
method that sets these fields.
Once you've created a command, use the execute
method to send the command to AMPS. The execute
method returns a MessageStream
that provides response messages. The executeAsync
method sends the command to AMPS, waits for a processed
acknowledgment, then returns. Messages are processed on the client background thread.
For example, the following snippet sends the command created above:
You can also provide a message handler to receive acknowledgments, statistics, or the results of subscriptions and SOW queries. The AMPS client maintains a background thread that receives and processes incoming messages. The call to executeAsync
returns on the main thread as soon as AMPS acknowledges the command as having been processed, and messages are received and processed on the background thread:
While this message handler simply prints the ack type and reason for sample purposes, message handlers in production applications are typically designed with a specific purpose. For example, your message handler may fill a work queue, or check for success and throw an exception if a command failed.
Notice that the publish
command does not typically provide return results other than acknowledgment messages, so there is little need for a message handler with a publish
command. To send a publish
command, use the executeAsync()
method with a default-constructed message handler. With a default-constructed message handler, AMPS does not enter the message handler in the internal routing table, which improves efficiency for commands that do not expect a response:
A default-constructed message handler has an empty implementation and does not receive acknowledgments. To detect write failures, set the FailedWriteHandler
on the client.
The AMPS Command Reference includes information on which fields and options to set on commands to get a specific result. The reference includes both reference information and a Command Cookbook that provides a concise guide for commonly-used commands.
The AMPS C++ client includes a set of utilities and helper classes to make working with AMPS easier.
The client provides a pair of classes for creating and parsing composite message types:
CompositeMessageBuilder
allows you to assemble the parts of a composite message and then serialize them in a format suitable for AMPS.
CompositeMessageParser
extracts the individual parts of a composite message type.
For more information regarding composite message types, refer to the chapter in the .
To build a composite message, create an instance of CompositeMessageBuilder
, and populate the parts. The CompositeMessageBuilder
copies the parts provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual parts of a message once they've been added to the builder.
The snippet below shows how to build a composite message that includes a JSON part, constructed as a string, and a binary part consisting of the bytes from a standard vector
.
To parse a composite message, create an instance of CompositeMessageParser
, then use the parse()
method to parse the message provided by the AMPS client. The CompositeMessageParser
gives you access to each part of the message as a sequence of bytes.
For example, the following snippet parses and prints messages that contain a JSON part and a binary part that contains an array of doubles.
Notice that the receiving application is written with explicit knowledge of the structure and content of the composite message type.
The C++ client distribution contains the following samples to demonstrate CompositeMessageBuilder
and CompositeMessageParser
.
The client provides a pair of classes for creating and parsing NVFIX message types:
NVFIXBuilder
allows you to assemble an NVFIX message and then serialize it in a format suitable for AMPS.
NVFIXShredder
extracts the individual fields of an NVFIX message type.
To build an NVFIX message, create an instance of NVFIXBuilder
, then add the fields of the message using append()
. NVFIXBuilder
copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.
The snippet below shows how to build an NVFIX message and publish it to the AMPS client.
To parse an NVFIX message, create an instance of NVFIXShredder
, then use the toMap()
method to parse the message provided by the AMPS client. The NVFIXShredder
gives you access to each field of the message in a map.
The snippet below shows how to parse and print an NVFIX message.
The C++ client distribution contains the following samples to demonstrate NVFIXBuilder
and NVFIXShredder
.
The client provides a pair of classes for creating and parsing FIX messages:
FIXBuilder
allows you to assemble a FIX message and then serialize them in a format suitable for AMPS.
FIXShredder
extracts the individual fields of a FIX message.
To build a FIX message, create an instance of FIXBuilder
, then add the fields of the message using append()
. FIXBuilder
copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.
The snippet below shows how to build a FIX message and publish it to the AMPS client.
To parse a FIX message, create an instance of FIXShredder
, then use the toMap()
method to parse the message provided by the AMPS client. The FIXShredder
gives you access to each field of the message in a map.
The snippet below shows how to parse and print a FIX message.
The C++ client distribution contains the following samples to demonstrate FIXBuilder
and FIXShredder
.
amps_publish_composite.cpp
Creating and publishing a composite message using CompositeMessageBuilder
amps_subscribe_composite.cpp
Receiving and parsing a composite message using CompositeMessageParser
amps_nvfix_builder_publisher.cpp
Creating and publishing a message using NVFIXBuilder
amps_nvfix_builder_subscriber.cpp
Receiving a message and parsing it using NVFIXShredder
amps_fix_builder_publisher.cpp
Creating and publishing a message using FIXBuilder
amps_fix_builder_subscriber.cpp
Receiving a message and parsing it using FIXShredder
The AMPS C/C++ client offers the ability to filter incoming and outgoing messages in the format they are sent and received on the network. This allows you to inspect or modify outgoing messages before they are sent to the network, and incoming messages as they arrive from the network. This can be especially useful when using SSL connections, since this gives you a way to monitor outgoing network traffic before it is encrypted, and incoming network traffic after it is decrypted.
To create a transport filter, you create a function with the following signature:
You then register the filter by calling setTransportFilterFunction
with a pointer to the function and a pointer to the data to be provided in the userdata
parameter of the callback.
For example, the following filter function simply prints the data provided to the standard output:
Registering the function is a matter of calling setTransportFilterFunction
with this function and any callback data, as shown below:
The snippet above installs the filter function for the client.
Notice that the transport filter function is called with the verbatim contents of data received from AMPS. This means that, for incoming data, the function may not be called precisely on message boundaries, and that the binary length encoding used by the client and server will be presented to the transport filter.
The AMPS C++ client includes support for Secure Sockets Layer (SSL) connections to AMPS. The client automatically attempts to make an SSL connection when the transport in the connection string is set to tcps
, as described in the Connection Strings section in Chapter 3 of this guide.
To use the tcps
transport, your application must have an SSL library loaded before making the tcps
connection. Notice that the AMPS C++ client uses the OpenSSL implementation that you provide. The AMPS client distribution doesn't include OpenSSL, and doesn't provide facilities for certificate generation, certificate signing, key management, and so forth. Those facilities are provided by the OpenSSL implementation you choose.
To make an SSL connection, the AMPS client must have an SSL library loaded before making the SSL connection.
There are two common ways to load the library:
At link time, specify the OpenSSL shared object file (Linux) or DLL (Windows) to the linker. With this approach, the operating system will load the SSL library for your application automatically when the application starts up. You then use call amps_ssl_init
with NULL
as the library name to initialize the library.
Use the amps_ssl_init
function to load the SSL library. This function accepts either the library name, or a full path including the library name. When called with the library name, the AMPS C++ client will search appropriate system paths for shared libraries (for example, the LD_LIBRARY_PATH
on Linux) and load the first object found that matches the provided name. When called with a full path, the AMPS C++ client will load exactly the object specified. The AMPS client will then initialize the library.
Once the SSL library is loaded and initialized, you can connect using tcps
as a transport type. The fact that the connection uses a secure socket is only important when making the connection, and does not affect the operation of the client once the connection has been made.
The AMPS C++ Client provides an easy way to create highly-available applications using AMPS, via the HAClient
class. HAClient
derives from Client
and offers the same methods, but also adds protection against network, server, and client outages.
Using HAClient
allows applications to automatically:
Recover from temporary disconnects between client and server.
Failover from one server to another when a server becomes unavailable.
Since the HAClient
automatically manages failover and reconnection, 60East recommends using the HAClient
for applications that need to:
Automatically reconnect and resume work in the case of disconnection.
Ensure no messages are lost or duplicated after a reconnect or failover.
Persist messages and the current state of a bookmark subscription on disk for protection against client failure.
You can choose how your application uses HAClient
features. For example, you might need automatic reconnection, but have no need to resume subscriptions or republish messages. The high availability behavior in HAClient
is provided by implementations of defined interfaces. You can combine different implementations provided by 60East to meet your needs, and implement those interfaces to provide your own policies.
Some of these features require specific configuration settings on your AMPS instance(s). This chapter mentions these features and describes how to use them from the AMPS C++ client. You can find full documentation for these settings and server features in the AMPS User Guide.
HAClient
derives from Client
and offers the same methods for sending commands to AMPS and receiving messages from AMPS.
The HAClient
differs from the Client
in two ways:
The HAClient
automatically installs a disconnect handler that reconnects to AMPS and resumes active (asynchronous) subscriptions. The disconnect handler optionally replays publish
and sow_delete
messages that have not been acknowledged by AMPS, using a PublishStore
. The disconnect handler can optionally resume replays from the transaction log at a point that guarantees no messages are skipped and no duplicates are delivered to the application, using a BookmarkStore
.
The HAClient
includes the infrastructure needed for client failover, including a list of connection strings and their associated authentication mechanisms (provided by the ServerChooser
), and options for controlling backoff behavior for reconnects (provided by the DelayStrategy
). As a result, the HAClient
provides a connectAndLogon()
function for establishing a connection to AMPS, rather than treating these as independent steps that an application must manage itself.
If your application needs to automatically reconnect to AMPS, 60East recommends using the HAClient
and the automatically provided disconnect handler rather than using a Client
or replacing the HAClient
default disconnect handler.
The most important difference between Client
and HAClient
is that HAClient
automatically provides a reconnect handler.
This description provides a high-level framework for understanding the components involved in failover with the HAClient
. The components are described in more detail in the following sections.
The HAClient
reconnect handler performs the following steps when reconnecting:
Calls the ServerChooser
to determine the next URI to connect to and the authenticator to use for that connection.
If the connection fails, calls get_error
on the ServerChooser
to get a description of the failure, sends an exception to the exception listener, and stops the reconnection process.
Calls the DelayStrategy
to determine how long to wait before attempting to reconnect, and waits for that period of time.
Connects to the AMPS server. If the connection fails, calls reportFailure
on the ServerChooser
and begins the process again.
Logs on to the AMPS server. If the connection fails, calls reportFailure
on the ServerChooser
and begins the process again.
Calls reportSuccess
on the ServerChooser
.
Receives the bookmark for the last message that the server has persisted. Discards any older messages from the PublishStore
.
Republishes any messages in the PublishStore
that have not been persisted by the server.
Re-establishes subscriptions using the SubscriptionManager
for the client. For bookmark subscriptions, the reconnect handler uses the BookmarkStore
for the client to determine the most recent bookmark, and re-subscribes with that bookmark. For subscriptions that do not use a bookmark, the SubscriptionManager
simply re-enters the subscription, meaning that it is entered at the point at which the HAClient
reconnects.
The ServerChooser
, DelayStrategy
, PublishStore
, SubscriptionManager
, and BookmarkStore
are all extension points for the HAClient
. You can adapt the failover and recovery behavior by setting a different object for the behavior you want to customize on the HAClient
or by providing your own implementation.
For example, the convenience methods in the previous section customize the behavior of the PublishStore
and BookmarkStore
by providing either memory-backed or file-backed stores.
If your application needs reliable publish to AMPS, install a PublishStore
in the HAClient
. If your application needs to resume replays from the transaction log, install a BookmarkStore
in the HAClient
.
These stores provide the following capabilities:
A bookmark store tracks received messages, and is used to resume subscriptions that replay from the transaction log.
A publish store tracks published messages, and is used to ensure that messages are persisted in AMPS.
The AMPS C++ client provides a memory-backed version of each store and a file-backed version of each store. An HAClient
can use either a memory backed store or a file backed store for protection. Each method provides resilience to different failures, as described below:
Memory-backed stores provide recovery after disconnection from AMPS by storing messages and bookmarks in your process' address space. This is the highest performance option for working with AMPS in a highly available manner. The trade-off with this method is there is no protection from a crash or failure of your client application. If your application is terminated prematurely or, if the application terminates at the same time as an AMPS instance failure or network outage, then messages may be lost or duplicated. The state of bookmark replays will be lost when the application shuts down. Messages in the publish store when the application shuts down will not be maintained through a restart, so the application will not be able to attempt any necessary redelivery when the application restarts.
A memory-backed store should only be used by one instance of a client at a time.
File-backed stores provide recovery after client failure or shutdown and disconnection from AMPS by storing messages and bookmarks on disk. To use this protection method, the createFileBacked
convenience method requests additional arguments for the two files that will be used for both bookmark storage and message storage. If these files exist and are non-empty (as they would be after a client application is restarted), the HAClient
loads their contents and ensures synchronization with the AMPS server once connected. The performance of this option depends heavily on the speed of the device on which these files are placed. When the files do not exist (as they would the first time a client starts on a given system), the HAClient
creates and initializes the files. In this case the client does not have a point at which to resume the subscription or messages to republish.
A store file should only be used by one instance of a client at a time.
When using file backed bookmark stores, 60East recommends periodically removing unneeded entries by calling the prune()
method. The precise strategy that your application uses to call prune()
depends on the nature of the application. Most applications call prune()
when the application exits.
There are two basic strategies that applications follow while the application runs:
Install a resize handler and call prune()
after a specified number of resize operations, or when the store reaches a specific size.
Call prune()
after a specific number of messages are processed (for example, every 10,000 messages received or every 1,000 updates completed).
Regardless of the strategy, it is best to call prune()
when the application is otherwise idle, since the prune()
call rewrites the log file.
The store interface is public, and an application can create and provide a custom store as necessary. While clients provide convenience methods for creating file-backed and memory-backed HAClient
objects with the appropriate stores, you can also create and set the stores in your application code. The AMPS C++ client also includes default stores, which implement the appropriate interface, but do not actually persist messages.
Starting in 5.3.2.0, the AMPS client contains a recovery point adapter interface to make it easy to add a custom persistence layer to a bookmark store. The distribution includes a recovery point adapter that can store bookmark recovery information in an AMPS SOW topic.
The HAClient
provides convenience methods for creating clients and setting stores. You can also construct an HAClient
and set whichever store implementations you choose.
In this example, we create several clients. The first client uses memory stores for both bookmarks and publishes. The second client uses files for both bookmarks and publishes. The third client uses a file for bookmarks. The third client does not set a store for publishes, which means that AMPS provides the default store (and no outgoing messages are stored). The final client does not specify any stores, so has no persistence for published messages or bookmark subscriptions, but can take advantage of the automatic failover and reconnection in the HAClient
.
While this chapter presents the built-in file and memory-based stores, the AMPS C/C++ Client provides open interfaces that allow development of custom persistent message stores. To fully control recovery behavior, you can implement the Store
and BookmarkStore
interfaces in your code, and then pass instances of those to setPublishStore()
or setBookmarkStore()
methods in your Client
. You can also implement the RecoveryPointAdapter
interface to easily add a custom storage mechanism to one of the 60East-provided bookmark store implementations.
Instructions on developing a custom store are beyond the scope of this document; please refer to the AMPS Client HA Whitepaper for more information.
The AMPS client also includes the ability to use a SOW topic to store bookmark state for a bookmark store. This can be a useful option in a situation where an application needs a persistent bookmark store, but does not have the ability to store a file on the filesystem, or where an application has a bookmark file, but wants to have the ability to resume the subscription if the file is lost or damaged, or if the application is started on a system that does not have access to the file.
To use the SOW topic recovery point adapter, you create a bookmark store of the type you would like to use for the Client
, passing an adapter when you construct the store. You then set this bookmark store as the store for the Client
to use. The constructor for the SOW recovery adapter allows you to customize the topic name and field names used to store the recovery point information in AMPS. As with the RecoveryPointAdapter
interface in general, it is possible to customize the behavior of the SOW recovery point adapter by overriding the provided methods.
This section describes how to use the adapter with the default settings. Should you need to change the behavior of the class, you would adjust the guidance in this section accordingly. (For example, if you override methods to produce a message with a different set of keys or a different message format, you would update the topic definition accordingly).
To store recovery point state in AMPS, the AMPS instance that will store the recovery point state must define a SOW/Topic
to hold the recovery point data.
By default, the adapter uses a topic named /ADMIN/bookmark_store
of json
message type, with the /clientName
and /subId
fields as keys, similar to the following definition:
You must include this definition, or an equivalent definition, in the configuration file for the AMPS instance that will host the recovery point.
If you define a topic with a different configuration (for example, different key names, a different topic name or a different message type), you must ensure that the adapter that you create uses the same parameters as those configured on the server.
The AMPS SOW Recovery Point Adapter requires a Client
or HAClient
connected to the instance that contains the SOW topic. The Adapter will use this client to recover bookmark state and store bookmarks in AMPS. Notice that this client must not be a client that the Adapter is keeping state for. This must be a completely separate client instance, otherwise the client may deadlock while updating the store.
The client must be connected and logged in to the instance that contains the SOW topic, using the message type defined for the topic.
When an application uses a file-backed store, it is important to make sure that there is enough space available on the file system to be able to manage the store.
For logged bookmark stores, an application needs to keep a bookmark record for each message received, each message discarded, and the persisted acknowledgments delivered by the server approximately once a second. Each bookmark entry consumes roughly 70 bytes of storage plus the length of the subscription ID for the subscription receiving the message. The logged bookmark store retains entries until an application explicitly calls prune()
. The capacity needed for a logged bookmark store will depend on the strategy that the application uses for pruning the file.
For a file-backed publish store, the application needs to be able to store published messages until the AMPS server that the publisher is connected to acknowledges those messages as persisted. The volume of messages that needs to be stored depends on the failover policy for the server -- that is, the maximum amount of time that the server will allow a downstream instance to fail to acknowledge a message before the server downgrades that connection to async
acknowledgment. By default, AMPS does not downgrade connections: this policy must be set explicitly using the AMPS actions. As an example, if the server is configured to downgrade connections that are more than 120 seconds behind, then -- for disaster recovery -- the application must have the capacity to store 120 seconds of published messages at peak publishing load. However, unlike the logged bookmark store, a file-backed publish store removes messages from the store and reuses the space once AMPS has acknowledged the message.
Unlike Client
, the HAClient
attempts to keep itself connected to an AMPS instance at all times, by automatically reconnecting or failing over when it detects that the client is disconnected. When you are using the Client
directly, your disconnect handler usually takes care of reconnection. HAClient
, on the other hand, provides a disconnect handler that automatically reconnects to the current server or to the next available server.
To inform the HAClient
of the addresses of the AMPS instances in your system, you pass a ServerChooser
instance to the HAClient
. ServerChooser
acts as a smart enumerator over the servers available: HAClient
calls ServerChooser
methods to inquire about what server should be connected, and calls methods to indicate whether a given server succeeded or failed.
The AMPS C/C++ Client provides a simple implementation of ServerChooser
, called DefaultServerChooser
, that provides very simple logic for reconnecting. This server chooser is most suitable for basic testing, or in cases where an application should simply rotate through a list of servers. For most applications, you implement the ServerChooser
interface yourself for more advanced logic, such as choosing a backup server based on your network topology, or limiting the number of times your application should try to reconnect to a given address.
In either case, you must provide a ServerChooser
to HAClient
and then call connectAndLogon()
to create the first connection.
Similar to Client
, HAClient
remains connected to the server until disconnect()
is called. Unlike Client
, HAClient
provides a built-in disconnect handler that automatically attempts to reconnect to your server if it detects a disconnect, and, if that server cannot be connected, fails over to the next server provided by the ServerChooser
. In this example, the call to connectAndLogon()
attempts to connect and log in to primary.amps.xyz.com
, and returns if that is successful. If it cannot connect, it tries secondary.amps.xyz.com
, and continues trying servers from the ServerChooser
until a connection is established. Likewise, if it detects a disconnection while the client is in use, HAClient
attempts to reconnect to the server it was most recently connected with, and, if that is not possible, it moves on to the next server provided by the ServerChooser
.
You can control the amount of time between reconnection attempts and set a total amount of time for the HAClient
to attempt to reconnect.
The AMPS C++ Client includes an interface for managing this behavior called the ReconnectDelayStrategy
.
Two implementations of this interface are provided with the client:
FixedDelayStrategy
provides the same delay each time the HAClient
tries to reconnect.
ExponentialDelayStrategy
provides an exponential backoff until a connection attempt succeeds.
To use either of these classes, you simply create an instance with appropriate parameters, and install that instance as the delay strategy for the HAClient
. For example, the following code sets up a reconnect delay that starts at 200ms and increases the delay by 1.5 times after each failure. The strategy allows a maximum delay between connection attempts of 5 seconds, and will not retry longer than 60 seconds.
As described above, you provide the HAClient
with connection strings to one or more AMPS servers using a ServerChooser
. The purpose of a ServerChooser
is to provide information to the HAClient
. A ServerChooser
does not manage the reconnection process, and should not call methods on the HAClient
.
A ServerChooser
has two required responsibilities to the HAClient
:
Tells the HAClient
the connection string for the server to connect to. If there are no servers, or the ServerChooser
wants the connection to fail, the ServerChooser
returns an empty string.
To provide this information, the ServerChooser
implements the getCurrentURI()
method.
Provides an Authenticator
for the current connection string. This is especially important for installations where different servers require different credentials or authentication tokens must be reset after each connection attempt.
To provide the authenticator, the ServerChooser
implements the getCurrentAuthenticator()
method.
The HAClient
calls the getCurrentURI()
and getCurrentAuthenticator()
methods each time it needs to make a connection.
Each time a connection succeeds, the HAClient
calls the reportSuccess()
method of the ServerChooser
. Each time a connection fails, the HAClient
calls the reportFailure()
method of the ServerChooser
. The HAClient
does not require the ServerChooser
to take any particular action when it calls these methods. These methods are provided for the HAClient
to do internal maintenance, logging, or record keeping. For example, an HAClient
might keep a list of available URIs with a current failure count, and skip over URIs that have failed more than 5 consecutive times until all URIs in the list have failed more than 5 consecutive times.
When the ServerChooser
returns an empty string from getCurrentURI()
, indicating that no servers are available for connection, the HAClient
calls getError()
method on the ServerChooser
and includes the string returned by getError()
in the generated exception.
Use of the HAClient
allows your application to quickly recover from detected connection failures. By default, connection failure detection occurs when AMPS receives an operating system error on the connection. This system may result in unpredictable delays in detecting a connection failure on the client, particularly when failures in network routing hardware occur, and the client primarily acts as a subscriber.
The heartbeat feature of the AMPS client allows connection failure to be detected quickly. Heartbeats ensure that regular messages are sent between the AMPS client and server on a predictable schedule. The AMPS client and server both assume disconnection has occurred if there is no other activity and these regular heartbeats cease, ensuring disconnection is detected in a timely manner.
To use the heartbeat feature, call the setHeartbeat
method on Client
or HAClient
:
Method setHeartbeat
takes one parameter: the heartbeat interval. The heartbeat interval specifies the periodicity of heartbeat messages sent by the server: the value 3
indicates messages are sent on a three-second interval. If the client receives no messages in a six second window (two heartbeat intervals), the connection is assumed to be dead, and the HAClient
attempts reconnection. An additional variant of setHeartbeat
allows the idle period to be set to a value other than two heartbeat intervals. (The server, however, will always consider a connection to be closed after two heartbeat intervals without any traffic.)
Notice that, for HAClient
, setHeartbeat
must be called before the client is connected. For Client
, setHeartbeat
must be called after the client is connected.
Heartbeats are serviced on the receive thread created by the AMPS client. Your application must not block the receive thread for longer than the heartbeat interval, or the application is subject to being disconnected.
Publishing with an HAClient
is nearly identical to regular publishing; you simply call the publish()
method with your message’s topic and data. The AMPS client sends the message to AMPS, and then returns from the publish()
call. For maximum performance, the client does not wait for the AMPS server to acknowledge that the message has been received.
When an HAClient
uses a publish store (other than the DefaultPublishStore
), the publish store retains a copy of each outgoing message and requests that AMPS acknowledge that the message has been persisted. The AMPS server acknowledges messages back to the publisher. Acknowledgments can be delivered for multiple messages at periodic intervals (for topics recorded in the transaction log) or after each message (for topics that are not recorded in the transaction log). When an acknowledgment for a message is received, the HAClient
removes that message from the bookmark store. When a connection to a server is made, the HAClient
automatically determines which messages from the publish store (if any) the server has not processed, and replays those messages to the server once the connection is established.
For reliable publishers, the application must choose how best to handle application shutdown. For example, it is possible for the network to fail immediately after the publisher sends the message, while the message is still in transit. In this case, the publisher has sent the message, but the server has not processed it and acknowledged it. During normal operation, the HAClient
will automatically connect and retry the message. On shutdown, however, the application must decide whether to wait for messages to be acknowledged, or whether to exit.
Publish store implementations provide an unpersistedCount()
method that reports the number of messages that have not yet been acknowledged by the AMPS server. When the unpersistedCount()
reaches 0
, there are no unpersisted messages in the local publish store.
For the highest level of safety, an application can wait until the unpersistedCount()
reaches 0
, which indicates that all of the messages have been persisted to the instance that the application is connected to, and the synchronous replication destinations configured for that instance. When a synchronous replication destination goes offline, this approach will cause the publisher to wait to exit until the destination comes back online or until the destination is downgraded to asynchronous replication.
For applications that are shut down periodically for short periods of time (for example, applications that are only offline during a weekly maintenance window), another approach is to use the publishFlush()
method to ensure that messages are delivered to AMPS, and then rely on the connection logic to replay messages as necessary when the application restarts.
For example, the following code flushes messages to AMPS, then warns if not all messages have been acknowledged:
In this example, the client sends each message immediately when publish()
is called. If AMPS becomes unavailable between the final publish()
and the disconnect()
, or one of the servers that the AMPS instance replicates to is offline, the client may not have received a persisted acknowledgment for all of the published messages. For example, if a message has not yet been persisted by all of the servers in the replication fabric that are connected with synchronous replication, AMPS will not have acknowledged the message.
Before shutting down the client, the code does two things:
First, the code flushes messages to the server to ensure that all messages have been delivered to AMPS.
Next, the code checks to see if all of the messages in the publish store have been acknowledged as persisted by AMPS. If the messages have not been acknowledged, they will remain in the publish store file and will be published to AMPS, if necessary, the next time the application connects. An application may choose to loop until unpersistedCount()
returns 0
, or (as we do in this case) simply warn that AMPS has not confirmed that the messages are fully persisted. The behavior you choose in your application should be consistent with the high-availability guarantees your application needs to provide.
AMPS uses the name of the HAClient
to determine the origin of messages. For the AMPS server to correctly identify duplicate messages, each instance of an application that publishes messages must use a distinct name. That name must be consistent across different runs of the application.
If your application crashes or is terminated, some published messages may not have been persisted in the AMPS server. If you use the file-based store (in other words, the store created by using HAClient.createFileBacked()
), then the HAClient
will recover the messages, and once logged on, correlate the message store to what the AMPS server has received, re-publishing any missing messages. This occurs automatically when HAClient
connects, without any explicit consideration in your code, other than ensuring that the same file name is passed to createFileBacked()
if recovery is desired.
AMPS provides persisted acknowledgment messages for topics that do not have a transaction log enabled; however, the level of durability provided for topics with no transaction log is minimal. Learn more about transaction logs in the AMPS User Guide.
AMPS replication provides two different acknowledgment modes for outgoing replication links from an instance:
For a link in sync
acknowledgment mode, a message must be successfully acknowledged by the downstream instance of AMPS before this instance of AMPS will acknowledge the message.
For a link in async
acknowledgment mode, this link is not considered for acknowledging the message. In this mode, the downstream side of the replication link may not have received or processed the message at the time that the publisher receives an acknowledgment.
As described in the AMPS User Guide, a publisher must not failover from one instance of AMPS to another instance when any link between those instances uses async
acknowledgment unless replication is certain to have reached that instance. (For example, if replication is taking a maximum of 1.2 seconds between the instances and the publisher has been disconnected for 30 seconds, all messages from that publisher will have been replicated).
To help detect a situation where a publisher may be "jumping ahead" of messages that it has published, but which have not yet been replicated, the AMPS client allows an application to consider it to be an error to make a connection to a server that has not received messages previously published by the application.
To enable this behavior, set the setErrorOnPublishGap()
method to set this property on the PublishStore
in use for the client. When this property is set, the client will consider it to be an error to connect to a server that has not received messages previously published by the client, and consider the connection to have failed.
Notice that an application that uses this approach may need to handle situations where no server has received the message, particularly if the replication configuration uses automated replication downgrade.
HAClient
provides two important features for applications that subscribe to one or more topics: re-subscription, and a bookmark store to track the correct point at which to resume a bookmark subscription.
Any asynchronous subscription placed using an HAClient
is automatically reinstated after a disconnect or a failover. These subscriptions are placed in an in-memory SubscriptionManager
, which is created automatically when the HAClient
is instantiated. Most applications will use this built-in subscription manager, but for applications that create a varying number of subscriptions, you may wish to implement SubscriptionManager
to store subscriptions in a more durable place. Note that these subscriptions contain no message data, but rather simply contain the parameters of the subscription itself (for instance, the command, topic, message handler, options, and filter).
When a re-subscription occurs, the AMPS C++ Client re-executes the command as originally submitted, including the original topic, options, and so on. AMPS sends the subscriber any messages for the specified topic (or topic expression) that are published after the subscription is placed. For a sow_and_subscribe
command, this means that the client re-issues the full command, including the SOW query as well as the subscription.
The HAClient
(starting with the AMPS C++ Client version 4.3.1.1) does not track synchronous message processing subscriptions in the SubscriptionManager
. The reason for this is to preserve conventional iterator behavior. That is, once the MessageStream
indicates that there are no more elements to iterate (for example, because the connection has closed), the MessageStream
will not suddenly produce more elements.
To re-subscribe when the HAClient
fails over, you can simply re-issue the subscription. For example, the snippet below re-issues a subscribe
command when the message stream ends:
In cases where it is critical not to miss a single message, it is important to be able to resume a subscription at the exact point that a failure occurred. In this case, simply recreating a subscription isn't sufficient. Even though the subscription is recreated, the subscriber may have been disconnected at precisely the wrong time, and will not see the message.
To ensure delivery of every message from a topic or set of topics, the AMPS HAClient
includes a BookmarkStore
that, combined with the bookmark subscription and transaction log functionality in the AMPS server, ensures that clients receive any messages that might have been missed. The client stores the bookmark associated with each message received, and tracks whether the application has processed that message; if a disconnect occurs, the client uses the BookmarkStore
to determine the correct resubscription point, and sends that bookmark to AMPS when it re-subscribes. AMPS then replays messages from its transaction log from the point after the specified bookmark, thus ensuring the client is completely up-to-date.
HAClient
helps you to take advantage of this bookmark mechanism through the BookmarkStore
interface and bookmarkSubscribe()
method on Client
. When you create subscriptions with bookmarkSubscribe()
, whenever a disconnection or failover occurs, your application automatically re-subscribes to the message after the last message it processed. HAClients
created by createFileBacked()
additionally store these bookmarks on disk, so that the application can restart with the appropriate message if the client application fails and restarts.
To take advantage of bookmark subscriptions, do the following:
Ensure the topic(s) to be subscribed to are included in a transaction log. See the AMPS User Guide for information on how to specify the contents of a transaction log.
Use bookmarkSubscribe()
instead of subscribe()
when creating a subscription, and decide how the application will manage subscription identifiers (SubIds). If you are using a Command
object, you can simply provide a bookmark on that object.
Use the BookmarkStore.discard()
method in message handlers to indicate when a message has been fully processed by the application, that is, when the application does not need to receive the message again if the application fails over.
The following example creates a bookmark subscription against a transaction-logged topic, and fully processes each message as soon as it is delivered:
In this example, the client is a file-backed client, meaning that arriving bookmarks will be stored in a file (theClient.subscribeLog
). Storing these bookmarks in a file allows the application to restart the subscription from the last message processed, in the event of either server or client failure.
For optimum performance, it is critical to discard every message once its processing is complete. If a message is never discarded, it remains in the bookmark store. During re-subscription, HAClient
always restarts the bookmark subscription with the oldest undiscarded message, and then filters out any more recent messages that have been discarded. If an old message remains in the store, but is no longer important for the application’s functioning, the client and the AMPS server will incur unnecessary network, disk, and CPU activity.
In the example above, all parameters after the bookmark are optional. However, all options before — and including the bookmark — are required when creating a bookmarkSubscribe()
.
The last parameter, subId
, specifies an identifier to be used for this subscription. Passing NULL
causes HAClient
to generate one and return it, like most other Client
functions. However, if you wish to resume a subscription from a previous point after the application has terminated and restarted, the application must pass the same subscription ID as during its previous run. Passing a different subscription ID bypasses any recovery mechanisms, creating an entirely new subscription. When you use an existing subscription ID, the HAClient
locates the last-used bookmark for that subscription in the local store, and attempts to re-subscribe from that point.
The subId
is also required to be unique when used within a single client, but can be the same in different clients. Internally, AMPS tracks subscriptions in each client, thus each identifier for each subscription within a client must be unique. The same subId
can be reused across unique clients simultaneously without causing problems.
Below are the different bookmark types that can be used to enable different recovery strategies for an application:
Client::BOOKMARK_NOW()
specifies that the subscription should begin from the moment the server receives the subscription request. This results in the same messages being delivered as if you had invoked subscribe()
instead, except that the messages will be accompanied by bookmarks. This is also the behavior that results if you supply an invalid bookmark.
Client::BOOKMARK_EPOCH()
specifies that the subscription should begin from the beginning of the AMPS transaction log (that is, the first entry in the oldest journal file for the transaction log).
Client::BOOKMARK_RECENT()
specifies that the subscription should begin from the last-used message in the associated BookmarkStore
, or, if this subscription has not been seen before, to begin with EPOCH
. This is the most common value for this parameter, and is the value used in the preceding example. By using BOOKMARK_RECENT
, the application automatically resumes from wherever the subscription left off, taking into account any messages that have already been processed and discarded.
When the HAClient
re-subscribes after a disconnection and reconnection, it always uses BOOKMARK_RECENT
, ensuring that the continued subscription always begins from the last message discarded before the disconnect, so that no messages are missed.
With only a few changes, most AMPS applications can take advantage of the HAClient
and associated classes to become more highly-available and resilient. Using the PublishStore
, publishers can ensure that every message published has actually been persisted by AMPS. Using BookmarkStore
, subscribers can make sure that there are no gaps or duplicates in the messages received. HAClient
makes both kinds of applications more resilient to network and server outages, as well as temporary issues. By utilizing the file-based HAClient
, clients can recover their state after an unexpected termination or crash. Though HAClient
provides useful defaults for the Store
, BookmarkStore
, SubscriptionManager
, and ServerChooser
, you can customize any or all of these to the specific needs of your application and architecture.