25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 46 #include <sys/atomic.h> 48 #include "BookmarkStore.hpp" 49 #include "MessageRouter.hpp" 51 #include "ampscrc.hpp" 53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 88 #define AMPS_DEFAULT_TOP_N -1 89 #define AMPS_DEFAULT_BATCH_SIZE 10 90 #define AMPS_NUMBER_BUFFER_LEN 20 91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 98 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
106 typedef std::map<std::string, std::string> ConnectionInfo;
108 class PerThreadMessageTracker
110 std::vector<AMPS::Message*> _messages;
112 PerThreadMessageTracker() {}
113 ~PerThreadMessageTracker()
115 for (
size_t i = 0; i < _messages.size(); ++i)
122 _messages.push_back(message);
126 static AMPS::Mutex _lock;
127 AMPS::Lock<Mutex> l(_lock);
128 _addMessageToCleanupList(message);
132 static PerThreadMessageTracker tracker;
133 tracker.addMessage(message);
138 inline std::string asString(Type x_)
140 std::ostringstream os;
146 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
148 size_t pos = AMPS_NUMBER_BUFFER_LEN;
149 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
153 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
162 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
164 size_t pos = AMPS_NUMBER_BUFFER_LEN;
165 for (
int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
169 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
183 static const char* duplicate()
187 static const char* badFilter()
191 static const char* badRegexTopic()
193 return "bad regex topic";
195 static const char* subscriptionAlreadyExists()
197 return "subscription already exists";
199 static const char* nameInUse()
201 return "name in use";
203 static const char* authFailure()
205 return "auth failure";
207 static const char* notEntitled()
209 return "not entitled";
211 static const char* authDisabled()
213 return "authentication disabled";
215 static const char* subidInUse()
217 return "subid in use";
219 static const char* noTopic()
237 virtual void exceptionThrown(
const std::exception&)
const {;}
243 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 248 catch (std::exception& ex_)\ 252 _exceptionListener->exceptionThrown(ex_);\ 277 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 280 while(me->_connected)\ 287 catch(MessageStreamFullException&)\ 289 me->checkAndSendHeartbeat(false);\ 293 catch (std::exception& ex_)\ 297 me->_exceptionListener->exceptionThrown(ex_);\ 321 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 322 while(me->_connected)\ 329 catch(MessageStreamFullException&)\ 331 me->checkAndSendHeartbeat(false);\ 335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 338 while(me->_connected)\ 345 catch(MessageStreamFullException& ex_)\ 347 me->checkAndSendHeartbeat(false);\ 351 catch (std::exception& ex_)\ 355 me->_exceptionListener->exceptionThrown(ex_);\ 379 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 380 while(me->_connected)\ 387 catch(MessageStreamFullException& ex_)\ 389 me->checkAndSendHeartbeat(false);\ 394 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 397 _exceptionListener->exceptionThrown(ex);\ 402 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 405 me->_exceptionListener->exceptionThrown(ex);\ 444 static const unsigned Subscribe = 1;
445 static const unsigned SOW = 2;
446 static const unsigned NeedsSequenceNumber = 4;
447 static const unsigned ProcessedAck = 8;
448 static const unsigned StatsAck = 16;
449 void init(Message::Command::Type command_)
458 void init(
const std::string& command_)
470 if (!(command & Message::Command::NoDataCommands))
473 if (command == Message::Command::Subscribe ||
474 command == Message::Command::SOWAndSubscribe ||
475 command == Message::Command::DeltaSubscribe ||
476 command == Message::Command::SOWAndDeltaSubscribe)
481 if (command == Message::Command::SOW
482 || command == Message::Command::SOWAndSubscribe
483 || command == Message::Command::SOWAndDeltaSubscribe)
488 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
490 if (command == Message::Command::SOW)
495 _flags |= ProcessedAck;
497 else if (command == Message::Command::SOWDelete)
500 _flags |= ProcessedAck;
501 _flags |= NeedsSequenceNumber;
503 else if (command == Message::Command::Publish
504 || command == Message::Command::DeltaPublish)
506 _flags |= NeedsSequenceNumber;
508 else if (command == Message::Command::StopTimer)
655 std::ostringstream os;
660 amps_uint64_t getSequence()
const 676 _message.
setData(data_, dataLen_);
706 _batchSize = batchSize_;
728 if (ackType_ ==
"processed")
730 _flags |= ProcessedAck;
732 else if (ackType_ ==
"stats")
742 if (ackType_.find(
"processed") != std::string::npos)
744 _flags |= ProcessedAck;
748 _flags &= ~ProcessedAck;
750 if (ackType_.find(
"stats") != std::string::npos)
764 if (ackType_ & Message::AckType::Processed)
766 _flags |= ProcessedAck;
770 _flags &= ~ProcessedAck;
772 if (ackType_ & Message::AckType::Stats)
797 unsigned getTimeout(
void)
const 801 unsigned getBatchSize(
void)
const 805 bool isSubscribe(
void)
const 807 return _flags & Subscribe;
809 bool isSow(
void)
const 811 return (_flags & SOW) != 0;
813 bool hasProcessedAck(
void)
const 815 return (_flags & ProcessedAck) != 0;
817 bool hasStatsAck(
void)
const 819 return (_flags & StatsAck) != 0;
821 bool needsSequenceNumber(
void)
const 823 return (_flags & NeedsSequenceNumber) != 0;
829 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
846 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
854 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
861 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
873 std::string
authenticate(
const std::string& ,
const std::string& password_)
880 std::string
retry(
const std::string& ,
const std::string& )
882 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
885 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
906 virtual void execute(
Message& message_) = 0;
921 typedef bool (*PublishStoreResizeHandler)(
Store store_,
931 : _resizeHandler(NULL)
932 , _resizeHandlerData(NULL)
939 virtual amps_uint64_t store(
const Message& message_) = 0;
945 virtual void discardUpTo(amps_uint64_t index_) = 0;
960 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
966 virtual size_t unpersistedCount()
const = 0;
978 virtual void flush(
long timeout_) = 0;
984 return AMPS_UNSET_INDEX;
991 return AMPS_UNSET_SEQUENCE;
997 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
1002 virtual amps_uint64_t getLastPersisted() = 0;
1016 _resizeHandler = handler_;
1017 _resizeHandlerData = userData_;
1022 return _resizeHandler;
1025 bool callResizeHandler(
size_t newSize_);
1029 void* _resizeHandlerData;
1036 RefHandle<StoreImpl> _body;
1040 Store(
const Store& rhs) : _body(rhs._body) {;}
1052 return _body.get().store(message_);
1061 _body.get().discardUpTo(index_);
1070 _body.get().replay(replayer_);
1082 return _body.get().replaySingle(replayer_, index_);
1091 return _body.get().unpersistedCount();
1099 return _body.isValid();
1112 return _body.get().flush(timeout_);
1120 return _body.get().getLowestUnpersisted();
1128 return _body.get().getLastPersisted();
1143 _body.get().setResizeHandler(handler_, userData_);
1148 return _body.get().getResizeHandler();
1156 if (_body.isValid())
1158 return &_body.get();
1182 virtual void failedWrite(
const Message& message_,
1183 const char* reason_,
size_t reasonLength_) = 0;
1187 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1191 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1205 long* timeoutp = (
long*)data_;
1213 store_.
flush(*timeoutp);
1216 catch (
const TimedOutException&)
1218 catch (
const TimedOutException& e)
1241 unsigned requestedAckTypes_) = 0;
1248 virtual void clear() = 0;
1252 virtual void resubscribe(Client& client_) = 0;
1264 typedef enum { Disconnected = 0,
1268 PublishReplayed = 8,
1269 HeartbeatInitiated = 16,
1283 virtual void connectionStateChanged(
State newState_) = 0;
1288 class MessageStreamImpl;
1291 typedef void(*DeferredExecutionFunc)(
void*);
1293 class ClientImpl :
public RefBody
1299 AMPS_SOCKET _socket;
1305 socklen_t _valueLen;
1309 : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(
sizeof(
int))
1311 _valuePtr = (
char*)&_noDelay;
1313 if (_socket != AMPS_INVALID_SOCKET)
1315 getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1319 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1323 _socket = AMPS_INVALID_SOCKET;
1330 if (_socket != AMPS_INVALID_SOCKET)
1333 setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1338 friend class Client;
1341 DisconnectHandler _disconnectHandler;
1342 enum GlobalCommandTypeHandlers :
size_t 1352 DuplicateMessage = 8,
1355 std::vector<MessageHandler> _globalCommandTypeHandlers;
1356 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1358 MessageRouter::RouteCache _routeCache;
1359 mutable Mutex _lock;
1360 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1361 amps_uint64_t _nameHashValue;
1363 Store _publishStore;
1364 bool _isRetryOnDisconnect;
1365 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1366 volatile amps_uint64_t _lastSentHaSequenceNumber;
1367 ATOMIC_TYPE_8 _badTimeToHAPublish;
1368 ATOMIC_TYPE_8 _badTimeToHASubscribe;
1369 VersionInfo _serverVersion;
1370 Timer _heartbeatTimer;
1371 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1374 int _queueAckTimeout;
1375 bool _isAutoAckEnabled;
1376 unsigned _ackBatchSize;
1377 unsigned _queuedAckCount;
1378 unsigned _defaultMaxDepth;
1379 struct QueueBookmarks
1381 QueueBookmarks(
const std::string& topic_)
1388 amps_uint64_t _oldestTime;
1389 unsigned _bookmarkCount;
1391 typedef amps_uint64_t topic_hash;
1392 typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1393 TopicHashMap _topicHashMap;
1397 ClientImpl* _client;
1402 ClientStoreReplayer()
1403 : _client(NULL), _version(0), _res(
AMPS_E_OK)
1406 ClientStoreReplayer(ClientImpl* client_)
1407 : _client(client_), _version(0), _res(
AMPS_E_OK)
1410 void setClient(ClientImpl* client_)
1415 void execute(
Message& message_)
1419 throw CommandException(
"Can't replay without a client.");
1423 if (index > _client->_lastSentHaSequenceNumber)
1425 _client->_lastSentHaSequenceNumber = index;
1433 (!_client->_badTimeToHAPublish ||
1437 message_.getMessage(),
1441 throw DisconnectedException(
"AMPS Server disconnected during replay");
1447 ClientStoreReplayer _replayer;
1451 ClientImpl* _parent;
1452 const char* _reason;
1453 size_t _reasonLength;
1454 size_t _replayCount;
1456 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1459 _reasonLength(reasonLength_),
1462 void execute(
Message& message_)
1464 if (_parent->_failedWriteHandler)
1467 _parent->_failedWriteHandler->failedWrite(message_,
1468 _reason, _reasonLength);
1471 size_t replayCount(
void)
const 1473 return _replayCount;
1477 struct AckResponseImpl :
public RefBody
1479 std::string username, password, reason, status, bookmark, options;
1480 amps_uint64_t sequenceNo;
1481 amps_uint64_t nameHashValue;
1482 VersionInfo serverVersion;
1483 volatile bool responded, abandoned;
1484 unsigned connectionVersion;
1487 sequenceNo((amps_uint64_t)0),
1491 connectionVersion(0)
1498 RefHandle<AckResponseImpl> _body;
1500 AckResponse() : _body(NULL) {;}
1501 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1502 static AckResponse create()
1505 r._body =
new AckResponseImpl();
1509 const std::string& username()
1511 return _body.get().username;
1513 void setUsername(
const char* data_,
size_t len_)
1517 _body.get().username.assign(data_, len_);
1521 _body.get().username.clear();
1524 const std::string& password()
1526 return _body.get().password;
1528 void setPassword(
const char* data_,
size_t len_)
1532 _body.get().password.assign(data_, len_);
1536 _body.get().password.clear();
1539 const std::string& reason()
1541 return _body.get().reason;
1543 void setReason(
const char* data_,
size_t len_)
1547 _body.get().reason.assign(data_, len_);
1551 _body.get().reason.clear();
1554 const std::string& status()
1556 return _body.get().status;
1558 void setStatus(
const char* data_,
size_t len_)
1562 _body.get().status.assign(data_, len_);
1566 _body.get().status.clear();
1569 const std::string& bookmark()
1571 return _body.get().bookmark;
1573 void setBookmark(
const Field& bookmark_)
1575 if (!bookmark_.
empty())
1577 _body.get().bookmark.assign(bookmark_.
data(), bookmark_.
len());
1578 Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1579 _body.get().sequenceNo);
1583 _body.get().bookmark.clear();
1584 _body.get().sequenceNo = (amps_uint64_t)0;
1585 _body.get().nameHashValue = (amps_uint64_t)0;
1588 amps_uint64_t sequenceNo()
const 1590 return _body.get().sequenceNo;
1592 amps_uint64_t nameHashValue()
const 1594 return _body.get().nameHashValue;
1596 void setSequenceNo(
const char* data_,
size_t len_)
1598 amps_uint64_t result = (amps_uint64_t)0;
1601 for (
size_t i = 0; i < len_; ++i)
1603 result *= (amps_uint64_t)10;
1604 result += (amps_uint64_t)(data_[i] -
'0');
1607 _body.get().sequenceNo = result;
1609 VersionInfo serverVersion()
const 1611 return _body.get().serverVersion;
1613 void setServerVersion(
const char* data_,
size_t len_)
1617 _body.get().serverVersion.setVersion(std::string(data_, len_));
1622 return _body.get().responded;
1624 void setResponded(
bool responded_)
1626 _body.get().responded = responded_;
1630 return _body.get().abandoned;
1632 void setAbandoned(
bool abandoned_)
1634 if (_body.isValid())
1636 _body.get().abandoned = abandoned_;
1640 void setConnectionVersion(
unsigned connectionVersion)
1642 _body.get().connectionVersion = connectionVersion;
1645 unsigned getConnectionVersion()
1647 return _body.get().connectionVersion;
1649 void setOptions(
const char* data_,
size_t len_)
1653 _body.get().options.assign(data_, len_);
1657 _body.get().options.clear();
1661 const std::string& options()
1663 return _body.get().options;
1666 AckResponse& operator=(
const AckResponse& rhs)
1674 typedef std::map<std::string, AckResponse> AckMap;
1677 DefaultExceptionListener _defaultExceptionListener;
1680 struct DeferredExecutionRequest
1682 DeferredExecutionRequest(DeferredExecutionFunc func_,
1685 _userData(userData_)
1688 DeferredExecutionFunc _func;
1692 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1693 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1695 std::string _username;
1696 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1697 ConnectionStateListeners _connectionStateListeners;
1698 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1699 Mutex _deferredExecutionLock;
1700 DeferredExecutionList _deferredExecutionList;
1701 unsigned _heartbeatInterval;
1702 unsigned _readTimeout;
1710 if (!_connected && newState_ > ConnectionStateListener::Connected)
1714 for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1716 AMPS_CALL_EXCEPTION_WRAPPER(
1717 (*it)->connectionStateChanged(newState_));
1720 unsigned processedAck(
Message& message);
1721 unsigned persistedAck(
Message& meesage);
1722 void lastChance(
Message& message);
1723 void checkAndSendHeartbeat(
bool force =
false);
1724 virtual ConnectionInfo getConnectionInfo()
const;
1726 ClientImplMessageHandler(
amps_handle message,
void* userData);
1728 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1730 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1732 void unsubscribeInternal(
const std::string&
id)
1740 subId.assign(
id.data(),
id.length());
1741 _routes.removeRoute(subId);
1743 if (_subscriptionManager)
1746 Unlock<Mutex> unlock(_lock);
1747 _subscriptionManager->unsubscribe(subId);
1753 _sendWithoutRetry(_message);
1754 deferredExecution(&s_noOpFn, NULL);
1757 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1758 bool isHASubscribe_)
1760 return syncAckProcessing(timeout_, message_,
1761 (amps_uint64_t)0, isHASubscribe_);
1764 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1765 amps_uint64_t haSeq = (amps_uint64_t)0,
1766 bool isHASubscribe_ =
false)
1769 AckResponse ack = AckResponse::create();
1772 Lock<Mutex> guard(_ackMapLock);
1775 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1776 if (ack.getConnectionVersion() == 0)
1779 throw DisconnectedException(
"Connection closed while waiting for response.");
1781 bool timedOut =
false;
1782 AMPS_START_TIMER(timeout_)
1783 while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
1787 timedOut = !_lock.wait(timeout_);
1791 AMPS_RESET_TIMER(timedOut, timeout_);
1798 Unlock<Mutex> unlck(_lock);
1799 amps_invoke_waiting_function();
1802 if (ack.responded())
1804 if (ack.status() !=
"failure")
1808 amps_uint64_t ackSequence = ack.sequenceNo();
1809 if (_lastSentHaSequenceNumber < ackSequence)
1811 _lastSentHaSequenceNumber = ackSequence;
1824 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
1825 _nameHashValue = ack.nameHashValue();
1826 _serverVersion = ack.serverVersion();
1827 if (_bookmarkStore.isValid())
1834 const std::string& options = ack.options();
1835 size_t index = options.find_first_of(
"max_backlog=");
1836 if (index != std::string::npos)
1839 const char* c = options.c_str() + index + 12;
1840 while (*c && *c !=
',')
1842 data = (data * 10) + (
unsigned)(*c++ -48);
1844 if (_ackBatchSize > data)
1846 _ackBatchSize = data;
1852 const size_t NotEntitled = 12;
1853 std::string ackReason = ack.reason();
1854 if (ackReason.length() == 0)
1858 if (ackReason.length() == NotEntitled &&
1859 ackReason[0] ==
'n' &&
1864 message_.throwFor(_client, ackReason);
1868 if (!ack.abandoned())
1870 throw TimedOutException(
"timed out waiting for operation.");
1874 throw DisconnectedException(
"Connection closed while waiting for response.");
1888 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1889 _pEmptyMessageStream.reset(NULL);
1896 ClientImpl(
const std::string& clientName)
1897 : _client(NULL), _name(clientName)
1898 , _isRetryOnDisconnect(
true)
1899 , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1900 , _badTimeToHASubscribe(0), _serverVersion()
1901 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1902 , _isAutoAckEnabled(
false)
1904 , _queuedAckCount(0)
1905 , _defaultMaxDepth(0)
1907 , _heartbeatInterval(0)
1910 _replayer.setClient(
this);
1913 (amps_handler)ClientImpl::ClientImplMessageHandler,
1916 (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
1919 (amps_handler)ClientImpl::ClientImplDisconnectHandler,
1921 _exceptionListener = &_defaultExceptionListener;
1922 for (
size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
1924 #ifdef AMPS_USE_EMPLACE 1932 virtual ~ClientImpl()
1937 const std::string& getName()
const 1942 const std::string& getNameHash()
const 1947 const amps_uint64_t getNameHashValue()
const 1949 return _nameHashValue;
1952 void setName(
const std::string& name)
1959 AMPSException::throwFor(_client, result);
1964 const std::string& getLogonCorrelationData()
const 1966 return _logonCorrelationData;
1969 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
1971 _logonCorrelationData = logonCorrelationData_;
1974 size_t getServerVersion()
const 1976 return _serverVersion.getOldStyleVersion();
1979 VersionInfo getServerVersionInfo()
const 1981 return _serverVersion;
1984 const std::string& getURI()
const 1989 virtual void connect(
const std::string& uri)
1991 Lock<Mutex> l(_lock);
1995 virtual void _connect(
const std::string& uri)
2001 AMPSException::throwFor(_client, result);
2008 _readMessage.setClientImpl(
this);
2009 if (_queueAckTimeout)
2014 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2017 void setDisconnected()
2020 Lock<Mutex> l(_lock);
2023 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2026 _heartbeatTimer.setTimeout(0.0);
2033 virtual void disconnect()
2035 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2037 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2038 Lock<Mutex> l(_lock);
2039 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2042 void clearAcks(
unsigned failedVersion)
2045 Lock<Mutex> guard(_ackMapLock);
2048 std::vector<std::string> worklist;
2049 for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2051 if (i->second.getConnectionVersion() <= failedVersion)
2053 i->second.setAbandoned(
true);
2054 worklist.push_back(i->first);
2058 for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2067 int send(
const Message& message)
2069 Lock<Mutex> l(_lock);
2070 return _send(message);
2073 void sendWithoutRetry(
const Message& message_)
2075 Lock<Mutex> l(_lock);
2076 _sendWithoutRetry(message_);
2079 void _sendWithoutRetry(
const Message& message_)
2084 AMPSException::throwFor(_client, result);
2088 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2089 bool isHASubscribe_ =
false)
2096 Message localMessage = message;
2097 unsigned version = 0;
2101 if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
2105 if (!_isRetryOnDisconnect)
2109 if (!_lock.wait(1000))
2111 amps_invoke_waiting_function();
2116 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2117 (isHASubscribe_ && _badTimeToHASubscribe != 0))
2119 return (
int)version;
2123 if (haSeq > _lastSentHaSequenceNumber)
2125 while (haSeq > _lastSentHaSequenceNumber + 1)
2131 _lastSentHaSequenceNumber + 1))
2137 version = _replayer._version;
2140 catch (
const DisconnectedException&)
2142 catch (
const DisconnectedException& e)
2145 result = _replayer._res;
2150 localMessage.getMessage(),
2152 ++_lastSentHaSequenceNumber;
2156 localMessage.getMessage(),
2160 if (!isHASubscribe_ && !haSeq &&
2161 localMessage.getMessage() == message.getMessage())
2165 if (_isRetryOnDisconnect)
2167 Unlock<Mutex> u(_lock);
2172 if ((isHASubscribe_ || haSeq) &&
2175 return (
int)version;
2182 AMPSException::throwFor(_client, result);
2188 amps_invoke_waiting_function();
2194 AMPSException::throwFor(_client, result);
2196 return (
int)version;
2199 void addMessageHandler(
const Field& commandId_,
2201 unsigned requestedAcks_,
bool isSubscribe_)
2203 Lock<Mutex> lock(_lock);
2204 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2208 bool removeMessageHandler(
const Field& commandId_)
2210 Lock<Mutex> lock(_lock);
2211 return _routes.removeRoute(commandId_);
2219 bool isSubscribe =
false;
2220 bool isSubscribeOnly =
false;
2221 bool replace =
false;
2223 unsigned systemAddedAcks = Message::AckType::None;
2227 case Message::Command::Subscribe:
2228 case Message::Command::DeltaSubscribe:
2229 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2230 isSubscribeOnly =
true;
2232 case Message::Command::SOWAndSubscribe:
2233 case Message::Command::SOWAndDeltaSubscribe:
2240 while (!replace &&
id != subId && _routes.hasRoute(
id))
2253 systemAddedAcks |= Message::AckType::Persisted;
2256 case Message::Command::SOW:
2263 while (!replace &&
id != subId && _routes.hasRoute(
id))
2274 if (!isSubscribeOnly)
2283 while (!replace && qid != subId && qid !=
id 2284 && _routes.hasRoute(qid))
2290 systemAddedAcks |= Message::AckType::Processed;
2292 if (!isSubscribeOnly)
2294 systemAddedAcks |= Message::AckType::Completed;
2298 int routesAdded = 0;
2299 Lock<Mutex> l(_lock);
2300 if (!subId.
empty() && messageHandler_.isValid())
2302 if (!_routes.hasRoute(subId))
2308 _routes.addRoute(subId, messageHandler_, requestedAcks,
2309 systemAddedAcks, isSubscribe);
2311 if (!isSubscribeOnly && !qid.
empty()
2312 && messageHandler_.isValid() && qid != subId)
2314 if (routesAdded == 0)
2316 _routes.addRoute(qid, messageHandler_,
2317 requestedAcks, systemAddedAcks,
false);
2323 Unlock<Mutex> u(_lock);
2324 data = amps_invoke_copy_route_function(
2325 messageHandler_.userData());
2329 _routes.addRoute(qid, messageHandler_, requestedAcks,
2330 systemAddedAcks,
false);
2334 _routes.addRoute(qid,
2337 requestedAcks, systemAddedAcks,
false);
2342 if (!
id.empty() && messageHandler_.isValid()
2343 && requestedAcks & ~
Message::AckType::Persisted
2344 &&
id != subId &&
id != qid)
2346 if (routesAdded == 0)
2348 _routes.addRoute(
id, messageHandler_, requestedAcks,
2349 systemAddedAcks,
false);
2355 Unlock<Mutex> u(_lock);
2356 data = amps_invoke_copy_route_function(
2357 messageHandler_.userData());
2361 _routes.addRoute(
id, messageHandler_, requestedAcks,
2362 systemAddedAcks,
false);
2366 _routes.addRoute(
id,
2370 systemAddedAcks,
false);
2379 syncAckProcessing(timeout_, message_, 0,
false);
2386 _routes.removeRoute(
id);
2393 case Message::Command::Unsubscribe:
2394 case Message::Command::Heartbeat:
2395 case Message::Command::Logon:
2396 case Message::Command::StartTimer:
2397 case Message::Command::StopTimer:
2398 case Message::Command::SOWDelete:
2400 Lock<Mutex> l(_lock);
2409 if (messageHandler_.isValid())
2411 _routes.addRoute(
id, messageHandler_, requestedAcks,
2412 Message::AckType::None,
false);
2418 case Message::Command::DeltaPublish:
2419 case Message::Command::Publish:
2422 Lock<Mutex> l(_lock);
2425 if (ackType != Message::AckType::None
2433 if (messageHandler_.isValid())
2435 _routes.addRoute(
id, messageHandler_, requestedAcks,
2436 Message::AckType::None,
false);
2442 syncAckProcessing(timeout_, message_, 0,
false);
2451 case Message::Command::GroupBegin:
2452 case Message::Command::GroupEnd:
2453 case Message::Command::OOF:
2454 case Message::Command::Ack:
2455 case Message::Command::Unknown:
2457 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2463 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2465 Lock<Mutex> l(_lock);
2466 _disconnectHandler = disconnectHandler;
2469 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2471 switch (command_[0])
2473 #if 0 // Not currently implemented to avoid an extra branch in delivery 2475 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2478 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2482 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2484 #if 0 // Not currently implemented to avoid an extra branch in delivery 2486 if (command_[6] ==
'b')
2488 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2490 else if (command_[6] ==
'e')
2492 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2496 std::ostringstream os;
2497 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2498 throw CommandException(os.str());
2502 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2506 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2510 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2514 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2517 std::ostringstream os;
2518 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2519 throw CommandException(os.str());
2524 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2528 #if 0 // Not currently implemented to avoid an extra branch in delivery 2529 case Message::Command::Publish:
2530 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2532 case Message::Command::SOW:
2533 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2536 case Message::Command::Heartbeat:
2537 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2539 #if 0 // Not currently implemented to avoid an extra branch in delivery 2540 case Message::Command::GroupBegin:
2541 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2543 case Message::Command::GroupEnd:
2544 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2546 case Message::Command::OOF:
2547 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2550 case Message::Command::Ack:
2551 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2555 unsigned command = command_;
2562 AMPS_snprintf(errBuf,
sizeof(errBuf),
2563 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2564 CommandConstants<0>::Lengths[bits],
2565 CommandConstants<0>::Values[bits]);
2566 throw CommandException(errBuf);
2571 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2573 _globalCommandTypeHandlers[handlerType_] = handler_;
2578 Lock<Mutex> l(_lock);
2579 _failedWriteHandler.reset(handler_);
2582 void setPublishStore(
const Store& publishStore_)
2584 Lock<Mutex> l(_lock);
2587 throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2589 _publishStore = publishStore_;
2594 Lock<Mutex> l(_lock);
2597 throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2599 _bookmarkStore = bookmarkStore_;
2604 Lock<Mutex> l(_lock);
2605 _subscriptionManager.reset(subscriptionManager_);
2613 DisconnectHandler getDisconnectHandler()
const 2615 return _disconnectHandler;
2620 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2628 Store getPublishStore()
const 2630 return _publishStore;
2635 return _bookmarkStore;
2638 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2642 Lock<Mutex> l(_lock);
2644 _publishMessage.assignData(data_, dataLen_);
2645 _send(_publishMessage);
2650 if (!publishStoreMessage)
2652 publishStoreMessage =
new Message();
2653 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2655 publishStoreMessage->reset();
2656 publishStoreMessage->setCommandEnum(Message::Command::Publish);
2657 return _publish(topic_, topicLen_, data_, dataLen_);
2661 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2662 size_t dataLen_,
unsigned long expiration_)
2666 Lock<Mutex> l(_lock);
2668 _publishMessage.assignData(data_, dataLen_);
2669 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2670 size_t pos = convertToCharArray(exprBuf, expiration_);
2671 _publishMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2672 _send(_publishMessage);
2678 if (!publishStoreMessage)
2680 publishStoreMessage =
new Message();
2681 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2683 publishStoreMessage->reset();
2684 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2685 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2686 publishStoreMessage->setCommandEnum(Message::Command::Publish)
2687 .assignExpiration(exprBuf + exprPos,
2688 AMPS_NUMBER_BUFFER_LEN - exprPos);
2689 return _publish(topic_, topicLen_, data_, dataLen_);
2696 ClientImpl* _pClient;
2698 volatile bool _acked;
2699 volatile bool _disconnected;
2701 FlushAckHandler(ClientImpl* pClient_)
2702 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
2704 pClient_->addConnectionStateListener(
this);
2708 _pClient->removeConnectionStateListener(
this);
2709 _pClient->removeMessageHandler(_cmdId);
2712 void setCommandId(
const Field& cmdId_)
2720 void connectionStateChanged(
State state_)
2722 if (state_ <= Shutdown)
2724 _disconnected =
true;
2733 return _acked || _disconnected;
2737 void publishFlush(
long timeout_,
unsigned ackType_)
2739 static const char* processed =
"processed";
2740 static const size_t processedLen = strlen(processed);
2741 static const char* persisted =
"persisted";
2742 static const size_t persistedLen = strlen(persisted);
2743 static const char* flush =
"flush";
2744 static const size_t flushLen = strlen(flush);
2745 static VersionInfo minPersisted(
"5.3.3.0");
2746 static VersionInfo minFlush(
"4");
2747 if (ackType_ != Message::AckType::Processed
2748 && ackType_ != Message::AckType::Persisted)
2750 throw CommandException(
"Flush can only be used with processed or persisted acks.");
2752 FlushAckHandler flushHandler(
this);
2753 if (_serverVersion >= minFlush)
2755 Lock<Mutex> l(_lock);
2758 throw DisconnectedException(
"Not connected trying to flush");
2763 if (_serverVersion < minPersisted
2764 || ackType_ == Message::AckType::Processed)
2774 std::bind(&FlushAckHandler::invoke,
2775 std::ref(flushHandler),
2776 std::placeholders::_1),
2778 NoDelay noDelay(_client);
2779 if (_send(_message) == -1)
2781 throw DisconnectedException(
"Disconnected trying to flush");
2788 _publishStore.
flush(timeout_);
2790 catch (
const AMPSException& ex)
2792 AMPS_UNHANDLED_EXCEPTION(ex);
2796 else if (_serverVersion < minFlush)
2800 AMPS_USLEEP(timeout_ * 1000);
2804 AMPS_USLEEP(1000 * 1000);
2810 Timer timer((
double)timeout_);
2812 while (!timer.check() && !flushHandler.done())
2815 amps_invoke_waiting_function();
2820 while (!flushHandler.done())
2823 amps_invoke_waiting_function();
2827 if (!flushHandler.done())
2829 throw TimedOutException(
"Timed out waiting for flush");
2832 if (!flushHandler.acked() && !_publishStore.
isValid())
2834 throw DisconnectedException(
"Disconnected waiting for flush");
2838 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2839 const char* data_,
size_t dataLength_)
2843 Lock<Mutex> l(_lock);
2845 _deltaMessage.assignData(data_, dataLength_);
2846 _send(_deltaMessage);
2851 if (!publishStoreMessage)
2853 publishStoreMessage =
new Message();
2854 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2856 publishStoreMessage->reset();
2857 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2858 return _publish(topic_, topicLength_, data_, dataLength_);
2862 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2863 const char* data_,
size_t dataLength_,
2864 unsigned long expiration_)
2868 Lock<Mutex> l(_lock);
2870 _deltaMessage.assignData(data_, dataLength_);
2871 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2872 size_t pos = convertToCharArray(exprBuf, expiration_);
2873 _deltaMessage.
assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2874 _send(_deltaMessage);
2880 if (!publishStoreMessage)
2882 publishStoreMessage =
new Message();
2883 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2885 publishStoreMessage->reset();
2886 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2887 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2888 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2889 .assignExpiration(exprBuf + exprPos,
2890 AMPS_NUMBER_BUFFER_LEN - exprPos);
2891 return _publish(topic_, topicLength_, data_, dataLength_);
2895 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
2896 const char* data_,
size_t dataLength_)
2898 publishStoreMessage->assignTopic(topic_, topicLength_)
2899 .setAckTypeEnum(Message::AckType::Persisted)
2900 .assignData(data_, dataLength_);
2901 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
2902 char buf[AMPS_NUMBER_BUFFER_LEN];
2903 size_t pos = convertToCharArray(buf, haSequenceNumber);
2904 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2906 Lock<Mutex> l(_lock);
2907 _send(*publishStoreMessage, haSequenceNumber);
2909 return haSequenceNumber;
2912 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
2913 const char* options_ = NULL)
2915 Lock<Mutex> l(_lock);
2916 return _logon(timeout_, authenticator_, options_);
2919 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
2920 const char* options_ = NULL)
2922 AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2928 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 2930 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2933 if (uri.user().size())
2937 if (uri.password().size())
2941 if (uri.protocol() ==
"amps" && uri.messageType().size())
2945 if (uri.isTrue(
"pretty"))
2951 if (!_logonCorrelationData.empty())
2962 NoDelay noDelay(_client);
2966 AckResponse ack = syncAckProcessing(timeout_, _message);
2967 if (ack.status() ==
"retry")
2969 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
2970 _username = ack.username();
2975 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
2979 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2984 catch (
const AMPSException& ex)
2987 AMPS_UNHANDLED_EXCEPTION(ex);
3000 _publishStore.
replay(_replayer);
3001 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3003 catch (
const StoreException& ex)
3006 std::ostringstream os;
3007 os <<
"A local store exception occurred while logging on." 3009 throw ConnectionException(os.str());
3011 catch (
const AMPSException& ex)
3014 AMPS_UNHANDLED_EXCEPTION(ex);
3017 catch (
const std::exception& ex)
3020 AMPS_UNHANDLED_EXCEPTION(ex);
3030 return newCommandId;
3034 const std::string& topic_,
3036 const std::string& filter_,
3037 const std::string& bookmark_,
3038 const std::string& options_,
3039 const std::string& subId_,
3040 bool isHASubscribe_ =
true)
3042 isHASubscribe_ &= (bool)_subscriptionManager;
3043 Lock<Mutex> l(_lock);
3047 std::string subId(subId_);
3050 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3052 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
3062 unsigned ackTypes = Message::AckType::Processed;
3064 if (!bookmark_.empty() && _bookmarkStore.isValid())
3066 ackTypes |= Message::AckType::Persisted;
3070 if (filter_.length())
3074 if (bookmark_.length())
3084 if (_bookmarkStore.isValid())
3089 _bookmarkStore.
log(_message);
3090 _bookmarkStore.
discard(_message);
3096 if (options_.length())
3105 Unlock<Mutex> u(_lock);
3106 _subscriptionManager->subscribe(messageHandler_, message,
3107 Message::AckType::None);
3108 if (_badTimeToHASubscribe)
3116 Message::AckType::None, ackTypes,
true);
3119 if (!options_.empty())
3125 syncAckProcessing(timeout_, message, isHASubscribe_);
3127 catch (
const DisconnectedException&)
3129 if (!isHASubscribe_)
3131 _routes.removeRoute(subIdField);
3136 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3140 catch (
const TimedOutException&)
3142 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3150 Unlock<Mutex> unlock(_lock);
3151 _subscriptionManager->unsubscribe(subIdField);
3153 _routes.removeRoute(subIdField);
3159 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
3160 const std::string& topic_,
3162 const std::string& filter_,
3163 const std::string& bookmark_,
3164 const std::string& options_,
3165 const std::string& subId_ =
"",
3166 bool isHASubscribe_ =
true)
3168 isHASubscribe_ &= (bool)_subscriptionManager;
3169 Lock<Mutex> l(_lock);
3173 std::string subId(subId_);
3183 unsigned ackTypes = Message::AckType::Processed;
3185 if (!bookmark_.empty() && _bookmarkStore.isValid())
3187 ackTypes |= Message::AckType::Persisted;
3190 if (filter_.length())
3194 if (bookmark_.length())
3204 if (_bookmarkStore.isValid())
3209 _bookmarkStore.
log(_message);
3210 _bookmarkStore.
discard(_message);
3216 if (options_.length())
3224 Unlock<Mutex> u(_lock);
3225 _subscriptionManager->subscribe(messageHandler_, message,
3226 Message::AckType::None);
3227 if (_badTimeToHASubscribe)
3235 Message::AckType::None, ackTypes,
true);
3238 if (!options_.empty())
3244 syncAckProcessing(timeout_, message, isHASubscribe_);
3246 catch (
const DisconnectedException&)
3248 if (!isHASubscribe_)
3250 _routes.removeRoute(subIdField);
3254 catch (
const TimedOutException&)
3256 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3264 Unlock<Mutex> unlock(_lock);
3265 _subscriptionManager->unsubscribe(subIdField);
3267 _routes.removeRoute(subIdField);
3273 void unsubscribe(
const std::string&
id)
3275 Lock<Mutex> l(_lock);
3276 unsubscribeInternal(
id);
3279 void unsubscribe(
void)
3281 if (_subscriptionManager)
3283 _subscriptionManager->clear();
3286 _routes.unsubscribeAll();
3287 Lock<Mutex> l(_lock);
3292 _sendWithoutRetry(_message);
3294 deferredExecution(&s_noOpFn, NULL);
3298 const std::string& topic_,
3299 const std::string& filter_ =
"",
3300 const std::string& orderBy_ =
"",
3301 const std::string& bookmark_ =
"",
3302 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3303 int topN_ = AMPS_DEFAULT_TOP_N,
3304 const std::string& options_ =
"",
3305 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3307 Lock<Mutex> l(_lock);
3314 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3317 if (filter_.length())
3321 if (orderBy_.length())
3325 if (bookmark_.length())
3330 if (topN_ != AMPS_DEFAULT_TOP_N)
3334 if (options_.length())
3339 _routes.addRoute(_message.
getQueryID(), messageHandler_,
3340 Message::AckType::None, ackTypes,
false);
3344 syncAckProcessing(timeout_, _message);
3348 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3356 const std::string& topic_,
3358 const std::string& filter_ =
"",
3359 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3360 int topN_ = AMPS_DEFAULT_TOP_N)
3363 return sow(messageHandler_,
3374 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3375 const std::string& topic_,
3376 const std::string& filter_ =
"",
3377 const std::string& orderBy_ =
"",
3378 const std::string& bookmark_ =
"",
3379 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3380 int topN_ = AMPS_DEFAULT_TOP_N,
3381 const std::string& options_ =
"",
3382 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3383 bool isHASubscribe_ =
true)
3385 isHASubscribe_ &= (bool)_subscriptionManager;
3386 unsigned ackTypes = Message::AckType::Processed;
3387 Lock<Mutex> l(_lock);
3392 std::string subId = cid;
3394 if (filter_.length())
3398 if (orderBy_.length())
3402 if (bookmark_.length())
3406 if (_bookmarkStore.isValid())
3408 ackTypes |= Message::AckType::Persisted;
3416 _bookmarkStore.
log(_message);
3417 if (!BookmarkRange::isRange(bookmark))
3419 _bookmarkStore.
discard(_message);
3431 if (topN_ != AMPS_DEFAULT_TOP_N)
3435 if (options_.length())
3444 Unlock<Mutex> u(_lock);
3445 _subscriptionManager->subscribe(messageHandler_, message,
3446 Message::AckType::None);
3447 if (_badTimeToHASubscribe)
3452 _routes.addRoute(cid, messageHandler_,
3453 Message::AckType::None, ackTypes,
true);
3455 if (!options_.empty())
3461 syncAckProcessing(timeout_, message, isHASubscribe_);
3463 catch (
const DisconnectedException&)
3465 if (!isHASubscribe_)
3467 _routes.removeRoute(subId);
3471 catch (
const TimedOutException&)
3473 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3481 Unlock<Mutex> unlock(_lock);
3482 _subscriptionManager->unsubscribe(cid);
3484 _routes.removeRoute(subId);
3490 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3491 const std::string& topic_,
3493 const std::string& filter_ =
"",
3494 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3495 bool oofEnabled_ =
false,
3496 int topN_ = AMPS_DEFAULT_TOP_N,
3497 bool isHASubscribe_ =
true)
3500 return sowAndSubscribe(messageHandler_,
3507 (oofEnabled_ ?
"oof" :
""),
3512 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3513 const std::string& topic_,
3514 const std::string& filter_ =
"",
3515 const std::string& orderBy_ =
"",
3516 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3517 int topN_ = AMPS_DEFAULT_TOP_N,
3518 const std::string& options_ =
"",
3519 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3520 bool isHASubscribe_ =
true)
3522 isHASubscribe_ &= (bool)_subscriptionManager;
3523 Lock<Mutex> l(_lock);
3531 if (filter_.length())
3535 if (orderBy_.length())
3540 if (topN_ != AMPS_DEFAULT_TOP_N)
3544 if (options_.length())
3552 Unlock<Mutex> u(_lock);
3553 _subscriptionManager->subscribe(messageHandler_, message,
3554 Message::AckType::None);
3555 if (_badTimeToHASubscribe)
3560 _routes.addRoute(message.
getQueryID(), messageHandler_,
3561 Message::AckType::None, Message::AckType::Processed,
true);
3563 if (!options_.empty())
3569 syncAckProcessing(timeout_, message, isHASubscribe_);
3571 catch (
const DisconnectedException&)
3573 if (!isHASubscribe_)
3575 _routes.removeRoute(subId);
3579 catch (
const TimedOutException&)
3581 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3589 Unlock<Mutex> unlock(_lock);
3590 _subscriptionManager->unsubscribe(
Field(subId));
3592 _routes.removeRoute(subId);
3598 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3599 const std::string& topic_,
3601 const std::string& filter_ =
"",
3602 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3603 bool oofEnabled_ =
false,
3604 bool sendEmpties_ =
false,
3605 int topN_ = AMPS_DEFAULT_TOP_N,
3606 bool isHASubscribe_ =
true)
3614 if (sendEmpties_ ==
false)
3618 return sowAndDeltaSubscribe(messageHandler_,
3630 const std::string& topic_,
3631 const std::string& filter_,
3637 unsigned ackType = Message::AckType::Processed |
3638 Message::AckType::Stats |
3639 Message::AckType::Persisted;
3640 if (!publishStoreMessage)
3642 publishStoreMessage =
new Message();
3643 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3645 publishStoreMessage->reset();
3646 if (commandId_.
empty())
3648 publishStoreMessage->newCommandId();
3649 commandId_ = publishStoreMessage->getCommandId();
3653 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3655 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3656 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3657 .assignQueryID(commandId_.
data(), commandId_.
len())
3658 .setAckTypeEnum(ackType)
3659 .assignTopic(topic_.c_str(), topic_.length())
3660 .assignFilter(filter_.c_str(), filter_.length());
3661 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3662 char buf[AMPS_NUMBER_BUFFER_LEN];
3663 size_t pos = convertToCharArray(buf, haSequenceNumber);
3664 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3668 Lock<Mutex> l(_lock);
3669 _routes.addRoute(commandId_, messageHandler_,
3670 Message::AckType::Stats,
3671 Message::AckType::Processed | Message::AckType::Persisted,
3673 syncAckProcessing(timeout_, *publishStoreMessage,
3676 catch (
const DisconnectedException&)
3683 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3687 return (std::string)commandId_;
3691 Lock<Mutex> l(_lock);
3693 if (commandId_.
empty())
3704 .assignQueryID(commandId_.
data(), commandId_.
len())
3705 .setAckTypeEnum(Message::AckType::Processed |
3706 Message::AckType::Stats)
3708 .assignFilter(filter_.c_str(), filter_.length());
3709 _routes.addRoute(commandId_, messageHandler_,
3710 Message::AckType::Stats,
3711 Message::AckType::Processed,
3715 syncAckProcessing(timeout_, _message);
3719 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3722 return (std::string)commandId_;
3726 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3727 const std::string& topic_,
3728 const std::string& data_,
3734 unsigned ackType = Message::AckType::Processed |
3735 Message::AckType::Stats |
3736 Message::AckType::Persisted;
3737 if (!publishStoreMessage)
3739 publishStoreMessage =
new Message();
3740 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3742 publishStoreMessage->reset();
3743 if (commandId_.
empty())
3745 publishStoreMessage->newCommandId();
3746 commandId_ = publishStoreMessage->getCommandId();
3750 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3752 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3753 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3754 .assignQueryID(commandId_.
data(), commandId_.
len())
3755 .setAckTypeEnum(ackType)
3756 .assignTopic(topic_.c_str(), topic_.length())
3757 .assignData(data_.c_str(), data_.length());
3758 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3759 char buf[AMPS_NUMBER_BUFFER_LEN];
3760 size_t pos = convertToCharArray(buf, haSequenceNumber);
3761 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3765 Lock<Mutex> l(_lock);
3766 _routes.addRoute(commandId_, messageHandler_,
3767 Message::AckType::Stats,
3768 Message::AckType::Processed | Message::AckType::Persisted,
3770 syncAckProcessing(timeout_, *publishStoreMessage,
3773 catch (
const DisconnectedException&)
3780 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3784 return (std::string)commandId_;
3788 Lock<Mutex> l(_lock);
3790 if (commandId_.
empty())
3801 .assignQueryID(commandId_.
data(), commandId_.
len())
3802 .setAckTypeEnum(Message::AckType::Processed |
3803 Message::AckType::Stats)
3805 .assignData(data_.c_str(), data_.length());
3806 _routes.addRoute(commandId_, messageHandler_,
3807 Message::AckType::Stats,
3808 Message::AckType::Processed,
3812 syncAckProcessing(timeout_, _message);
3816 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3819 return (std::string)commandId_;
3823 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
3824 const std::string& topic_,
3825 const std::string& keys_,
3831 unsigned ackType = Message::AckType::Processed |
3832 Message::AckType::Stats |
3833 Message::AckType::Persisted;
3834 if (!publishStoreMessage)
3836 publishStoreMessage =
new Message();
3837 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3839 publishStoreMessage->reset();
3840 if (commandId_.
empty())
3842 publishStoreMessage->newCommandId();
3843 commandId_ = publishStoreMessage->getCommandId();
3847 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3849 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3850 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3851 .assignQueryID(commandId_.
data(), commandId_.
len())
3852 .setAckTypeEnum(ackType)
3853 .assignTopic(topic_.c_str(), topic_.length())
3854 .assignSowKeys(keys_.c_str(), keys_.length());
3855 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3856 char buf[AMPS_NUMBER_BUFFER_LEN];
3857 size_t pos = convertToCharArray(buf, haSequenceNumber);
3858 publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3862 Lock<Mutex> l(_lock);
3863 _routes.addRoute(commandId_, messageHandler_,
3864 Message::AckType::Stats,
3865 Message::AckType::Processed | Message::AckType::Persisted,
3867 syncAckProcessing(timeout_, *publishStoreMessage,
3870 catch (
const DisconnectedException&)
3877 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3881 return (std::string)commandId_;
3885 Lock<Mutex> l(_lock);
3887 if (commandId_.
empty())
3898 .assignQueryID(commandId_.
data(), commandId_.
len())
3899 .setAckTypeEnum(Message::AckType::Processed |
3900 Message::AckType::Stats)
3902 .assignSowKeys(keys_.c_str(), keys_.length());
3903 _routes.addRoute(commandId_, messageHandler_,
3904 Message::AckType::Stats,
3905 Message::AckType::Processed,
3909 syncAckProcessing(timeout_, _message);
3913 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3916 return (std::string)commandId_;
3920 void startTimer(
void)
3922 if (_serverVersion >=
"5.3.2.0")
3924 throw CommandException(
"The start_timer command is deprecated.");
3926 Lock<Mutex> l(_lock);
3935 if (_serverVersion >=
"5.3.2.0")
3937 throw CommandException(
"The stop_timer command is deprecated.");
3939 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
3954 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
3956 _pExceptionListener = pListener_;
3957 _exceptionListener = _pExceptionListener.get();
3962 _exceptionListener = &listener_;
3967 return *_exceptionListener;
3970 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
3972 if (readTimeout_ < heartbeatInterval_)
3974 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
3976 Lock<Mutex> l(_lock);
3977 if (_heartbeatInterval != heartbeatInterval_ ||
3978 _readTimeout != readTimeout_)
3980 _heartbeatInterval = heartbeatInterval_;
3981 _readTimeout = readTimeout_;
3986 void _sendHeartbeat(
void)
3988 if (_connected && _heartbeatInterval != 0)
3990 std::ostringstream options;
3991 options <<
"start," << _heartbeatInterval;
3996 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3997 _heartbeatTimer.start();
4000 _sendWithoutRetry(startMessage);
4001 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4003 catch (ConnectionException& ex_)
4007 AMPS_UNHANDLED_EXCEPTION(ex_);
4011 if (_readTimeout && _connected)
4017 AMPSException::throwFor(_client, result);
4023 Lock<Mutex> lock(_lock);
4024 _connectionStateListeners.insert(listener_);
4029 Lock<Mutex> lock(_lock);
4030 _connectionStateListeners.erase(listener_);
4033 void clearConnectionStateListeners()
4035 Lock<Mutex> lock(_lock);
4036 _connectionStateListeners.clear();
4041 unsigned systemAddedAcks_,
bool isSubscribe_)
4043 Message message = command_.getMessage();
4048 bool added = qid.
len() || subid.
len() || cid_.
len();
4050 if (subid.
len() > 0)
4054 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4055 systemAddedAcks_, isSubscribe_);
4057 if (qid.
len() > 0 && qid != subid)
4059 while (_routes.hasRoute(qid))
4068 if (addedCount == 0)
4070 _routes.addRoute(qid, handler_, requestedAcks_,
4071 systemAddedAcks_, isSubscribe_);
4077 Unlock<Mutex> u(_lock);
4078 data = amps_invoke_copy_route_function(handler_.userData());
4082 _routes.addRoute(qid, handler_, requestedAcks_,
4083 systemAddedAcks_,
false);
4087 _routes.addRoute(qid,
4091 systemAddedAcks_,
false);
4096 if (cid_.
len() > 0 && cid_ != qid && cid_ != subid
4097 && requestedAcks_ & ~
Message::AckType::Persisted)
4099 while (_routes.hasRoute(cid_))
4103 if (addedCount == 0)
4105 _routes.addRoute(cid_, handler_, requestedAcks_,
4106 systemAddedAcks_,
false);
4112 Unlock<Mutex> u(_lock);
4113 data = amps_invoke_copy_route_function(handler_.userData());
4117 _routes.addRoute(cid_, handler_, requestedAcks_,
4118 systemAddedAcks_,
false);
4122 _routes.addRoute(cid_,
4126 systemAddedAcks_,
false);
4130 else if (commandType == Message::Command::Publish ||
4131 commandType == Message::Command::DeltaPublish)
4134 _routes.addRoute(cid_, handler_, requestedAcks_,
4135 systemAddedAcks_,
false);
4140 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
4145 bool isHASubscribe_ =
true)
4147 isHASubscribe_ &= (bool)_subscriptionManager;
4148 Message& message = command_.getMessage();
4149 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4150 Message::AckType::Processed : Message::AckType::None;
4152 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
4154 if (commandType == Message::Command::SOW
4155 || commandType == Message::Command::SOWAndSubscribe
4156 || commandType == Message::Command::SOWAndDeltaSubscribe
4157 || commandType == Message::Command::StopTimer)
4159 systemAddedAcks |= Message::AckType::Completed;
4162 if (handler_.isValid() && cid.
empty())
4168 if (command_.isSubscribe())
4171 if (_bookmarkStore.isValid())
4173 systemAddedAcks |= Message::AckType::Persisted;
4181 _bookmarkStore.
log(message);
4182 if (!BookmarkRange::isRange(bookmark))
4184 _bookmarkStore.
discard(message);
4198 systemAddedAcks |= Message::AckType::Persisted;
4200 bool isSubscribe = command_.isSubscribe();
4201 if (handler_.isValid() && !isSubscribe)
4203 _registerHandler(command_, cid, handler_,
4204 requestedAcks, systemAddedAcks, isSubscribe);
4206 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
4209 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4212 Unlock<Mutex> u(_lock);
4213 haSequenceNumber = _publishStore.
store(message);
4220 syncAckProcessing((
long)command_.getTimeout(), message,
4225 _send(message, haSequenceNumber);
4228 catch (
const DisconnectedException&)
4235 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4246 Unlock<Mutex> u(_lock);
4247 _subscriptionManager->subscribe(handler_,
4250 if (_badTimeToHASubscribe)
4253 return std::string(subId.
data(), subId.
len());
4256 if (handler_.isValid())
4258 _registerHandler(command_, cid, handler_,
4259 requestedAcks, systemAddedAcks, isSubscribe);
4266 syncAckProcessing((
long)command_.getTimeout(), message,
4274 catch (
const DisconnectedException&)
4276 if (!isHASubscribe_)
4278 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4279 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4280 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4285 catch (
const TimedOutException&)
4287 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4288 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4289 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
4297 Unlock<Mutex> unlock(_lock);
4298 _subscriptionManager->unsubscribe(subId);
4304 _routes.removeRoute(cid);
4305 _routes.removeRoute(subId);
4308 if (subId.
len() > 0)
4311 return std::string(subId.
data(), subId.
len());
4321 syncAckProcessing((
long)(command_.getTimeout()), message);
4328 catch (
const DisconnectedException&)
4330 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4331 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4337 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4338 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
4351 bool isHASubscribe_ =
true)
4353 Lock<Mutex> lock(_lock);
4354 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4358 void setAutoAck(
bool isAutoAckEnabled_)
4360 _isAutoAckEnabled = isAutoAckEnabled_;
4362 bool getAutoAck(
void)
const 4364 return _isAutoAckEnabled;
4366 void setAckBatchSize(
const unsigned batchSize_)
4368 _ackBatchSize = batchSize_;
4369 if (!_queueAckTimeout)
4371 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4375 unsigned getAckBatchSize(
void)
const 4377 return _ackBatchSize;
4379 int getAckTimeout(
void)
const 4381 return _queueAckTimeout;
4383 void setAckTimeout(
const int ackTimeout_)
4386 _queueAckTimeout = ackTimeout_;
4388 size_t _ack(QueueBookmarks& queueBookmarks_)
4390 if (queueBookmarks_._bookmarkCount)
4392 if (!publishStoreMessage)
4394 publishStoreMessage =
new Message();
4395 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4397 publishStoreMessage->reset();
4398 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4399 .setTopic(queueBookmarks_._topic)
4400 .setBookmark(queueBookmarks_._data)
4401 .setCommandId(
"AMPS-queue-ack");
4402 amps_uint64_t haSequenceNumber = 0;
4405 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
4406 publishStoreMessage->setAckType(
"persisted")
4407 .setSequence(haSequenceNumber);
4408 queueBookmarks_._data.erase();
4409 queueBookmarks_._bookmarkCount = 0;
4411 _send(*publishStoreMessage, haSequenceNumber);
4414 queueBookmarks_._data.erase();
4415 queueBookmarks_._bookmarkCount = 0;
4421 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4423 if (_isAutoAckEnabled)
4427 _ack(topic_, bookmark_, options_);
4429 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
4431 if (bookmark_.
len() == 0)
4435 Lock<Mutex> lock(_lock);
4436 if (_ackBatchSize < 2 || options_ != NULL)
4438 if (!publishStoreMessage)
4440 publishStoreMessage =
new Message();
4441 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4443 publishStoreMessage->reset();
4444 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4445 .setCommandId(
"AMPS-queue-ack")
4446 .setTopic(topic_).setBookmark(bookmark_);
4449 publishStoreMessage->setOptions(options_);
4451 amps_uint64_t haSequenceNumber = 0;
4454 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
4455 publishStoreMessage->setAckType(
"persisted")
4456 .setSequence(haSequenceNumber);
4458 _send(*publishStoreMessage, haSequenceNumber);
4462 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(), topic_.
len());
4463 TopicHashMap::iterator it = _topicHashMap.find(hash);
4464 if (it == _topicHashMap.end())
4467 #ifdef AMPS_USE_EMPLACE 4468 it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4470 it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4473 QueueBookmarks& queueBookmarks = it->second;
4474 if (queueBookmarks._data.length())
4476 queueBookmarks._data.append(
",");
4480 queueBookmarks._oldestTime = amps_now();
4482 queueBookmarks._data.append(bookmark_);
4483 if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4485 _ack(queueBookmarks);
4488 void flushAcks(
void)
4490 size_t sendCount = 0;
4497 Lock<Mutex> lock(_lock);
4498 typedef TopicHashMap::iterator iterator;
4499 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4501 QueueBookmarks& queueBookmarks = it->second;
4502 sendCount += _ack(queueBookmarks);
4505 if (sendCount && _connected)
4507 publishFlush(0, Message::AckType::Processed);
4511 void checkQueueAcks(
void)
4513 if (!_topicHashMap.size())
4517 Lock<Mutex> lock(_lock);
4520 amps_uint64_t threshold = amps_now()
4521 - (amps_uint64_t)_queueAckTimeout;
4522 typedef TopicHashMap::iterator iterator;
4523 for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4525 QueueBookmarks& queueBookmarks = it->second;
4526 if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4528 _ack(queueBookmarks);
4532 catch (std::exception& ex)
4534 AMPS_UNHANDLED_EXCEPTION(ex);
4538 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4540 Lock<Mutex> lock(_deferredExecutionLock);
4541 #ifdef AMPS_USE_EMPLACE 4542 _deferredExecutionList.emplace_back(
4543 DeferredExecutionRequest(func_, userData_));
4545 _deferredExecutionList.push_back(
4546 DeferredExecutionRequest(func_, userData_));
4550 inline void processDeferredExecutions(
void)
4552 if (_deferredExecutionList.size())
4554 Lock<Mutex> lock(_deferredExecutionLock);
4555 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4556 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4557 for (; it != end; ++it)
4561 it->_func(it->_userData);
4569 _deferredExecutionList.clear();
4570 _routes.invalidateCache();
4571 _routeCache.invalidateCache();
4575 bool getRetryOnDisconnect(
void)
const 4577 return _isRetryOnDisconnect;
4580 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4582 _isRetryOnDisconnect = isRetryOnDisconnect_;
4585 void setDefaultMaxDepth(
unsigned maxDepth_)
4587 _defaultMaxDepth = maxDepth_;
4590 unsigned getDefaultMaxDepth(
void)
const 4592 return _defaultMaxDepth;
4684 RefHandle<MessageStreamImpl> _body;
4694 inline void advance(
void);
4701 : _pStream(pStream_)
4706 bool operator==(
const iterator& rhs)
4708 return _pStream == rhs._pStream;
4710 bool operator!=(
const iterator& rhs)
4712 return _pStream != rhs._pStream;
4714 void operator++(
void)
4730 return _body.isValid();
4737 if (!_body.isValid())
4739 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
4771 unsigned getMaxDepth(
void)
const;
4774 unsigned getDepth(
void)
const;
4778 inline void setSOWOnly(
const std::string& commandId_,
4779 const std::string& queryId_ =
"");
4780 inline void setSubscription(
const std::string& subId_,
4781 const std::string& commandId_ =
"",
4782 const std::string& queryId_ =
"");
4783 inline void setStatsOnly(
const std::string& commandId_,
4784 const std::string& queryId_ =
"");
4785 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
4791 friend class Client;
4817 BorrowRefHandle<ClientImpl> _body;
4819 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4820 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4821 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4832 : _body(new ClientImpl(clientName), true)
4835 Client(ClientImpl* existingClient)
4836 : _body(existingClient,
true)
4839 Client(ClientImpl* existingClient,
bool isRef)
4840 : _body(existingClient, isRef)
4843 Client(
const Client& rhs) : _body(rhs._body) {;}
4844 virtual ~Client(
void) {;}
4846 Client& operator=(
const Client& rhs)
4854 return _body.isValid();
4871 _body.get().setName(name);
4878 return _body.get().getName();
4886 return _body.get().getNameHash();
4894 return _body.get().getNameHashValue();
4905 _body.get().setLogonCorrelationData(logonCorrelationData_);
4912 return _body.get().getLogonCorrelationData();
4925 return _body.get().getServerVersion();
4936 return _body.get().getServerVersionInfo();
4950 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4965 return AMPS::convertVersionToNumber(data_, len_);
4972 return _body.get().getURI();
4996 _body.get().connect(uri);
5003 _body.get().disconnect();
5021 _body.get().send(message);
5034 unsigned requestedAcks_,
bool isSubscribe_)
5036 _body.get().addMessageHandler(commandId_, messageHandler_,
5037 requestedAcks_, isSubscribe_);
5045 return _body.get().removeMessageHandler(commandId_);
5073 return _body.get().send(messageHandler, message, timeout);
5087 _body.get().setDisconnectHandler(disconnectHandler);
5095 return _body.get().getDisconnectHandler();
5104 return _body.get().getConnectionInfo();
5117 _body.get().setBookmarkStore(bookmarkStore_);
5125 return _body.
get().getBookmarkStore();
5133 return _body.get().getSubscriptionManager();
5145 _body.get().setSubscriptionManager(subscriptionManager_);
5169 _body.get().setPublishStore(publishStore_);
5177 return _body.
get().getPublishStore();
5185 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5186 duplicateMessageHandler_);
5200 return _body.get().getDuplicateMessageHandler();
5214 _body.get().setFailedWriteHandler(handler_);
5222 return _body.get().getFailedWriteHandler();
5243 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
5245 return _body.get().publish(topic_.c_str(), topic_.length(),
5246 data_.c_str(), data_.length());
5268 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5269 const char* data_,
size_t dataLength_)
5271 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5292 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
5293 unsigned long expiration_)
5295 return _body.get().publish(topic_.c_str(), topic_.length(),
5296 data_.c_str(), data_.length(), expiration_);
5319 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
5320 const char* data_,
size_t dataLength_,
5321 unsigned long expiration_)
5323 return _body.get().publish(topic_, topicLength_,
5324 data_, dataLength_, expiration_);
5365 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
5367 _body.get().publishFlush(timeout_, ackType_);
5386 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
5388 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5389 data_.c_str(), data_.length());
5410 const char* data_,
size_t dataLength_)
5412 return _body.get().deltaPublish(topic_, topicLength_,
5413 data_, dataLength_);
5432 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
5433 unsigned long expiration_)
5435 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5436 data_.c_str(), data_.length(),
5459 const char* data_,
size_t dataLength_,
5460 unsigned long expiration_)
5462 return _body.get().deltaPublish(topic_, topicLength_,
5463 data_, dataLength_, expiration_);
5482 const char* options_ = NULL)
5484 return _body.get().logon(timeout_, authenticator_, options_);
5498 std::string
logon(
const char* options_,
int timeout_ = 0)
5516 std::string
logon(
const std::string& options_,
int timeout_ = 0)
5542 const std::string& topic_,
5544 const std::string& filter_ =
"",
5545 const std::string& options_ =
"",
5546 const std::string& subId_ =
"")
5548 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5549 filter_,
"", options_, subId_);
5568 long timeout_ = 0,
const std::string& filter_ =
"",
5569 const std::string& options_ =
"",
5570 const std::string& subId_ =
"")
5573 if (_body.get().getDefaultMaxDepth())
5575 result.
maxDepth(_body.get().getDefaultMaxDepth());
5577 result.setSubscription(_body.get().subscribe(
5579 topic_, timeout_, filter_,
"",
5580 options_, subId_,
false));
5600 long timeout_ = 0,
const std::string& filter_ =
"",
5601 const std::string& options_ =
"",
5602 const std::string& subId_ =
"")
5605 if (_body.get().getDefaultMaxDepth())
5607 result.
maxDepth(_body.get().getDefaultMaxDepth());
5609 result.setSubscription(_body.get().subscribe(
5611 topic_, timeout_, filter_,
"",
5612 options_, subId_,
false));
5629 const std::string& topic_,
5631 const std::string& filter_ =
"",
5632 const std::string& options_ =
"",
5633 const std::string& subId_ =
"")
5635 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5636 filter_,
"", options_, subId_);
5647 long timeout_,
const std::string& filter_ =
"",
5648 const std::string& options_ =
"",
5649 const std::string& subId_ =
"")
5652 if (_body.get().getDefaultMaxDepth())
5654 result.
maxDepth(_body.get().getDefaultMaxDepth());
5656 result.setSubscription(_body.get().deltaSubscribe(
5658 topic_, timeout_, filter_,
"",
5659 options_, subId_,
false));
5665 long timeout_,
const std::string& filter_ =
"",
5666 const std::string& options_ =
"",
5667 const std::string& subId_ =
"")
5670 if (_body.get().getDefaultMaxDepth())
5672 result.
maxDepth(_body.get().getDefaultMaxDepth());
5674 result.setSubscription(_body.get().deltaSubscribe(
5676 topic_, timeout_, filter_,
"",
5677 options_, subId_,
false));
5707 const std::string& topic_,
5709 const std::string& bookmark_,
5710 const std::string& filter_ =
"",
5711 const std::string& options_ =
"",
5712 const std::string& subId_ =
"")
5714 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5715 filter_, bookmark_, options_, subId_);
5736 const std::string& bookmark_,
5737 const std::string& filter_ =
"",
5738 const std::string& options_ =
"",
5739 const std::string& subId_ =
"")
5742 if (_body.get().getDefaultMaxDepth())
5744 result.
maxDepth(_body.get().getDefaultMaxDepth());
5746 result.setSubscription(_body.get().subscribe(
5748 topic_, timeout_, filter_,
5749 bookmark_, options_,
5757 const std::string& bookmark_,
5758 const std::string& filter_ =
"",
5759 const std::string& options_ =
"",
5760 const std::string& subId_ =
"")
5763 if (_body.get().getDefaultMaxDepth())
5765 result.
maxDepth(_body.get().getDefaultMaxDepth());
5767 result.setSubscription(_body.get().subscribe(
5769 topic_, timeout_, filter_,
5770 bookmark_, options_,
5785 return _body.get().unsubscribe(commandId);
5797 return _body.get().unsubscribe();
5831 const std::string& topic_,
5832 const std::string& filter_ =
"",
5833 const std::string& orderBy_ =
"",
5834 const std::string& bookmark_ =
"",
5835 int batchSize_ = DEFAULT_BATCH_SIZE,
5836 int topN_ = DEFAULT_TOP_N,
5837 const std::string& options_ =
"",
5838 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5840 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5841 bookmark_, batchSize_, topN_, options_,
5869 const std::string& filter_ =
"",
5870 const std::string& orderBy_ =
"",
5871 const std::string& bookmark_ =
"",
5872 int batchSize_ = DEFAULT_BATCH_SIZE,
5873 int topN_ = DEFAULT_TOP_N,
5874 const std::string& options_ =
"",
5875 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5878 if (_body.get().getDefaultMaxDepth())
5880 result.
maxDepth(_body.get().getDefaultMaxDepth());
5882 result.setSOWOnly(sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5888 const std::string& filter_ =
"",
5889 const std::string& orderBy_ =
"",
5890 const std::string& bookmark_ =
"",
5891 int batchSize_ = DEFAULT_BATCH_SIZE,
5892 int topN_ = DEFAULT_TOP_N,
5893 const std::string& options_ =
"",
5894 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5897 if (_body.get().getDefaultMaxDepth())
5899 result.
maxDepth(_body.get().getDefaultMaxDepth());
5901 result.setSOWOnly(sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5927 const std::string& topic_,
5929 const std::string& filter_ =
"",
5930 int batchSize_ = DEFAULT_BATCH_SIZE,
5931 int topN_ = DEFAULT_TOP_N)
5933 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5959 const std::string& topic_,
5961 const std::string& filter_ =
"",
5962 int batchSize_ = DEFAULT_BATCH_SIZE,
5963 bool oofEnabled_ =
false,
5964 int topN_ = DEFAULT_TOP_N)
5966 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5967 filter_, batchSize_, oofEnabled_,
5992 const std::string& filter_ =
"",
5993 int batchSize_ = DEFAULT_BATCH_SIZE,
5994 bool oofEnabled_ =
false,
5995 int topN_ = DEFAULT_TOP_N)
5998 if (_body.get().getDefaultMaxDepth())
6000 result.
maxDepth(_body.get().getDefaultMaxDepth());
6002 result.setSubscription(_body.get().sowAndSubscribe(
6004 topic_, timeout_, filter_,
6005 batchSize_, oofEnabled_,
6030 const std::string& filter_ =
"",
6031 int batchSize_ = DEFAULT_BATCH_SIZE,
6032 bool oofEnabled_ =
false,
6033 int topN_ = DEFAULT_TOP_N)
6036 if (_body.get().getDefaultMaxDepth())
6038 result.
maxDepth(_body.get().getDefaultMaxDepth());
6040 result.setSubscription(_body.get().sowAndSubscribe(
6042 topic_, timeout_, filter_,
6043 batchSize_, oofEnabled_,
6077 const std::string& topic_,
6078 const std::string& filter_ =
"",
6079 const std::string& orderBy_ =
"",
6080 const std::string& bookmark_ =
"",
6081 int batchSize_ = DEFAULT_BATCH_SIZE,
6082 int topN_ = DEFAULT_TOP_N,
6083 const std::string& options_ =
"",
6084 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6086 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6087 orderBy_, bookmark_, batchSize_,
6088 topN_, options_, timeout_);
6116 const std::string& filter_ =
"",
6117 const std::string& orderBy_ =
"",
6118 const std::string& bookmark_ =
"",
6119 int batchSize_ = DEFAULT_BATCH_SIZE,
6120 int topN_ = DEFAULT_TOP_N,
6121 const std::string& options_ =
"",
6122 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6125 if (_body.get().getDefaultMaxDepth())
6127 result.
maxDepth(_body.get().getDefaultMaxDepth());
6129 result.setSubscription(_body.get().sowAndSubscribe(
6131 topic_, filter_, orderBy_,
6132 bookmark_, batchSize_, topN_,
6133 options_, timeout_,
false));
6139 const std::string& filter_ =
"",
6140 const std::string& orderBy_ =
"",
6141 const std::string& bookmark_ =
"",
6142 int batchSize_ = DEFAULT_BATCH_SIZE,
6143 int topN_ = DEFAULT_TOP_N,
6144 const std::string& options_ =
"",
6145 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6148 if (_body.get().getDefaultMaxDepth())
6150 result.
maxDepth(_body.get().getDefaultMaxDepth());
6152 result.setSubscription(_body.get().sowAndSubscribe(
6154 topic_, filter_, orderBy_,
6155 bookmark_, batchSize_, topN_,
6156 options_, timeout_,
false));
6185 const std::string& topic_,
6186 const std::string& filter_ =
"",
6187 const std::string& orderBy_ =
"",
6188 int batchSize_ = DEFAULT_BATCH_SIZE,
6189 int topN_ = DEFAULT_TOP_N,
6190 const std::string& options_ =
"",
6191 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6193 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6194 filter_, orderBy_, batchSize_,
6195 topN_, options_, timeout_);
6218 const std::string& filter_ =
"",
6219 const std::string& orderBy_ =
"",
6220 int batchSize_ = DEFAULT_BATCH_SIZE,
6221 int topN_ = DEFAULT_TOP_N,
6222 const std::string& options_ =
"",
6223 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6226 if (_body.get().getDefaultMaxDepth())
6228 result.
maxDepth(_body.get().getDefaultMaxDepth());
6230 result.setSubscription(sowAndDeltaSubscribe(result.operator
MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
6231 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6233 topic_, filter_, orderBy_,
6234 batchSize_, topN_, options_,
6241 const std::string& filter_ =
"",
6242 const std::string& orderBy_ =
"",
6243 int batchSize_ = DEFAULT_BATCH_SIZE,
6244 int topN_ = DEFAULT_TOP_N,
6245 const std::string& options_ =
"",
6246 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6249 if (_body.get().getDefaultMaxDepth())
6251 result.
maxDepth(_body.get().getDefaultMaxDepth());
6253 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6255 topic_, filter_, orderBy_,
6256 batchSize_, topN_, options_,
6286 const std::string& topic_,
6288 const std::string& filter_ =
"",
6289 int batchSize_ = DEFAULT_BATCH_SIZE,
6290 bool oofEnabled_ =
false,
6291 bool sendEmpties_ =
false,
6292 int topN_ = DEFAULT_TOP_N)
6294 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6295 timeout_, filter_, batchSize_,
6296 oofEnabled_, sendEmpties_,
6323 const std::string& filter_ =
"",
6324 int batchSize_ = DEFAULT_BATCH_SIZE,
6325 bool oofEnabled_ =
false,
6326 bool sendEmpties_ =
false,
6327 int topN_ = DEFAULT_TOP_N)
6330 if (_body.get().getDefaultMaxDepth())
6332 result.
maxDepth(_body.get().getDefaultMaxDepth());
6334 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6336 topic_, timeout_, filter_,
6337 batchSize_, oofEnabled_,
6338 sendEmpties_, topN_,
false));
6364 const std::string& filter_ =
"",
6365 int batchSize_ = DEFAULT_BATCH_SIZE,
6366 bool oofEnabled_ =
false,
6367 bool sendEmpties_ =
false,
6368 int topN_ = DEFAULT_TOP_N)
6371 if (_body.get().getDefaultMaxDepth())
6373 result.
maxDepth(_body.get().getDefaultMaxDepth());
6375 result.setSubscription(_body.get().sowAndDeltaSubscribe(
6377 topic_, timeout_, filter_,
6378 batchSize_, oofEnabled_,
6379 sendEmpties_, topN_,
false));
6402 const std::string& topic,
6403 const std::string& filter,
6406 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6434 stream.setStatsOnly(cid);
6435 _body.get().sowDelete(stream.operator
MessageHandler(), topic, filter, timeout, cid);
6436 return *(stream.
begin());
6438 catch (
const DisconnectedException&)
6440 removeMessageHandler(cid);
6451 _body.get().startTimer();
6462 return _body.get().stopTimer(messageHandler);
6487 const std::string& topic_,
6488 const std::string& keys_,
6491 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6523 stream.setStatsOnly(cid);
6524 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(), topic_, keys_, timeout_, cid);
6525 return *(stream.
begin());
6527 catch (
const DisconnectedException&)
6529 removeMessageHandler(cid);
6549 const std::string& topic_,
const std::string& data_,
6552 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6579 stream.setStatsOnly(cid);
6580 _body.get().sowDeleteByData(stream.operator
MessageHandler(), topic_, data_, timeout_, cid);
6581 return *(stream.
begin());
6583 catch (
const DisconnectedException&)
6585 removeMessageHandler(cid);
6595 return _body.get().getHandle();
6608 _body.get().setExceptionListener(pListener_);
6621 _body.get().setExceptionListener(listener_);
6628 return _body.get().getExceptionListener();
6654 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6678 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6684 setLastChanceMessageHandler(messageHandler);
6691 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6717 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6742 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6825 _body.get().addConnectionStateListener(listener);
6833 _body.get().removeConnectionStateListener(listener);
6840 _body.get().clearConnectionStateListeners();
6870 return _body.get().executeAsync(command_, handler_);
6908 if (command_.isSubscribe())
6910 Message& message = command_.getMessage();
6913 if (useExistingHandler)
6916 if (_body.get()._routes.getRoute(subId, existingHandler))
6919 _body.get().executeAsync(command_, existingHandler,
false);
6924 id = _body.get().executeAsync(command_, handler_,
false);
6926 catch (
const DisconnectedException&)
6928 removeMessageHandler(command_.getMessage().
getCommandId());
6929 if (command_.isSubscribe())
6933 if (command_.isSow())
6935 removeMessageHandler(command_.getMessage().
getQueryID());
6966 _body.get().ack(topic_, bookmark_, options_);
6988 void ack(
const std::string& topic_,
const std::string& bookmark_,
6989 const char* options_ = NULL)
6991 _body.get().ack(
Field(topic_.data(), topic_.length()),
Field(bookmark_.data(), bookmark_.length()), options_);
6999 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
7001 _body.get()._ack(topic_, bookmark_, options_);
7014 _body.get().flushAcks();
7023 return _body.get().getAutoAck();
7033 _body.get().setAutoAck(isAutoAckEnabled_);
7041 return _body.get().getAckBatchSize();
7051 _body.get().setAckBatchSize(ackBatchSize_);
7062 return _body.get().getAckTimeout();
7072 _body.get().setAckTimeout(ackTimeout_);
7086 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7095 return _body.get().getRetryOnDisconnect();
7104 _body.get().setDefaultMaxDepth(maxDepth_);
7113 return _body.get().getDefaultMaxDepth();
7125 return _body.get().setTransportFilterFunction(filter_, userData_);
7139 return _body.get().setThreadCreatedCallback(callback_, userData_);
7147 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
7149 _body.get().deferredExecution(func_, userData_);
7159 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7165 unsigned deliveries = 0;
7177 const char* data = NULL;
7179 const char* status = NULL;
7180 size_t statusLen = 0;
7182 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7185 if (len == NotEntitled || len == Duplicate ||
7186 (statusLen == Failure && status[0] ==
'f'))
7188 if (_failedWriteHandler)
7190 if (_publishStore.isValid())
7192 amps_uint64_t sequence =
7194 FailedWriteStoreReplayer replayer(
this, data, len);
7195 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7196 replayer, sequence));
7202 AMPS_CALL_EXCEPTION_WRAPPER(
7203 _failedWriteHandler->failedWrite(emptyMessage,
7209 if (_publishStore.isValid())
7218 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7222 if (!deliveries && _bookmarkStore.isValid())
7229 const char* bookmarkData = NULL;
7230 size_t bookmarkLen = 0;
7236 if (bookmarkLen > 0 && _routes.hasRoute(subId))
7239 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
7244 catch (std::exception& ex)
7246 AMPS_UNHANDLED_EXCEPTION(ex);
7252 ClientImpl::processedAck(
Message& message)
7254 unsigned deliveries = 0;
7256 const char* data = NULL;
7260 Lock<Mutex> l(_lock);
7263 Lock<Mutex> guard(_ackMapLock);
7264 AckMap::iterator i = _ackMap.find(std::string(data, len));
7265 if (i != _ackMap.end())
7275 ack.setStatus(data, len);
7277 ack.setReason(data, len);
7279 ack.setUsername(data, len);
7281 ack.setPassword(data, len);
7283 ack.setServerVersion(data, len);
7285 ack.setOptions(data, len);
7288 ack.setResponded(
true);
7295 ClientImpl::checkAndSendHeartbeat(
bool force)
7297 if (force || _heartbeatTimer.check())
7299 _heartbeatTimer.start();
7302 sendWithoutRetry(_beatMessage);
7304 catch (
const AMPSException&)
7311 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 7313 ConnectionInfo info;
7314 std::ostringstream writer;
7316 info[
"client.uri"] = _lastUri;
7317 info[
"client.name"] = _name;
7318 info[
"client.username"] = _username;
7319 if (_publishStore.isValid())
7321 writer << _publishStore.unpersistedCount();
7322 info[
"publishStore.unpersistedCount"] = writer.str();
7331 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
7333 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7334 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7335 ClientImpl* me = (ClientImpl*) userData_;
7336 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7337 if (!messageHandle_)
7339 if (me->_queueAckTimeout)
7341 me->checkQueueAcks();
7346 me->_readMessage.replace(messageHandle_);
7347 Message& message = me->_readMessage;
7349 if (commandType & SOWMask)
7351 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7355 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7356 me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7358 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7361 else if (commandType & PublishMask)
7363 #if 0 // Not currently implemented, to avoid an extra branch in delivery 7364 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7365 me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7366 GlobalCommandTypeHandlers::Publish :
7367 GlobalCommandTypeHandlers::OOF)].invoke(message));
7369 const char* subIds = NULL;
7370 size_t subIdsLen = 0;
7373 &subIds, &subIdsLen);
7374 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
7375 for (
size_t i = 0; i < subIdCount; ++i)
7377 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7379 if (handler.isValid())
7382 AMPS_SubscriptionId,
7383 subIds + lookupResult.idOffset,
7384 lookupResult.idLength);
7387 bool isAutoAck = me->_isAutoAckEnabled;
7389 if (!isMessageQueue && !bookmark.
empty() &&
7390 me->_bookmarkStore.isValid())
7392 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7395 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7397 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7402 me->_bookmarkStore.log(me->_readMessage);
7403 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7404 handler.invoke(message));
7409 if (isMessageQueue && isAutoAck)
7413 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7414 if (!message.getIgnoreAutoAck())
7416 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7420 catch (std::exception& ex)
7422 if (!message.getIgnoreAutoAck())
7424 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7427 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7432 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7433 handler.invoke(message));
7439 me->lastChance(message);
7443 else if (commandType == Message::Command::Ack)
7445 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7446 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7448 unsigned deliveries = 0U;
7451 case Message::AckType::Persisted:
7452 deliveries += me->persistedAck(message);
7454 case Message::AckType::Processed:
7455 deliveries += me->processedAck(message);
7458 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7459 if (deliveries == 0)
7461 me->lastChance(message);
7464 else if (commandType == Message::Command::Heartbeat)
7466 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7467 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7468 if (me->_heartbeatTimer.getTimeout() != 0.0)
7470 me->checkAndSendHeartbeat(
true);
7474 me->lastChance(message);
7480 unsigned deliveries = 0U;
7483 while (me->_connected)
7487 deliveries = me->_routes.deliverData(message, message.
getCommandId());
7491 catch (MessageStreamFullException&)
7493 catch (MessageStreamFullException& ex_)
7496 me->checkAndSendHeartbeat(
false);
7500 catch (std::exception& ex_)
7504 me->_exceptionListener->exceptionThrown(ex_);
7511 if (deliveries == 0)
7513 me->lastChance(message);
7516 me->checkAndSendHeartbeat();
7521 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
7523 ClientImpl* me = (ClientImpl*) userData;
7526 me->clearAcks(failedConnectionVersion);
7530 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
7532 ClientImpl* me = (ClientImpl*) userData;
7533 Lock<Mutex> l(me->_lock);
7534 Client wrapper(me,
false);
7537 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7541 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7544 AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
7545 me->_connected =
false;
7549 Unlock<Mutex> unlock(me->_lock);
7550 me->_disconnectHandler.invoke(wrapper);
7553 catch (
const std::exception& ex)
7555 AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7557 me->_lock.signalAll();
7559 if (!me->_connected)
7561 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7562 AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException(
"Reconnect failed."));
7568 if (me->_subscriptionManager)
7573 Unlock<Mutex> unlock(me->_lock);
7574 me->_subscriptionManager->resubscribe(wrapper);
7576 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7580 catch (
const AMPSException& subEx)
7582 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7584 catch (
const std::exception& subEx)
7586 AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7609 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
7610 : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7612 while (_pos != _len && _data[_pos] == _fieldSep)
7618 typedef void* difference_type;
7619 typedef std::forward_iterator_tag iterator_category;
7620 typedef std::pair<Message::Field, Message::Field> value_type;
7621 typedef value_type* pointer;
7622 typedef value_type& reference;
7623 bool operator==(
const iterator& rhs)
const 7625 return _pos == rhs._pos;
7627 bool operator!=(
const iterator& rhs)
const 7629 return _pos != rhs._pos;
7631 iterator& operator++()
7634 while (_pos != _len && _data[_pos] != _fieldSep)
7639 while (_pos != _len && _data[_pos] == _fieldSep)
7646 value_type operator*()
const 7649 size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7650 for (; i < _len && _data[i] !=
'='; ++i)
7655 result.first.assign(_data + _pos, keyLength);
7657 if (i < _len && _data[i] ==
'=')
7661 for (; i < _len && _data[i] != _fieldSep; ++i)
7666 result.second.assign(_data + valueStart, valueLength);
7672 class reverse_iterator
7679 typedef std::pair<Message::Field, Message::Field> value_type;
7680 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7681 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7686 while (_pos >= _data && *_pos == _fieldSep)
7690 while (_pos > _data && *_pos != _fieldSep)
7697 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7707 bool operator==(
const reverse_iterator& rhs)
const 7709 return _pos == rhs._pos;
7711 bool operator!=(
const reverse_iterator& rhs)
const 7713 return _pos != rhs._pos;
7715 reverse_iterator& operator++()
7726 while (_pos >= _data && *_pos == _fieldSep)
7731 while (_pos > _data && *_pos != _fieldSep)
7735 if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7746 value_type operator*()
const 7749 size_t keyLength = 0, valueStart = 0, valueLength = 0;
7750 size_t i = (size_t)(_pos - _data);
7751 for (; i < _len && _data[i] !=
'='; ++i)
7755 result.first.assign(_pos, keyLength);
7756 if (i < _len && _data[i] ==
'=')
7760 for (; i < _len && _data[i] != _fieldSep; ++i)
7765 result.second.assign(_data + valueStart, valueLength);
7770 : _data(data.
data()), _len(data.
len()),
7771 _fieldSep(fieldSeparator)
7775 FIX(
const char* data,
size_t len,
char fieldSeparator = 1)
7776 : _data(data), _len(len), _fieldSep(fieldSeparator)
7780 iterator begin()
const 7782 return iterator(_data, _len, 0, _fieldSep);
7784 iterator end()
const 7786 return iterator(_data, _len, _len, _fieldSep);
7790 reverse_iterator rbegin()
const 7792 return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
7795 reverse_iterator rend()
const 7797 return reverse_iterator(_data, _len, 0, _fieldSep);
7818 std::stringstream _data;
7835 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
7837 _data << tag <<
'=';
7838 _data.write(value + offset, (std::streamsize)length);
7846 void append(
const T& tag,
const std::string& value)
7848 _data << tag <<
'=' << value << _fs;
7857 operator std::string()
const 7865 _data.str(std::string());
7902 typedef std::map<Message::Field, Message::Field>
map_type;
7913 for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
7925 std::deque<Message> _q;
7926 std::string _commandId;
7928 std::string _queryId;
7932 unsigned _requestedAcks;
7935 volatile enum { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } _state;
7936 typedef std::map<std::string, Message*> SOWKeyMap;
7937 SOWKeyMap _sowKeyMap;
7939 MessageStreamImpl(
const Client& client_)
7942 _maxDepth((
unsigned)~0),
7946 if (_client.isValid())
7952 MessageStreamImpl(ClientImpl* client_)
7955 _maxDepth((
unsigned)~0),
7959 if (_client.isValid())
7965 ~MessageStreamImpl()
7969 virtual void destroy()
7975 catch (std::exception& e)
7979 if (_client.isValid())
7986 if (_client.isValid())
7990 _client = Client((ClientImpl*)NULL);
7991 c.deferredExecution(MessageStreamImpl::destroyer,
this);
7999 static void destroyer(
void* vpMessageStreamImpl_)
8001 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8004 void setSubscription(
const std::string& subId_,
8005 const std::string& commandId_ =
"",
8006 const std::string& queryId_ =
"")
8008 Lock<Mutex> lock(_lock);
8010 if (!commandId_.empty() && commandId_ != subId_)
8012 _commandId = commandId_;
8014 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8016 _queryId = queryId_;
8019 if (Disconnected == _state)
8023 assert(Unset == _state);
8027 void setSOWOnly(
const std::string& commandId_,
8028 const std::string& queryId_ =
"")
8030 Lock<Mutex> lock(_lock);
8031 _commandId = commandId_;
8032 if (!queryId_.empty() && queryId_ != commandId_)
8034 _queryId = queryId_;
8037 if (Disconnected == _state)
8041 assert(Unset == _state);
8045 void setStatsOnly(
const std::string& commandId_,
8046 const std::string& queryId_ =
"")
8048 Lock<Mutex> lock(_lock);
8049 _commandId = commandId_;
8050 if (!queryId_.empty() && queryId_ != commandId_)
8052 _queryId = queryId_;
8055 if (Disconnected == _state)
8059 assert(Unset == _state);
8061 _requestedAcks = Message::AckType::Stats;
8064 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
8066 Lock<Mutex> lock(_lock);
8067 _commandId = commandId_;
8069 if (Disconnected == _state)
8073 assert(Unset == _state);
8075 _requestedAcks = acks_;
8080 Lock<Mutex> lock(_lock);
8081 if (state_ == AMPS::ConnectionStateListener::Disconnected)
8083 _state = Disconnected;
8089 void timeout(
unsigned timeout_)
8091 _timeout = timeout_;
8095 if (_state == Subscribe)
8100 void maxDepth(
unsigned maxDepth_)
8104 _maxDepth = maxDepth_;
8108 _maxDepth = (unsigned)~0;
8111 unsigned getMaxDepth(
void)
const 8115 unsigned getDepth(
void)
const 8117 return (
unsigned)(_q.size());
8122 Lock<Mutex> lock(_lock);
8123 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
8127 if (_client.isValid())
8129 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8133 catch (AMPSException&)
8135 catch (AMPSException& e)
8138 current_.invalidate();
8139 _previousTopic.
clear();
8140 _previousBookmark.
clear();
8143 _previousTopic.
clear();
8144 _previousBookmark.
clear();
8146 double minWaitTime = (double)((_timeout && _timeout > 1000)
8148 Timer timer(minWaitTime);
8150 while (_q.empty() && _state & Running)
8153 _lock.wait((
long)minWaitTime);
8155 Unlock<Mutex> unlck(_lock);
8156 amps_invoke_waiting_function();
8161 if (timer.checkAndGetRemaining(&minWaitTime))
8169 current_ = _q.front();
8170 if (_q.size() == _maxDepth)
8175 if (_state == Conflate)
8177 std::string sowKey = current_.
getSowKey();
8178 if (sowKey.length())
8180 _sowKeyMap.erase(sowKey);
8183 else if (_state == AcksOnly)
8187 if ((_state == AcksOnly && _requestedAcks == 0) ||
8188 (_state == SOWOnly && current_.
getCommand() ==
"group_end"))
8192 else if (current_.
getCommandEnum() == Message::Command::Publish &&
8202 if (_state == Disconnected)
8204 throw DisconnectedException(
"Connection closed.");
8206 current_.invalidate();
8207 if (_state == Closed)
8211 return _timeout != 0;
8215 if (_client.isValid())
8217 if (_state == SOWOnly || _state == Subscribe)
8219 if (!_commandId.empty())
8223 if (!_subId.empty())
8227 if (!_queryId.empty())
8234 if (!_commandId.empty())
8238 if (!_subId.empty())
8242 if (!_queryId.empty())
8248 if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8253 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
8255 Lock<Mutex> lock(this_->_lock);
8256 if (this_->_state != Conflate)
8258 AMPS_TESTING_SLOW_MESSAGE_STREAM
8259 if (this_->_q.size() >= this_->_maxDepth)
8264 this_->_lock.signalAll();
8265 throw MessageStreamFullException(
"Stream is currently full.");
8267 #ifdef AMPS_USE_EMPLACE 8268 this_->_q.emplace_back(message_.
deepCopy());
8270 this_->_q.push_back(message_.
deepCopy());
8273 this_->_client.isValid() && this_->_client.getAutoAck() &&
8277 message_.setIgnoreAutoAck();
8282 std::string sowKey = message_.
getSowKey();
8283 if (sowKey.length())
8285 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8286 if (it != this_->_sowKeyMap.end())
8288 *(it->second) = message_.
deepCopy();
8292 if (this_->_q.size() >= this_->_maxDepth)
8298 this_->_lock.signalAll();
8299 throw MessageStreamFullException(
"Stream is currently full.");
8301 #ifdef AMPS_USE_EMPLACE 8302 this_->_q.emplace_back(message_.
deepCopy());
8304 this_->_q.push_back(message_.
deepCopy());
8306 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8311 while (this_->_q.size() >= this_->_maxDepth)
8313 this_->_lock.wait(1);
8315 #ifdef AMPS_USE_EMPLACE 8316 this_->_q.emplace_back(message_.
deepCopy());
8318 this_->_q.push_back(message_.
deepCopy());
8322 this_->_lock.signalAll();
8325 inline MessageStream::MessageStream(
void)
8328 inline MessageStream::MessageStream(
const Client& client_)
8329 : _body(
new MessageStreamImpl(client_))
8332 inline void MessageStream::iterator::advance(
void)
8334 _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8338 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
8343 if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8345 result._body = (MessageStreamImpl*)(handler_._userData);
8350 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
8351 const std::string& queryId_)
8353 _body->setSOWOnly(commandId_, queryId_);
8355 inline void MessageStream::setSubscription(
const std::string& subId_,
8356 const std::string& commandId_,
8357 const std::string& queryId_)
8359 _body->setSubscription(subId_, commandId_, queryId_);
8361 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
8362 const std::string& queryId_)
8364 _body->setStatsOnly(commandId_, queryId_);
8366 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
8369 _body->setAcksOnly(commandId_, acks_);
8388 return _body->getMaxDepth();
8392 return _body->getDepth();
8395 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
8397 return *(_pEmptyMessageStream.get());
8405 ClientImpl& body = _body.get();
8406 Message& message = command_.getMessage();
8410 if (useExistingHandler)
8416 if (body._routes.getRoute(subId, existingHandler))
8419 body.executeAsync(command_, existingHandler,
false);
8420 return MessageStream::fromExistingHandler(existingHandler);
8429 if ((command & Message::Command::NoDataCommands)
8430 && (ackTypes == Message::AckType::Persisted
8431 || ackTypes == Message::AckType::None))
8434 if (!body._pEmptyMessageStream)
8436 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
8437 body._pEmptyMessageStream.get()->_body->close();
8439 return body.getEmptyMessageStream();
8442 if (body.getDefaultMaxDepth())
8444 stream.
maxDepth(body.getDefaultMaxDepth());
8447 std::string commandID = body.executeAsync(command_, handler,
false);
8448 if (command_.hasStatsAck())
8450 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
8452 else if (command_.isSow())
8454 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
8456 else if (command_.isSubscribe())
8458 stream.setSubscription(commandID,
8465 if (command == Message::Command::Publish ||
8466 command == Message::Command::DeltaPublish ||
8467 command == Message::Command::SOWDelete)
8469 stream.setAcksOnly(commandID,
8470 ackTypes & (
unsigned)~Message::AckType::Persisted);
8474 stream.setAcksOnly(commandID, ackTypes);
8481 inline void Message::ack(
const char* options_)
const 8483 ClientImpl* pClient = _body.get().clientImpl();
8485 if (pClient && bookmark.
len() &&
8486 !pClient->getAutoAck())
8489 pClient->ack(getTopic(), bookmark, options_);
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:615
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:180
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1422
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4831
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1399
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6486
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6460
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:522
std::string getAckType() const
Definition: ampsplusplus.hpp:783
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5043
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1395
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7816
void startTimer()
Definition: ampsplusplus.hpp:6449
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5990
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:927
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8376
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1368
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5071
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1250
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1363
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:517
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1174
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6740
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:761
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7102
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:4892
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4948
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5783
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:633
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1361
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:899
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7111
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6976
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5734
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5167
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5432
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4963
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1252
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:803
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5183
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:548
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6831
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:786
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7060
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4934
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:6513
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1129
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1126
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7853
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7122
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5319
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4735
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1255
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6759
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:739
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6769
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5019
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1370
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5706
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1252
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:982
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7084
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8386
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1399
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4994
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1229
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6184
Success.
Definition: amps.h:206
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1234
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1262
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:873
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5409
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7898
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5830
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:196
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:563
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5032
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4728
amps_result
Return values from amps_xxx functions.
Definition: amps.h:201
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5220
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1076
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1141
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8400
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:627
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1395
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5175
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7136
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:788
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4923
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1264
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:666
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5480
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1059
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:587
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:4884
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6138
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4814
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6715
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:599
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6750
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1248
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:6823
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:532
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:688
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6838
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1368
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5268
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:575
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1399
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1395
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5131
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5541
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:524
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1258
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1248
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6626
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1363
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:890
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1260
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6548
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:641
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1363
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1251
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:127
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1080
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:885
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5646
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6779
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5498
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1249
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1370
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:867
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4903
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1141
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7826
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6988
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:880
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:921
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6028
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1013
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7835
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1141
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1140
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5123
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1097
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7049
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1362
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1397
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1172
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6424
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1371
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5115
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1251
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:593
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6619
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:540
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5143
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6682
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7846
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7039
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5386
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6115
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:719
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4690
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6362
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1089
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:674
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1076
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5365
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6868
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6321
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5516
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5795
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7031
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5664
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1250
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1050
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6285
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:703
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6606
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1154
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5198
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5093
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7890
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1076
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5102
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1290
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6964
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7902
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5085
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1034
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7909
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:233
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6217
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6676
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5599
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4682
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:647
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6801
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:605
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1248
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:653
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6812
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1363
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:653
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1118
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:581
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8381
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1398
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5001
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5868
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8390
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1363
The operation has not succeeded, but ought to be retried.
Definition: amps.h:230
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:4910
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6652
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:694
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4970
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:725
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1068
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6790
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1140
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1106
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6593
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7863
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5212
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6240
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7070
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:569
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1362
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7021
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5243
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4869
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:836
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1110
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1202
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:989
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5887
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5292
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6689
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5458
The client and server are disconnected.
Definition: amps.h:234
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6569
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5628
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8371
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5926
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4876
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:438
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6076
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1139
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5567
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5755
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:552
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6902
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:626
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:6401
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4746
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7093
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7012
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5958