Only this pageAll pages
Powered by GitBook
1 of 52

AMPS C++ Client 5.3.4

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Client Identification

AMPS uses the name of the client as a session identifier and as part of the identifier for messages originating from that client.

For this reason, when a transaction log is enabled in the AMPS instance (that is, when the instance is recording a sequence of publishes and attempting to eliminate duplicate publishes), an AMPS instance will only allow one application with a given client name to connect to the instance.

When a transaction log is present, AMPS requires the client name for a publisher to be:

  • Unique within a set of replicated AMPS instances

  • Consistent from invocation to invocation if the publisher will be publishing the same logical stream of messages

If publishers do not meet this contract (for example, if the publisher changes its name and publishes the same messages, or if a different publisher uses the same session name), message loss or duplication can happen.

60East recommends always using consistent, unique client names. For example, the client name could be formed by combining the application name, an identifier for the host system, and the ID of the user running the application. A strategy like this provides a name that will be different for different users or on different systems, but consistent for instances of the application that should be treated as equivalent to the AMPS system.

Likewise, if a publisher is sending a completely independent stream of messages (for example, a microservice that sends a different, unrelated sequence of messages each time it connects to AMPS), there is no need for a publisher to retain the same name each time it starts. However, if a publisher is resuming a stream of messages (as in the case when using a file-backed publish store), that publisher must use the same client name, since the publisher is resuming the session.

Welcome to the AMPS C / C++ Client

This guide provides information you need to get started with the AMPS C/C++ client. It focuses specifically on the client and does not cover AMPS itself in detail.

This guide assumes that you have a development environment for C/C++ and access to an AMPS server using the configuration provided with the C/C++ samples (in the full source distribution of the client).

For an overview of AMPS and instructions on setting up your development environment, see the guide.

Introduction to AMPS

Connection Parameters for AMPS

When specifying a URI for connection to an AMPS server, you may specify a number of transport-specific options in the parameters section of the URI connection parameters. Here is an example:

tcp://localhost:9007/amps/json?tcp_nodelay=true&tcp_sndbuf=100000

In this example, we have specified the AMPS instance on localhost, port 9007, connecting to a transport that uses the amps protocol and sending JSON messages. We have also set two parameters: tcp_nodelay, a Boolean (true/false) parameter, and tcp_sndbuf, an integer parameter. Multiple parameters may be combined to finely tune settings available on the transport. Normally, you'll want to stick with the defaults on your platform, but there may be some cases where experimentation and fine-tuning will yield higher or more efficient performance.

The AMPS client supports the value of tcp in the scheme component connection string for TCP/IP connections, and the value of tcps as the scheme for SSL encrypted connections.

For connections that use Unix domain sockets, the client supports the value of unix in the scheme, and requires an additional option, as described in the Unix Transports Parameters section below.

IPv6 Connections

Starting with version 5.3.3.0, the AMPS client supports creating connections over both IPv4 and IPv6 protocols if supported by the underlying Operating System.

By default, the AMPS client will prefer to resolve host names to IPv4 addresses, but this behavior can be adjusted by supplying the ip_protocol_prefer transport option, described in the table below.

TCP and SSL Transport Options

The following transport options are available for TCP connections:

Option
Description

bind

(IP address) Sets the interface to bind the outgoing socket to.

Starting with version 5.3.3.0, both IPv4 and IPv6 addresses are fully supported for use with this parameter.

tcp_rcvbuf

(integer) Sets the socket receive buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/rmem_default.)

tcp_sndbuf

(integer) Sets the socket send buffer size. This defaults to the system default size. (On Linux, you can find the system default size in /proc/sys/net/core/wmem_default.)

tcp_nodelay

(boolean) Enables or disables the TCP_NODELAY setting on the socket. By default TCP_NODELAY is disabled.

tcp_linger

(integer) Enables and sets the SO_LINGER value for the socket By default, SO_LINGER is enabled with a value of 10, which specifies that the socket will linger for 10 seconds.

tcp_keepalive

(boolean) Enables or disables the SO_KEEPALIVE value for the socket. The default value for this option is true.

ip_protocol_prefer

(string) Influence the IP protocol to prefer during DNS resolution of the host. If a DNS entry of the preferred protocol can not be found, the other non-preferred protocol will then be tried.

If this parameter is not set, the default will be to prefer IPv4.

If an explicit IPv4 address or IPv6 IP address is provided as the host, the format of the IP address is used to determine the IP protocol used and this setting has no effect.

Supported Values:

ipv4: Prefer an IPv4 address when resolving the host

ipv6: Prefer an IPv6 address when resolving the host

This parameter is available starting with version 5.3.3.0.

Unix Transport Parameters

The unix transport type communicates over Unix domain sockets. This transport requires the following additional option:

Option
Description

path

The path to the Unix domain socket to connect to.

Unix domain sockets always connect to the local system. When the scheme specified is unix, the host address is ignored in the connection string. For example, the connection string:

unix://localhost:0/amps/json?path=/sockets/the-amps-socket

and the connection string:

unix://unix:unix/amps/json?path=/sockets/the-amps-socket

are equivalent.

The other components of the connection string, including the protocol, message type, username, and authentication token are processed just as they would be for TCP/IP sockets.

AMPS Additional Logon Options

The connection string can also be used to pass logon parameters to AMPS. AMPS supports the following additional logon option:

Option
Description

pretty

Provide formatted representations of binary messages rather than the original message contents.

Introduction

About the C/C++ Client

The C/C++ client package includes both the AMPS C++ client and a basic C client that uses only C language features.

C & C++ Support Matrix

This version of the AMPS C++ client supports the following operating systems and features:

Feature
Linux x64 / aarch64
Windows x64
OSX x64 /aarch64

Incredible performance

X

X

X

Publish and subscribe

X

X

X

State of the World (SOW) queries

X

X

X

Topic and content filtering

X

X

X

Atomic SOW query and subscribe

X

X

X

Transaction log replay

X

X

X

Historical SOW query

X

X

X

Beautiful documentation

X

X

X

HA: automatic failover

X

X

X

HA: durable publish and subscribe

X

X

X

This version of the AMPS C++ client has been tested with the following compilers and versions. Other compilers or versions may work, but have not been tested by 60East:

  • Linux: gcc 4.8 or later

  • Windows: Visual Studio versions under current mainstream support

  • OSX: clang

Subscriptions

Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent only to those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS C/C++ client.

Subscribing

Subscribing to an AMPS topic takes place by calling Client.subscribe(). Here is a short example showing the simplest way to subscribe to a topic (error handling and connection details are omitted for brevity):

Client client(...);
client.connect(...);

/* Here we have created or received a Client that is properly connected
to an AMPS server. */
client.logon();


/* Here we subscribe to the topic messages. We do not provide a filter, so AMPS
 * does not content-filter the subscription. Although we don't use the object
 * explicitly here, the subscribe function returns a MessageStream object that we
 * iterate over. If, at any time, we no longer need to subscribe, we can break out
 * of the loop. When we break out of the loop, the MessageStream goes out of scope,
 * the MessageStream destructor runs, and the AMPS client sends an unsubscribe
 * command to AMPS.
 */
for (auto message : client.subscribe("messages"))
{
    /* Within the body of the loop, we can process the message as we need to. In this
     * case, we simply print the contents of the message.
     */
    std :: cout << "Received message: " << message.getData () << std :: endl ;
}

AMPS creates a background thread that receives messages and copies them into a MessageStream that you iterate over. This means that the client application as a whole can continue to receive messages while you are doing processing work.

The simple method described above is provided for convenience. The AMPS C++ client provides convenience methods for the most common form of the AMPS commands. The client also provides an interface that allows you to have precise control over the command. Using that interface, the example above becomes:

Client client(...);
client.connect(...);

/* Here we have created or received a Client that is properly connected to an AMPS server. */
client.logon();


/* Here we subscribe to the topic messages. We do not provide a filter, so AMPS
 * does not content-filter the subscription. Although we don't use the object
 * explicitly here, the execute function returns a MessageStream object that we
 * iterate over. If, at any time, we no longer need to subscribe, we can break out
 * of the loop. When we break out of the loop, the MessageStream goes out of scope,
 * the MessageStream destructor runs, and the AMPS client sends an unsubscribe
 * command to AMPS.
 *
 * Here we create a command object for the subscribe command, specifying the topic
 * messages.
 */
for (auto message : ampsClient.execute(Command("subscribe").setTopic("messages")))
{
    std :: cout << "Received message: "<< message.getData () << std :: endl;
}

The Command interface allows you to precisely customize the commands you send to AMPS. For flexibility and ease of maintenance, 60East recommends using the Command interface (rather than a named method) for any command that will receive messages from AMPS. For publishing messages, there can be a slight performance advantage to using the named commands where possible.

Connection Strings for AMPS

The AMPS clients use connection strings to determine the server, port, transport, and protocol to use to connect to AMPS. When the connection point in AMPS accepts multiple message types, the connection string also specifies the precise message type to use for this connection.

Connection strings have a number of elements:

As shown in the figure above, connection strings have the following elements:

  • Transport - Defines the network used to send and receive messages from AMPS. In this case, the transport is tcp. For connections to transports that use the Secure Sockets Layer (SSL), use tcps. For connections to AMPS over a Unix domain socket, use unix.

  • Host Address - Defines the destination on the network where the AMPS instance receives messages. The format of the address is dependent on the transport. For tcp and tcps, the address consists of a host name and port number. In this case, the host address is localhost:9007. For unix domain sockets, a value for hostname and port must be provided to form a valid URI, but the content of the hostname and port are ignored, and the file name provided in the path parameter is used instead (by convention, many connection strings use localhost:0 to indicate that this is a local connection that does not use TCP/IP).

  • Protocol - Sets the format in which AMPS receives commands from the client. Most code uses the default amps protocol, which sends header information in JSON format. AMPS supports the ability to develop custom protocols as extension modules, and AMPS also supports legacy protocols for backward compatibility.

  • MessageType - Specifies the message type that this connection uses. This component of the connection string is required if the protocol accepts multiple message types and the transport is configured to accept multiple message types. If the protocol does not accept multiple message types, this component of the connection string is optional, and defaults to the message type specified in the transport.

    Legacy protocols such as fix, nvfix and xml only accept a single message type, and therefore do not require or accept a message type in the connection string.

As an example, a connection string such as:

tcp://localhost:9007/amps/json

would work for programs connecting from the local host to a Transport configured as follows:

<AMPSConfig>
    ...

    <!-- This transport accepts any known message type for the instance: the
         client must specify the message type. -->
    <Transport>
        <Name>any-tcp</Name>
        <Type>tcp</Type>
        <InetAddr>9007</InetAddr>
        <Protocol>amps</Protocol>
    </Transport>

    ...
</AMPSConfig>

Content Filtering

To apply a content filter to a subscription, simply pass it into the client.subscribe() call:

In this case, the application will receive messages where the sender field equals mom.

In this example, we have passed in a content filter /sender = 'mom'. This will result in the server only sending us messages, from the messages topic, that have the sender field equal to mom in the message.

For example, the AMPS server will send the following message, where /sender is mom:

The AMPS server will not send a message with a different /sender value:

Before You Start

Welcome to developing applications with AMPS, the Advanced Message Processing System from 60East Technologies!

These guides will help you learn how to develop applications using AMPS.

Before getting started with this guide, it is important to have a good understanding of the following topics:

  • Developing Applications in C or C++

    To be successful using this guide, and developing applications with AMPS, you will need to have a working knowledge of the language you are developing in.

  • AMPS Concepts

    This guide focuses on using the AMPS client libraries and how those libraries work with the AMPS server.

You will also need a system on which you can compile and run code, and a system where you can host the AMPS server.

Setting Up a Development Instance

You will need an installed and running AMPS server to use the product as well. You can write and compile programs that use AMPS without a running server, but you will get the most out of this guide by running the programs against a working server.

Your First AMPS Program

In this chapter, we will learn more about the structure and features of the AMPS C/C++ library, and build our first C/C++ program using AMPS.

Connecting to AMPS

Let’s begin by writing a simple program that connects to an AMPS server and sends a single message to a topic:

In the example above, we show the entire program.

Future examples will isolate one or more specific portions of the code. The next section describes how to build and run the application and explains the code in further detail.

Build and run

To build the program that you've created:

  • Create a new .cpp file and use your c compiler to build it, making sure the amps-c++-client/include directory is in your compiler’s include path

  • Link to the libamps.a or amps.lib static libraries.

  • Additionally, link to any operating system libraries required by AMPS; a full list may be found by examining the Makefile and project files in the samples directory.

Examining the code:

Let us revisit the code we listed earlier:

About Authentication

When a client logs on to AMPS, the client sends AMPS a username and password.

The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype to include the user name JohnDoe in the request.

For a given user name, the password is provided by an Authenticator. The AMPS client distribution includes a DefaultAuthenticator that simply returns the password, if any, provided in the URI. A logon() command that does not specify an Authenticator will use an instance of DefaultAuthenticator.

If your authentication system requires a different authentication token, you can implement an Authenticator that provides the appropriate token.

See the for more information on configuring transports.

One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server, so that your application and the network are not utilized by messages that are uninteresting for your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client. The under the provides full details on content filtering. The section of the AMPS User Guide describes the basic syntax of AMPS expressions, including content filters, and the section describes functions that are available for use in filters.

Before working through this guide, we recommend reading the guide.

Detailed explanations of the AMPS server behavior are in the .

Instructions for starting an instance of AMPS are available in the guide.

The AMPS server runs on x64 Linux. The and contain information on how to run an AMPS server on a development system that does not run Linux.

If the message is published successfully, there is no output to the console. We will demonstrate how to create a subscriber to receive messages in section.

AMPS Configuration Guide
for (auto message : ampsClient.subscribe("messages", 0, "/sender = 'mom'"))
{
    // process messages from mom
}
{
    "sender" : "mom",
    "text" : "Happy Birthday!",
    "reminder" : "Call me Thursday!"
}
{
    "sender" : "henry dave",
    "text" : "Things do not change; we change."
}
#include <amps/ampsplusplus.hpp>
#include <iostream>

int main(void)
{
    const char* uri = "tcp://127.0.0.1:9007/amps/json";

    // Construct a client with the name "examplePublisher".

    AMPS::Client ampsClient("examplePublisher");

    try
    {
        // connect to the server and log on
        ampsClient.connect(uri);
        ampsClient.logon();

        // publish a JSON message
        ampsClient.publish("messages",
                           R"({ "message" : "Hello, World!" ,)"
                           R"("client" : 1 })");

    }
    catch (const AMPS::AMPSException& e)
    {
        std::cerr << e.what() << std::endl;
        exit(1);
    }
    return 0;
}
/* These are the include files required for an AMPS C++ Client. The
 * first is <ampsplusplus.hpp>. This header includes everything needed to
 * compile C++ programs for AMPS. The next include is the Standard C++ Library
 * <iostream>, necessary due to use of std::cerr and std::endl.
 */
#include <amps/ampsplusplus.hpp>
#include <iostream>

