AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.2
AMPS::MessageStream Class Reference

An iterable object representing the results of an AMPS subscription and/or query. More...

#include <ampsplusplus.hpp>

Classes

class  iterator
 Represents an iterator over messages in an AMPS topic. More...
 

Public Member Functions

bool isValid () const
 Returns true if self is a valid stream that may be iterated.
 
iterator begin (void)
 Returns an iterator representing the beginning of the topic or subscription. More...
 
iterator end (void)
 Returns an iterator representing the end of the topic or subscription. More...
 
MessageStream timeout (unsigned timeout_)
 Sets the maximum time to wait for the next message in milliseconds; if no message is available within this timeout, return an invalid Message (that is, a message where isValid() returns false). More...
 
MessageStream conflate (void)
 Sets self to conflation mode, where a new update for a matching sow key will replace the previous one in the underlying queue. More...
 
MessageStream maxDepth (unsigned maxDepth_)
 Sets the maximum number of messages that can be held in the underlying queue. More...
 
unsigned getMaxDepth (void) const
 Gets the maximum number of messages that can be held in the underlying queue. More...
 
unsigned getDepth (void) const
 Gets the current number of messages held in the underlying queue. More...
 

Detailed Description

An iterable object representing the results of an AMPS subscription and/or query.

Objects of MesageStream type are returned by the MessageStream-returning overloads of methods on class AMPS::Client, such as AMPS::Client::sow() and AMPS::Client::subscribe() . Use this object via the begin() and end() iterators returned, for example:

MessageStream myTopic = client.sow("orders");
for(auto i = myTopic.begin(), e = myTopic.end(); i!=e; ++i)
{
Message m = *i;
... // (work with the contents of m)
}

For MessageStream objects returned by calls to methods other than sow(), note that the "end" is never reached, unless the server is disconnected. In this case, you may choose to stop processing messages by simply exiting the loop. For example, assuming that the isDone flag will be set when the application has processed the last message in a job:

MessageStream myTopic = client.subscribe("aTopic");
for(auto i = myTopic.begin();; ++i)
{
Message m = *i;
... // do work with *i
if(imDone) break;
}

By default, a MessageStream for a subscription will block until a message is available or the connection is closed. You can also configure a MessageStream to periodically produce an invalid (empty) message by setting the timeout on the message stream. This can be useful for indicating that the process is still working (for example, pinging a thread monitor) or reporting an error if a subscription pauses for an unexpectedly long period of time.

For example, the following code configures the message stream to produce an invalid message if no data arrives for this subscription within 1 second (1000 milliseconds). Even though there is no work for the application to do, this gives the application the opportunity to ping a thread monitor. Further, the application keeps track of how long it has been since the last data message arrived. If data flow pauses for more than 4 minutes (240 seconds) continuously, the application reports an error.

{
MessageStream ms = client.subscribe("aTopic");
ms.timeout(1000);
int timeout_counter = 0;
for (auto m : ms)
{
if (m.isValid() == false)
{
if(timeout_counter++ > 240) { /* report critical error */}
/* otherwise ping thread monitor, then */
continue;
}
timeout_counter = 0;
... process message here, pinging thread monitor as appropriate ...
}
} // MessageStream is unsubscribed and destroyed here

Member Function Documentation

iterator AMPS::MessageStream::begin ( void  )
inline

Returns an iterator representing the beginning of the topic or subscription.

Returns
An iterator representing the beginning of this topic.
MessageStream AMPS::MessageStream::conflate ( void  )
inline

Sets self to conflation mode, where a new update for a matching sow key will replace the previous one in the underlying queue.

Returns
This MessageStream
iterator AMPS::MessageStream::end ( void  )
inline

Returns an iterator representing the end of the topic or subscription.

Returns
An iterator representing the ending of this topic or subscription.
unsigned AMPS::MessageStream::getDepth ( void  ) const
inline

Gets the current number of messages held in the underlying queue.

Returns
The current depth of unprocessed messages.
unsigned AMPS::MessageStream::getMaxDepth ( void  ) const
inline

Gets the maximum number of messages that can be held in the underlying queue.

Returns
The maximum depth allowed.
MessageStream AMPS::MessageStream::maxDepth ( unsigned  maxDepth_)
inline

Sets the maximum number of messages that can be held in the underlying queue.

When that depth is reached, the receive thread will keep trying to store the next message before receiving any other new messages.

Parameters
maxDepth_The maximum number of messages, 0 means no maximum.
Returns
This MessageStream
MessageStream AMPS::MessageStream::timeout ( unsigned  timeout_)
inline

Sets the maximum time to wait for the next message in milliseconds; if no message is available within this timeout, return an invalid Message (that is, a message where isValid() returns false).

Parameters
timeout_The maximum milliseconds to wait for messages, 0 means no maximum.
Returns
This MessageStream

The documentation for this class was generated from the following file: