Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server, so that your application and the network are not utilized by messages that are uninteresting for your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client. The Filtering Subscriptions by Content under the AMPS User Guide provides full details on content filtering. The AMPS Expressions section of the AMPS User Guide describes the basic syntax of AMPS expressions, including content filters, and the AMPS Functions section describes functions that are available for use in filters.
To apply a content filter to a subscription, simply pass it into the client.subscribe()
call:
In this case, the application will receive messages where the sender
field equals mom
.
In this example, we have passed in a content filter /sender = 'mom'
. This will result in the server only sending us messages, from the messages
topic, that have the sender field equal to mom
in the message.
For example, the AMPS server will send the following message, where /sender
is mom
:
The AMPS server will not send a message with a different /sender
value:
AMPS allows you to update parameters, such as the content filter, on a subscription. When you replace a filter on the subscription, AMPS immediately begins sending only messages that match the updated filter. Notice that if the subscription was entered with a command that includes a SOW query, using the replace
option can re-issue the SOW query (as described in the AMPS User Guide).
To update the filter on a subscription, you create a subscribe
command. You set the SubscriptionId
provided on the Command
to the identifier of the existing subscription and include the replace
option on the Command
.
When you send the Command
, AMPS atomically replaces the filter and sends messages that match the updated filter from that point forward.
As mentioned , one way for an application to receive messages is to have the AMPS C++ client return a MessageStream
object that can be used to iterate over the results of the command.
The MessageStream
object makes copies of the incoming messages. When there is no message available, the MessageStream
will block.
A MessageStream
will only remain active while the client that produced it is connected. If the client disconnects, the MessageStream
will continue to provide any messages that have not yet been consumed, then throw an exception.
The advantages of using a MessageStream
that it provides a simple processing model, that receiving messages from a MessageStream
does not block the client receive thread (see ) and that a copy of the message is automatically made for the application.
In return for these advantages, a MessageStream
has higher overhead than , it will not be resumed if the client disconnects, and, by default, it will use as much memory as necessary to hold messages coming from the AMPS server.
So far, we have seen that subscribing to a topic involves working with objects of AMPS::Message
type. A Message
represents a single message to or from an AMPS server. Messages are received or sent for every client/server operation in AMPS.
There are two parts of each message in AMPS: a set of headers that provide metadata for the message, and the data that the message contains. Every AMPS message has one or more header fields defined. The precise headers present depend on the type and context of the message. There are many possible fields in any given message, but only a few are used for any given message. For each header field, the Message
class contains a distinct property that allows for retrieval and setting of that field. For example, the Message.get_command_id()
function corresponds to the commandId
header field, the Message.get_batch_size()
function corresponds to the BatchSize
header field, and so on. For more information on these header fields, consult the AMPS User Guide and AMPS Command Reference.
The Message
object contains several different accessors for header fields.
For retrieving the value of fields on a message:
The getXxx()
functions return a Field
. The Field
contains pointers to the underlying buffer in the Message
. Data is not copied until either deepCopy()
is called or the Field
is converted to another format (such as constructing a std::string
from the Field
). The value of the Field
is only valid for the lifetime of the message unless it is copied using deepCopy()
.
The getRawXxx()
functions take a pointer and a length. The pointer is assigned to the first byte of the value in the underlying buffer in the Message
. The length is set to the length of the value.
To assign values to a field in a message, the Message
object provides several different options. The differences between these options are a matter of managing the memory for the value.
The setXxx()
functions copy the value provided into the specified header.
The assignXxx()
functions set the value of the specified header, avoiding a copy if possible. When this function is used, the Message
may refer directly to the data passed in. That data should not change or be deallocated while the Message
is in use. (Notice, though, that a copy of the message produced using deepCopy
will copy the data and can be used safely even if the original data is changed or deallocated.)
60East does not recommend attempting to parse header fields from the raw data of the message, nor does 60East recommend attempting to manipulate the fields of the message without using the accessor methods.
In AMPS, fields sometimes need to be set to a unique identifier value. For example, when creating a new subscription, or sending a manually constructed message, you’ll need to assign a new unique identifier to multiple fields such as CommandId
and SubscriptionId
. For this purpose, Message provides newXxx()
methods for each field that generates a new unique identifier and sets the field to that new value.
Access to the data section of a message is provided via the getData()
method. The data
contains the unparsed data in the message, returned as a series of bytes (a string
or const char *
). Your application code parses and works with the data.
The AMPS C++ client contains a collection of helper classes for working with message types that are specific to AMPS (for example, FIX, NVFIX, and AMPS composite message types). For message types that are widely used, such as JSON or XML, you can use whichever library you typically use in your environment.
The AMPS Command Reference contains a full description of which fields are available and which fields are returned in response to specific commands.
Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent only to those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS C/C++ client.
Subscribing to an AMPS topic takes place by calling Client.subscribe()
. Here is a short example showing the simplest way to subscribe to a topic (error handling and connection details are omitted for brevity):
AMPS creates a background thread that receives messages and copies them into a MessageStream
that you iterate over. This means that the client application as a whole can continue to receive messages while you are doing processing work.
The simple method described above is provided for convenience. The AMPS C++ client provides convenience methods for the most common form of the AMPS commands. The client also provides an interface that allows you to have precise control over the command. Using that interface, the example above becomes:
The Command
interface allows you to precisely customize the commands you send to AMPS. For flexibility and ease of maintenance, 60East recommends using the Command
interface (rather than a named method) for any command that will receive messages from AMPS. For publishing messages, there can be a slight performance advantage to using the named commands where possible.
The AMPS C++ client also supports an interface that allows you to process messages asynchronously. In this case, you add a message handler to the function call. The client returns the command ID of the subscribe command once the server has acknowledged that the command has been processed. As messages arrive, the client calls your message handler directly on the background thread. This can be an advantage for some applications. For example, if your application is highly multithreaded and copies message data to a work queue processed by multiple threads, there is usually a performance benefit to enqueuing work directly from the background thread. See Understanding Threading for a discussion of threading considerations, including considerations for message handlers.
The advantage of using asynchronous message processing is that it is extremely efficient -- your processing code runs directly on the thread that the AMPS client is using to read from the socket and has direct access to the buffer that the AMPS client uses. Further, when an HAClient
is used (discussed later in this guide), the default disconnect handler for that client will automatically resume subscriptions that use asynchronous message processing. Last, but not least, since message processing runs directly on the receive thread, using asynchronous message processing will provide pushback on the socket in the event that messages are arriving faster than the application can process them (for example, in response to a sow
query).
In return for these advantages, your processing code must be careful not to block the processing thread for an excessive amount of time, and must make a deep copy of any data that will be used outside of the processing code.
Here is a short example (error handling and connection details are omitted for brevity):
The AMPS client resets and reuses the message provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it.
With newer compilers, you can use additional constructs to specify a callback function. Recent improvements in C++ have added lambda functions -- unnamed functions declared in-line that can refer to names in the lexical scope of their creator. If available on your system, both Standard C++ Library function objects and lambda functions may be used as callbacks.
Check functional.cpp
in the samples directory for numerous examples.
One of the more common ways of providing a message handler is as an instance method on an object that maintains message state. It's simple to provide a handler with this capability, as shown below.
You can then provide an instance of the handler directly wherever a message handler is required, as shown below:
Regular Expression (Regex) subscriptions allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.
To use a regular expression, simply supply the regular expression in place of the topic name in the subscribe
command. For example:
In this example, messages on topics orders-north-america
, orders-europe
, and new-orders
would match the regular expression. Messages published to any of those topics will be sent to our message_handler
function. As in the example, you can use the getTopic()
function to determine the actual topic of the message sent to the function.
The first time a command causes an instance of the Client
or HAClient
to connect to AMPS (typically, the logon()
command), the client creates a thread that runs in the background. This background thread is responsible for processing incoming messages from AMPS, which includes both messages that contain data and acknowledgments from the server.
When you call a command on the AMPS client, the command typically waits for an acknowledgment from the server and then returns. (The exception to this is publish
. For performance, the publish
command does not wait for an acknowledgment from the server before returning.)
In the simple case, using synchronous message processing, the client provides an internal handler function that populates the MessageStream
. The client receive thread calls the internal handler function, which makes a deep copy of the incoming message and adds it to the MessageStream
. The MessageStream
is used on the calling thread, so operations on the MessageStream
do not block the client receive thread.
When using asynchronous message processing, AMPS calls the handler function from the client receive thread. Message handlers provided for asynchronous message processing must be aware of the following considerations:
The client creates one client receive thread at a time, and the lifetime of the thread lasts for the lifetime of the connection to the AMPS server. A message handler that is only provided to a single client will only be called from a single thread at a time. If your message handler will be used by multiple clients, then multiple threads will call your message handler. In this case, you should take care to protect any state that will be shared between threads. Notice that if the client connection fails (or is closed), and the client reconnects, the client will create a different thread for the new connection.
For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to update an external database, a message handler that adds the relevant data to an update queue, that is processed by a different thread, will typically perform better than a message handler that does this update during the message handling.
While your message handler is running, the thread that calls your message handler is no longer receiving messages. This makes it easier to write a message handler because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed and your application will deadlock waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate thread or use a different client object to submit the commands.
The AMPS client resets and reuses the Message
provided to this function between calls. This improves performance in the client, but means that if your handler function needs to preserve information contained within the message, you must copy the information (either by making a copy of the entire message or copying the required fields) rather than just saving the message object. Otherwise, the AMPS client cannot guarantee the state of the object or the contents of the object when your program goes to use it. Likewise, a message handler should not modify the Message
-- this will result in modifying the message provided to other handlers (including handlers internal to the AMPS client).
The AMPS server continues a subscription until the client explicitly ends the subscription (that is, unsubscribes) or until the connection to the client is closed.
With the synchronous interface, AMPS automatically unsubscribes to the topic when the destructor for the MessageStream
runs. You can also explicitly call the close()
method on the MessageStream
object to remove the subscription.
In the asynchronous interface, when a subscription is successfully made, messages will begin flowing to the message handler, and the subscribe()
or executeAsync()
call will return a string for the subscription id that serves as the identifier for this subscription. A Client
can have any number of active subscriptions, and this subscription id is how AMPS designates messages intended for this particular subscription. To unsubscribe, we simply call unsubscribe
with the subscription identifier:
In this example, as in the previous section, we use the client.executeAsync()
method to create a subscription to the messages
topic. When our application is done listening to this topic, it unsubscribes by passing in the subId
returned by subscribe()
. After the subscription is removed, no more messages will flow into our myHandlerFunction()
.
When an application calls unsubscribe()
, the client sends an explicit unsubscribe
command to AMPS. The AMPS server removes that subscription from the set of subscriptions for the client, and stops sending messages for that subscription. On the client side, the client unregisters the subscription so that the MessageStream
or MessageHandler
for that subscription will no longer receive messages for that subscription.
Notice that calling unsubscribe
does not destroy messages that the server has already sent to the client. If there are messages on the way to the client for this subscription, the AMPS client must consume those messages. If a LastChanceMessageHandler
is registered, the handler will receive the messages. Otherwise, they will be discarded since no message handler matches the subscription ID on the message.