int main()
{
    /* The URI to use to connect to AMPS. The URI consists of the transport,
     * the address, and the protocol to use for the AMPS connection. In this case,
     * the transport is tcp, the address is 127.0.0.1:9007, and the protocol is
     * amps. In this case, AMPS is configured to allow any message type on that
     * transport, so we specify json in the URI to let AMPS know which message
     * type this connection will use. Even though a transport that uses the
     * amps protocol can accept multiple message types, each connection must specify
     * the exact message type that connection will use. Check with the person who
     * manages the AMPS instance to get the connection string to use for your programs.
     */
    const char* uri = "tcp://127.0.0.1:9007/amps/json";


    /* This is where we first interact with AMPS by instantiating an AMPS::Client
     * object. Client is the class used to connect to and interact with an AMPS
     * server. We pass the string "exampleClient" as the clientName. This name
     * will be used to uniquely identify this client to the server. Errors relating
     * to this connection will be logged with reference to this name, and AMPS uses
     * this name to help detect duplicate messages. AMPS enforces uniqueness for
     * client names when a transaction log is configured, and it is good practice
     * to always use unique client names.
     */
    AMPS::Client ampsClient("exampleClient");

    /* Here we open a try block. AMPS C++ classes throw exceptions to indicate
     * errors. For the remainder of our interactions with AMPS, if an
     * error occurs, the exception thrown by AMPS will be caught and handled
     * in the exception handler below.
     */
    try
    {
        /* At this point, we establish a valid AMPS network connection and
         * can begin to use it to publish and subscribe to messages. In this
         * example, we use the URI specified earlier in the file. If any errors
         * occur while attempting to connect to AMPS, the connect() method will
         * throw an exception.
         */
        ampsClient.connect(uri);

        /* The AMPS logon() command connects to AMPS and creates a named
         * connection. This version of the logon() command uses a DefaultAuthenticator,
         * which uses the credentials provided in the URI. Without credentials, the
         * client logs on to AMPS anonymously. AMPS versions 5.0 and later
         * require a logon() command in the default configuration.
         *
         * If you need to use a different authentication scheme, implement an
         * Authenticator and pass that Authenticator to this command.
         */
        ampsClient.logon();

        /* publish a JSON message
         * Here, a single message is published to AMPS on the messages topic,
         * containing the data Hello world. This data is placed into an XML
         * message and sent to the server. Upon successful completion of this
         * function, the AMPS client has sent the message to the server, and
         * subscribers to the messages topic will receive this Hello world message.
         */
        ampsClient.publish("messages",
                           R"({ "message" : "Hello, World!" ,)"
                           R"( "client" : 1 })");
    }
    /* Error handling begins with the catch block. All exceptions thrown by
     * AMPS derive from AMPSException classes. More specific exceptions may
     * be caught to handle certain conditions, but catching
     * AMPSException& allows us to handle all AMPS errors in one
     * place. In this example, we print out the error to the console and exit
     * the program.
     */
    catch (const AMPS::AMPSException& e)
    {
        std::cerr << e.what() << std::endl;
        exit(1);
    }

    /* At this point we return from main() and our ampsClient object falls
     * out of scope.  When this happens AMPS automatically disconnects from the
     * server and frees all of the client resources associated with the
     * connection.  In the AMPS C++ client, objects are reference-counted,
     * meaning that you can safely copy a client, for example, and destroy copies
     * of client without worrying about premature closure of the server connection
     * or memory leaks.
     */
    return 0;
}

Providing Credentials to AMPS

When a client logs on to AMPS, the client sends AMPS a username and password. The username is derived from the URI, using the standard syntax for providing a user name in a URI, for example, tcp://JohnDoe:@server:port/amps/messagetype to include the user name JohnDoe in the request.

For a given user name, the password is provided by an Authenticator. The AMPS client distribution includes a DefaultAuthenticator that simply returns the password, if any, provided in the URI. A logon() command that does not specify an Authenticator will use an instance of DefaultAuthenticator.

If your authentication system requires a different authentication token, you can implement an Authenticator that provides the appropriate token.

Providing Credentials in a Connection String

When using the DefaultAuthenticator, the AMPS clients support the standard format for including a username and password in a URI, as shown below:

tcp://user:password@host:port/protocol/message_type

When provided in this form, the default authenticator provides the username and password specified in the URI. If you have implemented another authenticator, that authenticator controls how passwords are provided to the AMPS server.

Changing the Filter on a Subscription

AMPS allows you to update parameters, such as the content filter, on a subscription. When you replace a filter on the subscription, AMPS immediately begins sending only messages that match the updated filter. Notice that if the subscription was entered with a command that includes a SOW query, using the replace option can re-issue the SOW query (as described in the AMPS User Guide).

To update the filter on a subscription, you create a subscribe command. You set the SubscriptionId provided on the Command to the identifier of the existing subscription and include the replace option on the Command.

When you send the Command, AMPS atomically replaces the filter and sends messages that match the updated filter from that point forward.

Filtering Subscriptions by Content
AMPS User Guide
AMPS Expressions
AMPS Functions
Introduction to AMPS
AMPS Server Documentation
Introduction to AMPS
Introduction to AMPS
AMPS FAQ
Subscriptions

Understanding Threading

The first time a command causes an instance of the Client or HAClient to connect to AMPS (typically, the logon() command), the client creates a thread that runs in the background. This background thread is responsible for processing incoming messages from AMPS, which includes both messages that contain data and acknowledgments from the server.

When you call a command on the AMPS client, the command typically waits for an acknowledgment from the server and then returns. (The exception to this is publish. For performance, the publish command does not wait for an acknowledgment from the server before returning.)

In the simple case, using synchronous message processing, the client provides an internal handler function that populates the MessageStream. The client receive thread calls the internal handler function, which makes a deep copy of the incoming message and adds it to the MessageStream. The MessageStream is used on the calling thread, so operations on the MessageStream do not block the client receive thread.

When using asynchronous message processing, AMPS calls the handler function from the client receive thread. Message handlers provided for asynchronous message processing must be aware of the following considerations:

  • The client creates one client receive thread at a time, and the lifetime of the thread lasts for the lifetime of the connection to the AMPS server. A message handler that is only provided to a single client will only be called from a single thread at a time. If your message handler will be used by multiple clients, then multiple threads will call your message handler. In this case, you should take care to protect any state that will be shared between threads. Notice that if the client connection fails (or is closed), and the client reconnects, the client will create a different thread for the new connection.

  • For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to update an external database, a message handler that adds the relevant data to an update queue, that is processed by a different thread, will typically perform better than a message handler that does this update during the message handling.

  • While your message handler is running, the thread that calls your message handler is no longer receiving messages. This makes it easier to write a message handler because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed and your application will deadlock waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate thread or use a different client object to submit the commands.

  • The AMPS client resets and reuses the Message provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information (either by making a copy of the entire message or copying the required fields) rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it. Likewise, a message handler should not modify the Message -- this will result in modifying the message provided to other handlers (including handlers internal to the AMPS client).

Error Handling

In every distributed system, the robustness of your application depends on its ability to recover gracefully from unexpected events. The AMPS client provides the building blocks necessary to ensure your application can recover from the kinds of errors and special events that may occur when using AMPS.

Synchronous Message Processing

The MessageStream object makes copies of the incoming messages. When there is no message available, the MessageStream will block.

A MessageStream will only remain active while the client that produced it is connected. If the client disconnects, the MessageStream will continue to provide any messages that have not yet been consumed, then throw an exception.

Asynchronous Message Processing

The advantage of using asynchronous message processing is that it is extremely efficient -- your processing code runs directly on the thread that the AMPS client is using to read from the socket and has direct access to the buffer that the AMPS client uses. Further, when an HAClient is used (discussed later in this guide), the default disconnect handler for that client will automatically resume subscriptions that use asynchronous message processing. Last, but not least, since message processing runs directly on the receive thread, using asynchronous message processing will provide pushback on the socket in the event that messages are arriving faster than the application can process them (for example, in response to a sow query).

In return for these advantages, your processing code must be careful not to block the processing thread for an excessive amount of time, and must make a deep copy of any data that will be used outside of the processing code.

Asynchronous Processing Example

Here is a short example (error handling and connection details are omitted for brevity):

The AMPS client resets and reuses the message provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.

With newer compilers, you can use additional constructs to specify a callback function. Recent improvements in C++ have added lambda functions -- unnamed functions declared in-line that can refer to names in the lexical scope of their creator. If available on your system, both Standard C++ Library function objects and lambda functions may be used as callbacks.

Check functional.cpp in the samples directory for numerous examples.

Using an Instance Method as a Message Handler

One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below.

You can then provide an instance of the handler directly wherever a message handler is required, as shown below:

Regular Expression Subscriptions

Regular Expression (Regex) subscriptions allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.

To use a regular expression, simply supply the regular expression in place of the topic name in the subscribe command. For example:

In this example, messages on topics orders-north-america, orders-europe, and new-orders would match the regular expression. Messages published to any of those topics will be sent to our message_handler function. As in the example, you can use the getTopic() function to determine the actual topic of the message sent to the function.

As mentioned , one way for an application to receive messages is to have the AMPS C++ client return a MessageStream object that can be used to iterate over the results of the command.

The advantages of using a MessageStream that it provides a simple processing model, that receiving messages from a MessageStream does not block the client receive thread (see ) and that a copy of the message is automatically made for the application.

In return for these advantages, a MessageStream has higher overhead than , it will not be resumed if the client disconnects, and, by default, it will use as much memory as necessary to hold messages coming from the AMPS server.

The AMPS C++ client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the function call. The client returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there is usually a performance benefit to enqueuing work directly from the background thread. See for a discussion of threading considerations, including considerations for message handlers.

Client client(...);
client.connect(...);
/* Here we have created or received a Client that is properly connected to an AMPS server. */


client.logon();

/* Here we create a subscription with the following parameters:
 * command        : This is the AMPS Command object that contains the subscribe command.
 * MessageHandler : This is an AMPS MessageHandler object that refers to our message
 *                  handling function myHandlerFunction. This function is
 *                  called on a background thread each time a message arrives.
 *                  The second parameter, NULL, is passed as-is from the
 *                  client.subscribe() call to the message handler with every message,
 *                  allowing you to pass context about the subscription through to the
 *                  message handler.
 *
 * We create a command object for the subscribe command, specifying the topic
 * messages.
 */
string subscriptionId = client.executeAsync(Command("subscribe").setTopic("messages"),
                                            MessageHandler(myHandlerFunction, NULL));

...

/* The myHandlerFunction is a global function that is invoked by AMPS whenever a
 * matching message is received. The first parameter, message, is a reference to an
 * AMPS Message object that contains the data and headers of the received message.
 * The second parameter, userData, is set to whatever value was provided in the
 * MessageHandler constructor -- NULL in this example.
 */
void myHandlerFunction(const Message& message, void* userData)
{
    std::cout << message.getData() << std::endl;
}
class StatefulHandler
{
private:
    std::string _handlerName;
public:
    /* Construct the handler and save state. */
    StatefulHandler(const std::string& name) : _handlerName(name) {}

    /* Message handler method. */
    void operator()(const AMPS::Message & message)
    {
      std::cout << _handlerName << " got " << message.getData() << std::endl;
    }
};
client.subscribe(StatefulHandler("An instance"), "topic");

std::string subscriptionId = client.executeAsync(
              Command("subscribe").setTopic("orders.*"),
              MessageHandler(myHandlerFunction, NULL));

...

/* The myHandlerFunction is a global function that is invoked by the AMPS client
 * whenever a matching message is received. The first parameter, message, is 
 * a reference to an AMPS Message object that contains the data and headers
 * of the received message. The second parameter, userData, is set to whatever
 * value was provided in the MessageHandler constructor -- NULL in this example.
 */
void myHandlerFunction(const Message& message, void* userData)
{
    std::cout << message.getTopic() << ": " << message.getData() << std::endl;
}

Ending Subscriptions

The AMPS server continues a subscription until the client explicitly ends the subscription (that is, unsubscribes) or until the connection to the client is closed.

With the synchronous interface, AMPS automatically unsubscribes to the topic when the destructor for the MessageStream runs. You can also explicitly call the close() method on the MessageStream object to remove the subscription.

In the asynchronous interface, when a subscription is successfully made, messages will begin flowing to the message handler, and the subscribe() or executeAsync() call will return a string for the subscription id that serves as the identifier for this subscription. A Client can have any number of active subscriptions, and this subscription id is how AMPS designates messages intended for this particular subscription. To unsubscribe, we simply call unsubscribe with the subscription identifier:

Client client = ...;

// Register asynchronous subscription
std::string subId = client.executeAsync(
                         Command("subscribe").setTopic("messages"),
                         MessageHandler(myHandlerFunction, NULL));

... other work here ...

client.unsubscribe(subId);

In this example, as in the previous section, we use the client.executeAsync() method to create a subscription to the messages topic. When our application is done listening to this topic, it unsubscribes by passing in the subId returned by subscribe(). After the subscription is removed, no more messages will flow into our myHandlerFunction().

When an application calls unsubscribe(), the client sends an explicit unsubscribe command to AMPS. The AMPS server removes that subscription from the set of subscriptions for the client, and stops sending messages for that subscription. On the client side, the client unregisters the subscription so that the MessageStream or MessageHandler for that subscription will no longer receive messages for that subscription.

Notice that calling unsubscribe does not destroy messages that the server has already sent to the client. If there are messages on the way to the client for this subscription, the AMPS client must consume those messages. If a LastChanceMessageHandler is registered, the handler will receive the messages. Otherwise, they will be discarded since no message handler matches the subscription ID on the message.

earlier
Understanding Threading
Asynchronous Message Processing
Understanding Threading

Using a Heartbeat to Detect Disconnection

The AMPS client includes a heartbeat feature to help applications detect disconnection from the server within a predictable amount of time. Without using a heartbeat, an application must rely on the operating system to notify the application when a disconnect occurs. For applications that are simply receiving messages, it can be impossible to tell whether a socket is disconnected or whether there are simply no incoming messages for the client.

When you set a heartbeat, the AMPS client sends a heartbeat message to the AMPS server at a regular interval, and waits a specified amount of time for the response. If the operating system reports an error on send, or if there is no activity received from the server within the specified amount of time, the AMPS client considers the server to be disconnected. Likewise, the server will ensure that traffic is sent to the client at the specified interval, using heartbeat messages when no other traffic is being sent to the client. If, after sending a heartbeat message, no traffic from the client arrives within a period twice the specified interval, the server will consider the client to be disconnected or nonresponsive.

The AMPS client processes heartbeat messages on the client receive thread, which is the thread used for asynchronous message processing. If your application uses asynchronous message processing and occupies the thread for longer than the heartbeat interval, the client may fail to respond to heartbeat messages in a timely manner and may be disconnected by the server.

Disconnect Handling

Every distributed system will experience occasional disconnections between one or more nodes. The reliability of the overall system depends on an application’s ability to efficiently detect and recover from these disconnections. Using the AMPS C/C++ client’s disconnect handling, you can build powerful applications that are resilient in the face of connection failures and spurious disconnects.

Exception Types

Each method in AMPS documents the kinds of exceptions that it can throw. The following table details each of the exception types thrown by AMPS. They are all declared in the AMPS namespace, and all of these types publicly inherit from class std::runtime_error.

Exception
When
Notes

AlreadyConnectedException

Connecting

Thrown when connect() is called on a Client that is already connected.

AMPSException

Anytime

Base class for all AMPS exceptions.

AuthenticationException

Connecting

Indicates an authentication failure occurred on the server.

BadFilterException

Subscribe or Query

This typically indicates a syntax error in a filter expression.

BadRegexTopicException

Subscribe or Query

Indicates a malformed regular expression was found in the topic name.

BadSowKeyException

Subscribing, Publishing, Deleting

Raised when a command uses an invalid SOW key.

