25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 45 #include <sys/atomic.h> 47 #include "MessageRouter.hpp" 49 #include "ampscrc.hpp" 51 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 52 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 79 #define AMPS_INITIAL_LOG_SIZE 40960UL 80 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 81 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 82 #define AMPS_RING_POSITIONS 3 83 #define AMPS_RING_BYTES_SUBID 128 84 #define AMPS_RING_BYTES_BOOKMARK 64 85 #define AMPS_RING_ENTRIES 16384 86 #define AMPS_RING_ENTRY_SIZE ( AMPS_RING_BYTES_SUBID + ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) ) 87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 98 #define AMPS_BOOKMARK_RECENT "recent" 105 #define AMPS_BOOKMARK_EPOCH "0" 109 #define AMPS_BOOKMARK_NOW "0|1|" 113 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL 114 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 115 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 116 #define AMPS_PERSISTED_BOOKMARK_MIN_VERSION 3080000 117 #define AMPS_MULTI_BOOKMARK_MIN_VERSION 4000000 118 #define AMPS_FLUSH_MIN_VERSION 4000000 119 #define AMPS_MIN_SOW_KEY_PUBLISH_VERSION 4030100 120 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 121 #define AMPS_DEFAULT_TOP_N -1 122 #define AMPS_DEFAULT_BATCH_SIZE 10 123 #define AMPS_NUMBER_BUFFER_LEN 20 124 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 126 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 131 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
139 typedef std::map<std::string, std::string> ConnectionInfo;
141 class PerThreadMessageTracker {
142 std::vector<AMPS::Message*> _messages;
144 PerThreadMessageTracker() {}
145 ~PerThreadMessageTracker()
147 for (
size_t i=0; i<_messages.size(); ++i)
154 _messages.push_back(message);
158 static AMPS::Mutex _lock;
159 AMPS::Lock<Mutex> l(_lock);
160 _addMessageToCleanupList(message);
164 static PerThreadMessageTracker tracker;
165 tracker.addMessage(message);
170 inline std::string asString(Type x_)
172 std::ostringstream os;
178 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
180 size_t pos = AMPS_NUMBER_BUFFER_LEN;
181 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
185 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
194 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
196 size_t pos = AMPS_NUMBER_BUFFER_LEN;
197 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
201 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
215 static const char* duplicate() {
return "duplicate";}
216 static const char* badFilter() {
return "bad filter";}
217 static const char* badRegexTopic() {
return "bad regex topic";}
218 static const char* subscriptionAlreadyExists() {
return "subscription already exists";}
219 static const char* nameInUse() {
return "name in use";}
220 static const char* authFailure() {
return "auth failure";}
221 static const char* notEntitled() {
return "not entitled";}
222 static const char* authDisabled() {
return "authentication disabled";}
223 static const char* subidInUse() {
return "subid in use";}
224 static const char* noTopic() {
return "no topic";}
239 virtual void exceptionThrown(
const std::exception&)
const {;}
245 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 250 catch (std::exception& ex_)\ 254 _exceptionListener->exceptionThrown(ex_);\ 279 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \ 280 while(me->_connected)\ 287 catch(MessageStreamFullException&)\ 289 me->checkAndSendHeartbeat(false);\ 293 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \ 294 while(me->_connected)\ 301 catch(MessageStreamFullException& ex_)\ 303 me->checkAndSendHeartbeat(false);\ 308 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 311 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \ 313 catch (std::exception& ex_)\ 317 me->_exceptionListener->exceptionThrown(ex_);\ 341 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 344 _exceptionListener->exceptionThrown(ex);\ 349 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 352 me->_exceptionListener->exceptionThrown(ex);\ 391 static const unsigned Subscribe = 1;
392 static const unsigned SOW = 2;
393 static const unsigned NeedsSequenceNumber = 4;
394 static const unsigned ProcessedAck = 8;
395 static const unsigned StatsAck = 16;
396 void init(Message::Command::Type command_)
405 void init(
const std::string& command_)
417 if (command != Message::Command::Publish && command != Message::Command::DeltaPublish)
420 if (command == Message::Command::Subscribe ||
421 command == Message::Command::SOWAndSubscribe ||
422 command == Message::Command::DeltaSubscribe ||
423 command == Message::Command::SOWAndDeltaSubscribe)
428 if (command == Message::Command::SOWDelete)
430 _flags |= NeedsSequenceNumber;
435 if (command == Message::Command::SOW)
440 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
443 else if ((command == Message::Command::SOWAndSubscribe ||
444 command == Message::Command::SOWAndDeltaSubscribe)
447 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
450 _flags |= ProcessedAck;
454 _flags |= NeedsSequenceNumber;
573 if (v_ ==
"processed") _flags |= ProcessedAck;
574 else if (v_ ==
"stats") _flags |= StatsAck;
578 Message& getMessage(
void) {
return _message; }
579 unsigned getTimeout(
void)
const {
return _timeout; }
580 unsigned getBatchSize(
void)
const {
return _batchSize; }
581 bool isSubscribe(
void)
const 583 return _flags & Subscribe;
585 bool isSow(
void)
const {
return (_flags & SOW) != 0; }
586 bool hasProcessedAck(
void)
const {
return (_flags & ProcessedAck) != 0; }
587 bool hasStatsAck(
void)
const {
return (_flags & StatsAck) != 0; }
588 bool needsSequenceNumber(
void)
const {
return (_flags & NeedsSequenceNumber) != 0; }
593 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
610 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
618 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
625 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
637 std::string
authenticate(
const std::string& ,
const std::string& password_)
644 std::string
retry(
const std::string& ,
const std::string& )
646 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
649 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
683 : _resizeHandler(NULL)
684 , _resizeHandlerData(NULL)
685 , _maxSubIdLength(AMPS_MAX_SUBID_LEN)
696 virtual size_t log(
Message& message_) = 0;
705 size_t bookmarkSeqNo_) = 0;
712 virtual void discard(
const Message& message_) = 0;
730 virtual bool isDiscarded(
Message& message_) = 0;
737 virtual void purge() = 0;
750 virtual size_t getOldestBookmarkSeq(
const Message::Field& subId_) = 0;
760 _resizeHandler = handler_;
761 _resizeHandlerData = userData_;
790 virtual void setServerVersion(
size_t version_) = 0;
796 virtual void setServerVersion(
const VersionInfo& version_) = 0;
798 bool callResizeHandler(
const Message::Field& subId_,
size_t newSize_);
800 inline void prune(std::string tmpFileName_ = std::string())
802 _prune(tmpFileName_);
805 virtual void _prune(std::string) {
return; }
813 return _maxSubIdLength;
822 _maxSubIdLength = maxSubIdLength_;
827 void* _resizeHandlerData;
828 size_t _maxSubIdLength;
835 RefHandle<BookmarkStoreImpl> _body;
864 return _body.isValid();
876 return _body.get().log(message_);
889 _body.get().discard(subId_, bookmarkSeqNo_);
900 _body.get().discard(message_);
912 return _body.get().getMostRecent(subId_);
927 return _body.get().isDiscarded(message_);
950 _body.get().purge(subId_);
962 _body.get().setResizeHandler(handler_, userData_);
972 return _body.get().getOldestBookmarkSeq(
Message::Field(subId_.c_str(),
974 return AMPS_UNSET_INDEX;
984 return _body.get().getOldestBookmarkSeq(subId_);
985 return AMPS_UNSET_INDEX;
995 _body.get().persisted(subId_, bookmark_);
1004 if (_body.isValid())
1005 _body.get().persisted(subId_, bookmark_);
1014 if (_body.isValid())
1015 _body.get().noPersistedAcks(subId_);
1024 if (_body.isValid())
1025 _body.get().setServerVersion(version_);
1034 if (_body.isValid())
1035 _body.get().setServerVersion(version_);
1043 void prune(std::string tmpFileName_ =
"")
1045 if (_body.isValid())
1046 _body.get().prune(tmpFileName_);
1054 if (_body.isValid())
1055 return &_body.get();
1066 if (_body.isValid())
1067 return _body.get().getMaxSubIdLength();
1078 if (_body.isValid())
1079 _body.get().setMaxSubIdLength(maxSubIdLength_);
1084 inline bool BookmarkStoreImpl::callResizeHandler(
const Message::Field& subId_,
1088 return _resizeHandler(
BookmarkStore(
this), subId_, newSize_, _resizeHandlerData);
1100 size_t newSize_,
void* data_)
1102 size_t* maxSizep = (
size_t*)data_;
1103 if (newSize_ > *maxSizep)
1106 store_.
discard(subId_, discardSeq);
1122 virtual void execute(
Message& message_) = 0;
1137 typedef bool (*PublishStoreResizeHandler)(
Store store_,
1146 StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
1152 virtual amps_uint64_t store(
const Message& message_) = 0;
1158 virtual void discardUpTo(amps_uint64_t index_) = 0;
1173 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1179 virtual size_t unpersistedCount()
const = 0;
1191 virtual void flush(
long timeout_) = 0;
1204 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
1209 virtual amps_uint64_t getLastPersisted() = 0;
1223 _resizeHandler = handler_;
1224 _resizeHandlerData = userData_;
1229 return _resizeHandler;
1232 bool callResizeHandler(
size_t newSize_);
1236 void* _resizeHandlerData;
1243 RefHandle<StoreImpl> _body;
1258 return _body.get().store(message_);
1267 _body.get().discardUpTo(index_);
1276 _body.get().replay(replayer_);
1288 return _body.get().replaySingle(replayer_, index_);
1297 return _body.get().unpersistedCount();
1305 return _body.isValid();
1318 return _body.get().flush(timeout_);
1326 return _body.get().getLowestUnpersisted();
1334 return _body.get().getLastPersisted();
1349 _body.get().setResizeHandler(handler_, userData_);
1354 return _body.get().getResizeHandler();
1362 if (_body.isValid())
1363 return &_body.get();
1384 virtual void failedWrite(
const Message& message_,
1385 const char* reason_,
size_t reasonLength_) = 0;
1389 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1392 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1405 long* timeoutp = (
long*)data_;
1407 if (count == 0)
return false;
1410 store_.
flush(*timeoutp);
1413 catch (
const TimedOutException&)
1415 catch (
const TimedOutException& e)
1438 unsigned requestedAckTypes_) = 0;
1445 virtual void clear() = 0;
1449 virtual void resubscribe(Client& client_) = 0;
1460 typedef enum { Disconnected = 0,
1464 PublishReplayed = 8,
1465 HeartbeatInitiated = 16,
1479 virtual void connectionStateChanged(
State newState_) = 0;
1484 class MessageStreamImpl;
1487 typedef void(*DeferredExecutionFunc)(
void*);
1489 class ClientImpl :
public RefBody
1491 friend class Client;
1494 DisconnectHandler _disconnectHandler;
1495 enum GlobalCommandTypeHandlers :
size_t 1505 DuplicateMessage = 8,
1508 std::vector<MessageHandler> _globalCommandTypeHandlers;
1509 MessageHandler _lastChanceMessageHandler, _duplicateMessageHandler;
1510 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1512 MessageRouter::RouteCache _routeCache;
1513 mutable Mutex _lock;
1514 std::string _name, _lastUri, _logonCorrelationData;
1516 Store _publishStore;
1517 bool _isRetryOnDisconnect;
1518 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1519 volatile amps_uint64_t _lastSentHaSequenceNumber;
1520 ATOMIC_TYPE_8 _badTimeToHAPublish;
1521 ATOMIC_TYPE_8 _badTimeToHASubscribe;
1522 VersionInfo _serverVersion;
1523 Timer _heartbeatTimer;
1524 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1527 int _queueAckTimeout;
1528 bool _isAutoAckEnabled;
1529 unsigned _ackBatchSize;
1530 unsigned _queuedAckCount;
1531 unsigned _defaultMaxDepth;
1532 struct QueueBookmarks
1534 QueueBookmarks(
const std::string topic_)
1541 amps_uint64_t _oldestTime;
1542 unsigned _bookmarkCount;
1544 typedef amps_uint64_t topic_hash;
1545 typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1546 TopicHashMap _topicHashMap;
1550 ClientImpl* _client;
1555 ClientStoreReplayer()
1556 : _client(NULL) , _version(0), _res(
AMPS_E_OK)
1559 ClientStoreReplayer(ClientImpl* client_)
1560 : _client(client_) , _version(0), _res(
AMPS_E_OK)
1563 void setClient(ClientImpl* client_) { _client = client_; }
1565 void execute(
Message& message_)
1567 if (!_client)
throw CommandException(
"Can't replay without a client.");
1570 if (index > _client->_lastSentHaSequenceNumber)
1571 _client->_lastSentHaSequenceNumber = index;
1578 (!_client->_badTimeToHAPublish ||
1579 message_.getOptions().len() < 6))
1582 message_.getMessage(),
1586 throw DisconnectedException(
"AMPS Server disconnected during replay");
1592 ClientStoreReplayer _replayer;
1596 ClientImpl* _parent;
1597 const char* _reason;
1598 size_t _reasonLength;
1599 size_t _replayCount;
1601 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1604 _reasonLength(reasonLength_),
1607 void execute(
Message& message_)
1609 if (_parent->_failedWriteHandler)
1612 _parent->_failedWriteHandler->failedWrite(message_,
1613 _reason, _reasonLength);
1616 size_t replayCount(
void)
const {
return _replayCount; }
1619 struct AckResponseImpl :
public RefBody
1621 std::string username, password, reason, status, bookmark, options;
1622 amps_uint64_t sequenceNo;
1623 VersionInfo serverVersion;
1624 volatile bool responded, abandoned;
1625 unsigned connectionVersion;
1628 sequenceNo((amps_uint64_t)0),
1632 connectionVersion(0)
1639 RefHandle<AckResponseImpl> _body;
1641 AckResponse() : _body(NULL) {;}
1642 static AckResponse create()
1645 r._body =
new AckResponseImpl();
1649 const std::string& username()
1651 return _body.get().username;
1653 void setUsername(
const char* data_,
size_t len_)
1655 if (data_) _body.get().username.assign(data_, len_);
1656 else _body.get().username.clear();
1658 const std::string& password()
1660 return _body.get().password;
1662 void setPassword(
const char* data_,
size_t len_)
1664 if (data_) _body.get().password.assign(data_, len_);
1665 else _body.get().password.clear();
1667 const std::string& reason()
1669 return _body.get().reason;
1671 void setReason(
const char* data_,
size_t len_)
1673 if (data_) _body.get().reason.assign(data_, len_);
1674 else _body.get().reason.clear();
1676 const std::string& status()
1678 return _body.get().status;
1680 void setStatus(
const char* data_,
size_t len_)
1682 if (data_) _body.get().status.assign(data_, len_);
1683 else _body.get().status.clear();
1685 const std::string& bookmark()
1687 return _body.get().bookmark;
1689 void setBookmark(
const char* data_,
size_t len_)
1691 if (data_) _body.get().bookmark.assign(data_, len_);
1692 else _body.get().bookmark.clear();
1694 amps_uint64_t sequenceNo()
const 1696 return _body.get().sequenceNo;
1698 void setSequenceNo(
const char* data_,
size_t len_)
1700 amps_uint64_t result = (amps_uint64_t)0;
1703 for(
size_t i=0; i<len_; ++i)
1705 result *= (amps_uint64_t)10;
1706 result += (amps_uint64_t)(data_[i] -
'0');
1709 _body.get().sequenceNo = result;
1711 VersionInfo serverVersion()
const 1713 return _body.get().serverVersion;
1715 void setServerVersion(
const char* data_,
size_t len_)
1718 _body.get().serverVersion.setVersion(std::string(data_, len_));
1722 return _body.get().responded;
1724 void setResponded(
bool responded_)
1726 _body.get().responded = responded_;
1730 return _body.get().abandoned;
1732 void setAbandoned(
bool abandoned_)
1734 if (_body.isValid())
1735 _body.get().abandoned = abandoned_;
1738 void setConnectionVersion(
unsigned connectionVersion)
1740 _body.get().connectionVersion = connectionVersion;
1743 unsigned getConnectionVersion()
1745 return _body.get().connectionVersion;
1747 void setOptions(
const char* data_,
size_t len_)
1749 if (data_) _body.get().options.assign(data_,len_);
1750 else _body.get().options.clear();
1753 const std::string& options()
1755 return _body.get().options;
1758 AckResponse& operator=(
const AckResponse& rhs)
1766 typedef std::map<std::string, AckResponse> AckMap;
1768 DefaultExceptionListener _defaultExceptionListener;
1771 struct DeferredExecutionRequest
1773 DeferredExecutionRequest(DeferredExecutionFunc func_,
1776 _userData(userData_)
1779 DeferredExecutionFunc _func;
1783 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1785 std::string _username;
1786 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1787 ConnectionStateListeners _connectionStateListeners;
1788 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1789 DeferredExecutionList _deferredExecutionList;
1790 unsigned _heartbeatInterval;
1791 unsigned _readTimeout;
1799 if (!_connected && newState_ > ConnectionStateListener::Connected)
1803 for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1805 AMPS_CALL_EXCEPTION_WRAPPER(
1806 (*it)->connectionStateChanged(newState_));
1809 unsigned processedAck(
Message& message);
1810 unsigned persistedAck(
Message& meesage);
1811 void lastChance(
Message& message);
1812 void checkAndSendHeartbeat(
bool force=
false);
1813 virtual ConnectionInfo getConnectionInfo()
const;
1815 ClientImplMessageHandler(
amps_handle message,
void* userData);
1817 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1819 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1821 void unsubscribeInternal(
const std::string&
id)
1824 if (_subscriptionManager)
1827 subId.assign(
id.data(),
id.length());
1829 Unlock<Mutex> unlock(_lock);
1830 _subscriptionManager->unsubscribe(subId);
1838 _sendWithoutRetry(_message);
1841 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1842 bool isHASubscribe_)
1844 return syncAckProcessing(timeout_, message_,
1845 (amps_uint64_t)0, isHASubscribe_);
1848 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1849 amps_uint64_t haSeq = (amps_uint64_t)0,
1850 bool isHASubscribe_ =
false)
1853 AckResponse ack = AckResponse::create();
1855 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1856 if (ack.getConnectionVersion() == 0)
1859 throw DisconnectedException(
"Connection closed while waiting for response.");
1861 bool timedOut =
false;
1862 AMPS_START_TIMER(timeout_)
1863 while(!timedOut && !ack.responded() && !ack.abandoned())
1867 timedOut = !_lock.wait(timeout_);
1869 if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1875 amps_invoke_waiting_function();
1878 if (ack.responded())
1880 if (ack.status() !=
"failure")
1884 amps_uint64_t ackSequence = ack.sequenceNo();
1885 if (_lastSentHaSequenceNumber < ackSequence)
1887 _lastSentHaSequenceNumber = ackSequence;
1897 _serverVersion = ack.serverVersion();
1898 if (_bookmarkStore.isValid())
1903 const std::string& options = ack.options();
1904 size_t index = options.find_first_of(
"max_backlog=");
1905 if(index != std::string::npos)
1908 const char* c = options.c_str()+index+12;
1909 while(*c && *c!=
',')
1911 data = (data*10) + (
unsigned)(*c++-48);
1913 if(_ackBatchSize > data) _ackBatchSize = data;
1918 const size_t NotEntitled = 12;
1919 std::string ackReason = ack.reason();
1920 if (ackReason.length() == 0)
return ack;
1921 if (ackReason.length() == NotEntitled &&
1922 ackReason[0] ==
'n' &&
1927 message_.throwFor(_client, ackReason);
1931 if (!ack.abandoned())
1933 throw TimedOutException(
"timed out waiting for operation.");
1937 throw DisconnectedException(
"Connection closed while waiting for response.");
1945 if (!_client)
return;
1949 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1950 _pEmptyMessageStream.reset(NULL);
1952 processDeferredExecutions();
1958 ClientImpl(
const std::string& clientName)
1959 : _client(NULL), _name(clientName)
1960 , _isRetryOnDisconnect(
true)
1961 , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1962 , _badTimeToHASubscribe(0), _serverVersion()
1963 , _queueAckTimeout(0)
1964 , _isAutoAckEnabled(
false)
1966 , _queuedAckCount(0)
1967 , _defaultMaxDepth(0)
1969 , _heartbeatInterval(0)
1972 _replayer.setClient(
this);
1977 _exceptionListener = &_defaultExceptionListener;
1978 for (
size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1985 virtual ~ClientImpl()
1991 const std::string& getName()
const 1996 void setName(
const std::string& name)
2001 _client, name.c_str());
2004 AMPSException::throwFor(_client, result);
2009 const std::string& getLogonCorrelationData()
const 2011 return _logonCorrelationData;
2014 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
2016 _logonCorrelationData = logonCorrelationData_;
2019 size_t getServerVersion()
const 2021 return _serverVersion.getOldStyleVersion();
2024 VersionInfo getServerVersionInfo()
const 2026 return _serverVersion;
2029 const std::string& getURI()
const 2031 Lock<Mutex> l(_lock);
2035 virtual void connect(
const std::string& uri)
2037 Lock<Mutex> l(_lock);
2040 _client, uri.c_str());
2043 AMPSException::throwFor(_client, result);
2049 _beatMessage.setOptions(
"beat");
2050 _readMessage.setClientImpl(
this);
2051 if(_queueAckTimeout)
2056 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2059 void setDisconnected()
2062 Lock<Mutex> l(_lock);
2064 broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
2067 _heartbeatTimer.setTimeout(0.0);
2072 virtual void disconnect()
2075 Lock<Mutex> l(_lock);
2080 AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
2082 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2085 Lock<Mutex> l(_lock);
2086 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2089 void clearAcks(
unsigned failedVersion)
2092 Lock<Mutex> l(_lock);
2095 std::list<std::string> worklist;
2096 for(AckMap::iterator i = _acks.begin(); i != _acks.end(); i++)
2098 if (i->second.getConnectionVersion() <= failedVersion)
2100 i->second.setAbandoned(
true);
2101 worklist.push_back(i->first);
2105 for(std::list<std::string>::iterator j = worklist.begin(); j != worklist.end(); j++)
2114 int send(
const Message& message)
2116 Lock<Mutex> l(_lock);
2117 return _send(message);
2120 void sendWithoutRetry(
const Message& message_)
2122 Lock<Mutex> l(_lock);
2123 _sendWithoutRetry(message_);
2126 void _sendWithoutRetry(
const Message& message_)
2131 AMPSException::throwFor(_client,result);
2135 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2136 bool isHASubscribe_ =
false)
2143 Message localMessage = message;
2144 unsigned version = 0;
2148 if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
2152 if(!_isRetryOnDisconnect)
2156 Unlock<Mutex> l(_lock);
2167 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2168 (isHASubscribe_ && _badTimeToHASubscribe != 0))
2170 return (
int)version;
2174 if (haSeq > _lastSentHaSequenceNumber)
2176 while (haSeq > _lastSentHaSequenceNumber + 1)
2182 _lastSentHaSequenceNumber+1))
2188 version = _replayer._version;
2191 catch(
const DisconnectedException&)
2193 catch(
const DisconnectedException& e)
2196 result = _replayer._res;
2201 localMessage.getMessage(),
2203 ++_lastSentHaSequenceNumber;
2207 localMessage.getMessage(),
2211 if (!isHASubscribe_ && !haSeq &&
2212 localMessage.getMessage() == message.getMessage())
2216 if(_isRetryOnDisconnect)
2218 Unlock<Mutex> u(_lock);
2223 if ((isHASubscribe_ || haSeq) &&
2226 return (
int)version;
2233 AMPSException::throwFor(_client, result);
2238 amps_invoke_waiting_function();
2241 if (result !=
AMPS_E_OK) AMPSException::throwFor(_client, result);
2242 return (
int)version;
2245 void addMessageHandler(
const Field& commandId_,
2247 unsigned requestedAcks_,
bool isSubscribe_)
2249 Lock<Mutex> lock(_lock);
2250 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2254 bool removeMessageHandler(
const Field& commandId_)
2256 Lock<Mutex> lock(_lock);
2257 return _routes.removeRoute(commandId_);
2263 bool isSubscribe =
false;
2265 unsigned systemAddedAcks = Message::AckType::None;
2268 case Message::Command::Subscribe:
2269 case Message::Command::DeltaSubscribe:
2272 systemAddedAcks |= Message::AckType::Persisted;
2275 case Message::Command::SOWAndSubscribe:
2276 case Message::Command::SOWAndDeltaSubscribe:
2288 case Message::Command::SOW:
2298 systemAddedAcks |= Message::AckType::Processed;
2300 if (!isSubscribe) systemAddedAcks |= Message::AckType::Completed;
2303 Lock<Mutex> l(_lock);
2304 _routes.addRoute(message_.
getQueryID(), messageHandler_, requestedAcks,
2305 systemAddedAcks, isSubscribe);
2308 messageHandler_.isValid() &&
2312 requestedAcks, systemAddedAcks,
true);
2318 syncAckProcessing(timeout_, message_, 0,
false);
2325 _routes.removeRoute(
id);
2332 case Message::Command::Unsubscribe:
2333 case Message::Command::Heartbeat:
2334 case Message::Command::Logon:
2335 case Message::Command::StartTimer:
2336 case Message::Command::StopTimer:
2337 case Message::Command::DeltaPublish:
2338 case Message::Command::Publish:
2339 case Message::Command::SOWDelete:
2341 Lock<Mutex> l(_lock);
2350 if (messageHandler_.isValid())
2352 _routes.addRoute(
id, messageHandler_, requestedAcks,
2353 Message::AckType::None,
false);
2360 case Message::Command::GroupBegin:
2361 case Message::Command::GroupEnd:
2362 case Message::Command::OOF:
2363 case Message::Command::Ack:
2364 case Message::Command::Unknown:
2366 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2372 void setDisconnectHandler(DisconnectHandler disconnectHandler)
2374 Lock<Mutex> l(_lock);
2375 _disconnectHandler = disconnectHandler;
2378 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2380 switch (command_[0])
2382 #if 0 // Not currently implemented to avoid an extra branch in delivery 2384 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2387 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2391 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2393 #if 0 // Not currently implemented to avoid an extra branch in delivery 2395 if (command_[6] ==
'b')
2397 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2399 else if (command_[6] ==
'e')
2401 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2405 std::ostringstream os;
2406 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2407 throw CommandException(os.str());
2411 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2415 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2419 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2423 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2426 std::ostringstream os;
2427 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2428 throw CommandException(os.str());
2433 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2437 #if 0 // Not currently implemented to avoid an extra branch in delivery 2438 case Message::Command::Publish:
2439 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2441 case Message::Command::SOW:
2442 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2445 case Message::Command::Heartbeat:
2446 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2448 #if 0 // Not currently implemented to avoid an extra branch in delivery 2449 case Message::Command::GroupBegin:
2450 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2452 case Message::Command::GroupEnd:
2453 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2455 case Message::Command::OOF:
2456 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2459 case Message::Command::Ack:
2460 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2464 unsigned command = command_;
2465 while (command > 0) { ++bits; command >>= 1; }
2467 AMPS_snprintf(errBuf,
sizeof(errBuf),
2468 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2469 CommandConstants<0>::Lengths[bits],
2470 CommandConstants<0>::Values[bits]);
2471 throw CommandException(errBuf);
2476 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2478 _globalCommandTypeHandlers[handlerType_] = handler_;
2483 Lock<Mutex> l(_lock);
2484 _failedWriteHandler.reset(handler_);
2487 void setPublishStore(
const Store& publishStore_)
2489 Lock<Mutex> l(_lock);
2490 if (_connected)
throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2491 _publishStore = publishStore_;
2496 Lock<Mutex> l(_lock);
2497 if (_connected)
throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2498 _bookmarkStore = bookmarkStore_;
2503 Lock<Mutex> l(_lock);
2504 _subscriptionManager.reset(subscriptionManager_);
2509 Lock<Mutex> l(_lock);
2510 return _subscriptionManager.get();
2513 DisconnectHandler getDisconnectHandler()
2515 Lock<Mutex> l(_lock);
2516 return _disconnectHandler;
2521 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2526 Lock<Mutex> l(_lock);
2527 return _failedWriteHandler.get();
2530 Store getPublishStore()
2532 Lock<Mutex> l(_lock);
2533 return _publishStore;
2538 Lock<Mutex> l(_lock);
2539 return _bookmarkStore;
2542 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2546 Lock<Mutex> l(_lock);
2548 _publishMessage.assignData(data_, dataLen_);
2549 _send(_publishMessage);
2554 if (!publishStoreMessage)
2556 publishStoreMessage =
new Message();
2557 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2559 publishStoreMessage->reset();
2560 publishStoreMessage->setCommandEnum(Message::Command::Publish);
2561 return _publish(topic_, topicLen_, data_, dataLen_);
2565 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2566 size_t dataLen_,
unsigned long expiration_)
2570 Lock<Mutex> l(_lock);
2572 _publishMessage.assignData(data_, dataLen_);
2573 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2574 size_t pos = convertToCharArray(exprBuf, expiration_);
2576 _send(_publishMessage);
2582 if (!publishStoreMessage)
2584 publishStoreMessage =
new Message();
2585 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2587 publishStoreMessage->reset();
2588 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2589 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2590 publishStoreMessage->setCommandEnum(Message::Command::Publish)
2591 .assignExpiration(exprBuf+exprPos,
2592 AMPS_NUMBER_BUFFER_LEN-exprPos);
2593 return _publish(topic_, topicLen_, data_, dataLen_);
2597 void publishFlush(
long timeout_)
2599 static const char* processed =
"processed";
2600 static const size_t processedLen = strlen(processed);
2603 if (_serverVersion.getOldStyleVersion() >= AMPS_FLUSH_MIN_VERSION)
2605 Lock<Mutex> l(_lock);
2609 static const char* flush =
"flush";
2610 static const size_t flushLen = strlen(flush);
2614 syncAckProcessing(timeout_, _message);
2616 catch(
const AMPSException& ex)
2618 AMPS_UNHANDLED_EXCEPTION(ex);
2624 if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2625 else { AMPS_USLEEP(1000 * 1000); }
2633 _publishStore.
flush(timeout_);
2635 catch (
const AMPSException& ex)
2637 AMPS_UNHANDLED_EXCEPTION(ex);
2643 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2644 const char* data_,
size_t dataLength_)
2648 Lock<Mutex> l(_lock);
2650 _deltaMessage.assignData(data_, dataLength_);
2651 _send(_deltaMessage);
2656 if (!publishStoreMessage)
2658 publishStoreMessage =
new Message();
2659 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2661 publishStoreMessage->reset();
2662 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2663 return _publish(topic_, topicLength_, data_, dataLength_);
2667 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2668 const char* data_,
size_t dataLength_,
2669 unsigned long expiration_)
2673 Lock<Mutex> l(_lock);
2675 _deltaMessage.assignData(data_, dataLength_);
2676 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2677 size_t pos = convertToCharArray(exprBuf, expiration_);
2679 _send(_deltaMessage);
2685 if (!publishStoreMessage)
2687 publishStoreMessage =
new Message();
2688 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2690 publishStoreMessage->reset();
2691 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2692 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2693 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2694 .assignExpiration(exprBuf+exprPos,
2695 AMPS_NUMBER_BUFFER_LEN-exprPos);
2696 return _publish(topic_, topicLength_, data_, dataLength_);
2700 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
2701 const char* data_,
size_t dataLength_)
2703 publishStoreMessage->assignTopic(topic_, topicLength_)
2704 .setAckTypeEnum(Message::AckType::Persisted)
2705 .assignData(data_, dataLength_);
2706 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
2707 char buf[AMPS_NUMBER_BUFFER_LEN];
2708 size_t pos = convertToCharArray(buf, haSequenceNumber);
2709 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2711 Lock<Mutex> l(_lock);
2712 _send(*publishStoreMessage, haSequenceNumber);
2714 return haSequenceNumber;
2717 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
2718 const char* options_ = NULL)
2720 Lock<Mutex> l(_lock);
2721 AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2727 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 2729 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2732 if(uri.user().size()) _message.
setUserId(uri.user());
2733 if(uri.password().size()) _message.
setPassword(uri.password());
2734 if(uri.protocol() ==
"amps" && uri.messageType().size())
2738 if(uri.isTrue(
"pretty"))
2740 _message.setOptions(
"pretty");
2744 if (!_logonCorrelationData.empty())
2750 _message.setOptions(options_);
2758 AckResponse ack = syncAckProcessing(timeout_, _message);
2759 if (ack.status() ==
"retry")
2761 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
2762 _username = ack.username();
2767 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
2771 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2776 catch(
const AMPSException& ex)
2778 AMPS_UNHANDLED_EXCEPTION(ex);
2790 _publishStore.
replay(_replayer);
2791 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2793 catch(
const StoreException& ex)
2795 std::ostringstream os;
2796 os <<
"A local store exception occurred while logging on." 2798 throw ConnectionException(os.str());
2800 catch(
const AMPSException& ex)
2802 AMPS_UNHANDLED_EXCEPTION(ex);
2805 catch(
const std::exception& ex)
2807 AMPS_UNHANDLED_EXCEPTION(ex);
2815 return newCommandId;
2819 const std::string& topic_,
2821 const std::string& filter_,
2822 const std::string& bookmark_,
2823 const std::string& options_,
2824 const std::string& subId_,
2825 bool isHASubscribe_ =
true)
2827 isHASubscribe_ &= (bool)_subscriptionManager;
2828 Lock<Mutex> l(_lock);
2832 std::string subId(subId_);
2835 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2836 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
2845 unsigned ackTypes = Message::AckType::Processed;
2847 if (!bookmark_.empty() && _bookmarkStore.isValid())
2849 ackTypes |= Message::AckType::Persisted;
2853 if (filter_.length()) _message.
setFilter(filter_);
2854 if (bookmark_.length())
2864 if (_bookmarkStore.isValid())
2869 _bookmarkStore.
log(_message);
2870 _bookmarkStore.
discard(_message);
2876 if (options_.length()) _message.setOptions(options_);
2882 Unlock<Mutex> u(_lock);
2883 _subscriptionManager->subscribe(messageHandler_, message,
2884 Message::AckType::None);
2885 if (_badTimeToHASubscribe)
return subId;
2890 Message::AckType::None, ackTypes,
true);
2893 if (!options_.empty()) message.setOptions(options_);
2896 syncAckProcessing(timeout_, message, isHASubscribe_);
2898 catch (
const DisconnectedException&)
2900 if (!isHASubscribe_)
2902 _routes.removeRoute(subIdField);
2907 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2911 catch (
const TimedOutException&)
2913 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2921 Unlock<Mutex> unlock(_lock);
2922 _subscriptionManager->unsubscribe(subIdField);
2924 _routes.removeRoute(subIdField);
2931 const std::string& topic_,
2933 const std::string& filter_,
2934 const std::string& bookmark_,
2935 const std::string& options_,
2936 const std::string& subId_ =
"",
2937 bool isHASubscribe_ =
true)
2939 isHASubscribe_ &= (bool)_subscriptionManager;
2940 Lock<Mutex> l(_lock);
2944 std::string subId(subId_);
2954 unsigned ackTypes = Message::AckType::Processed;
2956 if (!bookmark_.empty() && _bookmarkStore.isValid())
2958 ackTypes |= Message::AckType::Persisted;
2961 if (filter_.length()) _message.
setFilter(filter_);
2962 if (bookmark_.length())
2972 if (_bookmarkStore.isValid())
2977 _bookmarkStore.
log(_message);
2978 _bookmarkStore.
discard(_message);
2984 if (options_.length()) _message.setOptions(options_);
2989 Unlock<Mutex> u(_lock);
2990 _subscriptionManager->subscribe(messageHandler_, message,
2991 Message::AckType::None);
2992 if (_badTimeToHASubscribe)
return subId;
2997 Message::AckType::None, ackTypes,
true);
3000 if (!options_.empty()) message.setOptions(options_);
3003 syncAckProcessing(timeout_, message, isHASubscribe_);
3005 catch (
const DisconnectedException&)
3007 if (!isHASubscribe_)
3009 _routes.removeRoute(subIdField);
3013 catch (
const TimedOutException&)
3015 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3023 Unlock<Mutex> unlock(_lock);
3024 _subscriptionManager->unsubscribe(subIdField);
3026 _routes.removeRoute(subIdField);
3032 void unsubscribe(
const std::string&
id)
3034 Lock<Mutex> l(_lock);
3035 unsubscribeInternal(
id);
3038 void unsubscribe(
void)
3040 if (_subscriptionManager)
3042 _subscriptionManager->clear();
3045 _routes.unsubscribeAll();
3046 Lock<Mutex> l(_lock);
3051 _sendWithoutRetry(_message);
3056 const std::string& topic_,
3057 const std::string& filter_ =
"",
3058 const std::string& orderBy_ =
"",
3059 const std::string& bookmark_ =
"",
3060 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3061 int topN_ = AMPS_DEFAULT_TOP_N,
3062 const std::string& options_ =
"",
3063 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3065 Lock<Mutex> l(_lock);
3072 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3075 if (filter_.length()) _message.
setFilter(filter_);
3076 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
3077 if (bookmark_.length()) _message.
setBookmark(bookmark_);
3080 if (options_.length()) _message.setOptions(options_);
3082 _routes.addRoute(_message.
getQueryID(), messageHandler_,
3083 Message::AckType::None, ackTypes,
false);
3087 syncAckProcessing(timeout_, _message);
3091 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3099 const std::string& topic_,
3101 const std::string& filter_ =
"",
3102 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3103 int topN_ = AMPS_DEFAULT_TOP_N)
3106 return sow(messageHandler_,
3118 const std::string& topic_,
3119 const std::string& filter_ =
"",
3120 const std::string& orderBy_ =
"",
3121 const std::string& bookmark_ =
"",
3122 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3123 int topN_ = AMPS_DEFAULT_TOP_N,
3124 const std::string& options_ =
"",
3125 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3126 bool isHASubscribe_ =
true)
3128 isHASubscribe_ &= (bool)_subscriptionManager;
3129 Lock<Mutex> l(_lock);
3136 std::string subId = cid;
3138 if (filter_.length()) _message.
setFilter(filter_);
3139 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
3140 if (bookmark_.length()) _message.
setBookmark(bookmark_);
3143 if (options_.length()) _message.setOptions(options_);
3149 Unlock<Mutex> u(_lock);
3150 _subscriptionManager->subscribe(messageHandler_, message,
3151 Message::AckType::None);
3152 if (_badTimeToHASubscribe)
return subId;
3154 if (!_routes.hasRoute(cid))
3156 _routes.addRoute(cid, messageHandler_,
3157 Message::AckType::None, Message::AckType::Processed,
true);
3160 if (!options_.empty()) message.setOptions(options_);
3163 syncAckProcessing(timeout_, message, isHASubscribe_);
3165 catch (
const DisconnectedException&)
3167 if (!isHASubscribe_)
3169 _routes.removeRoute(subId);
3173 catch (
const TimedOutException&)
3175 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3183 Unlock<Mutex> unlock(_lock);
3184 _subscriptionManager->unsubscribe(cid);
3186 _routes.removeRoute(subId);
3193 const std::string& topic_,
3195 const std::string& filter_ =
"",
3196 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3197 bool oofEnabled_ =
false,
3198 int topN_ = AMPS_DEFAULT_TOP_N,
3199 bool isHASubscribe_ =
true)
3202 return sowAndSubscribe(messageHandler_,
3209 (oofEnabled_ ?
"oof" :
""),
3215 const std::string& topic_,
3216 const std::string& filter_ =
"",
3217 const std::string& orderBy_ =
"",
3218 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3219 int topN_ = AMPS_DEFAULT_TOP_N,
3220 const std::string& options_ =
"",
3221 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3222 bool isHASubscribe_ =
true)
3224 isHASubscribe_ &= (bool)_subscriptionManager;
3225 Lock<Mutex> l(_lock);
3233 if (filter_.length()) _message.
setFilter(filter_);
3234 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
3237 if (options_.length()) _message.setOptions(options_);
3242 Unlock<Mutex> u(_lock);
3243 _subscriptionManager->subscribe(messageHandler_, message,
3244 Message::AckType::None);
3245 if (_badTimeToHASubscribe)
return subId;
3249 _routes.addRoute(message.
getQueryID(), messageHandler_,
3250 Message::AckType::None, Message::AckType::Processed,
true);
3253 if (!options_.empty()) message.setOptions(options_);
3256 syncAckProcessing(timeout_, message, isHASubscribe_);
3258 catch (
const DisconnectedException&)
3260 if (!isHASubscribe_)
3262 _routes.removeRoute(subId);
3266 catch (
const TimedOutException&)
3268 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3276 Unlock<Mutex> unlock(_lock);
3277 _subscriptionManager->unsubscribe(
Field(subId));
3279 _routes.removeRoute(subId);
3286 const std::string& topic_,
3288 const std::string& filter_ =
"",
3289 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3290 bool oofEnabled_ =
false,
3291 bool sendEmpties_ =
false,
3292 int topN_ = AMPS_DEFAULT_TOP_N,
3293 bool isHASubscribe_ =
true)
3297 if (oofEnabled_) options.
setOOF();
3299 return sowAndDeltaSubscribe(messageHandler_,
3311 const std::string& topic_,
3312 const std::string& filter_,
3318 unsigned ackType = Message::AckType::Processed |
3319 Message::AckType::Stats |
3320 Message::AckType::Persisted;
3321 if (!publishStoreMessage)
3323 publishStoreMessage =
new Message();
3324 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3326 publishStoreMessage->reset();
3327 if (commandId_.
empty())
3329 publishStoreMessage->newCommandId();
3330 commandId_ = publishStoreMessage->getCommandId();
3334 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3336 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3337 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3338 .assignQueryID(commandId_.
data(), commandId_.
len())
3339 .setAckTypeEnum(ackType)
3340 .assignTopic(topic_.c_str(), topic_.length())
3341 .assignFilter(filter_.c_str(), filter_.length());
3342 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3343 char buf[AMPS_NUMBER_BUFFER_LEN];
3344 size_t pos = convertToCharArray(buf, haSequenceNumber);
3345 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3349 Lock<Mutex> l(_lock);
3350 _routes.addRoute(commandId_, messageHandler_,
3351 Message::AckType::Stats,
3352 Message::AckType::Processed|Message::AckType::Persisted,
3354 syncAckProcessing(timeout_, *publishStoreMessage,
3357 catch (
const DisconnectedException&)
3363 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3367 return (std::string)commandId_;
3371 Lock<Mutex> l(_lock);
3373 if (commandId_.
empty())
3384 .assignQueryID(commandId_.
data(), commandId_.
len())
3385 .setAckTypeEnum(Message::AckType::Processed |
3386 Message::AckType::Stats)
3388 .assignFilter(filter_.c_str(), filter_.length());
3389 _routes.addRoute(commandId_, messageHandler_,
3390 Message::AckType::Stats,
3391 Message::AckType::Processed,
3395 syncAckProcessing(timeout_, _message);
3399 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3402 return (std::string)commandId_;
3407 const std::string& topic_,
3408 const std::string& data_,
3414 unsigned ackType = Message::AckType::Processed |
3415 Message::AckType::Stats |
3416 Message::AckType::Persisted;
3417 if (!publishStoreMessage)
3419 publishStoreMessage =
new Message();
3420 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3422 publishStoreMessage->reset();
3423 if (commandId_.
empty())
3425 publishStoreMessage->newCommandId();
3426 commandId_ = publishStoreMessage->getCommandId();
3430 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3432 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3433 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3434 .assignQueryID(commandId_.
data(), commandId_.
len())
3435 .setAckTypeEnum(ackType)
3436 .assignTopic(topic_.c_str(), topic_.length())
3437 .assignData(data_.c_str(), data_.length());
3438 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3439 char buf[AMPS_NUMBER_BUFFER_LEN];
3440 size_t pos = convertToCharArray(buf, haSequenceNumber);
3441 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3445 Lock<Mutex> l(_lock);
3446 _routes.addRoute(commandId_, messageHandler_,
3447 Message::AckType::Stats,
3448 Message::AckType::Processed|Message::AckType::Persisted,
3450 syncAckProcessing(timeout_, *publishStoreMessage,
3453 catch (
const DisconnectedException&)
3459 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3463 return (std::string)commandId_;
3467 Lock<Mutex> l(_lock);
3469 if (commandId_.
empty())
3480 .assignQueryID(commandId_.
data(), commandId_.
len())
3481 .setAckTypeEnum(Message::AckType::Processed |
3482 Message::AckType::Stats)
3484 .assignData(data_.c_str(), data_.length());
3485 _routes.addRoute(commandId_, messageHandler_,
3486 Message::AckType::Stats,
3487 Message::AckType::Processed,
3491 syncAckProcessing(timeout_, _message);
3495 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3498 return (std::string)commandId_;
3503 const std::string& topic_,
3504 const std::string& keys_,
3510 unsigned ackType = Message::AckType::Processed |
3511 Message::AckType::Stats |
3512 Message::AckType::Persisted;
3513 if (!publishStoreMessage)
3515 publishStoreMessage =
new Message();
3516 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3518 publishStoreMessage->reset();
3519 if (commandId_.
empty())
3521 publishStoreMessage->newCommandId();
3522 commandId_ = publishStoreMessage->getCommandId();
3526 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3528 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3529 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3530 .assignQueryID(commandId_.
data(), commandId_.
len())
3531 .setAckTypeEnum(ackType)
3532 .assignTopic(topic_.c_str(), topic_.length())
3533 .assignSowKeys(keys_.c_str(), keys_.length());
3534 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3535 char buf[AMPS_NUMBER_BUFFER_LEN];
3536 size_t pos = convertToCharArray(buf, haSequenceNumber);
3537 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3541 Lock<Mutex> l(_lock);
3542 _routes.addRoute(commandId_, messageHandler_,
3543 Message::AckType::Stats,
3544 Message::AckType::Processed|Message::AckType::Persisted,
3546 syncAckProcessing(timeout_, *publishStoreMessage,
3549 catch (
const DisconnectedException&)
3555 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3559 return (std::string)commandId_;
3563 Lock<Mutex> l(_lock);
3565 if (commandId_.
empty())
3576 .assignQueryID(commandId_.
data(), commandId_.
len())
3577 .setAckTypeEnum(Message::AckType::Processed |
3578 Message::AckType::Stats)
3580 .assignSowKeys(keys_.c_str(), keys_.length());
3581 _routes.addRoute(commandId_, messageHandler_,
3582 Message::AckType::Stats,
3583 Message::AckType::Processed,
3587 syncAckProcessing(timeout_, _message);
3591 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3594 return (std::string)commandId_;
3598 void startTimer(
void)
3600 Lock<Mutex> l(_lock);
3609 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
3619 _exceptionListener = &listener_;
3624 return *_exceptionListener;
3627 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
3629 if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3631 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
3633 Lock<Mutex> l(_lock);
3634 if(_heartbeatInterval != heartbeatInterval_ ||
3635 _readTimeout != readTimeout_)
3637 _heartbeatInterval = heartbeatInterval_;
3638 _readTimeout = readTimeout_;
3643 void _sendHeartbeat(
void)
3645 if (_connected && _heartbeatInterval != 0)
3647 std::ostringstream options;
3648 options <<
"start," << _heartbeatInterval;
3651 .setOptions(options.str());
3653 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3654 _heartbeatTimer.start();
3657 _sendWithoutRetry(startMessage);
3658 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3660 catch(ConnectionException &ex_)
3664 AMPS_UNHANDLED_EXCEPTION(ex_);
3668 if(_readTimeout && _connected)
3674 AMPSException::throwFor(_client, result);
3680 Lock<Mutex> lock(_lock);
3681 _connectionStateListeners.insert(listener_);
3686 Lock<Mutex> lock(_lock);
3687 _connectionStateListeners.erase(listener_);
3692 unsigned systemAddedAcks_,
bool isSubscribe_)
3694 Message message = command_.getMessage();
3699 bool added = qid.
len() || subid.
len() || cid_.
len();
3702 if (!_routes.hasRoute(qid))
3704 _routes.addRoute(qid, handler_, requestedAcks_,
3705 systemAddedAcks_, isSubscribe_);
3708 if (subid.
len() > 0)
3710 if (!_routes.hasRoute(subid))
3712 _routes.addRoute(subid, handler_, requestedAcks_,
3713 systemAddedAcks_, isSubscribe_);
3718 if (!_routes.hasRoute(cid_))
3720 _routes.addRoute(cid_, handler_, requestedAcks_,
3721 systemAddedAcks_, isSubscribe_);
3724 else if (commandType == Message::Command::Publish ||
3725 commandType == Message::Command::DeltaPublish)
3728 _routes.addRoute(cid_, handler_, requestedAcks_,
3729 systemAddedAcks_, isSubscribe_);
3734 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
3739 bool isHASubscribe_ =
true)
3741 isHASubscribe_ &= (bool)_subscriptionManager;
3742 Message& message = command_.getMessage();
3743 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3744 Message::AckType::Processed : Message::AckType::None;
3746 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
3748 if (commandType == Message::Command::SOW ||
3749 commandType == Message::Command::StopTimer)
3750 systemAddedAcks |= Message::AckType::Completed;
3754 if (command_.isSubscribe())
3757 if (_bookmarkStore.isValid())
3759 systemAddedAcks |= Message::AckType::Persisted;
3767 _bookmarkStore.
log(message);
3768 _bookmarkStore.
discard(message);
3781 systemAddedAcks |= Message::AckType::Persisted;
3783 bool isSubscribe = command_.isSubscribe();
3784 if (handler_.isValid() && !isSubscribe)
3786 _registerHandler(command_, cid, handler_,
3787 requestedAcks, systemAddedAcks, isSubscribe);
3789 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
3790 if (_publishStore.
isValid() && command_.needsSequenceNumber())
3792 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3795 Unlock<Mutex> u(_lock);
3796 haSequenceNumber = _publishStore.
store(message);
3803 syncAckProcessing((
long)command_.getTimeout(), message,
3806 catch (
const DisconnectedException&)
3812 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3816 else _send(message, haSequenceNumber);
3820 if(command_.isSubscribe())
3825 Unlock<Mutex> u(_lock);
3826 _subscriptionManager->subscribe(handler_,
3829 if (_badTimeToHASubscribe)
3832 return std::string(subId.
data(), subId.
len());
3835 if (handler_.isValid())
3837 _registerHandler(command_, cid, handler_,
3838 requestedAcks, systemAddedAcks, isSubscribe);
3845 syncAckProcessing((
long)command_.getTimeout(), message,
3848 catch (
const DisconnectedException&)
3850 if (!isHASubscribe_)
3856 catch (
const TimedOutException&)
3858 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3859 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3867 Unlock<Mutex> unlock(_lock);
3868 _subscriptionManager->unsubscribe(subId);
3872 _routes.removeRoute(cid);
3873 _routes.removeRoute(subId);
3877 else _send(message);
3878 if (subId.
len() > 0)
3881 return std::string(subId.
data(), subId.
len());
3891 syncAckProcessing((
long)(command_.getTimeout()), message);
3893 catch (
const DisconnectedException&)
3900 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3905 else _send(message);
3915 bool isHASubscribe_ =
true)
3917 Lock<Mutex> lock(_lock);
3918 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3922 void setAutoAck(
bool isAutoAckEnabled_)
3924 _isAutoAckEnabled = isAutoAckEnabled_;
3926 bool getAutoAck(
void)
const 3928 return _isAutoAckEnabled;
3930 void setAckBatchSize(
const unsigned batchSize_)
3932 _ackBatchSize = batchSize_;
3933 if (!_queueAckTimeout)
3935 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3939 unsigned getAckBatchSize(
void)
const 3941 return _ackBatchSize;
3943 int getAckTimeout(
void)
const 3945 return _queueAckTimeout;
3947 void setAckTimeout(
const int ackTimeout_)
3950 _queueAckTimeout = ackTimeout_;
3952 size_t _ack(QueueBookmarks& queueBookmarks_)
3954 if(queueBookmarks_._bookmarkCount)
3956 if (!publishStoreMessage)
3958 publishStoreMessage =
new Message();
3959 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3961 publishStoreMessage->reset();
3962 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3963 .setTopic(queueBookmarks_._topic)
3964 .setBookmark(queueBookmarks_._data)
3965 .setCommandId(
"AMPS-queue-ack");
3966 amps_uint64_t haSequenceNumber = 0;
3969 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3970 publishStoreMessage->setAckType(
"persisted")
3971 .setSequence(haSequenceNumber);
3972 queueBookmarks_._data.erase();
3973 queueBookmarks_._bookmarkCount = 0;
3975 _send(*publishStoreMessage, haSequenceNumber);
3978 queueBookmarks_._data.erase();
3979 queueBookmarks_._bookmarkCount = 0;
3985 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3987 if (_isAutoAckEnabled)
return;
3988 _ack(topic_, bookmark_, options_);
3990 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3992 if (bookmark_.
len() == 0)
return;
3993 Lock<Mutex> lock(_lock);
3994 if(_ackBatchSize < 2 || options_ != NULL)
3996 if (!publishStoreMessage)
3998 publishStoreMessage =
new Message();
3999 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4001 publishStoreMessage->reset();
4002 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4003 .setCommandId(
"AMPS-queue-ack")
4004 .setTopic(topic_).setBookmark(bookmark_);
4005 if (options_) publishStoreMessage->setOptions(options_);
4006 amps_uint64_t haSequenceNumber = 0;
4009 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
4010 publishStoreMessage->setAckType(
"persisted")
4011 .setSequence(haSequenceNumber);
4013 _send(*publishStoreMessage, haSequenceNumber);
4017 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(),topic_.
len());
4018 TopicHashMap::iterator it = _topicHashMap.find(hash);
4019 if(it == _topicHashMap.end())
4022 it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
4024 QueueBookmarks &queueBookmarks = it->second;
4025 if(queueBookmarks._data.length())
4027 queueBookmarks._data.append(
",");
4031 queueBookmarks._oldestTime = amps_now();
4033 queueBookmarks._data.append(bookmark_);
4034 if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
4036 void flushAcks(
void)
4038 size_t sendCount = 0;
4041 Lock<Mutex> lock(_lock);
4042 typedef TopicHashMap::iterator iterator;
4043 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
4045 QueueBookmarks& queueBookmarks = it->second;
4046 sendCount += _ack(queueBookmarks);
4049 if(sendCount) publishFlush(0);
4052 void checkQueueAcks(
void)
4054 if(!_topicHashMap.size())
return;
4055 Lock<Mutex> lock(_lock);
4058 amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
4059 typedef TopicHashMap::iterator iterator;
4060 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
4062 QueueBookmarks& queueBookmarks = it->second;
4063 if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
4066 catch(std::exception& ex)
4068 AMPS_UNHANDLED_EXCEPTION(ex);
4072 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4074 Lock<Mutex> lock(_lock);
4075 _deferredExecutionList.push_back(
4076 DeferredExecutionRequest(func_,userData_));
4079 inline void processDeferredExecutions(
void)
4081 if(_deferredExecutionList.size())
4083 Lock<Mutex> lock(_lock);
4084 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4085 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4086 for(; it != end; ++it)
4088 it->_func(it->_userData);
4090 _deferredExecutionList.clear();
4091 _routes.invalidateCache();
4092 _routeCache.invalidateCache();
4096 bool getRetryOnDisconnect(
void)
const 4098 return _isRetryOnDisconnect;
4101 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4103 _isRetryOnDisconnect = isRetryOnDisconnect_;
4106 void setDefaultMaxDepth(
unsigned maxDepth_)
4108 _defaultMaxDepth = maxDepth_;
4111 unsigned getDefaultMaxDepth(
void)
const 4113 return _defaultMaxDepth;
4122 AMPSException::throwFor(_client, result);
4156 RefHandle<MessageStreamImpl> _body;
4166 inline void advance(
void);
4178 bool operator==(
const iterator& rhs)
4180 return _pStream == rhs._pStream;
4182 bool operator!=(
const iterator& rhs)
4184 return _pStream != rhs._pStream;
4186 void operator++(
void) { advance(); }
4187 Message operator*(
void) {
return _current; }
4188 Message* operator->(
void) {
return &_current; }
4197 if(!_body.isValid())
4199 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
4212 unsigned getMaxDepth(
void)
const;
4213 unsigned getDepth(
void)
const;
4217 inline void setSOWOnly(
const std::string& commandId_);
4218 inline void setSubscription(
const std::string& commandId_);
4219 inline void setStatsOnly(
const std::string& commandId_);
4220 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
4225 friend class Client;
4236 BorrowRefHandle<ClientImpl> _body;
4238 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4239 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4240 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4251 : _body(new ClientImpl(clientName), true)
4254 Client(ClientImpl* existingClient)
4255 : _body(existingClient,
true)
4258 Client(ClientImpl* existingClient,
bool isRef)
4259 : _body(existingClient, isRef)
4262 Client(
const Client& rhs) : _body(rhs._body) {;}
4263 virtual ~Client(
void) {;}
4265 Client& operator=(
const Client& rhs)
4273 return _body.isValid();
4290 _body.get().setName(name);
4297 return _body.get().getName();
4308 _body.get().setLogonCorrelationData(logonCorrelationData_);
4315 return _body.get().getLogonCorrelationData();
4328 return _body.get().getServerVersion();
4339 return _body.get().getServerVersionInfo();
4353 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4368 return AMPS::convertVersionToNumber(data_, len_);
4375 return _body.get().getURI();
4399 _body.get().connect(uri);
4406 _body.get().disconnect();
4424 _body.get().send(message);
4437 unsigned requestedAcks_,
bool isSubscribe_)
4439 _body.get().addMessageHandler(commandId_, messageHandler_,
4440 requestedAcks_, isSubscribe_);
4448 return _body.get().removeMessageHandler(commandId_);
4476 return _body.get().send(messageHandler, message, timeout);
4490 _body.get().setDisconnectHandler(disconnectHandler);
4498 return _body.get().getDisconnectHandler();
4507 return _body.get().getConnectionInfo();
4520 _body.get().setBookmarkStore(bookmarkStore_);
4528 return _body.
get().getBookmarkStore();
4536 return _body.get().getSubscriptionManager();
4548 _body.get().setSubscriptionManager(subscriptionManager_);
4572 _body.get().setPublishStore(publishStore_);
4580 return _body.
get().getPublishStore();
4588 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4589 duplicateMessageHandler_);
4603 return _body.get().getDuplicateMessageHandler();
4617 _body.get().setFailedWriteHandler(handler_);
4625 return _body.get().getFailedWriteHandler();
4646 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
4648 return _body.get().publish(topic_.c_str(), topic_.length(),
4649 data_.c_str(), data_.length());
4671 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4672 const char* data_,
size_t dataLength_)
4674 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4695 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
4696 unsigned long expiration_)
4698 return _body.get().publish(topic_.c_str(), topic_.length(),
4699 data_.c_str(), data_.length(), expiration_);
4722 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4723 const char* data_,
size_t dataLength_,
4724 unsigned long expiration_)
4726 return _body.get().publish(topic_, topicLength_,
4727 data_, dataLength_, expiration_);
4765 _body.get().publishFlush(timeout_);
4782 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
4784 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4785 data_.c_str(), data_.length());
4804 const char* data_,
size_t dataLength_)
4806 return _body.get().deltaPublish(topic_, topicLength_,
4807 data_, dataLength_);
4824 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
4825 unsigned long expiration_)
4827 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4828 data_.c_str(), data_.length(),
4849 const char* data_,
size_t dataLength_,
4850 unsigned long expiration_)
4852 return _body.get().deltaPublish(topic_, topicLength_,
4853 data_, dataLength_, expiration_);
4871 const char* options_ = NULL)
4873 return _body.get().logon(timeout_, authenticator_, options_);
4886 std::string
logon(
const char* options_,
int timeout_ = 0)
4902 std::string
logon(
const std::string& options_,
int timeout_ = 0)
4928 const std::string& topic_,
4930 const std::string& filter_=
"",
4931 const std::string& options_ =
"",
4932 const std::string& subId_ =
"")
4934 return _body.get().subscribe(messageHandler_, topic_, timeout_,
4935 filter_,
"", options_, subId_);
4954 long timeout_=0,
const std::string& filter_=
"",
4955 const std::string& options_ =
"",
4956 const std::string& subId_ =
"")
4959 if (_body.get().getDefaultMaxDepth())
4960 result.maxDepth(_body.get().getDefaultMaxDepth());
4961 result.setSubscription(_body.get().subscribe(
4963 topic_, timeout_, filter_,
"",
4964 options_, subId_,
false));
4984 long timeout_ = 0,
const std::string& filter_ =
"",
4985 const std::string& options_ =
"",
4986 const std::string& subId_ =
"")
4989 if (_body.get().getDefaultMaxDepth())
4990 result.maxDepth(_body.get().getDefaultMaxDepth());
4991 result.setSubscription(_body.get().subscribe(
4993 topic_, timeout_, filter_,
"",
4994 options_, subId_,
false));
5011 const std::string& topic_,
5013 const std::string& filter_=
"",
5014 const std::string& options_ =
"",
5015 const std::string& subId_ =
"")
5017 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5018 filter_,
"", options_, subId_);
5029 long timeout_,
const std::string& filter_=
"",
5030 const std::string& options_ =
"",
5031 const std::string& subId_ =
"")
5034 if (_body.get().getDefaultMaxDepth())
5035 result.maxDepth(_body.get().getDefaultMaxDepth());
5036 result.setSubscription(_body.get().deltaSubscribe(
5038 topic_, timeout_, filter_,
"",
5039 options_, subId_,
false));
5045 long timeout_,
const std::string& filter_ =
"",
5046 const std::string& options_ =
"",
5047 const std::string& subId_ =
"")
5050 if (_body.get().getDefaultMaxDepth())
5051 result.maxDepth(_body.get().getDefaultMaxDepth());
5052 result.setSubscription(_body.get().deltaSubscribe(
5054 topic_, timeout_, filter_,
"",
5055 options_, subId_,
false));
5085 const std::string& topic_,
5087 const std::string& bookmark_,
5088 const std::string& filter_=
"",
5089 const std::string& options_ =
"",
5090 const std::string& subId_ =
"")
5092 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5093 filter_, bookmark_, options_, subId_);
5114 const std::string& bookmark_,
5115 const std::string& filter_=
"",
5116 const std::string& options_ =
"",
5117 const std::string& subId_ =
"")
5120 if (_body.get().getDefaultMaxDepth())
5121 result.maxDepth(_body.get().getDefaultMaxDepth());
5122 result.setSubscription(_body.get().subscribe(
5124 topic_, timeout_, filter_,
5125 bookmark_, options_,
5133 const std::string& bookmark_,
5134 const std::string& filter_ =
"",
5135 const std::string& options_ =
"",
5136 const std::string& subId_ =
"")
5139 if (_body.get().getDefaultMaxDepth())
5140 result.maxDepth(_body.get().getDefaultMaxDepth());
5141 result.setSubscription(_body.get().subscribe(
5143 topic_, timeout_, filter_,
5144 bookmark_, options_,
5159 return _body.get().unsubscribe(commandId);
5171 return _body.get().unsubscribe();
5205 const std::string& topic_,
5206 const std::string& filter_ =
"",
5207 const std::string& orderBy_ =
"",
5208 const std::string& bookmark_ =
"",
5209 int batchSize_ = DEFAULT_BATCH_SIZE,
5210 int topN_ = DEFAULT_TOP_N,
5211 const std::string& options_ =
"",
5212 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5214 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5215 bookmark_, batchSize_, topN_, options_,
5243 const std::string& filter_ =
"",
5244 const std::string& orderBy_ =
"",
5245 const std::string& bookmark_ =
"",
5246 int batchSize_ = DEFAULT_BATCH_SIZE,
5247 int topN_ = DEFAULT_TOP_N,
5248 const std::string& options_ =
"",
5249 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5252 if (_body.get().getDefaultMaxDepth())
5253 result.maxDepth(_body.get().getDefaultMaxDepth());
5254 result.setSOWOnly(sow(result.operator
MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5260 const std::string& filter_ =
"",
5261 const std::string& orderBy_ =
"",
5262 const std::string& bookmark_ =
"",
5263 int batchSize_ = DEFAULT_BATCH_SIZE,
5264 int topN_ = DEFAULT_TOP_N,
5265 const std::string& options_ =
"",
5266 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5269 if (_body.get().getDefaultMaxDepth())
5270 result.maxDepth(_body.get().getDefaultMaxDepth());
5271 result.setSOWOnly(sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5297 const std::string& topic_,
5299 const std::string& filter_ =
"",
5300 int batchSize_ = DEFAULT_BATCH_SIZE,
5301 int topN_ = DEFAULT_TOP_N)
5303 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5329 const std::string& topic_,
5331 const std::string& filter_ =
"",
5332 int batchSize_ = DEFAULT_BATCH_SIZE,
5333 bool oofEnabled_ =
false,
5334 int topN_ = DEFAULT_TOP_N)
5336 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5337 filter_, batchSize_, oofEnabled_,
5362 const std::string& filter_ =
"",
5363 int batchSize_ = DEFAULT_BATCH_SIZE,
5364 bool oofEnabled_ =
false,
5365 int topN_ = DEFAULT_TOP_N)
5368 if (_body.get().getDefaultMaxDepth())
5369 result.maxDepth(_body.get().getDefaultMaxDepth());
5370 result.setSubscription(_body.get().sowAndSubscribe(
5372 topic_, timeout_, filter_,
5373 batchSize_, oofEnabled_,
5398 const std::string& filter_ =
"",
5399 int batchSize_ = DEFAULT_BATCH_SIZE,
5400 bool oofEnabled_ =
false,
5401 int topN_ = DEFAULT_TOP_N)
5404 if (_body.get().getDefaultMaxDepth())
5405 result.maxDepth(_body.get().getDefaultMaxDepth());
5406 result.setSubscription(_body.get().sowAndSubscribe(
5408 topic_, timeout_, filter_,
5409 batchSize_, oofEnabled_,
5443 const std::string& topic_,
5444 const std::string& filter_ =
"",
5445 const std::string& orderBy_ =
"",
5446 const std::string& bookmark_ =
"",
5447 int batchSize_ = DEFAULT_BATCH_SIZE,
5448 int topN_ = DEFAULT_TOP_N,
5449 const std::string& options_ =
"",
5450 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5452 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5453 orderBy_, bookmark_, batchSize_,
5454 topN_, options_, timeout_);
5482 const std::string& filter_ =
"",
5483 const std::string& orderBy_ =
"",
5484 const std::string& bookmark_ =
"",
5485 int batchSize_ = DEFAULT_BATCH_SIZE,
5486 int topN_ = DEFAULT_TOP_N,
5487 const std::string& options_ =
"",
5488 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5491 if (_body.get().getDefaultMaxDepth())
5492 result.maxDepth(_body.get().getDefaultMaxDepth());
5493 result.setSubscription(_body.get().sowAndSubscribe(
5495 topic_, filter_, orderBy_,
5496 bookmark_, batchSize_, topN_,
5497 options_, timeout_,
false));
5503 const std::string& filter_ =
"",
5504 const std::string& orderBy_ =
"",
5505 const std::string& bookmark_ =
"",
5506 int batchSize_ = DEFAULT_BATCH_SIZE,
5507 int topN_ = DEFAULT_TOP_N,
5508 const std::string& options_ =
"",
5509 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5512 if (_body.get().getDefaultMaxDepth())
5513 result.maxDepth(_body.get().getDefaultMaxDepth());
5514 result.setSubscription(_body.get().sowAndSubscribe(
5516 topic_, filter_, orderBy_,
5517 bookmark_, batchSize_, topN_,
5518 options_, timeout_,
false));
5547 const std::string& topic_,
5548 const std::string& filter_ =
"",
5549 const std::string& orderBy_ =
"",
5550 int batchSize_ = DEFAULT_BATCH_SIZE,
5551 int topN_ = DEFAULT_TOP_N,
5552 const std::string& options_ =
"",
5553 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5555 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5556 filter_, orderBy_, batchSize_,
5557 topN_, options_, timeout_);
5580 const std::string& filter_ =
"",
5581 const std::string& orderBy_ =
"",
5582 int batchSize_ = DEFAULT_BATCH_SIZE,
5583 int topN_ = DEFAULT_TOP_N,
5584 const std::string& options_ =
"",
5585 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5588 if (_body.get().getDefaultMaxDepth())
5589 result.maxDepth(_body.get().getDefaultMaxDepth());
5590 result.setSubscription(sowAndDeltaSubscribe(result.operator
MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5591 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5593 topic_, filter_, orderBy_,
5594 batchSize_, topN_, options_,
5601 const std::string& filter_ =
"",
5602 const std::string& orderBy_ =
"",
5603 int batchSize_ = DEFAULT_BATCH_SIZE,
5604 int topN_ = DEFAULT_TOP_N,
5605 const std::string& options_ =
"",
5606 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5609 if (_body.get().getDefaultMaxDepth())
5610 result.maxDepth(_body.get().getDefaultMaxDepth());
5611 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5613 topic_, filter_, orderBy_,
5614 batchSize_, topN_, options_,
5644 const std::string& topic_,
5646 const std::string& filter_ =
"",
5647 int batchSize_ = DEFAULT_BATCH_SIZE,
5648 bool oofEnabled_ =
false,
5649 bool sendEmpties_ =
false,
5650 int topN_ = DEFAULT_TOP_N)
5652 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5653 timeout_, filter_, batchSize_,
5654 oofEnabled_, sendEmpties_,
5681 const std::string& filter_ =
"",
5682 int batchSize_ = DEFAULT_BATCH_SIZE,
5683 bool oofEnabled_ =
false,
5684 bool sendEmpties_ =
false,
5685 int topN_ = DEFAULT_TOP_N)
5688 if (_body.get().getDefaultMaxDepth())
5689 result.maxDepth(_body.get().getDefaultMaxDepth());
5690 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5692 topic_, timeout_, filter_,
5693 batchSize_, oofEnabled_,
5694 sendEmpties_, topN_,
false));
5720 const std::string& filter_ =
"",
5721 int batchSize_ = DEFAULT_BATCH_SIZE,
5722 bool oofEnabled_ =
false,
5723 bool sendEmpties_ =
false,
5724 int topN_ = DEFAULT_TOP_N)
5727 if (_body.get().getDefaultMaxDepth())
5728 result.maxDepth(_body.get().getDefaultMaxDepth());
5729 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5731 topic_, timeout_, filter_,
5732 batchSize_, oofEnabled_,
5733 sendEmpties_, topN_,
false));
5756 const std::string& topic,
5757 const std::string& filter,
5760 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5788 stream.setStatsOnly(cid);
5789 _body.get().sowDelete(stream.operator
MessageHandler(),topic,filter,timeout,cid);
5790 return *(stream.
begin());
5792 catch (
const DisconnectedException&)
5794 removeMessageHandler(cid);
5805 _body.get().startTimer();
5816 return _body.get().stopTimer(messageHandler);
5841 const std::string& topic_,
5842 const std::string& keys_,
5845 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5877 stream.setStatsOnly(cid);
5878 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(),topic_,keys_,timeout_,cid);
5879 return *(stream.
begin());
5881 catch (
const DisconnectedException&)
5883 removeMessageHandler(cid);
5903 const std::string& topic_,
const std::string& data_,
5906 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5932 stream.setStatsOnly(cid);
5933 _body.get().sowDeleteByData(stream.operator
MessageHandler(),topic_,data_,timeout_,cid);
5934 return *(stream.
begin());
5936 catch (
const DisconnectedException&)
5938 removeMessageHandler(cid);
5948 return _body.get().getHandle();
5960 _body.get().setExceptionListener(listener_);
5967 return _body.get().getExceptionListener();
5985 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6002 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6008 setLastChanceMessageHandler(messageHandler);
6015 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6038 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6059 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6078 _body.get().addConnectionStateListener(listener);
6086 _body.get().removeConnectionStateListener(listener);
6112 return _body.get().executeAsync(command_, handler_);
6150 id = _body.get().executeAsync(command_, handler_,
false);
6152 catch (
const DisconnectedException&)
6154 removeMessageHandler(command_.getMessage().
getCommandId());
6155 if (command_.isSubscribe())
6159 if (command_.isSow())
6161 removeMessageHandler(command_.getMessage().
getQueryID());
6190 _body.get().ack(topic_,bookmark_,options_);
6208 void ack(
const std::string& topic_,
const std::string& bookmark_,
6209 const char* options_ = NULL)
6211 _body.get().ack(
Field(topic_.data(),topic_.length()),
Field(bookmark_.data(),bookmark_.length()),options_);
6219 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
6221 _body.get()._ack(topic_,bookmark_,options_);
6232 _body.get().flushAcks();
6241 return _body.get().getAutoAck();
6251 _body.get().setAutoAck(isAutoAckEnabled_);
6259 return _body.get().getAckBatchSize();
6269 _body.get().setAckBatchSize(ackBatchSize_);
6278 return _body.get().getAckTimeout();
6288 _body.get().setAckTimeout(ackTimeout_);
6302 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6311 return _body.get().getRetryOnDisconnect();
6320 _body.get().setDefaultMaxDepth(maxDepth_);
6329 return _body.get().getDefaultMaxDepth();
6341 return _body.get().setTransportFilterFunction(filter_, userData_);
6349 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
6351 _body.get().deferredExecution(func_,userData_);
6361 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6367 unsigned deliveries = 0;
6379 const char* data = NULL;
6381 const char* status = NULL;
6382 size_t statusLen = 0;
6384 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6387 &status, &statusLen);
6388 if (len == NotEntitled || len == Duplicate ||
6389 (statusLen == Failure && status[0] ==
'f'))
6391 if (_failedWriteHandler)
6393 amps_uint64_t sequence =
6396 if (_publishStore.isValid())
6398 FailedWriteStoreReplayer replayer(
this, data, len);
6399 _publishStore.replaySingle(replayer, sequence);
6404 _failedWriteHandler->failedWrite(emptyMessage, data, len);
6409 if (_publishStore.isValid())
6418 _publishStore.discardUpTo(seq);
6422 if (!deliveries && _bookmarkStore.isValid())
6428 const char* bookmarkData = NULL;
6429 size_t bookmarkLen = 0;
6431 &bookmarkData, &bookmarkLen);
6432 if (bookmarkLen > 0)
6436 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
6441 catch (std::exception& ex)
6443 AMPS_UNHANDLED_EXCEPTION(ex);
6449 ClientImpl::processedAck(
Message &message)
6451 unsigned deliveries = 0;
6453 const char* data = NULL;
6457 Lock<Mutex> l(_lock);
6460 AckMap::iterator i = _acks.find(std::string(data, len));
6461 if (i != _acks.end())
6472 ack.setStatus(data, len);
6475 ack.setReason(data, len);
6478 ack.setUsername(data, len);
6481 ack.setPassword(data, len);
6484 ack.setSequenceNo(data, len);
6487 ack.setServerVersion(data, len);
6489 ack.setOptions(data,len);
6490 ack.setResponded(
true);
6497 ClientImpl::checkAndSendHeartbeat(
bool force)
6499 if (force || _heartbeatTimer.check())
6501 _heartbeatTimer.start();
6504 sendWithoutRetry(_beatMessage);
6506 catch (
const AMPSException&)
6513 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 6515 ConnectionInfo info;
6516 std::ostringstream writer;
6518 info[
"client.uri"] = _lastUri;
6519 info[
"client.name"] = _name;
6520 info[
"client.username"] = _username;
6521 if(_publishStore.isValid())
6523 writer << _publishStore.unpersistedCount();
6524 info[
"publishStore.unpersistedCount"] = writer.str();
6533 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
6535 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6536 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6537 ClientImpl* me = (ClientImpl*) userData_;
6540 if(me->_queueAckTimeout) me->checkQueueAcks();
6543 me->processDeferredExecutions();
6545 me->_readMessage.replace(messageHandle_);
6546 Message& message = me->_readMessage;
6548 if (commandType & SOWMask)
6550 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6554 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6555 me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6557 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6560 else if (commandType & PublishMask)
6562 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6563 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6564 me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6565 GlobalCommandTypeHandlers::Publish :
6566 GlobalCommandTypeHandlers::OOF)].invoke(message));
6568 const char* subIds = NULL;
6569 size_t subIdsLen = 0;
6572 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
6573 for(
size_t i=0; i<subIdCount; ++i)
6575 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6577 if (handler.isValid())
6580 subIds + lookupResult.idOffset, lookupResult.idLength);
6583 bool isAutoAck = me->_isAutoAckEnabled;
6585 if (!isMessageQueue && !bookmark.
empty() &&
6586 me->_bookmarkStore.isValid())
6588 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6591 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6593 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message);
6598 me->_bookmarkStore.log(me->_readMessage);
6599 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6600 handler.invoke(message));
6605 if(isMessageQueue && isAutoAck)
6609 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6610 if (!message.getIgnoreAutoAck())
6612 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6616 catch(std::exception& ex)
6618 if (!message.getIgnoreAutoAck())
6620 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6623 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6628 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6629 handler.invoke(message));
6633 else me->lastChance(message);
6636 else if (commandType == Message::Command::Ack)
6638 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6639 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6641 unsigned deliveries = 0U;
6644 case Message::AckType::Persisted:
6645 deliveries += me->persistedAck(message);
6647 case Message::AckType::Processed:
6648 deliveries += me->processedAck(message);
6651 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6652 if (deliveries == 0)
6654 me->lastChance(message);
6657 else if (commandType == Message::Command::Heartbeat)
6659 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6660 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6661 if(me->_heartbeatTimer.getTimeout() != 0.0)
6663 me->checkAndSendHeartbeat(
true);
6667 me->lastChance(message);
6673 unsigned deliveries = 0U;
6676 while(me->_connected)
6680 deliveries = me->_routes.deliverData(message, message.
getCommandId());
6684 catch(MessageStreamFullException&)
6686 catch(MessageStreamFullException& ex_)
6689 me->checkAndSendHeartbeat(
false);
6693 catch (std::exception& ex_)
6697 me->_exceptionListener->exceptionThrown(ex_);
6704 if (deliveries == 0)
6705 me->lastChance(message);
6707 me->checkAndSendHeartbeat();
6712 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
6714 ClientImpl* me = (ClientImpl*) userData;
6717 me->clearAcks(failedConnectionVersion);
6721 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
6723 ClientImpl* me = (ClientImpl*) userData;
6724 Lock<Mutex> l(me->_lock);
6725 Client wrapper(me,
false);
6727 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6730 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6733 AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6734 me->_connected =
false;
6738 Unlock<Mutex> unlock(me->_lock);
6739 me->_disconnectHandler.invoke(wrapper);
6742 catch(
const std::exception& ex)
6744 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6747 if (!me->_connected)
6749 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6750 AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException(
"Reconnect failed."));
6756 if (me->_subscriptionManager)
6761 Unlock<Mutex> unlock(me->_lock);
6762 me->_subscriptionManager->resubscribe(wrapper);
6764 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6768 catch(
const AMPSException& subEx)
6770 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6772 catch(
const std::exception& subEx)
6774 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6797 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
6798 : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6800 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6803 typedef void* difference_type;
6804 typedef std::forward_iterator_tag iterator_category;
6805 typedef std::pair<Message::Field, Message::Field> value_type;
6806 typedef value_type* pointer;
6807 typedef value_type& reference;
6808 bool operator==(
const iterator& rhs)
const 6810 return _pos == rhs._pos;
6812 bool operator!=(
const iterator& rhs)
const 6814 return _pos != rhs._pos;
6816 iterator& operator++()
6819 while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
6821 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6825 value_type operator*()
const 6828 size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
6829 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
6831 result.first.assign(_data+_pos, keyLength);
6833 if (i < _len && _data[i] ==
'=')
6837 for(; i < _len && _data[i] != _fieldSep; ++i)
6842 result.second.assign(_data+valueStart, valueLength);
6848 class reverse_iterator
6855 typedef std::pair<Message::Field, Message::Field> value_type;
6856 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
6857 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
6862 while(_pos >=_data && *_pos == _fieldSep) --_pos;
6863 while(_pos > _data && *_pos != _fieldSep) --_pos;
6867 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6868 if (_pos < _data) _pos = 0;
6871 bool operator==(
const reverse_iterator& rhs)
const 6873 return _pos == rhs._pos;
6875 bool operator!=(
const reverse_iterator& rhs)
const 6877 return _pos != rhs._pos;
6879 reverse_iterator& operator++()
6890 while(_pos >=_data && *_pos == _fieldSep) --_pos;
6892 while(_pos >_data && *_pos != _fieldSep) --_pos;
6893 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6894 if (_pos < _data) _pos = 0;
6898 value_type operator*()
const 6901 size_t keyLength = 0, valueStart = 0, valueLength = 0;
6902 size_t i = (size_t)(_pos - _data);
6903 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
6904 result.first.assign(_pos, keyLength);
6905 if (i<_len && _data[i] ==
'=')
6909 for(; i<_len && _data[i] != _fieldSep; ++i)
6914 result.second.assign(_data+valueStart, valueLength);
6919 : _data(data.
data()), _len(data.
len()),
6920 _fieldSep(fieldSeparator)
6924 FIX(
const char* data,
size_t len,
char fieldSeparator=1)
6925 : _data(data), _len(len), _fieldSep(fieldSeparator)
6929 iterator begin()
const 6931 return iterator(_data, _len, 0, _fieldSep);
6933 iterator end()
const 6935 return iterator(_data, _len, _len, _fieldSep);
6939 reverse_iterator rbegin()
const 6941 return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
6944 reverse_iterator rend()
const 6946 return reverse_iterator(_data, _len, 0, _fieldSep);
6967 std::stringstream _data;
6984 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
6987 _data.write(value+offset, (std::streamsize)length);
6995 void append(
const T& tag,
const std::string& value)
6997 _data << tag <<
'=' << value << _fs;
7006 operator std::string()
const 7014 _data.str(std::string());
7051 typedef std::map<Message::Field, Message::Field>
map_type;
7062 for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7074 std::deque<Message> _q;
7075 std::string _commandId;
7079 unsigned _requestedAcks;
7082 volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7083 typedef std::map<std::string, Message*> SOWKeyMap;
7084 SOWKeyMap _sowKeyMap;
7086 MessageStreamImpl(
const Client& client_)
7089 _maxDepth((
unsigned)~0),
7093 if (_client.isValid())
7099 MessageStreamImpl(ClientImpl* client_)
7102 _maxDepth((
unsigned)~0),
7106 if (_client.isValid())
7112 ~MessageStreamImpl()
7116 virtual void destroy()
7122 catch(std::exception &e)
7126 if (_client.isValid())
7132 if (_client.isValid())
7135 _client.deferredExecution(MessageStreamImpl::destroyer,
this);
7136 _client = Client((ClientImpl*)NULL);
7144 static void destroyer(
void* vpMessageStreamImpl_)
7146 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7149 void setSubscription(
const std::string& commandId_)
7151 Lock<Mutex> lock(_lock);
7153 if (Disconnected == _state)
return;
7154 assert(Unset==_state);
7156 _commandId = commandId_;
7159 void setSOWOnly(
const std::string& commandId_)
7161 Lock<Mutex> lock(_lock);
7163 if (Disconnected == _state)
return;
7164 assert(Unset==_state);
7166 _commandId = commandId_;
7169 void setStatsOnly(
const std::string& commandId_)
7171 Lock<Mutex> lock(_lock);
7173 if (Disconnected == _state)
return;
7174 assert(Unset==_state);
7176 _requestedAcks = Message::AckType::Stats;
7177 _commandId = commandId_;
7180 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
7182 Lock<Mutex> lock(_lock);
7184 if (Disconnected == _state)
return;
7185 assert(Unset==_state);
7187 _requestedAcks = acks_;
7188 _commandId = commandId_;
7193 Lock<Mutex> lock(_lock);
7194 if(state_ == AMPS::ConnectionStateListener::Disconnected)
7196 _state = Disconnected;
7201 void timeout(
unsigned timeout_)
7203 _timeout = timeout_;
7207 if(_state == Subscribe) _state = Conflate;
7209 void maxDepth(
unsigned maxDepth_)
7211 if(maxDepth_) _maxDepth = maxDepth_;
7212 else _maxDepth = (unsigned)~0;
7214 unsigned getMaxDepth(
void)
const 7218 unsigned getDepth(
void)
const 7220 return (
unsigned)(_q.size());
7225 Lock<Mutex> lock(_lock);
7226 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
7230 if (_client.isValid())
7232 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7236 catch (AMPSException&)
7238 catch (AMPSException& e)
7241 current_.invalidate();
7242 _previousTopic.
clear();
7243 _previousBookmark.
clear();
7246 _previousTopic.
clear();
7247 _previousBookmark.
clear();
7249 unsigned minWaitTime = _timeout ? _timeout : 1000;
7251 while(_q.empty() && _state & Running)
7254 _lock.wait((
long)minWaitTime);
7255 amps_invoke_waiting_function();
7258 waited += minWaitTime;
7259 if(waited >= _timeout)
break;
7264 current_ = _q.front();
7265 if(_q.size() == _maxDepth) _lock.signalAll();
7267 if(_state == Conflate)
7269 std::string sowKey = current_.
getSowKey();
7270 if(sowKey.length()) _sowKeyMap.erase(sowKey);
7272 else if(_state == AcksOnly)
7276 if((_state == AcksOnly && _requestedAcks == 0) ||
7277 (_state == SOWOnly && current_.
getCommand()==
"group_end"))
7281 else if (current_.
getCommandEnum() == Message::Command::Publish &&
7291 if(_state == Disconnected)
7293 throw DisconnectedException(
"Connection closed.");
7295 current_.invalidate();
7296 if(_state == Closed)
7300 return _timeout != 0;
7304 if((_state==SOWOnly || _state==Subscribe) && _client.isValid())
7308 if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7313 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
7315 Lock<Mutex> lock(this_->_lock);
7316 if(this_->_state != Conflate)
7318 AMPS_TESTING_SLOW_MESSAGE_STREAM
7319 if(this_->_q.size() >= this_->_maxDepth)
7324 this_->_lock.signalAll();
7325 throw MessageStreamFullException(
"Stream is currently full.");
7327 this_->_q.push_back(message_.
deepCopy());
7329 this_->_client.isValid() && this_->_client.getAutoAck() &&
7333 message_.setIgnoreAutoAck();
7338 std::string sowKey = message_.
getSowKey();
7341 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7342 if(it != this_->_sowKeyMap.end())
7344 *(it->second) = message_.
deepCopy();
7348 if(this_->_q.size() >= this_->_maxDepth)
7354 this_->_lock.signalAll();
7355 throw MessageStreamFullException(
"Stream is currently full.");
7357 this_->_q.push_back(message_.
deepCopy());
7358 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7363 while(this_->_q.size() >= this_->_maxDepth)
7365 this_->_lock.wait(1);
7367 this_->_q.push_back(message_.
deepCopy());
7370 this_->_lock.signalAll();
7373 inline MessageStream::MessageStream(
void)
7376 inline MessageStream::MessageStream(
const Client& client_)
7377 :_body(
new MessageStreamImpl(client_))
7380 inline void MessageStream::iterator::advance(
void)
7382 _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7386 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
7391 if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7393 result._body = (MessageStreamImpl*)(handler_._userData);
7398 inline void MessageStream::setSOWOnly(
const std::string& commandId_)
7400 _body->setSOWOnly(commandId_);
7402 inline void MessageStream::setSubscription(
const std::string& commandId_)
7404 _body->setSubscription(commandId_);
7406 inline void MessageStream::setStatsOnly(
const std::string& commandId_)
7408 _body->setStatsOnly(commandId_);
7410 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
7413 _body->setAcksOnly(commandId_, acks_);
7415 inline MessageStream MessageStream::timeout(
unsigned timeout_)
7417 _body->timeout(timeout_);
7425 inline MessageStream MessageStream::maxDepth(
unsigned maxDepth_)
7427 _body->maxDepth(maxDepth_);
7430 inline unsigned MessageStream::getMaxDepth(
void)
const 7432 return _body->getMaxDepth();
7434 inline unsigned MessageStream::getDepth(
void)
const 7436 return _body->getDepth();
7439 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
7441 return *(_pEmptyMessageStream.get());
7449 Message& message = command_.getMessage();
7452 bool useExistingHandler = !subId.
empty() && ((!message.getOptions().empty() && message.getOptions().contains(
"replace",7)) || message.
getCommandEnum() == Message::Command::SOW);
7453 if(useExistingHandler)
7459 if (_body.get()._routes.getRoute(subId, existingHandler))
7462 _body.get().executeAsync(command_, existingHandler,
false);
7463 return MessageStream::fromExistingHandler(existingHandler);
7468 Message::Command::Type command = command_.getMessage().
getCommandEnum();
7469 if ((command == Message::Command::Publish ||
7470 command == Message::Command::DeltaPublish) &&
7471 (ackTypes == Message::AckType::Persisted ||
7472 ackTypes == Message::AckType::None))
7475 if (!_body.get()._pEmptyMessageStream)
7477 _body.get()._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
7478 _body.get()._pEmptyMessageStream.get()->_body->close();
7480 return _body.get().getEmptyMessageStream();
7483 if (_body.get().getDefaultMaxDepth())
7484 stream.maxDepth(_body.get().getDefaultMaxDepth());
7486 std::string commandID = _body.get().executeAsync(command_, handler,
false);
7487 if (command_.hasStatsAck())
7489 stream.setStatsOnly(commandID);
7491 else if (command_.isSow())
7493 stream.setSOWOnly(commandID);
7495 else if (command_.isSubscribe())
7497 stream.setSubscription(commandID);
7502 if (command == Message::Command::Publish ||
7503 command == Message::Command::DeltaPublish ||
7504 command == Message::Command::SOWDelete)
7506 stream.setAcksOnly(commandID,
7507 ackTypes & (
unsigned)~Message::AckType::Persisted);
7511 stream.setAcksOnly(commandID, ackTypes);
7518 inline void Message::ack(
const char* options_)
const 7520 ClientImpl* pClient = _body.get().clientImpl();
7522 if(pClient && bookmark.
len() &&
7523 !pClient->getAutoAck())
7525 pClient->ack(getTopic(),bookmark,options_);
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:530
void prune(std::string tmpFileName_="")
Used to trim the size of a store's storage.
Definition: ampsplusplus.hpp:1043
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:212
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1173
void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: ampsplusplus.hpp:947
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4250
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:958
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:404
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4446
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:978
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:6965
void startTimer()
Sends a message to AMPS requesting that AMPS start a server-side timer.
Definition: ampsplusplus.hpp:5803
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5360
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1143
void setMaxSubIdLength(size_t maxSubIdLength_)
Sets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:1076
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:570
std::string subscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4927
Abstract base class for storing received bookmarks for HA clients.
Definition: ampsplusplus.hpp:679
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1062
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1057
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:461
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:885
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6057
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: ampsplusplus.hpp:1022
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a defualt max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6318
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4351
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5157
void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a Message.
Definition: ampsplusplus.hpp:897
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1060
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1115
virtual void setResizeHandler(BookmarkStoreResizeHandler handler_, void *userData_)
Set a handler on the bookmark store that will get called whenever a resize of the store is required d...
Definition: ampsplusplus.hpp:758
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: ampsplusplus.hpp:992
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6327
void ack(Message &message_, const char *options_=NULL)
ACK a message queue message by supplying the message directly.
Definition: ampsplusplus.hpp:6198
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5112
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:515
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: ampsplusplus.hpp:886
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4570
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4824
bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: ampsplusplus.hpp:924
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4366
DisconnectHandler getDisconnectHandler(void)
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4496
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:979
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:615
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4586
bool(* BookmarkStoreResizeHandler)(BookmarkStore store_, const Message::Field &subId_, size_t size_, void *userData_)
Function type for BookmarkStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:672
void purge()
Called to purge the contents of this store.
Definition: ampsplusplus.hpp:936
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:430
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6084
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:604
int getAckTimeout(void) const
Returns the current value of the ack timeout setting.
Definition: ampsplusplus.hpp:6276
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4337
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5867
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1041
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1332
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7002
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6338
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4722
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4195
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1055
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5546
bool ThrowawayBookmarkResizeHandler(BookmarkStore store_, const Message::Field &subId_, size_t newSize_, void *data_)
A BookmarkStoreResizeHandler that discards the oldest bookmark assuming that it was used but not disc...
Definition: ampsplusplus.hpp:1098
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4422
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1072
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1195
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6300
BookmarkStore(BookmarkStoreImpl *impl_)
Creates a BookmarkStore based on the given implementation.
Definition: ampsplusplus.hpp:843
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:958
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4397
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1426
Success.
Definition: amps.h:116
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:945
std::string deltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5010
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: ampsplusplus.hpp:1032
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:637
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4803
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7047
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:106
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:445
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4435
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4191
amps_result
Return values from amps_xxx functions.
Definition: amps.h:111
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4623
void setResizeHandler(BookmarkStoreResizeHandler handler_, void *userData_)
Set a handler on the bookmark store that will get called whenever a resize of the store is required d...
Definition: ampsplusplus.hpp:959
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:991
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:955
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7444
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:528
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:978
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4578
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5328
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4326
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:568
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1460
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:540
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server.
Definition: ampsplusplus.hpp:4869
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1265
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5643
std::string send(MessageHandler messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4474
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5502
The main class for interacting with AMPS.
Definition: ampsplusplus.hpp:4233
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6036
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:956
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:6076
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:476
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1063
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4671
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:958
void publishFlush(long timeout_=0)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4763
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:978
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4534
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:468
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:980
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:956
std::string sowDelete(MessageHandler messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5755
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:5965
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:654
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1456
void setImplementation(BookmarkStoreImpl *impl_)
Sets the BookmarkStore to use the given implementation.
Definition: ampsplusplus.hpp:857
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1051
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:513
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:557
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5442
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: ampsplusplus.hpp:101
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1052
void setDisconnectHandler(DisconnectHandler disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4488
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:517
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:93
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1286
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:649
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5028
Interface for BookmarkStoreImpl classes.
Definition: ampsplusplus.hpp:833
std::string logon(const char *options_, int timeout_=0)
Logon to the server.
Definition: ampsplusplus.hpp:4886
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:550
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:957
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1072
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: ampsplusplus.hpp:105
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:631
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4306
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:955
std::string bookmarkSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5084
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:6975
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
ACK a message queue message by supplying a topic and bookmark string.
Definition: ampsplusplus.hpp:6208
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:644
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1051
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1137
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5396
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1220
void persisted(const Message::Field &subId_, size_t bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: ampsplusplus.hpp:1002
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6984
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:955
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: ampsplusplus.hpp:109
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1346
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4526
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1303
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the ack batch size setting.
Definition: ampsplusplus.hpp:6267
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1076
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1059
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1374
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5778
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1061
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4518
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:121
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1052
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:511
void setExceptionListener(const ExceptionListener &listener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:5958
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:484
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4546
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Deprecated. Use setLastChanceMessageHandler instead.
Definition: ampsplusplus.hpp:6006
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6995
unsigned getAckBatchSize(void) const
Returns the value of the ack batch size setting.
Definition: ampsplusplus.hpp:6257
size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark sequence in the store.
Definition: ampsplusplus.hpp:981
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4782
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5481
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4162
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: ampsplusplus.hpp:873
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5718
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1295
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:991
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6110
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5679
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server.
Definition: ampsplusplus.hpp:4902
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5169
void setAutoAck(bool isAutoAckEnabled_)
Sets the auto-ack setting on this client.
Definition: ampsplusplus.hpp:6249
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5044
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1062
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1256
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1360
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4601
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7039
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:991
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4505
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
ACK a message queue message by supplying a topic and bookmark.
Definition: ampsplusplus.hpp:6188
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7051
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1241
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7058
void noPersistedAcks(const Message::Field &subId_)
Internally used to indicate when a subscription is placed without requesting persisted acks...
Definition: ampsplusplus.hpp:1012
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:235
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5579
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6000
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:519
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4983
An iterable object representing the contents of an AMPS topic.
Definition: ampsplusplus.hpp:4154
std::string sow(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5204
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:956
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1324
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
std::string sow(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5296
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:552
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1077
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:811
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4404
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5242
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:536
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1057
The operation has not succeeded, but ought to be retried.
Definition: amps.h:140
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4313
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:5983
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4373
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:533
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1274
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1064
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
std::string sowDeleteByData(MessageHandler messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5902
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1021
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1052
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:523
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5946
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7012
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4615
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5600
void setAckTimeout(const int ackTimeout_)
Sets the message queue ACK timeout value.
Definition: ampsplusplus.hpp:6286
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:509
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1076
bool getAutoAck(void) const
Returns the value of the auto-ack setting.
Definition: ampsplusplus.hpp:6239
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4646
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:1064
Definition: ampsplusplus.hpp:136
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4288
std::string stopTimer(MessageHandler messageHandler)
Sends a message to AMPS requesting that AMPS stop the previously started timer.
Definition: ampsplusplus.hpp:5814
size_t getOldestBookmarkSeq(const std::string &subId_)
Called to find the oldest bookmark in the store.
Definition: ampsplusplus.hpp:969
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:600
std::string sowDeleteByKeys(MessageHandler messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5840
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1316
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1402
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1199
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5259
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4695
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6013
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: ampsplusplus.hpp:909
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4848
The client and server are disconnected.
Definition: amps.h:144
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5922
void setMaxSubIdLength(size_t maxSubIdLength_)
Sets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:820
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4295
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:385
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1058
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4953
static const size_t BOOKMARK_NONE
An indicator of no bookmark value.
Definition: Message.hpp:408
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5131
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:496
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6144
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4206
BookmarkStore()
Creates a BookmarkStore that does nothing.
Definition: ampsplusplus.hpp:839
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6309
void flushAcks(void)
Sends any queued message queue ACK messages to the server immediately.
Definition: ampsplusplus.hpp:6230