Imagine an application that displays real-time information about the position and status of a fleet of delivery vans. When the application starts, it should display the current location of each of the vans along with their current status. As vans move around the city and post other status updates, the application should keep its display up to date. Vans upload information to the system by posting message to a van location topic, configured with a key of van_id on the AMPS server.
In this application, it is important to not only stay up-to-date on the latest information about each van, but to ensure all of the active vans are displayed as soon as the application starts. Combining a SOW with a subscription to the topic is exactly what is needed, and that is accomplished by the Client.sowAndSubscribe() method. Now we will look at an example:
/* processSOWMessage
*
* Processes a message during SOW query. Returns
* true if the SOW query is complete (group_end command),
* false otherwise.
*/
bool processSOWMessage(const AMPS::Message& message)
{
if (message.getCommand() == "group_begin") {
std::cout << "Receiving messages from the SOW." << std::endl;
}
else if (message.getCommand() == "group_end") {
std::cout << "Done receiving messages from SOW." << std::endl;
return true;
}
else {
std::cout << "SOW message: " << message.getData() << std::endl;
addVan(message);
}
return false;
}
/* processSubscriptionMessage
*
* Process messages received on a subscription, after the SOW
* query is complete.
*/
void processSubscribeMessage(const AMPS::Message& message)
{
if (message.getCommand() == "oof") {
std::cout << "OOF : " << message.getReason()
<< " message to remove : "
<< message.getData() << std::endl;
removeVan(message);
}
else {
std::cout << "New or updated message: " << message.getData() << std::endl;
addOrUpdateVan(message);
}
}
...
void doSowAndSubscribe(AMPS::Client& ampsClient)
{
bool sowDone = false;
std::cerr << "about to subscribe..." << std::endl;
/* We issue a sowAndSubscribe() to begin receiving information about all of the
* open orders in the system for the symbol ROL. These orders are now are returned
* as Messages whose Command returns SOW.
*
* Notice here that we specified true for the oofEnabled parameter.
* Setting this parameter to true causes us to receive Out-of-Focus("OOF")
* messages for the topic. OOF messages are sent when an entry that was sent
* to us in the past no longer matches our query. This happens when an entry
* is removed from the SOW cache via a sowDelete() operation,
* when the entry expires (as specified by the expiration time on the message
* or by the configuration of that topic on the AMPS server),
* or when the entry no longer matches the content filter
* specified. In our case, when an order is processed or canceled
* (or if the symbol changes), a Message is sent with Command set to OOF.
* The content of that message is the message sent previously.
* We use OOF messages to remove orders from our display as they are
* completed or canceled.
*/
for (auto message : ampsClient.execute(Command("sow_and_subscribe")
.setTopic("van_location")
.setFilter("/status = 'ACTIVE'")
.setBatchSize(100)
.setOptions("oof"))) {
if (sowDone == false)
{
sowDone = processSOWMessage(message);
}
else
{
processSubscribeMessage(message);
}
}
}
Now we will look at an example that uses the asynchronous form of sowAndSubscribe:
// handleMessage
//
// Handles messages for both SOW query and subscription.
void processSOWMessage(const AMPS::Message& message)
{
if (message.getCommand() == "group_begin")
{
std::cout << "Receiving messages from the SOW." << std::endl;
return;
}
else if (message.getCommand() == "group_end")
{
std::cout << "Done receiving messages from SOW." << std::endl;
return true;
}
else if (message.getCommand() == "oof")
{
std::cout << "OOF : " << message.getReason()
<< " message to remove : "
<< message.getData() << std::endl;
removeVan(message);
}
else
{
std::cout << "New or updated message: " << message.getData() << std::endl;
addOrUpdateVan(message);
}
}
...
std::string trackVanPositions(AMPS::Client& ampsClient)
{
std::cerr << "about to subscribe..." << std::endl;
return ampsClient.executeAsync(
Command("sow_and_subscribe")
.setTopic("van_location")
.setFilter("/status = 'ACTIVE'")
.setBatchSize(100)
.setOptions("oof"),
bind(processSOWMessage(placeholders::_1));
}
In the listing above, the trackVanPositions function invokes sowAndSubscribe to begin tracking vans, and returns the subscription ID. The application can later use this to unsubscribe.
The two forms have the same result. However, one form performs processing on a background thread, and blocks the client from receiving messages while that processing happens, while the other form processes messages on the calling thread and allows the background thread to continue to receive messages while processing occurs. In both cases, the application receives and processes the same messages.