CommandException

Anytime

Base class for all exceptions relating to commands sent to AMPS.

ConnectionException

Anytime

Base class for all exceptions relating to the state of the AMPS connection.

ConnectionRefusedException

Connecting

The connection was actively refused by the server. Validate that the server is running, that network connectivity is available, and the settings on the client match those on the server.

DisconnectedException

Anytime

No connection is available when AMPS needed to send data to the server or the user's disconnect handler threw an exception.

DuplicateLogonException

Connecting

A client tried to logon after already logging on.

InvalidBookmarkException

Subscribe or Query

Command specifies an invalid bookmark.

InvalidOptionsException

Subscribe or Query

Command specifies invalid options.

InvalidOrderByException

Query

Command specifies an invalid orderby clause.

InvalidSubIdException

Subscribe or Query

Command specifies an invalid subid.

InvalidTopicException

Subscribe or Query

The topic is not configured for the requested operation. For example, a sow command was issued for a topic that is not in the SOW or a bookmark subscribe was issued for a topic that is not recorded in the transaction log.

InvalidURIException

Connecting

The URI string provided to connect() was formatted improperly.

LogonRequiredException

Anytime

A client attempted to execute a command before calling logon.

MessageStreamFullException

Internal client use

Indicates that a MessageStream has reached its max depth and can't enqueue another message.

MissingFieldsException

Subscribe or Query

Thrown when a command is missing required fields.

NameInUseException

Connecting

The client name (specified when instantiating Client) is already in use on the server.

NotEntitledException

Connecting, Subscribe or Query

An authenticated client attempted to access a resource to which the user has not been granted proper entitlements.

PublishException

Publishing

A client attempted to publish an invalid message or some other error occurs with the message. Requested operation.

PublishStoreGapException

Connecting

The client attempted to logon to a server that appears to be missing messages from this client that are no longer in the publish store.

ReconnectMaximumExceededException

Connecting

The maximum allowed time for attempting to connect to the server has been exceeded.

RetryOperationException

Anytime

An error occurred that caused processing of the last command to be aborted. Try issuing the command again.

StoreException

Publishing, Subscribing

Thrown when a publish store or bookmark store experiences some internal error (such as lack of resources or permissions), or is in an improper state for the requested operation.

SubidInUseException

Subscribing

Indicates a subscription has been placed with the same subscription ID.

SubscriptionAlreadyExistsException

Subscribing

A subscription has been requested using the same CommandId as another subscription. Create a unique CommandId for every subscription.

TimedOutException

Anytime

A timeout occurred waiting for a response to a command.

TransportTypeException

Connecting

Thrown when a transport type was selected in the URI that is unknown to AMPS.

UnknownException

Anytime

Thrown when an internal error occurs. Contact AMPS support immediately.

UsageException

Changing the properties of an object.

Thrown when the object is not in a valid state for setting the properties. For example, some properties of a Client (such as the BookmarkStore used) cannot be changed while that client is connected to AMPS.

Exceptions

Generally speaking, when an error occurs that prohibits an operation from succeeding, AMPS will throw an exception. AMPS exceptions universally derive from AMPS::AMPSException, so by catching AMPSException, you will be sure to catch anything AMPS throws. For example:

...
void ReadAndEvaluate(Client& client)
{
    /* read a new payload from the user */
    string payload;
    getline(cin, payload);

    /* write a new message to AMPS */
    if (!payload.empty()) {
        try
        {
            client.publish("UserMessage",
            string("{ \"message\" : \"data\" }"));
        }
        catch (const AMPSException& exception)
        {
            cerr << "An AMPS exception occurred: "<< exception.toString() << endl;
        }
    }
}

In this example, if an error occurs, the program writes the error to stderr and the publish() command fails. However, client is still usable for continued publishing and subscribing. When the error occurs, the exception is written to the console, converting the exception to a string via the toString() method.

AMPS exception types vary based on the nature of the error that occurs. In your program, if you would like to handle certain kinds of errors differently than others, you can catch the appropriate subclass of AMPSException to detect those specific errors and do something different.

string CreateNewSubscription(Client& client)
{
    string id;
    string topicName;

    while (id.empty()) {
        topicName = AskUserForTopicName();

        try
        {
            /* If an error occurs when setting up the subscription, whether or not to
             * try again depends on the subclass of AMPSException that is thrown. If a
             * BadRegexTopicException is thrown, it means that a bad regular expression
             * was supplied during subscription. In this case, we would like to give the
             * user a chance to correct the issue.
             */
            id = client.subscribe(bind(HandleMessage,
            placeholders::_1),
            topicName, 5000);
        }
        catch(const BadRegexTopicException& ex)
        {
            /* This line indicates that the program catches the BadRegexTopicException
             * and displays a specific error to the user indicating the topic name or
             * expression was invalid. By not returning from the function in this catch
             * block, the while loop runs again and the user is asked for another topic
             * name.
             */
            DisplayError("Error: bad topic name or regular " +
                         "expression '" + topicName +"'. " +
                         "The error was: " + ex.toString());
        }

        /* If an AMPS exception of a type other than BadRegexTopicException is thrown by
         * AMPS, it is caught here. In that case, the program emits a different error
         * message to the user.
         */
        catch(const AMPSException& ex)
        {
            DisplayError("Error: error setting up subscription " +
                         "to topic " + topicName +
                         ". The error was: " + ex.toString());

        /* At this point the code stops attempting to subscribe to the client by the return
         * NULL statement.
         */
            return NULL; // give up
        }
    }
    return id;
}

Exception Handling and Asynchronous Message Processing

When using asynchronous message processing, exceptions thrown from the message handler are silently absorbed by the AMPS C++ client by default. The AMPS C++ client allows you to register an exception listener to detect and respond to these exceptions. When an exception listener is registered, AMPS will call the exception listener with the exception. See the section on for details.

Unhandled Exceptions

Managing Disconnection

The HAClient class, included with the AMPS C++ client, contains a disconnect handler and other features for building highly-available applications. The HAClient includes features for managing a list of failover servers, resuming subscriptions, republishing in-flight messages, and other functionality that is commonly needed for high availability. 60East recommends using the HAClient for automatic reconnection wherever possible, as the HAClient disconnect handler has been carefully crafted to handle a wide variety of edge cases and potential failures.

If an application needs to reconnect or fail over, use an HAClient, and the AMPS client library will automatically handle failover and reconnection. You control which servers the client fails over to using an implementation of the ServerChooser interface, and control the timing of the failover using an implementation of the ReconnectDelayStrategy interface.

For most applications, the combination of the HAClient disconnect handler and a ConnectionStateListener gives you the ability to monitor disconnections and add custom behavior at the appropriate point in the reconnection process.

If you need to add custom behavior to the failover (such as logging, resetting an internal cache, refreshing credentials and so on), the ConnectionStateListener class allows your application to be notified and take action when disconnection is detected and at each stage of the reconnection process.

To extend the behavior of the AMPS client during reconnection, implement a ConnectionStateListener.

Replacing Disconnect Handling

In some cases, an application does not want the AMPS client to reconnect, but instead wants to take a different action if disconnection occurs. For example, a stateless publisher that sends ephemeral data (such as telemetry or prices) may want to exit with an error if the connection is lost rather than risk falling behind and providing outdated messages. Often, in this case, a monitoring process will start another publisher if a publisher fails, and it is better for a message to be lost than to arrive late.

To cover cases where the application has unusual needs, the AMPS client library allows an application to provide custom disconnect handling.

Your application gets to specify exactly what happens when a disconnect occurs by supplying a function to client.setDisconnectHandler(), which is invoked whenever a disconnect occurs. This may be helpful for situations where a particular connection needs to do something completely different than reconnecting or failing over to another AMPS server.

Setting the disconnect handler completely replaces the disconnection and failover behavior for an HAClient and provides the only disconnection and failover behavior for a Client.

The handler runs on the thread that detects the disconnect. This may be the client receive thread (for example, if the disconnect is detected due to heartbeating) or an application thread (for example, if the disconnect is detected when sending a command to AMPS).

The example below` shows a disconnect handler that will exit the application when a disconnect is detected:

class MyApp
{
    string _uri;
    Client _client;

public:
    MyApp(const string& uri) : _uri(uri), _client("myapp")
    {
        _uri = uri;

        /* setDisconnectHandler() method is called to supply a function for use when AMPS
         * detects a disconnect. At any time, this function may be called by AMPS to
         * indicate that the client has disconnected from the server, and to allow your
         * application to choose what to do about it.
         *
         * An application that intends to reconnect would use an HAClient and
         * the provided disconnect handler. In this case, however, we want
         * the application to exit if the connection is ever lost.
         *
         * There are various options for providing the function: in this case,
         * because we want to call a member function on an instance of this
         * class, we use std::bind.
         */

        auto handle = bind(&MyApp::FailOnDisconnect, this, placeholders::_1);
        _client.setDisconnectHandler(AMPS::DisconnectHandler(handle));
        _client.connect(uri);
        _client.logon();
        _client.executeAsync(Command("subscribe")
                                .setTopic("orders"),
                              bind(&MyApp::ShowMessage,
                                    this,
                                    placeholders::_1));
    }
    void ShowMessage(const Message& m)
    {
        /* display order data to the user */
        ...
    }
    /* Our disconnect handler’s implementation begins here.
     *
     * If we wanted the application to reconnect, resubmit the subscription, and so
     * on, we would use the HAClient (and the provided disconnect handler).
     *
     * In this case, we want to exit with an error if the connection ever fails,
     * so we replace the disconnect handler with a function that does
     * exactly that.
     */
    void FailOnDisconnect(Client& client)
    {
        /* simple: exit if the client is ever disconnected */

        ::exit(EXIT_FAILURE);
    }
};

State of the World

The AMPS State of the World (SOW) allows you to automatically keep and query the latest information about a topic on the AMPS server, without building a separate database. Using SOW lets you build impressively high-performance applications that provide rich experiences to users. The AMPS C++ client lets you query SOW topics and subscribe to changes with ease. AMPS SOW topics can be used as a current value cache to provide the most recently published value for each record, as a key/value object store, as the source for an aggregate or conflated topic, or all of the above uses. For more information on State of the World topics, see the

AMPS User Guide

Detecting Write Failures

The publish methods in the C++ client deliver the message to be published to AMPS and then return immediately, without waiting for AMPS to return an acknowledgment. Likewise, the sowDelete methods request deletion of SOW messages, and return before AMPS processes the message and performs the deletion. This approach provides high performance for operations that are unlikely to fail in production. However, this means that the methods return before AMPS has processed the command, without the ability to return an error in the event that the command fails.

The AMPS C++ client provides a FailedWriteHandler that is called when the client receives an acknowledgment that indicates a failure to persist data within AMPS. To use this functionality, you implement the FailedWriteHandler interface, construct an instance of your new class, and register that instance with the setFailedWriteHandler() function on the client. When an acknowledgment returns that indicates a failed write, AMPS calls the registered handler method with information from the acknowledgment message, supplemented with information from the client publish store if one is available. Your client can log this information, present an error to the user, or take whatever action is appropriate for the failure.

If your application needs to know whether publishes succeeded and are durably persisted, the following approach is recommended:

  • Set a PublishStore on the client. This will ensure that messages are retransmitted if the client becomes disconnected before the message is acknowledged and request persisted acknowledgments for messages.

  • Install a FailedWriteHandler. In the event that AMPS reports an error for a given message, that event will be reported to the FailedWriteHandler.

  • Call publishFlush() and verify that all messages are persisted before the application exits.

When no FailedWriteHandler is registered, acknowledgments that indicate errors in persisting data are treated as unexpected messages and routed to the LastChanceMessageHandler. In this case, AMPS provides only the acknowledgment message and does not provide the additional information from the client publish store.

Obtaining and Installing the AMPS C / C++ Client

Obtaining the Client

The client is packaged into a single file,amps-c++-client-<version>.tar.gz, where is replaced by the version of the client (such as amps-c++-client-5.3.0.zip). In the following examples, the version number is omitted from the filename.

Once expanded, the amps-c++-client directory will be created, containing sources, samples and makefiles for the C++ client. You’re welcome to locate this directory anywhere that seems convenient; but for the remainder of this book, we’ll simply refer to this directory as theamps-c++-client directory.

Explore the Client

The client is organized into a number of directories that you’ll be using through this book. Understanding this organization now will save you time in the future. The top level directories are:

src

Sources and makefile for the AMPS C++ client library.

include

Location of include files for C and C++ programs. When building your own program, you’ll add the include directory to your include path.

samples

Getting started with a new C/C++ library can be challenging. For your reference, we provide a number of small samples, along with a makefile.

Build the Client

After unpacking the amps-c++-client directory, you must build the client library for your platform.

To build on Linux, change to the amps-c++-client directory and, from a command prompt, type:

make

to make a static library, or

SHARED=1 make

to make a shared object.

On Windows, from a Visual Studio Command Prompt, change to the amps-c++-client directory and type:

msbuild

Upon successful completion, the AMPS libraries and samples are built in the lib and samples directories, respectively.

To build on OS X, change to the amps-c++-client directory and, from a command prompt, type:

cmake

to make a static library, or

SHARED=1 cmake

to make a shared object.

Test Connectivity to AMPS

Before writing programs in AMPS, make sure connectivity to your AMPS development instance is working from your AMPS development environment.

Launch a terminal window and change the directory to the AMPS directory in your AMPS installation and use spark to test connectivity to your server.

For example:

./bin/spark ping -type fix -server localhost:9004

If spark returns an error, verify that your AMPS server is running and that there is no firewall blocking access (including local firewalls between a host instance and virtual machine or container).

Without connectivity to AMPS, you will be unable to make best use of this guide.

The AMPS C/C++ client is available as a download from the website. Download the client from the site, then install it on your development computer.

60East Technologies

Unexpected Messages

The AMPS C++ client handles most incoming messages and takes appropriate action. Some messages are unexpected or occur only in very rare circumstances. The AMPS C++ client provides a way for clients to process these messages. Rather than providing handlers for all of these unusual events, AMPS provides a single handler function for messages that can't be handled during normal processing.

Your application registers this handler by setting the UnhandledMessageHandler for the client. This handler is called when the client receives a message that can't be processed by any other handler. This is a rare event, and typically indicates an unexpected condition.

For example, if a client publishes a message that AMPS cannot parse, AMPS returns a failure acknowledgment. This is an unexpected event, so AMPS does not include an explicit handler for this event, and failure acknowledgments are received in the method registered as the UnhandledMessageHandler.

Your application is responsible for taking any corrective action needed. For example, if a message publication fails, your application can decide to republish the message, publish a compensating message, log the error, stop publication altogether, or any other action that is appropriate.

Unhandled Exceptions

In the AMPS C++ client, exceptions can occur that are not thrown to the main thread of the application. For example, when an exception is thrown from a message handler running on a background thread, AMPS does not automatically propagate that exception to the main thread.

Instead, AMPS provides the exception to an unhandled exception handler if one is specified on the client. The unhandled exception handler receives a reference to the exception object, and takes whatever action is necessary. Typically, this involves logging the exception or setting an error flag that the main thread can act on. Notice that AMPS C++ client only catches exceptions that derive from std::exception. If your message handler contains code that can throw exceptions that do not derive from std::exception, 60East recommends catching these exceptions and throwing an equivalent exception that derives from std::exception.

If your application will attempt to recover from an exception thrown on the background processing thread, your application should set a flag and attempt recovery on a different thread than the thread that called the exception listener.

At the point that the AMPS client calls the exception listener, it has handled the exception. Your exception listener must not rethrow the exception (or wrap the exception and throw a different exception type).

For example, the unhandled exception handler below takes a std::ostream, and logs information from each exception to that std::ostream.

class ExceptionLogger : public AMPS::ExceptionListener
{
private:
    std::ostream& os_;

public:

    ExceptionLogger() : os_(std::cout) {}
    ExceptionLogger(std::ostream& os) : os_(os) {}

    virtual void exceptionThrown(const std::exception& e) const
    {
       os_ << e.what() << std::endl;
    }
}

Performing SOW Queries

To begin, we will look at a simple example of issuing a SOW query.

for (auto message : ampsClient.sow("orders" ,"/symbol == 'ROL'")) {
    if (message.getCommand() == "group_begin" ) {
        std::cout << "Receiving messages from the SOW." << std::endl ;
    }
    else if (message.getCommand() == "group_end") {
        std::cout << "Done receiving messages from SOW." << std::endl;
    }
    else {
        std::cout << "Received message: " << message.getData () << std::endl;
    }
}

In the listing above, the program invokes ampsClient.sow() to initiate a SOW query on the orders topic, for all entries that have a symbol of ’ROL’. The SOW query is requested with a batch size of 100, meaning that AMPS will attempt to send 100 messages at a time as results are returned.

As the query executes, each matching entry in the topic at the time of the query is returned. Messages containing the data of matching entries have a Command of value sow, so as those arrive, we write them to the console. AMPS sends a "group_begin" message before the first SOW result, and a "group_end" message after the last SOW result.

When the SOW query is complete, the MessageStream completes iteration and the loop completes. There's no need to explicitly break out of the loop.

As with subscribe, the sow function also provides an asynchronous version. In this case, you provide a message handler that will be called on a background thread:

void HandleSOW(const Message& message)
{
    if (message.getCommand() == "sow") {
        cout << message.getData() << endl;
    }
}
void ExecuteSOWQuery(Client client)
{
    Command command("sow");
    command.setTopic("orders")
            .setFilter("/symbol='ROL'")
            .setBatchSize(100);

    client.executeAsync(command, bind(HandleSOW, placeholders::_1));
}

In the listing above, the ExecuteSOWQuery() function invokes client.sow() to initiate a SOW query on the orders topic, for all entries that have a symbol of ROL. The SOW query is requested with a batch size of 100, meaning that AMPS will attempt to send 100 messages at a time as results are returned.

As the query executes, the HandleSOW() method is invoked for each matching entry in the topic. Messages containing the data of matching entries have a Command of sow, so as those arrive, we write them to the console.

Controlling Blocking with Command Timeout

The named convenience methods and the Command class provide a timeout setting that specifies how long the command should wait to receive a processed acknowledgment from AMPS. This can be helpful in cases where it is important for the caller to limit the amount of time to block waiting for AMPS to acknowledge the command. If the AMPS client does not receive the processed acknowledgment within the specified time, the client sends an unsubscribe command to the server to cancel the command and throws an exception.

Acknowledgments from AMPS are processed by the client receive thread on the same socket as data from AMPS. This means that any other data previously returned (such as the results of a large query) must be consumed before the acknowledgment can be processed. An application that submits a set of SOW queries in rapid succession should set a timeout that takes into account the amount of time required to process the results of the previous query.

Samples of Querying a Topic in the SOW

The C++ client includes the following samples that demonstrate how to query a topic in the SOW.

Sample Name
Demonstrates

amps_publish_sow.cpp

Publishing messages to a SOW topic.

amps_query_sow.cpp

Querying messages from a SOW topic.

Monitoring Connection State

The AMPS client interface provides the ability to set one or more connection state listeners. A connection state listener is a callback that is invoked when the AMPS client detects a change to the connection state.

A connection state listener may be called from the client receive thread. An application should not submit commands to AMPS from a connection state listener, or the application risks creating a deadlock for commands that wait for acknowledgement from the server.

The AMPS client provides the following state values for a connection state listener:

State
Indicates

Connected

The client has established a connection to AMPS. If you are using a Client, this is delivered when connect() is successful.

If you are using an HAClient, this state indicates that the connect part of the connect and logon process has completed. An HAClient using the default disconnect handler will attempt to log on immediately after delivering this state.

Most applications that use Client will attempt to log on immediately after the call to connect() returns.

An application should not submit commands to AMPS from the connection state listener while the client is in this state unless the application knows that the state has been delivered from a Client and that the Client does not call logon().

LoggedOn

The client has successfully logged on to AMPS. If you are using a Client, this is delivered when logon() is successful.

If you are using an HAClient, this state indicates that the logon part of the connect and logon process has completed.

This state is delivered after the client is logged on, but before recovery of client state is complete. Recovery will continue after delivering this state: the application should not submit commands to AMPS from the connection state listener while the client is in this state if further recovery will take place.

HeartbeatInitiated

The client has successfully started heartbeat monitoring with AMPS. This state is delivered if the application has enabled heartbeating on the client.

This state is delivered before recovery of the client state is complete. Recovery may continue after this state is delivered. The application should not submit commands to AMPS from the connection state listener until the client is completely recovered.

PublishReplayed

Delivered when a client has completed replay of the publish store when recovering after connecting to AMPS.

This state is delivered when the client has a PublishStore configured.

If the client has a subscription manager set, (which is the default for an HAClient), the application should not submit commands from the connection state listener until the Resubscribed state is received.

Resubscribed

Delivered when a client has re-entered subscriptions when recovering after connecting to AMPS.

This state is delivered when the client has a subscription manager set (which is the default for an HAClient). This is the final recovery step. An application can submit commands to AMPS from the connection state listener after receiving this state.

Disconnected

The client is not connected. For an HAClient, this means that the client will attempt to reconnect to AMPS. For a Client, this means that the client will invoke the disconnect handler, if one is specified.

Shutdown

The client is shut down. For an HAClient, this means that the client will no longer attempt to reconnect to AMPS. This state is delivered when close() is called on the client or when a server chooser tells the HAClient to stop reconnecting to AMPS.

The enumeration provided for the connection state listener also includes a value of UNKNOWN, for use as a default or to represent additional states in a custom Client implementation. The 60East implementations of the client do not deliver this state.

The following table shows examples of the set of states that will be delivered during connection, in order, depending on what features of the client are set. Notice that, for an instance of the Client class, this table assumes that the application calls both connect() and logon(). For an HAClient, this table assumes that the HAClient is using the default DisconnectHandler for the HAClient.

Configuration
States

subscription manager

publish store

Connected

LoggedOn

PublishReplayed

Resubscribed

subscription manager

publish store

heartbeat set

Connected

LoggedOn

HeartbeatInitiated

PublishReplayed

Resubscribed

subscription manager

Connected

LoggedOn

Resubscribed

subscription manager

heartbeat set

Connected

LoggedOn

HeartbeatInitiated

Resubscribed

(default Client configuration)

Connected

LoggedOn

SOW and Subscribe

Imagine an application that displays real-time information about the position and status of a fleet of delivery vans. When the application starts, it should display the current location of each of the vans along with their current status. As vans move around the city and post other status updates, the application should keep its display up to date. Vans upload information to the system by posting message to a van location topic, configured with a key of van_id on the AMPS server.

In this application, it is important to not only stay up-to-date on the latest information about each van, but to ensure all of the active vans are displayed as soon as the application starts. Combining a SOW with a subscription to the topic is exactly what is needed, and that is accomplished by the Client.sowAndSubscribe() method. Now we will look at an example:

/* processSOWMessage
 *
 * Processes a message during SOW query. Returns
 * true if the SOW query is complete (group_end command),
 * false otherwise.
 */
bool processSOWMessage(const AMPS::Message& message)
{
    if (message.getCommand() == "group_begin") {
        std::cout << "Receiving messages from the SOW." << std::endl;
    }
    else if (message.getCommand() == "group_end") {
        std::cout << "Done receiving messages from SOW." << std::endl;
        return true;
    }
    else {
        std::cout << "SOW message: " << message.getData() << std::endl;
        addVan(message);
    }
    return false;
}


/* processSubscriptionMessage
 *
 * Process messages received on a subscription, after the SOW
 * query is complete.
 */
void processSubscribeMessage(const AMPS::Message& message)
{
   if (message.getCommand() == "oof") {
        std::cout << "OOF : " << message.getReason()
                  << " message to remove : "
                  <<  message.getData() << std::endl;
        removeVan(message);
   }
   else {
       std::cout << "New or updated message: " << message.getData() << std::endl;
       addOrUpdateVan(message);
   }
}

...

void doSowAndSubscribe(AMPS::Client& ampsClient)
{
    bool sowDone = false;

    std::cerr << "about to subscribe..." << std::endl;


    /* We issue a sowAndSubscribe() to begin receiving information about all of the
     * open orders in the system for the symbol ROL. These orders are now are returned
     * as Messages whose Command returns SOW.
     *
     * Notice here that we specified true for the oofEnabled parameter.
     * Setting this parameter to true causes us to receive Out-of-Focus("OOF")
     * messages for the topic. OOF messages are sent when an entry that was sent
     * to us in the past no longer matches our query. This happens when an entry
     * is removed from the SOW cache via a sowDelete() operation,
     * when the entry expires (as specified by the expiration time on the message
     * or by the configuration of that topic on the AMPS server),
     * or when the entry no longer matches the content filter
     * specified. In our case, when an order is processed or canceled
     * (or if the symbol changes), a Message is sent with Command set to OOF.
     * The content of that message is the message sent previously.
     * We use OOF messages to remove orders from our display as they are
     * completed or canceled.
     */
    for (auto message : ampsClient.execute(Command("sow_and_subscribe")
                                           .setTopic("van_location")
                                           .setFilter("/status = 'ACTIVE'")
                                           .setBatchSize(100)
                                           .setOptions("oof"))) {
        if (sowDone == false)
        {
           sowDone = processSOWMessage(message);
        }
        else
        {
           processSubscribeMessage(message);
        }
    }
}

Now we will look at an example that uses the asynchronous form of sowAndSubscribe:

// handleMessage
//
// Handles messages for both SOW query and subscription.

void processSOWMessage(const AMPS::Message& message)
{

    if (message.getCommand() == "group_begin")
    {
        std::cout << "Receiving messages from the SOW." << std::endl;
        return;
    }
    else if (message.getCommand() == "group_end")
    {
        std::cout << "Done receiving messages from SOW." << std::endl;
        return true;
    }
    else  if (message.getCommand() == "oof")
    {
        std::cout << "OOF : " << message.getReason()
                  << " message to remove : "
                  <<  message.getData() << std::endl;
        removeVan(message);
    }
    else
    {
        std::cout << "New or updated message: " << message.getData() << std::endl;
        addOrUpdateVan(message);
    }
}

...

std::string trackVanPositions(AMPS::Client& ampsClient)
{


    std::cerr << "about to subscribe..." << std::endl;

    return ampsClient.executeAsync(
          Command("sow_and_subscribe")
           .setTopic("van_location")
           .setFilter("/status = 'ACTIVE'")
           .setBatchSize(100)
           .setOptions("oof"),
          bind(processSOWMessage(placeholders::_1));
}

In the listing above, the trackVanPositions function invokes sowAndSubscribe to begin tracking vans, and returns the subscription ID. The application can later use this to unsubscribe.

The two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens, while the other form processes messages on the calling thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the application receives and processes the same messages.

Samples of SOW and Subscribe

The C++ client includes the following samples that demonstrate how to query a topic in the SOW and enter a subscription to receive updates to the topic.

Sample Name
Demonstrates

amps_publish_sow.cpp

Publishing messages to a SOW topic.

amps_sow_and_subscribe.cpp

Querying messages from a SOW topic and entering a subscription to receive updates.

Client Side Conflation

In some cases, though, it's important to conflate messages on the client side. This can be particularly useful for applications that do expensive processing on each message, applications that are more efficient when processing batches of messages, or for situations where you cannot provide an appropriate conflation interval for the server to use.

A MessageStream has the ability to conflate messages received for a subscription to a SOW topic, view, or conflated topic. When conflation is enabled, for each message received, the client checks to see whether it has already received an unprocessed message with the same SowKey. If so, the client replaces the unprocessed message with the new message. The application never receives the message that has been replaced.

To enable client-side conflation, you call conflate() on the MessageStream, and then use the MessageStream as usual:

/* Query and subscribe */
MessageStream results = ampsClient.sowAndSubscribe("orders", "/symbol == 'ROL'");

/* Turn on conflation */
results.conflate();

/* Process the results */
for (auto message : results)
{
   // Process message here
}

Notice that if the MessageStream is used for a subscription that does not include SowKeys (such as a subscription to a topic that does not have a SOW), no conflation will occur.

When using client-side conflation with delta subscriptions, bear in mind that client-side conflation replaces the whole message, and does not attempt to merge deltas. This means that updates can be lost when messages are replaced. For some applications (for example, a ticker application that simply sends delta updates that replace the current price), this causes no problems. For other applications (for example, when several processors may be updating different fields of a message simultaneously), using conflation with deltas could result in lost data, and server-side conflation is a safer alternative.

Understanding Message Objects

So far, we have seen that subscribing to a topic involves working with objects of AMPS::Message type. A Message represents a single message to or from an AMPS server. Messages are received or sent for every client/server operation in AMPS.

Header Properties

The Message object contains several different accessors for header fields.

For retrieving the value of fields on a message:

  • The getXxx() functions return a Field. The Field contains pointers to the underlying buffer in the Message. Data is not copied until either deepCopy() is called or the Field is converted to another format (such as constructing a std::string from the Field). The value of the Field is only valid for the lifetime of the message unless it is copied using deepCopy().

  • The getRawXxx() functions take a pointer and a length. The pointer is assigned to the first byte of the value in the underlying buffer in the Message. The length is set to the length of the value.

To assign values to a field in a message, the Message object provides several different options. The differences between these options are a matter of managing the memory for the value.

  • The setXxx() functions copy the value provided into the specified header.

  • The assignXxx() functions set the value of the specified header, avoiding a copy if possible. When this function is used, the Message may refer directly to the data passed in. That data should not change or be deallocated while the Message is in use. (Notice, though, that a copy of the message produced using deepCopy will copy the data and can be used safely even if the original data is changed or deallocated.)

60East does not recommend attempting to parse header fields from the raw data of the message, nor does 60East recommend attempting to manipulate the fields of the message without using the accessor methods.

In AMPS, fields sometimes need to be set to a unique identifier value. For example, when creating a new subscription, or sending a manually constructed message, you’ll need to assign a new unique identifier to multiple fields such as CommandId and SubscriptionId. For this purpose, Message provides newXxx() methods for each field that generates a new unique identifier and sets the field to that new value.

getData() Method

Access to the data section of a message is provided via the getData() method. The data contains the unparsed data in the message, returned as a series of bytes (a string or const char *). Your application code parses and works with the data.

The AMPS C++ client contains a collection of helper classes for working with message types that are specific to AMPS (for example, FIX, NVFIX, and AMPS composite message types). For message types that are widely used, such as JSON or XML, you can use whichever library you typically use in your environment.

Message Field Reference

In many cases, applications that use SOW topics only need the current value of a message at the time the message is processed, rather than processing each change that lead to the current value. On the server side, AMPS provides conflated topics to meet this need. See under the AMPS User Guide for more detail. It require no special handling on the client side.

There are two parts of each message in AMPS: a set of headers that provide metadata for the message, and the data that the message contains. Every AMPS message has one or more header fields defined. The precise headers present depend on the type and context of the message. There are many possible fields in any given message, but only a few are used for any given message. For each header field, the Message class contains a distinct property that allows for retrieval and setting of that field. For example, the Message.get_command_id() function corresponds to the commandId header field, the Message.get_batch_size() function corresponds to the BatchSize header field, and so on. For more information on these header fields, consult the and .

The contains a full description of which fields are available and which fields are returned in response to specific commands.

Conflated Topics
AMPS User Guide
AMPS Command Reference
AMPS Command Reference

Managing SOW Contents

AMPS allows applications to manage the contents of the SOW by explicitly deleting messages that are no longer relevant. For example, if a particular delivery van is retired from service, the application can remove the record for the van by deleting the record for the van.

The client provides the following methods for deleting records from the SOW.

  • sowDelete - Accepts a filter, and deletes all messages that match the filter.

  • sowDeleteByKeys - Accepts a set of SOW keys as a comma-delimited string and deletes messages for those keys, regardless of the contents of the messages. A SOW key is provided in the header of a SOW message, and is the internal identifier AMPS uses for that SOW message.

  • sowDeleteByData - Accepts a message, and deletes the record that would be updated by that message.

The most efficient way to remove messages from the SOW is to use sowDeleteByKeys or sowDeleteByData, since those options allow AMPS to exactly target the message or messages to be removed. Many applications use sowDelete, since this is the most flexible method for removing items from the SOW when the application does not have information on the exact messages to be removed.

Regardless of the command used, AMPS sends an OOF message to all subscribers who have received updates for the messages removed, as described in the previous section.

The simple form of the sowDelete command returns a MessageStream that receives the response. The response is an acknowledgment message that contains information on the delete command. For example, the following snippet simply prints informational text with the number of messages deleted:

for (auto msg : client.sowDelete("sow_topic", "/id in (42, 64, 37)"))
{
    std::cout << "Got a " << msg.getCommand()
              << " message containing " << msg.getAckType()
              << ": deleted " << msg.getMatches() << " entries."
              << std::endl;
}

You can also use client.execute to send a SOW delete command. As with the other SOW methods, the client provides an asynchronous versions of the SOW delete commands that require a message handler to be invoked:

void HandleSOWDelete(const Message& message)
{
  std::cout << "Got a " << msg.getCommand()
              << " message containing " << msg.getAckType()
              << ": deleted " << msg.getMatches() << " entries."
              << std::endl;
}

....

client.execute_async(Command("sow_delete")
                     .setTopic("sow_topic")
                     .setFilter("/id in (42, 64, 37)"),
                     bind(HandleSOWDelete, placeholders::_1));

Acknowledging messages from a queue uses a form of the sow_delete command that is only supported for queues. Acknowledgment is discussed in the chapter in this guide.

Using Queues

Using Queues

To publish messages to a message queue, publishers simply publish to any topic that is collected by the queue. There is no difference between publishing to a queue and publishing to any other topic, and a publisher does not need to be aware that the topic will be collected into a queue.

Subscribers must be aware that they are subscribing to a queue, and acknowledge messages from the queue when the message is processed.

AMPS message queues provide a high-performance way of distributing messages across a set of workers. The AMPS User Guide describes AMPS in detail, including the features of AMPS referred to in this chapter. This chapter does not describe message queues in detail, but instead explains how to use the AMPS C++ client with message queues.

Queues

Acknowledgement Batching

The AMPS C++ client automatically batches acknowledgments when either of the convenience methods is used. Batching acknowledgments reduces the number of round-trips to AMPS, reducing network traffic and improving overall performance. AMPS sends the batch of acknowledgments when the number of acknowledgments exceeds a specified size, or when the amount of time since the last batch was sent exceeds a specified timeout.

You can set the number of messages to batch and the maximum amount of time between batches:

client.setAckBatchSize(10); // Send batch after 10 messages
client.setAckTimeout(1000); // ... or 1 second

The AMPS C++ client is aware of the subscription backlog for a subscription. When AMPS returns the acknowledgment for a subscription that contains queues, AMPS includes information on the subscription backlog for the subscription. If the batch size is larger than the subscription backlog, the AMPS C++ client adjusts the requested batch size to match the subscription backlog.

60East recommends tuning the batch size to improve application performance. A value of 1/3 of the smallest max_backlog value is a good initial starting point for testing. 60East does not recommend setting the batch size larger than 1/2 of the max_backlog value without testing the setting to ensure that the application does not run out of messages to process.

Backlog and Smart Pipelining

AMPS queues are designed for high-volume applications that need minimal latency and overhead. One of the features that helps performance is the subscription backlog feature, which allows applications to receive multiple messages at a time. The subscription backlog sets the maximum number of unacknowledged messages that AMPS will provide to the subscription.

When the subscription backlog is larger than 1, AMPS delivers additional messages to a subscriber before the subscriber has acknowledged the first message received. This technique allows subscribers to process messages as fast as possible, without ever having to wait for messages to be delivered. The technique of providing a consistent flow of messages to the application is called smart pipelining.

Subscription Backlog

The AMPS server determines the backlog for each subscription. An application can set the maximum backlog that it is willing to accept with the max_backlog option. Depending on the configuration of the queue (or queues) specified in the subscription, AMPS may assign a smaller backlog to the subscription. If no max_backlog option is specified, AMPS uses a max_backlog of 1 for that subscription.

In general, applications that have a constant flow of messages perform better with a max_backlog setting higher than 1. The reason for this is that, with a backlog greater than 1, the application can always have a message waiting when the previous message is processed. Setting the optimum max_backlog is a matter of understanding the messaging pattern of your application and how quickly your application can process messages.

To request a max_backlog for a subscription, you explicitly set the option on the subscribe command, as shown below:

Command cmd("subscribe");
cmd.setTopic("my_queue")
   .setOptions("max_backlog=10");

Acknowledging Messages

For each message delivered on a subscription, AMPS counts the message against the subscription backlog until the message is explicitly acknowledged. In addition, when a queue specifies at-least-once delivery, AMPS retains the message in the queue until the message expires or until the message has been explicitly acknowledged and removed from the queue. From the point of view of the AMPS server, this is implemented as a sow_delete from the queue with the bookmarks of the messages to remove. The AMPS C++ client provides several ways to make it easier for applications to create and send the appropriate sow_delete.

Automatic Acknowledgment

The AMPS client allows you to specify that messages should be automatically acknowledged. When this mode is on, AMPS acknowledges the message automatically in the following cases:

  • Asynchronous message processing interface - The message handler returns without throwing an exception.

  • Synchronous message processing interface - The application requests the next message from the MessageStream.

AMPS batches acknowledgments created with this method, as described in the following section.

To enable automatic acknowledgment, use the setAutoAck() method.

client.setAutoAck(true);  // enable AutoAck

Message Convenience Method

The AMPS C++ client provides a convenience method, ack(), on delivered messages. When the application is finished with the message, the application simply calls ack() on the message. (This, in turn, provides the topic and bookmark to the ack() function of the client that received the message.)

For messages that originated from a queue with at-least-once semantics, this adds the bookmark from the message to the batch of messages to acknowledge. For other messages, this method has no effect.

message.ack(); // Add this message to the next
               // acknowledgment batch.

Delta Subscribe

To delta subscribe, you simply use the delta_subscribe command as follows:

As described in the section on , messages provided to a delta subscription will contain the fields used to generate the SOW key and any changed fields in the message. Your application is responsible for choosing how to handle the changed fields.

// assumes that client is connected and logged on

Command cmd("delta_subscribe");
cmd.setTopic("delta_topic");
cmd.setFilter("/thingIWant = 'true'");

for (auto m : client.execute(cmd))
{
    // Delta messages arrive here
}
AMPS User Guide
Receiving Only Updated Fields

Manual Acknowledgement

60East generally recommends that applications use an ack() method to acknowledge messages during normal processing. This approach works properly from within a message handler, provides batching support as described elsewhere in this chapter, and is generally both easier to code and more efficient.

However, in some situations, you may need to manually acknowledge messages in the queue. This is most common when an application needs to operate on all messages with certain characteristics, rather than acknowledging individual messages. For example, an application that is doing updates to an order may want cancel an order by both publishing a cancellation and immediately expiring all other messages in the queue for that order. With manual acknowledgment, that application can use a filter to remove all previous updates for that order, then publish the cancellation.

To manually acknowledge processed messages and remove the messages from the queue, applications use the sow_delete command. To remove specific messages from the queue, provide the bookmarks of those messages. To remove messages that match a given filter, provide the filter. Notice that AMPS only supports using a bookmark with sow_delete when removing messages from a queue, not when removing records from a SOW.

For example, given a Message object to acknowledge and a client, the code below acknowledges the message.

void acknowledgeSingle(const Client& client, const Message& message)
{
    Command acknowledge("sow_delete");
    acknowledge.setTopic(message.getTopic())
               .setBookmark(message.getBookmark());
    client.executeAsync(acknowledge, MessageHandler());
}

In the above listing the program creates a sow_delete command, specifies the topic and the bookmark, and then sends the command to the server.

While this method works, creating and sending an acknowledgment for each individual message can be inefficient if your application is processing a large volume of messages. Rather than acknowledging each message individually, your application can build a comma-delimited list of bookmarks from the processed messages and acknowledge all of the messages at the same time. In this case, it's important to be sure that the number of messages you wait for is less than the maximum backlog -- the number of messages your client can have unacknowledged at a given time. Notice that both automatic acknowledgment and the helper method on the Message object take the maximum backlog into account.

When constructing a command to acknowledge queue messages, AMPS allows an application to specify a filter rather than a set of bookmarks. AMPS interprets this as the client requesting acknowledgment of all messages that match the filter. (This may include messages that the client has not received, subject to the Leasing model for the queue.)

As a more typical example of manual acknowledgment, the code below expires all messages for a given id that have a status other than cancel. An application might do this to halt processing of an order that it is about to cancel:

void removePending(const Client& client,  const std::string& orderId)
{
    Command acknowledge("sow_delete");
    acknowledge.setTopic(message.getTopic())
               .setFilter("/id = '" + orderId +"' and /status != 'cancel'")
               .setOptions("expire");
    client.executeAsync(acknowledge, MessageHandler());
}

In the above listing the program specifies a topic and a filter to use to find the messages that should be removed. In this case, the program also provides the expire option to indicate that the messages have been removed from the queue rather than successfully processed (of course, whether this is the correct behavior for a canceled order depends on the expected message flow for your application).

Notice that, as described in the section on multithreading, this method of acknowledging a message should not be used from a message handler unless the sow_delete is sent from a different client than the client that called the message handler. Instead, 60East recommends using the ack() function from within a message handler.

Setting Batch Size

The AMPS clients include a batch size parameter that specifies how many messages the AMPS server will return to the client in a single batch when returning the results of a SOW query. The 60East clients set a batch size of 10 by default. This batch size works well for common message sizes and network configurations.

Adjusting the batch size may produce better network utilization and produce better performance overall for the application. The larger the batch size, the more messages AMPS will send to the network layer at a time. This can result in fewer packets being sent, and therefore less overhead in the network layer. The effect on performance is generally most noticeable for small messages, where setting a larger batch size will allow several messages to fit into a single packet. For larger messages, a batch size may still improve performance, but the improvement is less noticeable.

In general, 60East recommends setting a batch size that is large enough to produce few partially-filled packets. Bear in mind that AMPS holds the messages in memory while batching them, and the client must also hold the messages in memory while receiving the messages. Using batch sizes that require large amounts of memory for these operations can reduce overall application performance, even if network utilization is good.

For smaller message sizes, 60East recommends using the default batch size, and experimenting with tuning the batch size if performance improvements are necessary. For relatively large messages (especially messages with sizes over 1MB), 60East recommends explicitly setting a batch size of 1 as an initial value, and increasing the batch size only if performance testing with a larger batch size shows improved network utilization or faster overall performance.

Returning a Message to the Queue

A subscriber can also explicitly release a message back to the queue. AMPS returns the message to the queue, and redelivers the message just as though the lease had expired. To do this, the subscriber sends a sow_delete command with the bookmark of the message to release and the cancel option.

When using automatic acknowledgments and the asynchronous API, AMPS will cancel a message if an exception is thrown from the message handler.

To return a message to the queue, you can build a sow_delete acknowledgment using the Command class, or pass an option to the ack() method on the message.

Option
Result

cancel

Returns the message to the queue.

expire

Immediately expire the message from the queue.

For example, to return a message to a queue, call ack() on the message and pass the cancel option.

message.ack("cancel");

Delta Publish and Subscribe

Delta messaging in AMPS has two independent aspects:

  • Delta Subscribe - Allows subscribers to receive just the fields that are updated within a message.

  • Delta Publish - Allows publishers to update and add fields within a message by publishing only the updates into the SOW.

This chapter describes how to create delta publish and delta subscribe commands using the AMPS C++ client. For a discussion of this capability, how it works, and how message types support this capability see the .

AMPS User Guide

AMPS Programming: Working with Commands

The AMPS clients provide named convenience methods for core AMPS functionality. These named methods work by creating messages and sending those messages to AMPS. All communication with AMPS occurs through messages.

You can use the Command object to customize the messages that AMPS sends. This is useful for more advanced scenarios where you need precise control over AMPS, in cases where you need to use an earlier version of the client to communicate with a more recent version of AMPS, or in cases where a named method is not available.

Understanding AMPS Messages

AMPS messages are represented in the client as AMPS.Message objects. The Message object is generic, and can represent any type of AMPS message, including both outgoing and incoming messages. This section includes a brief overview of elements common to AMPS command messages. Full details of commands to AMPS are provided in the AMPS Command Reference (linked at the bottom of this page).

All AMPS command messages contain the following elements:

  • Command - The command tells AMPS how to interpret the message. Without a command, AMPS will reject the message. Examples of commands include publish, subscribe, and sow.

  • CommandId - The command ID, together with the name of the client, uniquely identifies a command to AMPS. The command ID can be used later on to refer to the command or the results of the command. For example, the command ID for a subscribe message becomes the identifier for the subscription. The AMPS client provides a command ID when the command requires one and no command ID is set.

Most AMPS messages contain the following fields:

  • Topic - The topic that the command applies to, or a regular expression that identifies a set of topics that the command applies to. For most commands, the topic is required. Commands such as logon, start_timer, and stop_timer do not apply to a specific topic, and do not need this field.

  • Ack Type - The ack type tells AMPS how to acknowledge the message to the client. Each command has a default acknowledgment type that AMPS uses if no other type is provided.

  • Options - The options are a comma-separated list of options that affect how AMPS processes and responds to the message.

Beyond these fields, different commands include fields that are relevant to that particular command. For example, SOW queries, subscriptions, and some forms of SOW deletes accept the Filter field, which specifies the filter to apply to the subscription or query. As another example, publish commands accept the Expiration field, which sets the SOW expiration for the message.

For full details on the options available for each command and the acknowledgment messages returned by AMPS, see the AMPS Command Reference.

Creating and Populating the Command

To create a command, you simply construct a command object of the appropriate type:

AMPS::Command command("sow");

Once created, you set the appropriate fields on the command. For example, the following code creates a SOW query, setting the command, topic and filter for the query:

AMPS::Command command("sow")
       .setTopic("messages-sow")
       .setFilter("/id > 20");

When sent to AMPS using the execute() method, AMPS performs a SOW query from the topic messages-sow using a filter of /id > 20. The results of sending this message to AMPS are no different than using the form of the sow method that sets these fields.

Using Execute

Once you've created a command, use the execute method to send the command to AMPS. The execute method returns a MessageStream that provides response messages. The executeAsync method sends the command to AMPS, waits for a processed acknowledgment, then returns. Messages are processed on the client background thread.

For example, the following snippet sends the command created above:

client.execute(command);

You can also provide a message handler to receive acknowledgments, statistics, or the results of subscriptions and SOW queries. The AMPS client maintains a background thread that receives and processes incoming messages. The call to executeAsync returns on the main thread as soon as AMPS acknowledges the command as having been processed, and messages are received and processed on the background thread:

void handleMessages(const AMPS::Message& m, void* user_data)
{
    /* print acknowledgment type and reason for sample purposes.*/
    std::cout << m.getAckType() << " : " << m.getReason() << std::endl;
}

...

client.executeAsync(command, AMPS::MessageHandler(handleMessages, NULL));

...

While this message handler simply prints the ack type and reason for sample purposes, message handlers in production applications are typically designed with a specific purpose. For example, your message handler may fill a work queue, or check for success and throw an exception if a command failed.

Using Execute To Publish

Notice that the publish command does not typically provide return results other than acknowledgment messages, so there is little need for a message handler with a publish command. To send a publish command, use the executeAsync() method with a default-constructed message handler. With a default-constructed message handler, AMPS does not enter the message handler in the internal routing table, which improves efficiency for commands that do not expect a response:

client.executeAsync(publishCmd, AMPS::MessageHandler());

A default-constructed message handler has an empty implementation and does not receive acknowledgments. To detect write failures, set the FailedWriteHandler on the client.

AMPS Command Cookbook

The includes information on which fields and options to set on commands to get a specific result. The reference includes both reference information and a that provides a concise guide for commonly-used commands.

AMPS Command Reference
Command Cookbook

Samples of Working With a Queue

The C++ client includes the following samples that demonstrate how to query a topic in the SOW.

Sample Name
Demonstrates

amps_publish_queue.cpp

Publishing messages to a queue topic.

amps_consume_queue.cpp

Consuming messages from a queue topic.

Delta Publish

To delta publish, you use the delta_publish command as follows:

/* assumes that client is connected and logged on */

String msg = ... ; // obtain changed fields here

client.deltaPublish("myTopic", msg);

The message that you provide to AMPS must include the fields that the topic uses to generate the SOW key. Otherwise, AMPS will not be able to identify the message to update. For SOW topics that use a User-Generated SOW Key, use the Command form of delta_publish to set the SowKey.

/* assumes that client is connected and logged on */

String msg = ... ; // obtain changed fields here
String key = ... ; // obtain user-generated SOW key

Command cmd("delta_publish");
cmd.setTopic("delta_topic");
cmd.setSowKey(key);
cmd.setData(msg);

/* Execute the delta publish. Use an empty
 * a message handler since any failure acks will
 * be routed to the FailedWriteHandler
 */
client.executeAsync(cmd,MessageHandler());

The section on making describes how the AMPS server processes the delta_publish command.

AMPS User Guide
Incremental Message Updates

Utility Classes

The AMPS C++ client includes a set of utilities and helper classes to make working with AMPS easier.

Composite Message Types

The client provides a pair of classes for creating and parsing composite message types:

  • CompositeMessageBuilder allows you to assemble the parts of a composite message and then serialize them in a format suitable for AMPS.

  • CompositeMessageParser extracts the individual parts of a composite message type.

Building Composite Messages

To build a composite message, create an instance of CompositeMessageBuilder, and populate the parts. The CompositeMessageBuilder copies the parts provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual parts of a message once they've been added to the builder.

The snippet below shows how to build a composite message that includes a JSON part, constructed as a string, and a binary part consisting of the bytes from a standard vector.

std::string json_part("{\"data\":\"sample\"}");

std::vector<double> data;
/* Populate data */
...

/* Create the payload for the composite message. */
AMPS::CompositeMessageBuilder builder;

builder.append(json_part.str());
builder.append(reinterpret_cast<const char*>(data.data()),
                   data.size() * sizeof(double));

/* Send the message */
std::string topic("messages");
ampsClient.publish(topic.c_str(), topic.length(), builder.data(), builder.length());

Parsing Composite Messages

To parse a composite message, create an instance of CompositeMessageParser, then use the parse() method to parse the message provided by the AMPS client. The CompositeMessageParser gives you access to each part of the message as a sequence of bytes.

For example, the following snippet parses and prints messages that contain a JSON part and a binary part that contains an array of doubles.

for (auto message : ampsClient.subscribe("messages")) {

    parser.parse(message);

    /* First part is JSON */
    std::string json_part = std::string(parser.getPart(0));

    /* Second part is the raw bytes for a vector<double> */
    AMPS::Field binary = parser.getPart(1);

    std::vector<double> vec;
    double *array_start = (double*)binary.data();
    double *array_end = array_start + (binary.len() / sizeof(double));

    vec.insert(vec.end(), array_start, array_end);

    /* Print the contents of the message */
    std::cout << "Received message with " << parser.size() << " parts"
              << std::endl
              << "\t" << json_part
              << std::endl;

    for (auto d : vec)
        std::cout << d << " ";

    std::cout << std::endl;
}

Notice that the receiving application is written with explicit knowledge of the structure and content of the composite message type.

Composite Message Builder and Composite Message Parser Samples

The C++ client distribution contains the following samples to demonstrate CompositeMessageBuilder and CompositeMessageParser.

Sample Name
Demonstrates

amps_publish_composite.cpp

Creating and publishing a composite message using CompositeMessageBuilder

amps_subscribe_composite.cpp

Receiving and parsing a composite message using CompositeMessageParser

NVFIX Messages

The client provides a pair of classes for creating and parsing NVFIX message types:

  • NVFIXBuilder allows you to assemble an NVFIX message and then serialize it in a format suitable for AMPS.

  • NVFIXShredder extracts the individual fields of an NVFIX message type.

Building NVFIX Messages

To build an NVFIX message, create an instance of NVFIXBuilder, then add the fields of the message using append(). NVFIXBuilder copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.

The snippet below shows how to build an NVFIX message and publish it to the AMPS client.

/* Construct a client with the name "NVFIXPublisher". */
AMPS::Client ampsClient("NVFIXPublisher");

/* Construct a simple NVFIX message. */
AMPS::NVFIXBuilder builder;

/* Add data to the builder */
builder.append("Test", "data");
builder.append("More", "stuff");

/* Display the data */
std::cout << builder.getString() << std::endl;

try
{
    /* Connect to the server and log on */
    ampsClient.connect(uri);
    ampsClient.logon();

    /* Publish message to the topic messages */
    std::string topic("messages");
    ampsClient.publish(topic, builder.getString());
}
catch (const AMPS::AMPSException& e)
{
    std::cerr << e.what() << std::endl;
    exit(1);
}

Parsing NVFIX Messages

To parse an NVFIX message, create an instance of NVFIXShredder, then use the toMap() method to parse the message provided by the AMPS client. The NVFIXShredder gives you access to each field of the message in a map.

The snippet below shows how to parse and print an NVFIX message.

/* Create a client with the name "NVFIXSubscriber" */
AMPS::Client ampsClient("NVFIXSubscriber");

try
{
    /* Connect to the server and log on */
    ampsClient.connect(uri);
    ampsClient.logon();

    /* Subscribe to the messages topic.
     *
     * This overload of the subscribe method returns a MessageStream
     * that can be iterated over. When the MessageStream destructor
     * runs, the destructor unsubscribes.
     */

    /* Set up the shredder */
    AMPS::NVFIXShredder shredder;

    for (auto message : ampsClient.subscribe("messages")) {
        /* Shred the data to a map */
        auto subscription = shredder.toMap(message.getData());

        /* Display the data */
        for (auto iterator = subscription.begin(); iterator != subscription.end(); ++iterator) {
            std::cout << iterator->first << " " << iterator->second << std::endl;
        }
    }
}
catch (const AMPS::AMPSException& e)
{
    std::cerr << e.what() << std::endl;
    exit(1);
}

NVFIX Builder and Shredder Samples

The C++ client distribution contains the following samples to demonstrate NVFIXBuilder and NVFIXShredder.

Sample Name
Demonstrates

amps_nvfix_builder_publisher.cpp

Creating and publishing a message using NVFIXBuilder

amps_nvfix_builder_subscriber.cpp

Receiving a message and parsing it using NVFIXShredder

FIX Messages

The client provides a pair of classes for creating and parsing FIX messages:

  • FIXBuilder allows you to assemble a FIX message and then serialize them in a format suitable for AMPS.

  • FIXShredder extracts the individual fields of a FIX message.

Building FIX Messages

To build a FIX message, create an instance of FIXBuilder, then add the fields of the message using append(). FIXBuilder copies the fields provided, in order, to the underlying message. The builder simply writes to an internal buffer with the appropriate formatting, and does not allow you to update or change the individual fields of a message once they've been added to the builder.

The snippet below shows how to build a FIX message and publish it to the AMPS client.

/* Construct a client with the name "FIXPublisher". */
AMPS::Client ampsClient("FIXPublisher");

/* Construct a simple FIX message. */
AMPS::FIXBuilder builder;

/* Add data to the builder */
builder.append(0, "123");

/* Display the data */
std::cout << builder.getString() << std::endl;

try
{
    /* Connect to the server and log on */
    ampsClient.connect(uri);
    ampsClient.logon();

    /* Publish message to the messages topic */
    std::string topic("messages");
    ampsClient.publish(topic, builder.getString());

}
catch (const AMPS::AMPSException& e)
{
    std::cerr << e.what() << std::endl;
    exit(1);
}

Parsing FIX Messages

To parse a FIX message, create an instance of FIXShredder, then use the toMap() method to parse the message provided by the AMPS client. The FIXShredder gives you access to each field of the message in a map.

The snippet below shows how to parse and print a FIX message.

/* Create a client with the name "FIXSubscriber" */
AMPS::Client ampsClient("FIXSubscriber");

try
{
    /* Connect to the server and log on */
    ampsClient.connect(uri);
    ampsClient.logon();

    /* Subscribe to the messages topic
     *
     * This overload of the subscribe method returns a MessageStream
     * that can be iterated over.  When the MessageStream destructor
     * runs, the destructor unsubscribes.
     */

    /* Set up the shredder */
    AMPS::FIXShredder shredder;

    for (auto message : ampsClient.subscribe("messages"))
    {
        // Shred the data to a map
        auto subscription = shredder.toMap(message.getData());

        // Display the data
        for (auto iterator = subscription.begin(); iterator != subscription.end(); ++iterator)
        {
            std::cout << iterator->first << " " << iterator->second << std::endl;
        }
    }
}
catch (const AMPS::AMPSException& e)
{
    std::cerr << e.what() << std::endl;
    exit(1);
}

FIX Builder and Shredder Samples

The C++ client distribution contains the following samples to demonstrate FIXBuilder and FIXShredder.

Sample Name
Demonstrates

amps_fix_builder_publisher.cpp

Creating and publishing a message using FIXBuilder

amps_fix_builder_subscriber.cpp

Receiving a message and parsing it using FIXShredder

For more information regarding composite message types, refer to the chapter in the .

Message Types
AMPS User Guide

High Availability

The AMPS C++ Client provides an easy way to create highly-available applications using AMPS, via the HAClient class. HAClient derives from Client and offers the same methods, but also adds protection against network, server, and client outages.

Using HAClient allows applications to automatically:

  • Recover from temporary disconnects between client and server.

  • Failover from one server to another when a server becomes unavailable.

Since the HAClient automatically manages failover and reconnection, 60East recommends using the HAClient for applications that need to:

  • Automatically reconnect and resume work in the case of disconnection.

  • Ensure no messages are lost or duplicated after a reconnect or failover.

  • Persist messages and the current state of a bookmark subscription on disk for protection against client failure.

You can choose how your application uses HAClient features. For example, you might need automatic reconnection, but have no need to resume subscriptions or republish messages. The high availability behavior in HAClient is provided by implementations of defined interfaces. You can combine different implementations provided by 60East to meet your needs, and implement those interfaces to provide your own policies.

Overview of HAClient

HAClient derives from Client and offers the same methods for sending commands to AMPS and receiving messages from AMPS.

The HAClient differs from the Client in two ways:

  • The HAClient automatically installs a disconnect handler that reconnects to AMPS and resumes active (asynchronous) subscriptions. The disconnect handler optionally replays publish and sow_delete messages that have not been acknowledged by AMPS, using a PublishStore. The disconnect handler can optionally resume replays from the transaction log at a point that guarantees no messages are skipped and no duplicates are delivered to the application, using a BookmarkStore.

  • The HAClient includes the infrastructure needed for client failover, including a list of connection strings and their associated authentication mechanisms (provided by the ServerChooser), and options for controlling backoff behavior for reconnects (provided by the DelayStrategy). As a result, the HAClient provides a connectAndLogon() function for establishing a connection to AMPS, rather than treating these as independent steps that an application must manage itself.

If your application needs to automatically reconnect to AMPS, 60East recommends using the HAClient and the automatically provided disconnect handler rather than using a Client or replacing the HAClient default disconnect handler.

Reconnection with HAClient

The most important difference between Client and HAClient is that HAClient automatically provides a reconnect handler.

This description provides a high-level framework for understanding the components involved in failover with the HAClient. The components are described in more detail in the following sections.

The HAClient reconnect handler performs the following steps when reconnecting:

  1. Calls the ServerChooser to determine the next URI to connect to and the authenticator to use for that connection.

    If the connection fails, calls get_error on the ServerChooser to get a description of the failure, sends an exception to the exception listener, and stops the reconnection process.

  2. Calls the DelayStrategy to determine how long to wait before attempting to reconnect, and waits for that period of time.

  3. Connects to the AMPS server. If the connection fails, calls reportFailure on the ServerChooser and begins the process again.

  4. Logs on to the AMPS server. If the connection fails, calls reportFailure on the ServerChooser and begins the process again.

  5. Calls reportSuccess on the ServerChooser.

  6. Receives the bookmark for the last message that the server has persisted. Discards any older messages from the PublishStore.

  7. Republishes any messages in the PublishStore that have not been persisted by the server.

  8. Re-establishes subscriptions using the SubscriptionManager for the client. For bookmark subscriptions, the reconnect handler uses the BookmarkStore for the client to determine the most recent bookmark, and re-subscribes with that bookmark. For subscriptions that do not use a bookmark, the SubscriptionManager simply re-enters the subscription, meaning that it is entered at the point at which the HAClient reconnects.

The ServerChooser, DelayStrategy, PublishStore, SubscriptionManager, and BookmarkStore are all extension points for the HAClient. You can adapt the failover and recovery behavior by setting a different object for the behavior you want to customize on the HAClient or by providing your own implementation.

For example, the convenience methods in the previous section customize the behavior of the PublishStore and BookmarkStore by providing either memory-backed or file-backed stores.

Choosing Store Durability

If your application needs reliable publish to AMPS, install a PublishStore in the HAClient. If your application needs to resume replays from the transaction log, install a BookmarkStore in the HAClient.

These stores provide the following capabilities:

  • A bookmark store tracks received messages, and is used to resume subscriptions that replay from the transaction log.

  • A publish store tracks published messages, and is used to ensure that messages are persisted in AMPS.

The AMPS C++ client provides a memory-backed version of each store and a file-backed version of each store. An HAClient can use either a memory backed store or a file backed store for protection. Each method provides resilience to different failures, as described below:

  • Memory-backed stores provide recovery after disconnection from AMPS by storing messages and bookmarks in your process' address space. This is the highest performance option for working with AMPS in a highly available manner. The trade-off with this method is there is no protection from a crash or failure of your client application. If your application is terminated prematurely or, if the application terminates at the same time as an AMPS instance failure or network outage, then messages may be lost or duplicated. The state of bookmark replays will be lost when the application shuts down. Messages in the publish store when the application shuts down will not be maintained through a restart, so the application will not be able to attempt any necessary redelivery when the application restarts.

    A memory-backed store should only be used by one instance of a client at a time.

  • File-backed stores provide recovery after client failure or shutdown and disconnection from AMPS by storing messages and bookmarks on disk. To use this protection method, the createFileBacked convenience method requests additional arguments for the two files that will be used for both bookmark storage and message storage. If these files exist and are non-empty (as they would be after a client application is restarted), the HAClient loads their contents and ensures synchronization with the AMPS server once connected. The performance of this option depends heavily on the speed of the device on which these files are placed. When the files do not exist (as they would the first time a client starts on a given system), the HAClient creates and initializes the files. In this case the client does not have a point at which to resume the subscription or messages to republish.

    A store file should only be used by one instance of a client at a time.

    When using file backed bookmark stores, 60East recommends periodically removing unneeded entries by calling the prune() method. The precise strategy that your application uses to call prune() depends on the nature of the application. Most applications call prune() when the application exits.

    There are two basic strategies that applications follow while the application runs:

    • Install a resize handler and call prune() after a specified number of resize operations, or when the store reaches a specific size.

    • Call prune() after a specific number of messages are processed (for example, every 10,000 messages received or every 1,000 updates completed).

    Regardless of the strategy, it is best to call prune() when the application is otherwise idle, since the prune() call rewrites the log file.

The store interface is public, and an application can create and provide a custom store as necessary. While clients provide convenience methods for creating file-backed and memory-backed HAClient objects with the appropriate stores, you can also create and set the stores in your application code. The AMPS C++ client also includes default stores, which implement the appropriate interface, but do not actually persist messages.

Starting in 5.3.2.0, the AMPS client contains a recovery point adapter interface to make it easy to add a custom persistence layer to a bookmark store. The distribution includes a recovery point adapter that can store bookmark recovery information in an AMPS SOW topic.

The HAClient provides convenience methods for creating clients and setting stores. You can also construct an HAClient and set whichever store implementations you choose.

In this example, we create several clients. The first client uses memory stores for both bookmarks and publishes. The second client uses files for both bookmarks and publishes. The third client uses a file for bookmarks. The third client does not set a store for publishes, which means that AMPS provides the default store (and no outgoing messages are stored). The final client does not specify any stores, so has no persistence for published messages or bookmark subscriptions, but can take advantage of the automatic failover and reconnection in the HAClient.

/* Memory publish store, memory bookmark store */
HAClient memoryClient = HAClient::createMemoryBacked("lessImportantMessages");

/* File-backed publish store, file-backed bookmark store */
HAClient diskClient = HAClient::createFileBacked("moreImportantMessages",
                                                 "/mnt/fastDisk/moreImportantMessages.outgoing",
                                                 "/mnt/fastDisk/moreImportantMessages.incoming");

/* Default publish store, file-backed bookmark store */
HAClient subscriberClient("subscriber");
subscriberClient.setBookmarkStore(
    new LoggedBookmarkStore("my_app.bookmark"));

/* Default publish store, default bookmark store
   Failover behavior and resubscription only */
HAClient streamReader("streamReader");

While this chapter presents the built-in file and memory-based stores, the AMPS C/C++ Client provides open interfaces that allow development of custom persistent message stores. To fully control recovery behavior, you can implement the Store and BookmarkStore interfaces in your code, and then pass instances of those to setPublishStore() or setBookmarkStore() methods in your Client. You can also implement the RecoveryPointAdapter interface to easily add a custom storage mechanism to one of the 60East-provided bookmark store implementations.

Instructions on developing a custom store are beyond the scope of this document; please refer to the AMPS Client HA Whitepaper for more information.

Using the SOW Recovery Point Adapter

The AMPS client also includes the ability to use a SOW topic to store bookmark state for a bookmark store. This can be a useful option in a situation where an application needs a persistent bookmark store, but does not have the ability to store a file on the filesystem, or where an application has a bookmark file, but wants to have the ability to resume the subscription if the file is lost or damaged, or if the application is started on a system that does not have access to the file.

To use the SOW topic recovery point adapter, you create a bookmark store of the type you would like to use for the Client, passing an adapter when you construct the store. You then set this bookmark store as the store for the Client to use. The constructor for the SOW recovery adapter allows you to customize the topic name and field names used to store the recovery point information in AMPS. As with the RecoveryPointAdapter interface in general, it is possible to customize the behavior of the SOW recovery point adapter by overriding the provided methods.

This section describes how to use the adapter with the default settings. Should you need to change the behavior of the class, you would adjust the guidance in this section accordingly. (For example, if you override methods to produce a message with a different set of keys or a different message format, you would update the topic definition accordingly).

AMPS Topic Configuration

To store recovery point state in AMPS, the AMPS instance that will store the recovery point state must define a SOW/Topic to hold the recovery point data.

By default, the adapter uses a topic named /ADMIN/bookmark_store of json message type, with the /clientName and /subId fields as keys, similar to the following definition:

<Topic>
   <Name>/ADMIN/bookmark_store</Name>
   <MessageType>json</MessageType>
   <Key>/clientName</Key>
   <Key>/subId</Key>
   <!-- Storage/persistence configuration here.
        In most cases, this topic should be
        persisted to a file, but that is not
        a requirement. -->
</Topic>

You must include this definition, or an equivalent definition, in the configuration file for the AMPS instance that will host the recovery point.

If you define a topic with a different configuration (for example, different key names, a different topic name or a different message type), you must ensure that the adapter that you create uses the same parameters as those configured on the server.

Constructing a Client for the Adapter

The AMPS SOW Recovery Point Adapter requires a Client or HAClient connected to the instance that contains the SOW topic. The Adapter will use this client to recover bookmark state and store bookmarks in AMPS. Notice that this client must not be a client that the Adapter is keeping state for. This must be a completely separate client instance, otherwise the client may deadlock while updating the store.

The client must be connected and logged in to the instance that contains the SOW topic, using the message type defined for the topic.

Capacity Planning and Store Sizing

When an application uses a file-backed store, it is important to make sure that there is enough space available on the file system to be able to manage the store.

For logged bookmark stores, an application needs to keep a bookmark record for each message received, each message discarded, and the persisted acknowledgments delivered by the server approximately once a second. Each bookmark entry consumes roughly 70 bytes of storage plus the length of the subscription ID for the subscription receiving the message. The logged bookmark store retains entries until an application explicitly calls prune(). The capacity needed for a logged bookmark store will depend on the strategy that the application uses for pruning the file.

For a file-backed publish store, the application needs to be able to store published messages until the AMPS server that the publisher is connected to acknowledges those messages as persisted. The volume of messages that needs to be stored depends on the failover policy for the server -- that is, the maximum amount of time that the server will allow a downstream instance to fail to acknowledge a message before the server downgrades that connection to async acknowledgment. By default, AMPS does not downgrade connections: this policy must be set explicitly using the AMPS actions. As an example, if the server is configured to downgrade connections that are more than 120 seconds behind, then -- for disaster recovery -- the application must have the capacity to store 120 seconds of published messages at peak publishing load. However, unlike the logged bookmark store, a file-backed publish store removes messages from the store and reuses the space once AMPS has acknowledged the message.

Connections and the Server Chooser

Unlike Client, the HAClient attempts to keep itself connected to an AMPS instance at all times, by automatically reconnecting or failing over when it detects that the client is disconnected. When you are using the Client directly, your disconnect handler usually takes care of reconnection. HAClient, on the other hand, provides a disconnect handler that automatically reconnects to the current server or to the next available server.

To inform the HAClient of the addresses of the AMPS instances in your system, you pass a ServerChooser instance to the HAClient. ServerChooser acts as a smart enumerator over the servers available: HAClient calls ServerChooser methods to inquire about what server should be connected, and calls methods to indicate whether a given server succeeded or failed.

The AMPS C/C++ Client provides a simple implementation of ServerChooser, called DefaultServerChooser, that provides very simple logic for reconnecting. This server chooser is most suitable for basic testing, or in cases where an application should simply rotate through a list of servers. For most applications, you implement the ServerChooser interface yourself for more advanced logic, such as choosing a backup server based on your network topology, or limiting the number of times your application should try to reconnect to a given address.

In either case, you must provide a ServerChooser to HAClient and then call connectAndLogon() to create the first connection.

HAClient myClient = HAClient::createMemoryBacked(
    "myClient");

/* primary.amps.xyz.com is the primary AMPS instance, and
 * secondary.amps.xyz.com is the secondary
 */
ServerChooser chooser(new DefaultServerChooser());
chooser.add("tcp://primary.amps.xyz.com:12345/fix");
chooser.add("tcp://secondary.amps.xyz.com:12345/fix");
myClient.setServerChooser(chooser);
myClient.connectAndLogon();
...
myClient.disconnect();

Similar to Client, HAClient remains connected to the server until disconnect() is called. Unlike Client, HAClient provides a built-in disconnect handler that automatically attempts to reconnect to your server if it detects a disconnect, and, if that server cannot be connected, fails over to the next server provided by the ServerChooser. In this example, the call to connectAndLogon() attempts to connect and log in to primary.amps.xyz.com, and returns if that is successful. If it cannot connect, it tries secondary.amps.xyz.com, and continues trying servers from the ServerChooser until a connection is established. Likewise, if it detects a disconnection while the client is in use, HAClient attempts to reconnect to the server it was most recently connected with, and, if that is not possible, it moves on to the next server provided by the ServerChooser.

Setting a Reconnect Delay and Timeout

You can control the amount of time between reconnection attempts and set a total amount of time for the HAClient to attempt to reconnect.

The AMPS C++ Client includes an interface for managing this behavior called the ReconnectDelayStrategy.

Two implementations of this interface are provided with the client:

  • FixedDelayStrategy provides the same delay each time the HAClient tries to reconnect.

  • ExponentialDelayStrategy provides an exponential backoff until a connection attempt succeeds.

To use either of these classes, you simply create an instance with appropriate parameters, and install that instance as the delay strategy for the HAClient. For example, the following code sets up a reconnect delay that starts at 200ms and increases the delay by 1.5 times after each failure. The strategy allows a maximum delay between connection attempts of 5 seconds, and will not retry longer than 60 seconds.

HAClient theClient = HAClient::createMemoryBacked("demo");

theClient.setReconnectDelayStrategy(new ExponentialDelayStrategy(200, 5000, 1.5, 6000,0));

Implementing a Server Chooser

As described above, you provide the HAClient with connection strings to one or more AMPS servers using a ServerChooser. The purpose of a ServerChooser is to provide information to the HAClient. A ServerChooser does not manage the reconnection process, and should not call methods on the HAClient.

A ServerChooser has two required responsibilities to the HAClient:

  • Tells the HAClient the connection string for the server to connect to. If there are no servers, or the ServerChooser wants the connection to fail, the ServerChooser returns an empty string.

    To provide this information, the ServerChooser implements the getCurrentURI() method.

  • Provides an Authenticator for the current connection string. This is especially important for installations where different servers require different credentials or authentication tokens must be reset after each connection attempt.

    To provide the authenticator, the ServerChooser implements the getCurrentAuthenticator() method.

The HAClient calls the getCurrentURI() and getCurrentAuthenticator() methods each time it needs to make a connection.

Each time a connection succeeds, the HAClient calls the reportSuccess() method of the ServerChooser. Each time a connection fails, the HAClient calls the reportFailure() method of the ServerChooser. The HAClient does not require the ServerChooser to take any particular action when it calls these methods. These methods are provided for the HAClient to do internal maintenance, logging, or record keeping. For example, an HAClient might keep a list of available URIs with a current failure count, and skip over URIs that have failed more than 5 consecutive times until all URIs in the list have failed more than 5 consecutive times.

When the ServerChooser returns an empty string from getCurrentURI(), indicating that no servers are available for connection, the HAClient calls getError() method on the ServerChooser and includes the string returned by getError() in the generated exception.

Heartbeats and Failure Detection

Use of the HAClient allows your application to quickly recover from detected connection failures. By default, connection failure detection occurs when AMPS receives an operating system error on the connection. This system may result in unpredictable delays in detecting a connection failure on the client, particularly when failures in network routing hardware occur, and the client primarily acts as a subscriber.

The heartbeat feature of the AMPS client allows connection failure to be detected quickly. Heartbeats ensure that regular messages are sent between the AMPS client and server on a predictable schedule. The AMPS client and server both assume disconnection has occurred if there is no other activity and these regular heartbeats cease, ensuring disconnection is detected in a timely manner.

To use the heartbeat feature, call the setHeartbeat method on Client or HAClient:

HAClient client = HAClient::createMemoryBacked(
    "importantStuff");
...
client.setHeartbeat(3);
client.connectAndLogon();
...

Method setHeartbeat takes one parameter: the heartbeat interval. The heartbeat interval specifies the periodicity of heartbeat messages sent by the server: the value 3 indicates messages are sent on a three-second interval. If the client receives no messages in a six second window (two heartbeat intervals), the connection is assumed to be dead, and the HAClient attempts reconnection. An additional variant of setHeartbeat allows the idle period to be set to a value other than two heartbeat intervals. (The server, however, will always consider a connection to be closed after two heartbeat intervals without any traffic.)

Notice that, for HAClient, setHeartbeat must be called before the client is connected. For Client, setHeartbeat must be called after the client is connected.

Heartbeats are serviced on the receive thread created by the AMPS client. Your application must not block the receive thread for longer than the heartbeat interval, or the application is subject to being disconnected.

Considerations for Publishers

Publishing with an HAClient is nearly identical to regular publishing; you simply call the publish() method with your message’s topic and data. The AMPS client sends the message to AMPS, and then returns from the publish() call. For maximum performance, the client does not wait for the AMPS server to acknowledge that the message has been received.

When an HAClient uses a publish store (other than the DefaultPublishStore), the publish store retains a copy of each outgoing message and requests that AMPS acknowledge that the message has been persisted. The AMPS server acknowledges messages back to the publisher. Acknowledgments can be delivered for multiple messages at periodic intervals (for topics recorded in the transaction log) or after each message (for topics that are not recorded in the transaction log). When an acknowledgment for a message is received, the HAClient removes that message from the bookmark store. When a connection to a server is made, the HAClient automatically determines which messages from the publish store (if any) the server has not processed, and replays those messages to the server once the connection is established.

For reliable publishers, the application must choose how best to handle application shutdown. For example, it is possible for the network to fail immediately after the publisher sends the message, while the message is still in transit. In this case, the publisher has sent the message, but the server has not processed it and acknowledged it. During normal operation, the HAClient will automatically connect and retry the message. On shutdown, however, the application must decide whether to wait for messages to be acknowledged, or whether to exit.

Publish store implementations provide an unpersistedCount() method that reports the number of messages that have not yet been acknowledged by the AMPS server. When the unpersistedCount() reaches 0, there are no unpersisted messages in the local publish store.

For the highest level of safety, an application can wait until the unpersistedCount() reaches 0, which indicates that all of the messages have been persisted to the instance that the application is connected to, and the synchronous replication destinations configured for that instance. When a synchronous replication destination goes offline, this approach will cause the publisher to wait to exit until the destination comes back online or until the destination is downgraded to asynchronous replication.

For applications that are shut down periodically for short periods of time (for example, applications that are only offline during a weekly maintenance window), another approach is to use the publishFlush() method to ensure that messages are delivered to AMPS, and then rely on the connection logic to replay messages as necessary when the application restarts.

For example, the following code flushes messages to AMPS, then warns if not all messages have been acknowledged:

HAClient pub = HAClient.createMemoryBacked("importantStuff");
...
pub.connectAndLogon();
std::string topic = "loggedTopic";
std:string data = ...;

for (size_t i = 0; i < MESSAGE_COUNT; i++)
{
    pub.publish(topic, data);
}

/* We think we are done, but the server may not
 * have received or acknowledged the messages yet.

 * Wait until the server has received all messages.
 * The program could also specify a timeout in this
 * command to avoid blocking forever if the
 * network is down or all servers are offline.
 */
pub.publishFlush();

/* Print warning to the console if messages have
 * been published but not yet acknowledged as persisted.
 */
if (pub.getPublishStore().unpersistedCount() > 0)
{
    printf("all messages have been published,"
           " but not all have been persisted.");
}

pub.disconnect();

In this example, the client sends each message immediately when publish() is called. If AMPS becomes unavailable between the final publish() and the disconnect(), or one of the servers that the AMPS instance replicates to is offline, the client may not have received a persisted acknowledgment for all of the published messages. For example, if a message has not yet been persisted by all of the servers in the replication fabric that are connected with synchronous replication, AMPS will not have acknowledged the message.

Before shutting down the client, the code does two things:

  • First, the code flushes messages to the server to ensure that all messages have been delivered to AMPS.

  • Next, the code checks to see if all of the messages in the publish store have been acknowledged as persisted by AMPS. If the messages have not been acknowledged, they will remain in the publish store file and will be published to AMPS, if necessary, the next time the application connects. An application may choose to loop until unpersistedCount() returns 0, or (as we do in this case) simply warn that AMPS has not confirmed that the messages are fully persisted. The behavior you choose in your application should be consistent with the high-availability guarantees your application needs to provide.

AMPS uses the name of the HAClient to determine the origin of messages. For the AMPS server to correctly identify duplicate messages, each instance of an application that publishes messages must use a distinct name. That name must be consistent across different runs of the application.

If your application crashes or is terminated, some published messages may not have been persisted in the AMPS server. If you use the file-based store (in other words, the store created by using HAClient.createFileBacked()), then the HAClient will recover the messages, and once logged on, correlate the message store to what the AMPS server has received, re-publishing any missing messages. This occurs automatically when HAClient connects, without any explicit consideration in your code, other than ensuring that the same file name is passed to createFileBacked() if recovery is desired.

AMPS provides persisted acknowledgment messages for topics that do not have a transaction log enabled; however, the level of durability provided for topics with no transaction log is minimal. Learn more about transaction logs in the AMPS User Guide.

Detecting Failover Ahead of Replication

AMPS replication provides two different acknowledgment modes for outgoing replication links from an instance:

  • For a link in sync acknowledgment mode, a message must be successfully acknowledged by the downstream instance of AMPS before this instance of AMPS will acknowledge the message.

  • For a link in async acknowledgment mode, this link is not considered for acknowledging the message. In this mode, the downstream side of the replication link may not have received or processed the message at the time that the publisher receives an acknowledgment.

As described in the AMPS User Guide, a publisher must not failover from one instance of AMPS to another instance when any link between those instances uses async acknowledgment unless replication is certain to have reached that instance. (For example, if replication is taking a maximum of 1.2 seconds between the instances and the publisher has been disconnected for 30 seconds, all messages from that publisher will have been replicated).

To help detect a situation where a publisher may be "jumping ahead" of messages that it has published, but which have not yet been replicated, the AMPS client allows an application to consider it to be an error to make a connection to a server that has not received messages previously published by the application.

To enable this behavior, set the setErrorOnPublishGap() method to set this property on the PublishStore in use for the client. When this property is set, the client will consider it to be an error to connect to a server that has not received messages previously published by the client, and consider the connection to have failed.

Notice that an application that uses this approach may need to handle situations where no server has received the message, particularly if the replication configuration uses automated replication downgrade.

Considerations for Subscribers

HAClient provides two important features for applications that subscribe to one or more topics: re-subscription, and a bookmark store to track the correct point at which to resume a bookmark subscription.

Resubscription with Asynchronous Message Processing

Any asynchronous subscription placed using an HAClient is automatically reinstated after a disconnect or a failover. These subscriptions are placed in an in-memory SubscriptionManager, which is created automatically when the HAClient is instantiated. Most applications will use this built-in subscription manager, but for applications that create a varying number of subscriptions, you may wish to implement SubscriptionManager to store subscriptions in a more durable place. Note that these subscriptions contain no message data, but rather simply contain the parameters of the subscription itself (for instance, the command, topic, message handler, options, and filter).

When a re-subscription occurs, the AMPS C++ Client re-executes the command as originally submitted, including the original topic, options, and so on. AMPS sends the subscriber any messages for the specified topic (or topic expression) that are published after the subscription is placed. For a sow_and_subscribe command, this means that the client re-issues the full command, including the SOW query as well as the subscription.

A sow command is a point-in-time query. It isn't added to the subscription manager, and isn't restarted if a disconnection happens in the middle of a query.

A sow_and_subscribe is a subscription, and is added to the subscription manager.

Resubscription with Synchronous Message Processing

The HAClient (starting with the AMPS C++ Client version 4.3.1.1) does not track synchronous message processing subscriptions in the SubscriptionManager. The reason for this is to preserve conventional iterator behavior. That is, once the MessageStream indicates that there are no more elements to iterate (for example, because the connection has closed), the MessageStream will not suddenly produce more elements.

To re-subscribe when the HAClient fails over, you can simply re-issue the subscription. For example, the snippet below re-issues a subscribe command when the message stream ends:

bool still_need_to_process = true;

while (still_need_to_process)
{
    try
    {
      for ( auto message : client.subscribe("messages"))
      {
           /* process messages here */

           /* check condition on still_need_to_process */
           if (!still_need_to_process) break;
       }
       /* end of stream: for a subscribe this means
        * that the connection is likely closed, or
        * the program broke out of the loop
        */
     }
     catch(...) /* for production, you would catch specific errors */
     {
        /* log error as appropriate */
     }
}

Bookmark Stores

In cases where it is critical not to miss a single message, it is important to be able to resume a subscription at the exact point that a failure occurred. In this case, simply recreating a subscription isn't sufficient. Even though the subscription is recreated, the subscriber may have been disconnected at precisely the wrong time, and will not see the message.

To ensure delivery of every message from a topic or set of topics, the AMPS HAClient includes a BookmarkStore that, combined with the bookmark subscription and transaction log functionality in the AMPS server, ensures that clients receive any messages that might have been missed. The client stores the bookmark associated with each message received, and tracks whether the application has processed that message; if a disconnect occurs, the client uses the BookmarkStore to determine the correct resubscription point, and sends that bookmark to AMPS when it re-subscribes. AMPS then replays messages from its transaction log from the point after the specified bookmark, thus ensuring the client is completely up-to-date.

HAClient helps you to take advantage of this bookmark mechanism through the BookmarkStore interface and bookmarkSubscribe() method on Client. When you create subscriptions with bookmarkSubscribe(), whenever a disconnection or failover occurs, your application automatically re-subscribes to the message after the last message it processed. HAClients created by createFileBacked() additionally store these bookmarks on disk, so that the application can restart with the appropriate message if the client application fails and restarts.

To take advantage of bookmark subscriptions, do the following:

  • Ensure the topic(s) to be subscribed to are included in a transaction log. See the AMPS User Guide for information on how to specify the contents of a transaction log.

  • Use bookmarkSubscribe() instead of subscribe() when creating a subscription, and decide how the application will manage subscription identifiers (SubIds). If you are using a Command object, you can simply provide a bookmark on that object.

  • Use the BookmarkStore.discard() method in message handlers to indicate when a message has been fully processed by the application, that is, when the application does not need to receive the message again if the application fails over.

The following example creates a bookmark subscription against a transaction-logged topic, and fully processes each message as soon as it is delivered:

HAClient client = HAClient::createFileBacked("theClient",
                                             "/logs/theClient.publishLog",
                                             "/logs/theClient.subscribeLog");
namespace MyMessageHandler
{
    public void invoke(const Message& message, void* data)
    {
        ...
        client.getBookmarkStore().discard(message);
        ...
    }
}

std::string commandID = client.executeAsync(Command("subscribe")
                              .setTopic("myTopic")
                              .setSubscriptionId("MySubId")
                              .setBookmark(AMPS::Client::BOOKMARK_RECENT()),
                              AMPS::MessageHandler(MyMessageHandler::invoke,(void*)(&client)));

In this example, the client is a file-backed client, meaning that arriving bookmarks will be stored in a file (theClient.subscribeLog). Storing these bookmarks in a file allows the application to restart the subscription from the last message processed, in the event of either server or client failure.

For optimum performance, it is critical to discard every message once its processing is complete. If a message is never discarded, it remains in the bookmark store. During re-subscription, HAClient always restarts the bookmark subscription with the oldest undiscarded message, and then filters out any more recent messages that have been discarded. If an old message remains in the store, but is no longer important for the application’s functioning, the client and the AMPS server will incur unnecessary network, disk, and CPU activity.

In the example above, all parameters after the bookmark are optional. However, all options before — and including the bookmark — are required when creating a bookmarkSubscribe().

The last parameter, subId, specifies an identifier to be used for this subscription. Passing NULL causes HAClient to generate one and return it, like most other Client functions. However, if you wish to resume a subscription from a previous point after the application has terminated and restarted, the application must pass the same subscription ID as during its previous run. Passing a different subscription ID bypasses any recovery mechanisms, creating an entirely new subscription. When you use an existing subscription ID, the HAClient locates the last-used bookmark for that subscription in the local store, and attempts to re-subscribe from that point.

The subId is also required to be unique when used within a single client, but can be the same in different clients. Internally, AMPS tracks subscriptions in each client, thus each identifier for each subscription within a client must be unique. The same subId can be reused across unique clients simultaneously without causing problems.

Below are the different bookmark types that can be used to enable different recovery strategies for an application:

  • Client::BOOKMARK_NOW() specifies that the subscription should begin from the moment the server receives the subscription request. This results in the same messages being delivered as if you had invoked subscribe() instead, except that the messages will be accompanied by bookmarks. This is also the behavior that results if you supply an invalid bookmark.

  • Client::BOOKMARK_EPOCH() specifies that the subscription should begin from the beginning of the AMPS transaction log (that is, the first entry in the oldest journal file for the transaction log).

  • Client::BOOKMARK_RECENT() specifies that the subscription should begin from the last-used message in the associated BookmarkStore, or, if this subscription has not been seen before, to begin with EPOCH. This is the most common value for this parameter, and is the value used in the preceding example. By using BOOKMARK_RECENT, the application automatically resumes from wherever the subscription left off, taking into account any messages that have already been processed and discarded.

When the HAClient re-subscribes after a disconnection and reconnection, it always uses BOOKMARK_RECENT, ensuring that the continued subscription always begins from the last message discarded before the disconnect, so that no messages are missed.

Conclusion

With only a few changes, most AMPS applications can take advantage of the HAClient and associated classes to become more highly-available and resilient. Using the PublishStore, publishers can ensure that every message published has actually been persisted by AMPS. Using BookmarkStore, subscribers can make sure that there are no gaps or duplicates in the messages received. HAClient makes both kinds of applications more resilient to network and server outages, as well as temporary issues. By utilizing the file-based HAClient, clients can recover their state after an unexpected termination or crash. Though HAClient provides useful defaults for the Store, BookmarkStore, SubscriptionManager, and ServerChooser, you can customize any or all of these to the specific needs of your application and architecture.

Advanced Topics

Transport Filtering

The AMPS C/C++ client offers the ability to filter incoming and outgoing messages in the format they are sent and received on the network. This allows you to inspect or modify outgoing messages before they are sent to the network, and incoming messages as they arrive from the network. This can be especially useful when using SSL connections, since this gives you a way to monitor outgoing network traffic before it is encrypted, and incoming network traffic after it is decrypted.

To create a transport filter, you create a function with the following signature:

You then register the filter by calling setTransportFilterFunction with a pointer to the function and a pointer to the data to be provided in the userdata parameter of the callback.

For example, the following filter function simply prints the data provided to the standard output:

Registering the function is a matter of calling setTransportFilterFunction with this function and any callback data, as shown below:

The snippet above installs the filter function for the client.

Notice that the transport filter function is called with the verbatim contents of data received from AMPS. This means that, for incoming data, the function may not be called precisely on message boundaries, and that the binary length encoding used by the client and server will be presented to the transport filter.

Using SSL

The AMPS C++ client includes support for Secure Sockets Layer (SSL) connections to AMPS. The client automatically attempts to make an SSL connection when the transport in the connection string is set to tcps, as described in the Connection Strings section in Chapter 3 of this guide.

To use the tcps transport, your application must have an SSL library loaded before making the tcps connection. Notice that the AMPS C++ client uses the OpenSSL implementation that you provide. The AMPS client distribution doesn't include OpenSSL, and doesn't provide facilities for certificate generation, certificate signing, key management, and so forth. Those facilities are provided by the OpenSSL implementation you choose.

Loading and Initializing the SSL Library

To make an SSL connection, the AMPS client must have an SSL library loaded before making the SSL connection.

There are two common ways to load the library:

  1. At link time, specify the OpenSSL shared object file (Linux) or DLL (Windows) to the linker. With this approach, the operating system will load the SSL library for your application automatically when the application starts up. You then use call amps_ssl_init with NULL as the library name to initialize the library.

  2. Use the amps_ssl_init function to load the SSL library. This function accepts either the library name, or a full path including the library name. When called with the library name, the AMPS C++ client will search appropriate system paths for shared libraries (for example, the LD_LIBRARY_PATH on Linux) and load the first object found that matches the provided name. When called with a full path, the AMPS C++ client will load exactly the object specified. The AMPS client will then initialize the library.

Once the SSL library is loaded and initialized, you can connect using tcps as a transport type. The fact that the connection uses a secure socket is only important when making the connection, and does not affect the operation of the client once the connection has been made.

Some of these features require specific configuration settings on your AMPS instance(s). This chapter mentions these features and describes how to use them from the AMPS C++ client. You can find full documentation for these settings and server features in the .

AMPS User Guide
void amps_tcp_filter_function(const unsigned char* data,size_t len,short direction, void* userdata);
void amps_tcp_trace_filter_function(const unsigned char* data,
                                    size_t len,
                                    short direction,
                                    void* userdata)
{
    /* Output the direction marker */
    if (direction == 0) {
        std::cout << "OUTGOING ---> ";
    }
    else {
        std::cout << "INCOMING ---> ";
    }

    /* Output the data */
    std::cout << std::string(data, len) << std::endl;
}
/* client is an existing Client object */
client.setTransportFilterFunction(
                              &amps_tcp_trace_filter_function,
                              (void*)NULL);