25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 46 #include <sys/atomic.h> 48 #include "BookmarkStore.hpp" 49 #include "MessageRouter.hpp" 51 #include "ampscrc.hpp" 53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 88 #define AMPS_DEFAULT_TOP_N -1 89 #define AMPS_DEFAULT_BATCH_SIZE 10 90 #define AMPS_NUMBER_BUFFER_LEN 20 91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 98 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
106 typedef std::map<std::string, std::string> ConnectionInfo;
108 class PerThreadMessageTracker {
109 std::vector<AMPS::Message*> _messages;
111 PerThreadMessageTracker() {}
112 ~PerThreadMessageTracker()
114 for (
size_t i=0; i<_messages.size(); ++i)
121 _messages.push_back(message);
125 static AMPS::Mutex _lock;
126 AMPS::Lock<Mutex> l(_lock);
127 _addMessageToCleanupList(message);
131 static PerThreadMessageTracker tracker;
132 tracker.addMessage(message);
137 inline std::string asString(Type x_)
139 std::ostringstream os;
145 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
147 size_t pos = AMPS_NUMBER_BUFFER_LEN;
148 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
152 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
161 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
163 size_t pos = AMPS_NUMBER_BUFFER_LEN;
164 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
168 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
182 static const char* duplicate() {
return "duplicate";}
183 static const char* badFilter() {
return "bad filter";}
184 static const char* badRegexTopic() {
return "bad regex topic";}
185 static const char* subscriptionAlreadyExists() {
return "subscription already exists";}
186 static const char* nameInUse() {
return "name in use";}
187 static const char* authFailure() {
return "auth failure";}
188 static const char* notEntitled() {
return "not entitled";}
189 static const char* authDisabled() {
return "authentication disabled";}
190 static const char* subidInUse() {
return "subid in use";}
191 static const char* noTopic() {
return "no topic";}
206 virtual void exceptionThrown(
const std::exception&)
const {;}
212 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 217 catch (std::exception& ex_)\ 221 _exceptionListener->exceptionThrown(ex_);\ 246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 249 while(me->_connected)\ 256 catch(MessageStreamFullException&)\ 258 me->checkAndSendHeartbeat(false);\ 262 catch (std::exception& ex_)\ 266 me->_exceptionListener->exceptionThrown(ex_);\ 290 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 291 while(me->_connected)\ 298 catch(MessageStreamFullException&)\ 300 me->checkAndSendHeartbeat(false);\ 304 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 307 while(me->_connected)\ 314 catch(MessageStreamFullException& ex_)\ 316 me->checkAndSendHeartbeat(false);\ 320 catch (std::exception& ex_)\ 324 me->_exceptionListener->exceptionThrown(ex_);\ 348 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 349 while(me->_connected)\ 356 catch(MessageStreamFullException& ex_)\ 358 me->checkAndSendHeartbeat(false);\ 363 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 366 _exceptionListener->exceptionThrown(ex);\ 371 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 374 me->_exceptionListener->exceptionThrown(ex);\ 413 static const unsigned Subscribe = 1;
414 static const unsigned SOW = 2;
415 static const unsigned NeedsSequenceNumber = 4;
416 static const unsigned ProcessedAck = 8;
417 static const unsigned StatsAck = 16;
418 void init(Message::Command::Type command_)
427 void init(
const std::string& command_)
439 if (!(command & Message::Command::NoDataCommands))
442 if (command == Message::Command::Subscribe ||
443 command == Message::Command::SOWAndSubscribe ||
444 command == Message::Command::DeltaSubscribe ||
445 command == Message::Command::SOWAndDeltaSubscribe)
450 if (command == Message::Command::SOW
451 || command == Message::Command::SOWAndSubscribe
452 || command == Message::Command::SOWAndDeltaSubscribe)
457 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
459 if (command == Message::Command::SOW)
464 _flags |= ProcessedAck;
466 else if (command == Message::Command::SOWDelete)
469 _flags |= ProcessedAck;
470 _flags |= NeedsSequenceNumber;
472 else if (command == Message::Command::Publish
473 || command == Message::Command::DeltaPublish)
475 _flags |= NeedsSequenceNumber;
477 else if (command == Message::Command::StopTimer)
568 std::ostringstream os;
613 if (v_ ==
"processed") _flags |= ProcessedAck;
614 else if (v_ ==
"stats") _flags |= StatsAck;
621 if (v_.find(
"processed") != std::string::npos) _flags |= ProcessedAck;
622 else _flags &= ~ProcessedAck;
623 if (v_.find(
"stats") != std::string::npos) _flags |= StatsAck;
624 else _flags &= ~StatsAck;
631 if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
632 else _flags &= ~ProcessedAck;
633 if (v_ & Message::AckType::Stats) _flags |= StatsAck;
634 else _flags &= ~StatsAck;
648 Message& getMessage(
void) {
return _message; }
649 unsigned getTimeout(
void)
const {
return _timeout; }
650 unsigned getBatchSize(
void)
const {
return _batchSize; }
651 bool isSubscribe(
void)
const 653 return _flags & Subscribe;
655 bool isSow(
void)
const {
return (_flags & SOW) != 0; }
656 bool hasProcessedAck(
void)
const {
return (_flags & ProcessedAck) != 0; }
657 bool hasStatsAck(
void)
const {
return (_flags & StatsAck) != 0; }
658 bool needsSequenceNumber(
void)
const {
return (_flags & NeedsSequenceNumber) != 0; }
663 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
680 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
688 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
695 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
707 std::string
authenticate(
const std::string& ,
const std::string& password_)
714 std::string
retry(
const std::string& ,
const std::string& )
716 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
719 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
740 virtual void execute(
Message& message_) = 0;
755 typedef bool (*PublishStoreResizeHandler)(
Store store_,
764 StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
770 virtual amps_uint64_t store(
const Message& message_) = 0;
776 virtual void discardUpTo(amps_uint64_t index_) = 0;
791 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
797 virtual size_t unpersistedCount()
const = 0;
809 virtual void flush(
long timeout_) = 0;
822 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
827 virtual amps_uint64_t getLastPersisted() = 0;
841 _resizeHandler = handler_;
842 _resizeHandlerData = userData_;
847 return _resizeHandler;
850 bool callResizeHandler(
size_t newSize_);
854 void* _resizeHandlerData;
861 RefHandle<StoreImpl> _body;
865 Store(
const Store& rhs) : _body(rhs._body) {;}
877 return _body.get().store(message_);
886 _body.get().discardUpTo(index_);
895 _body.get().replay(replayer_);
907 return _body.get().replaySingle(replayer_, index_);
916 return _body.get().unpersistedCount();
924 return _body.isValid();
937 return _body.get().flush(timeout_);
945 return _body.get().getLowestUnpersisted();
953 return _body.get().getLastPersisted();
968 _body.get().setResizeHandler(handler_, userData_);
973 return _body.get().getResizeHandler();
1003 virtual void failedWrite(
const Message& message_,
1004 const char* reason_,
size_t reasonLength_) = 0;
1008 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1011 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1024 long* timeoutp = (
long*)data_;
1026 if (count == 0)
return false;
1029 store_.
flush(*timeoutp);
1032 catch (
const TimedOutException&)
1034 catch (
const TimedOutException& e)
1057 unsigned requestedAckTypes_) = 0;
1064 virtual void clear() = 0;
1068 virtual void resubscribe(Client& client_) = 0;
1079 typedef enum { Disconnected = 0,
1083 PublishReplayed = 8,
1084 HeartbeatInitiated = 16,
1098 virtual void connectionStateChanged(
State newState_) = 0;
1103 class MessageStreamImpl;
1106 typedef void(*DeferredExecutionFunc)(
void*);
1108 class ClientImpl :
public RefBody
1110 friend class Client;
1113 DisconnectHandler _disconnectHandler;
1114 enum GlobalCommandTypeHandlers :
size_t 1124 DuplicateMessage = 8,
1127 std::vector<MessageHandler> _globalCommandTypeHandlers;
1128 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1130 MessageRouter::RouteCache _routeCache;
1131 mutable Mutex _lock;
1132 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1134 Store _publishStore;
1135 bool _isRetryOnDisconnect;
1136 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1137 volatile amps_uint64_t _lastSentHaSequenceNumber;
1138 ATOMIC_TYPE_8 _badTimeToHAPublish;
1139 ATOMIC_TYPE_8 _badTimeToHASubscribe;
1140 VersionInfo _serverVersion;
1141 Timer _heartbeatTimer;
1142 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1145 int _queueAckTimeout;
1146 bool _isAutoAckEnabled;
1147 unsigned _ackBatchSize;
1148 unsigned _queuedAckCount;
1149 unsigned _defaultMaxDepth;
1150 struct QueueBookmarks
1152 QueueBookmarks(
const std::string& topic_)
1159 amps_uint64_t _oldestTime;
1160 unsigned _bookmarkCount;
1162 typedef amps_uint64_t topic_hash;
1163 typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1164 TopicHashMap _topicHashMap;
1168 ClientImpl* _client;
1173 ClientStoreReplayer()
1174 : _client(NULL) , _version(0), _res(
AMPS_E_OK)
1177 ClientStoreReplayer(ClientImpl* client_)
1178 : _client(client_) , _version(0), _res(
AMPS_E_OK)
1181 void setClient(ClientImpl* client_) { _client = client_; }
1183 void execute(
Message& message_)
1185 if (!_client)
throw CommandException(
"Can't replay without a client.");
1188 if (index > _client->_lastSentHaSequenceNumber)
1189 _client->_lastSentHaSequenceNumber = index;
1196 (!_client->_badTimeToHAPublish ||
1200 message_.getMessage(),
1204 throw DisconnectedException(
"AMPS Server disconnected during replay");
1210 ClientStoreReplayer _replayer;
1214 ClientImpl* _parent;
1215 const char* _reason;
1216 size_t _reasonLength;
1217 size_t _replayCount;
1219 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1222 _reasonLength(reasonLength_),
1225 void execute(
Message& message_)
1227 if (_parent->_failedWriteHandler)
1230 _parent->_failedWriteHandler->failedWrite(message_,
1231 _reason, _reasonLength);
1234 size_t replayCount(
void)
const {
return _replayCount; }
1237 struct AckResponseImpl :
public RefBody
1239 std::string username, password, reason, status, bookmark, options;
1240 amps_uint64_t sequenceNo;
1241 VersionInfo serverVersion;
1242 volatile bool responded, abandoned;
1243 unsigned connectionVersion;
1246 sequenceNo((amps_uint64_t)0),
1250 connectionVersion(0)
1257 RefHandle<AckResponseImpl> _body;
1259 AckResponse() : _body(NULL) {;}
1260 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1261 static AckResponse create()
1264 r._body =
new AckResponseImpl();
1268 const std::string& username()
1270 return _body.get().username;
1272 void setUsername(
const char* data_,
size_t len_)
1274 if (data_) _body.get().username.assign(data_, len_);
1275 else _body.get().username.clear();
1277 const std::string& password()
1279 return _body.get().password;
1281 void setPassword(
const char* data_,
size_t len_)
1283 if (data_) _body.get().password.assign(data_, len_);
1284 else _body.get().password.clear();
1286 const std::string& reason()
1288 return _body.get().reason;
1290 void setReason(
const char* data_,
size_t len_)
1292 if (data_) _body.get().reason.assign(data_, len_);
1293 else _body.get().reason.clear();
1295 const std::string& status()
1297 return _body.get().status;
1299 void setStatus(
const char* data_,
size_t len_)
1301 if (data_) _body.get().status.assign(data_, len_);
1302 else _body.get().status.clear();
1304 const std::string& bookmark()
1306 return _body.get().bookmark;
1308 void setBookmark(
const char* data_,
size_t len_)
1310 if (data_) _body.get().bookmark.assign(data_, len_);
1311 else _body.get().bookmark.clear();
1313 amps_uint64_t sequenceNo()
const 1315 return _body.get().sequenceNo;
1317 void setSequenceNo(
const char* data_,
size_t len_)
1319 amps_uint64_t result = (amps_uint64_t)0;
1322 for(
size_t i=0; i<len_; ++i)
1324 result *= (amps_uint64_t)10;
1325 result += (amps_uint64_t)(data_[i] -
'0');
1328 _body.get().sequenceNo = result;
1330 VersionInfo serverVersion()
const 1332 return _body.get().serverVersion;
1334 void setServerVersion(
const char* data_,
size_t len_)
1337 _body.get().serverVersion.setVersion(std::string(data_, len_));
1341 return _body.get().responded;
1343 void setResponded(
bool responded_)
1345 _body.get().responded = responded_;
1349 return _body.get().abandoned;
1351 void setAbandoned(
bool abandoned_)
1353 if (_body.isValid())
1354 _body.get().abandoned = abandoned_;
1357 void setConnectionVersion(
unsigned connectionVersion)
1359 _body.get().connectionVersion = connectionVersion;
1362 unsigned getConnectionVersion()
1364 return _body.get().connectionVersion;
1366 void setOptions(
const char* data_,
size_t len_)
1368 if (data_) _body.get().options.assign(data_,len_);
1369 else _body.get().options.clear();
1372 const std::string& options()
1374 return _body.get().options;
1377 AckResponse& operator=(
const AckResponse& rhs)
1385 typedef std::map<std::string, AckResponse> AckMap;
1388 DefaultExceptionListener _defaultExceptionListener;
1391 struct DeferredExecutionRequest
1393 DeferredExecutionRequest(DeferredExecutionFunc func_,
1396 _userData(userData_)
1399 DeferredExecutionFunc _func;
1403 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1404 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1406 std::string _username;
1407 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1408 ConnectionStateListeners _connectionStateListeners;
1409 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1410 Mutex _deferredExecutionLock;
1411 DeferredExecutionList _deferredExecutionList;
1412 unsigned _heartbeatInterval;
1413 unsigned _readTimeout;
1421 if (!_connected && newState_ > ConnectionStateListener::Connected)
1425 for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1427 AMPS_CALL_EXCEPTION_WRAPPER(
1428 (*it)->connectionStateChanged(newState_));
1431 unsigned processedAck(
Message& message);
1432 unsigned persistedAck(
Message& meesage);
1433 void lastChance(
Message& message);
1434 void checkAndSendHeartbeat(
bool force=
false);
1435 virtual ConnectionInfo getConnectionInfo()
const;
1437 ClientImplMessageHandler(
amps_handle message,
void* userData);
1439 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1441 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1443 void unsubscribeInternal(
const std::string&
id)
1445 if (
id.empty())
return;
1448 subId.assign(
id.data(),
id.length());
1449 _routes.removeRoute(subId);
1451 if (_subscriptionManager)
1454 Unlock<Mutex> unlock(_lock);
1455 _subscriptionManager->unsubscribe(subId);
1461 _sendWithoutRetry(_message);
1462 deferredExecution(&s_noOpFn, NULL);
1465 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1466 bool isHASubscribe_)
1468 return syncAckProcessing(timeout_, message_,
1469 (amps_uint64_t)0, isHASubscribe_);
1472 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1473 amps_uint64_t haSeq = (amps_uint64_t)0,
1474 bool isHASubscribe_ =
false)
1477 AckResponse ack = AckResponse::create();
1480 Lock<Mutex> guard(_ackMapLock);
1483 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1484 if (ack.getConnectionVersion() == 0)
1487 throw DisconnectedException(
"Connection closed while waiting for response.");
1489 bool timedOut =
false;
1490 AMPS_START_TIMER(timeout_)
1491 while(!timedOut && !ack.responded() && !ack.abandoned() && _connected)
1495 timedOut = !_lock.wait(timeout_);
1497 if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1503 Unlock<Mutex> unlck(_lock);
1504 amps_invoke_waiting_function();
1507 if (ack.responded())
1509 if (ack.status() !=
"failure")
1513 amps_uint64_t ackSequence = ack.sequenceNo();
1514 if (_lastSentHaSequenceNumber < ackSequence)
1516 _lastSentHaSequenceNumber = ackSequence;
1526 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
1527 _serverVersion = ack.serverVersion();
1528 if (_bookmarkStore.isValid())
1533 const std::string& options = ack.options();
1534 size_t index = options.find_first_of(
"max_backlog=");
1535 if(index != std::string::npos)
1538 const char* c = options.c_str()+index+12;
1539 while(*c && *c!=
',')
1541 data = (data*10) + (
unsigned)(*c++-48);
1543 if(_ackBatchSize > data) _ackBatchSize = data;
1548 const size_t NotEntitled = 12;
1549 std::string ackReason = ack.reason();
1550 if (ackReason.length() == 0)
return ack;
1551 if (ackReason.length() == NotEntitled &&
1552 ackReason[0] ==
'n' &&
1557 message_.throwFor(_client, ackReason);
1561 if (!ack.abandoned())
1563 throw TimedOutException(
"timed out waiting for operation.");
1567 throw DisconnectedException(
"Connection closed while waiting for response.");
1575 if (!_client)
return;
1582 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1583 _pEmptyMessageStream.reset(NULL);
1590 ClientImpl(
const std::string& clientName)
1591 : _client(NULL), _name(clientName)
1592 , _isRetryOnDisconnect(
true)
1593 , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1594 , _badTimeToHASubscribe(0), _serverVersion()
1595 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1596 , _isAutoAckEnabled(
false)
1598 , _queuedAckCount(0)
1599 , _defaultMaxDepth(0)
1601 , _heartbeatInterval(0)
1604 _replayer.setClient(
this);
1609 _exceptionListener = &_defaultExceptionListener;
1610 for (
size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1616 virtual ~ClientImpl()
1621 const std::string& getName()
const 1626 const std::string& getNameHash()
const 1631 void setName(
const std::string& name)
1636 _client, name.c_str());
1639 AMPSException::throwFor(_client, result);
1644 const std::string& getLogonCorrelationData()
const 1646 return _logonCorrelationData;
1649 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
1651 _logonCorrelationData = logonCorrelationData_;
1654 size_t getServerVersion()
const 1656 return _serverVersion.getOldStyleVersion();
1659 VersionInfo getServerVersionInfo()
const 1661 return _serverVersion;
1664 const std::string& getURI()
const 1669 virtual void connect(
const std::string& uri)
1671 Lock<Mutex> l(_lock);
1675 virtual void _connect(
const std::string& uri)
1679 _client, uri.c_str());
1682 AMPSException::throwFor(_client, result);
1689 _readMessage.setClientImpl(
this);
1690 if(_queueAckTimeout)
1695 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1698 void setDisconnected()
1701 Lock<Mutex> l(_lock);
1704 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
1708 _heartbeatTimer.setTimeout(0.0);
1714 virtual void disconnect()
1717 Lock<Mutex> l(_lock);
1722 AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1724 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1726 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
1727 Lock<Mutex> l(_lock);
1728 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1731 void clearAcks(
unsigned failedVersion)
1734 Lock<Mutex> guard(_ackMapLock);
1737 std::vector<std::string> worklist;
1738 for(AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
1740 if (i->second.getConnectionVersion() <= failedVersion)
1742 i->second.setAbandoned(
true);
1743 worklist.push_back(i->first);
1747 for(std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
1756 int send(
const Message& message)
1758 Lock<Mutex> l(_lock);
1759 return _send(message);
1762 void sendWithoutRetry(
const Message& message_)
1764 Lock<Mutex> l(_lock);
1765 _sendWithoutRetry(message_);
1768 void _sendWithoutRetry(
const Message& message_)
1773 AMPSException::throwFor(_client,result);
1777 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1778 bool isHASubscribe_ =
false)
1785 Message localMessage = message;
1786 unsigned version = 0;
1790 if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1794 if(!_isRetryOnDisconnect)
1798 Unlock<Mutex> l(_lock);
1809 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1810 (isHASubscribe_ && _badTimeToHASubscribe != 0))
1812 return (
int)version;
1816 if (haSeq > _lastSentHaSequenceNumber)
1818 while (haSeq > _lastSentHaSequenceNumber + 1)
1824 _lastSentHaSequenceNumber+1))
1830 version = _replayer._version;
1833 catch(
const DisconnectedException&)
1835 catch(
const DisconnectedException& e)
1838 result = _replayer._res;
1843 localMessage.getMessage(),
1845 ++_lastSentHaSequenceNumber;
1849 localMessage.getMessage(),
1853 if (!isHASubscribe_ && !haSeq &&
1854 localMessage.getMessage() == message.getMessage())
1858 if(_isRetryOnDisconnect)
1860 Unlock<Mutex> u(_lock);
1865 if ((isHASubscribe_ || haSeq) &&
1868 return (
int)version;
1875 AMPSException::throwFor(_client, result);
1881 amps_invoke_waiting_function();
1885 if (result !=
AMPS_E_OK) AMPSException::throwFor(_client, result);
1886 return (
int)version;
1889 void addMessageHandler(
const Field& commandId_,
1891 unsigned requestedAcks_,
bool isSubscribe_)
1893 Lock<Mutex> lock(_lock);
1894 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1898 bool removeMessageHandler(
const Field& commandId_)
1900 Lock<Mutex> lock(_lock);
1901 return _routes.removeRoute(commandId_);
1909 bool isSubscribe =
false;
1910 bool isSubscribeOnly =
false;
1911 bool replace =
false;
1913 unsigned systemAddedAcks = Message::AckType::None;
1917 case Message::Command::Subscribe:
1918 case Message::Command::DeltaSubscribe:
1919 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos;
1922 systemAddedAcks |= Message::AckType::Persisted;
1924 isSubscribeOnly =
true;
1926 case Message::Command::SOWAndSubscribe:
1927 case Message::Command::SOWAndDeltaSubscribe:
1934 while (!replace &&
id != subId && _routes.hasRoute(
id))
1946 case Message::Command::SOW:
1953 while (!replace &&
id != subId && _routes.hasRoute(
id))
1964 if (!isSubscribeOnly)
1973 while (!replace && qid != subId && qid !=
id 1974 && _routes.hasRoute(qid))
1980 systemAddedAcks |= Message::AckType::Processed;
1982 if (!isSubscribeOnly) systemAddedAcks |= Message::AckType::Completed;
1985 int routesAdded = 0;
1986 Lock<Mutex> l(_lock);
1987 if (!subId.
empty() && messageHandler_.isValid())
1989 if (!_routes.hasRoute(subId))
1995 _routes.addRoute(subId, messageHandler_, requestedAcks,
1996 systemAddedAcks, isSubscribe);
1998 if (!isSubscribeOnly && !qid.
empty()
1999 && messageHandler_.isValid() && qid != subId)
2001 if (routesAdded == 0)
2003 _routes.addRoute(qid, messageHandler_,
2004 requestedAcks, systemAddedAcks,
false);
2010 Unlock<Mutex> u(_lock);
2011 data = amps_invoke_copy_route_function(
2012 messageHandler_.userData());
2016 _routes.addRoute(qid, messageHandler_, requestedAcks,
2017 systemAddedAcks,
false);
2021 _routes.addRoute(qid,
2024 requestedAcks, systemAddedAcks,
false);
2029 if (!
id.empty() && messageHandler_.isValid()
2030 && requestedAcks & ~
Message::AckType::Persisted
2031 &&
id != subId &&
id != qid)
2033 if (routesAdded == 0)
2035 _routes.addRoute(
id, messageHandler_, requestedAcks,
2036 systemAddedAcks,
false);
2042 Unlock<Mutex> u(_lock);
2043 data = amps_invoke_copy_route_function(
2044 messageHandler_.userData());
2048 _routes.addRoute(
id, messageHandler_, requestedAcks,
2049 systemAddedAcks,
false);
2053 _routes.addRoute(
id,
2057 systemAddedAcks,
false);
2066 syncAckProcessing(timeout_, message_, 0,
false);
2073 _routes.removeRoute(
id);
2080 case Message::Command::Unsubscribe:
2081 case Message::Command::Heartbeat:
2082 case Message::Command::Logon:
2083 case Message::Command::StartTimer:
2084 case Message::Command::StopTimer:
2085 case Message::Command::DeltaPublish:
2086 case Message::Command::Publish:
2087 case Message::Command::SOWDelete:
2089 Lock<Mutex> l(_lock);
2098 if (messageHandler_.isValid())
2100 _routes.addRoute(
id, messageHandler_, requestedAcks,
2101 Message::AckType::None,
false);
2108 case Message::Command::GroupBegin:
2109 case Message::Command::GroupEnd:
2110 case Message::Command::OOF:
2111 case Message::Command::Ack:
2112 case Message::Command::Unknown:
2114 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2120 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2122 Lock<Mutex> l(_lock);
2123 _disconnectHandler = disconnectHandler;
2126 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2128 switch (command_[0])
2130 #if 0 // Not currently implemented to avoid an extra branch in delivery 2132 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2135 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2139 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2141 #if 0 // Not currently implemented to avoid an extra branch in delivery 2143 if (command_[6] ==
'b')
2145 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2147 else if (command_[6] ==
'e')
2149 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2153 std::ostringstream os;
2154 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2155 throw CommandException(os.str());
2159 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2163 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2167 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2171 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2174 std::ostringstream os;
2175 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2176 throw CommandException(os.str());
2181 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2185 #if 0 // Not currently implemented to avoid an extra branch in delivery 2186 case Message::Command::Publish:
2187 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2189 case Message::Command::SOW:
2190 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2193 case Message::Command::Heartbeat:
2194 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2196 #if 0 // Not currently implemented to avoid an extra branch in delivery 2197 case Message::Command::GroupBegin:
2198 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2200 case Message::Command::GroupEnd:
2201 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2203 case Message::Command::OOF:
2204 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2207 case Message::Command::Ack:
2208 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2212 unsigned command = command_;
2213 while (command > 0) { ++bits; command >>= 1; }
2215 AMPS_snprintf(errBuf,
sizeof(errBuf),
2216 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2217 CommandConstants<0>::Lengths[bits],
2218 CommandConstants<0>::Values[bits]);
2219 throw CommandException(errBuf);
2224 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2226 _globalCommandTypeHandlers[handlerType_] = handler_;
2231 Lock<Mutex> l(_lock);
2232 _failedWriteHandler.reset(handler_);
2235 void setPublishStore(
const Store& publishStore_)
2237 Lock<Mutex> l(_lock);
2238 if (_connected)
throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2239 _publishStore = publishStore_;
2244 Lock<Mutex> l(_lock);
2245 if (_connected)
throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2246 _bookmarkStore = bookmarkStore_;
2251 Lock<Mutex> l(_lock);
2252 _subscriptionManager.reset(subscriptionManager_);
2260 DisconnectHandler getDisconnectHandler()
const 2262 return _disconnectHandler;
2267 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2275 Store getPublishStore()
const 2277 return _publishStore;
2282 return _bookmarkStore;
2285 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2289 Lock<Mutex> l(_lock);
2291 _publishMessage.assignData(data_, dataLen_);
2292 _send(_publishMessage);
2297 if (!publishStoreMessage)
2299 publishStoreMessage =
new Message();
2300 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2302 publishStoreMessage->reset();
2303 publishStoreMessage->setCommandEnum(Message::Command::Publish);
2304 return _publish(topic_, topicLen_, data_, dataLen_);
2308 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2309 size_t dataLen_,
unsigned long expiration_)
2313 Lock<Mutex> l(_lock);
2315 _publishMessage.assignData(data_, dataLen_);
2316 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2317 size_t pos = convertToCharArray(exprBuf, expiration_);
2319 _send(_publishMessage);
2325 if (!publishStoreMessage)
2327 publishStoreMessage =
new Message();
2328 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2330 publishStoreMessage->reset();
2331 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2332 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2333 publishStoreMessage->setCommandEnum(Message::Command::Publish)
2334 .assignExpiration(exprBuf+exprPos,
2335 AMPS_NUMBER_BUFFER_LEN-exprPos);
2336 return _publish(topic_, topicLen_, data_, dataLen_);
2343 ClientImpl* _pClient;
2345 volatile bool _acked;
2346 volatile bool _disconnected;
2348 FlushAckHandler(ClientImpl* pClient_)
2349 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
2351 pClient_->addConnectionStateListener(
this);
2355 _pClient->removeConnectionStateListener(
this);
2356 _pClient->removeMessageHandler(_cmdId);
2359 void setCommandId(
const Field& cmdId_)
2367 void connectionStateChanged(
State state_)
2369 if (state_ <= Shutdown)
2371 _disconnected =
true;
2380 return _acked || _disconnected;
2384 void publishFlush(
long timeout_,
unsigned ackType_)
2386 static const char* processed =
"processed";
2387 static const size_t processedLen = strlen(processed);
2388 static const char* persisted =
"persisted";
2389 static const size_t persistedLen = strlen(persisted);
2390 static const char* flush =
"flush";
2391 static const size_t flushLen = strlen(flush);
2392 static VersionInfo minPersisted(
"5.3.3.0");
2393 static VersionInfo minFlush(
"4");
2394 if (ackType_ != Message::AckType::Processed
2395 && ackType_ != Message::AckType::Persisted)
2397 throw new CommandException(
"Flush can only be used with processed or persisted acks.");
2399 FlushAckHandler flushHandler(
this);
2400 if (_serverVersion >= minFlush)
2402 Lock<Mutex> l(_lock);
2404 throw DisconnectedException(
"Not cconnected trying to flush");
2408 if (_serverVersion < minPersisted
2409 || ackType_ == Message::AckType::Processed)
2419 std::bind(&FlushAckHandler::invoke,
2420 std::ref(flushHandler),
2421 std::placeholders::_1),
2423 if (_send(_message) == -1)
2424 throw DisconnectedException(
"Disconnected trying to flush");
2430 _publishStore.
flush(timeout_);
2432 catch (
const AMPSException& ex)
2434 AMPS_UNHANDLED_EXCEPTION(ex);
2438 else if (_serverVersion < minFlush)
2440 if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2441 else { AMPS_USLEEP(1000 * 1000); }
2446 Timer timer((
double)timeout_);
2448 while (!timer.check() && !flushHandler.done())
2451 amps_invoke_waiting_function();
2456 while (!flushHandler.done())
2459 amps_invoke_waiting_function();
2463 if (!flushHandler.done())
2464 throw TimedOutException(
"Timed out waiting for flush");
2466 if (!flushHandler.acked() && !_publishStore.
isValid())
2467 throw DisconnectedException(
"Disconnected waiting for flush");
2470 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2471 const char* data_,
size_t dataLength_)
2475 Lock<Mutex> l(_lock);
2477 _deltaMessage.assignData(data_, dataLength_);
2478 _send(_deltaMessage);
2483 if (!publishStoreMessage)
2485 publishStoreMessage =
new Message();
2486 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2488 publishStoreMessage->reset();
2489 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2490 return _publish(topic_, topicLength_, data_, dataLength_);
2494 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2495 const char* data_,
size_t dataLength_,
2496 unsigned long expiration_)
2500 Lock<Mutex> l(_lock);
2502 _deltaMessage.assignData(data_, dataLength_);
2503 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2504 size_t pos = convertToCharArray(exprBuf, expiration_);
2506 _send(_deltaMessage);
2512 if (!publishStoreMessage)
2514 publishStoreMessage =
new Message();
2515 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2517 publishStoreMessage->reset();
2518 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2519 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2520 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2521 .assignExpiration(exprBuf+exprPos,
2522 AMPS_NUMBER_BUFFER_LEN-exprPos);
2523 return _publish(topic_, topicLength_, data_, dataLength_);
2527 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
2528 const char* data_,
size_t dataLength_)
2530 publishStoreMessage->assignTopic(topic_, topicLength_)
2531 .setAckTypeEnum(Message::AckType::Persisted)
2532 .assignData(data_, dataLength_);
2533 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
2534 char buf[AMPS_NUMBER_BUFFER_LEN];
2535 size_t pos = convertToCharArray(buf, haSequenceNumber);
2536 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2538 Lock<Mutex> l(_lock);
2539 _send(*publishStoreMessage, haSequenceNumber);
2541 return haSequenceNumber;
2544 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
2545 const char* options_ = NULL)
2547 Lock<Mutex> l(_lock);
2548 return _logon(timeout_, authenticator_, options_);
2551 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
2552 const char* options_ = NULL)
2554 AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2560 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 2562 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2565 if(uri.user().size()) _message.
setUserId(uri.user());
2566 if(uri.password().size()) _message.
setPassword(uri.password());
2567 if(uri.protocol() ==
"amps" && uri.messageType().size())
2571 if(uri.isTrue(
"pretty"))
2577 if (!_logonCorrelationData.empty())
2591 AckResponse ack = syncAckProcessing(timeout_, _message);
2592 if (ack.status() ==
"retry")
2594 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
2595 _username = ack.username();
2600 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
2604 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2609 catch(
const AMPSException& ex)
2611 AMPS_UNHANDLED_EXCEPTION(ex);
2623 _publishStore.
replay(_replayer);
2624 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2626 catch(
const StoreException& ex)
2628 std::ostringstream os;
2629 os <<
"A local store exception occurred while logging on." 2631 throw ConnectionException(os.str());
2633 catch(
const AMPSException& ex)
2635 AMPS_UNHANDLED_EXCEPTION(ex);
2638 catch(
const std::exception& ex)
2640 AMPS_UNHANDLED_EXCEPTION(ex);
2648 return newCommandId;
2652 const std::string& topic_,
2654 const std::string& filter_,
2655 const std::string& bookmark_,
2656 const std::string& options_,
2657 const std::string& subId_,
2658 bool isHASubscribe_ =
true)
2660 isHASubscribe_ &= (bool)_subscriptionManager;
2661 Lock<Mutex> l(_lock);
2665 std::string subId(subId_);
2668 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2669 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
2678 unsigned ackTypes = Message::AckType::Processed;
2680 if (!bookmark_.empty() && _bookmarkStore.isValid())
2682 ackTypes |= Message::AckType::Persisted;
2686 if (filter_.length()) _message.
setFilter(filter_);
2687 if (bookmark_.length())
2697 if (_bookmarkStore.isValid())
2702 _bookmarkStore.
log(_message);
2703 _bookmarkStore.
discard(_message);
2709 if (options_.length()) _message.
setOptions(options_);
2715 Unlock<Mutex> u(_lock);
2716 _subscriptionManager->subscribe(messageHandler_, message,
2717 Message::AckType::None);
2718 if (_badTimeToHASubscribe)
return subId;
2723 Message::AckType::None, ackTypes,
true);
2726 if (!options_.empty()) message.
setOptions(options_);
2729 syncAckProcessing(timeout_, message, isHASubscribe_);
2731 catch (
const DisconnectedException&)
2733 if (!isHASubscribe_)
2735 _routes.removeRoute(subIdField);
2740 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2744 catch (
const TimedOutException&)
2746 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2754 Unlock<Mutex> unlock(_lock);
2755 _subscriptionManager->unsubscribe(subIdField);
2757 _routes.removeRoute(subIdField);
2763 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
2764 const std::string& topic_,
2766 const std::string& filter_,
2767 const std::string& bookmark_,
2768 const std::string& options_,
2769 const std::string& subId_ =
"",
2770 bool isHASubscribe_ =
true)
2772 isHASubscribe_ &= (bool)_subscriptionManager;
2773 Lock<Mutex> l(_lock);
2777 std::string subId(subId_);
2787 unsigned ackTypes = Message::AckType::Processed;
2789 if (!bookmark_.empty() && _bookmarkStore.isValid())
2791 ackTypes |= Message::AckType::Persisted;
2794 if (filter_.length()) _message.
setFilter(filter_);
2795 if (bookmark_.length())
2805 if (_bookmarkStore.isValid())
2810 _bookmarkStore.
log(_message);
2811 _bookmarkStore.
discard(_message);
2817 if (options_.length()) _message.
setOptions(options_);
2822 Unlock<Mutex> u(_lock);
2823 _subscriptionManager->subscribe(messageHandler_, message,
2824 Message::AckType::None);
2825 if (_badTimeToHASubscribe)
return subId;
2830 Message::AckType::None, ackTypes,
true);
2833 if (!options_.empty()) message.
setOptions(options_);
2836 syncAckProcessing(timeout_, message, isHASubscribe_);
2838 catch (
const DisconnectedException&)
2840 if (!isHASubscribe_)
2842 _routes.removeRoute(subIdField);
2846 catch (
const TimedOutException&)
2848 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2856 Unlock<Mutex> unlock(_lock);
2857 _subscriptionManager->unsubscribe(subIdField);
2859 _routes.removeRoute(subIdField);
2865 void unsubscribe(
const std::string&
id)
2867 Lock<Mutex> l(_lock);
2868 unsubscribeInternal(
id);
2871 void unsubscribe(
void)
2873 if (_subscriptionManager)
2875 _subscriptionManager->clear();
2878 _routes.unsubscribeAll();
2879 Lock<Mutex> l(_lock);
2884 _sendWithoutRetry(_message);
2886 deferredExecution(&s_noOpFn, NULL);
2890 const std::string& topic_,
2891 const std::string& filter_ =
"",
2892 const std::string& orderBy_ =
"",
2893 const std::string& bookmark_ =
"",
2894 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2895 int topN_ = AMPS_DEFAULT_TOP_N,
2896 const std::string& options_ =
"",
2897 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2899 Lock<Mutex> l(_lock);
2906 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2909 if (filter_.length()) _message.
setFilter(filter_);
2910 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2911 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2914 if (options_.length()) _message.
setOptions(options_);
2916 _routes.addRoute(_message.
getQueryID(), messageHandler_,
2917 Message::AckType::None, ackTypes,
false);
2921 syncAckProcessing(timeout_, _message);
2925 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2933 const std::string& topic_,
2935 const std::string& filter_ =
"",
2936 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2937 int topN_ = AMPS_DEFAULT_TOP_N)
2940 return sow(messageHandler_,
2951 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
2952 const std::string& topic_,
2953 const std::string& filter_ =
"",
2954 const std::string& orderBy_ =
"",
2955 const std::string& bookmark_ =
"",
2956 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2957 int topN_ = AMPS_DEFAULT_TOP_N,
2958 const std::string& options_ =
"",
2959 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2960 bool isHASubscribe_ =
true)
2962 isHASubscribe_ &= (bool)_subscriptionManager;
2963 Lock<Mutex> l(_lock);
2970 std::string subId = cid;
2972 if (filter_.length()) _message.
setFilter(filter_);
2973 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2974 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2977 if (options_.length()) _message.
setOptions(options_);
2983 Unlock<Mutex> u(_lock);
2984 _subscriptionManager->subscribe(messageHandler_, message,
2985 Message::AckType::None);
2986 if (_badTimeToHASubscribe)
return subId;
2988 _routes.addRoute(cid, messageHandler_,
2989 Message::AckType::None, Message::AckType::Processed,
true);
2991 if (!options_.empty()) message.
setOptions(options_);
2994 syncAckProcessing(timeout_, message, isHASubscribe_);
2996 catch (
const DisconnectedException&)
2998 if (!isHASubscribe_)
3000 _routes.removeRoute(subId);
3004 catch (
const TimedOutException&)
3006 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3014 Unlock<Mutex> unlock(_lock);
3015 _subscriptionManager->unsubscribe(cid);
3017 _routes.removeRoute(subId);
3023 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3024 const std::string& topic_,
3026 const std::string& filter_ =
"",
3027 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3028 bool oofEnabled_ =
false,
3029 int topN_ = AMPS_DEFAULT_TOP_N,
3030 bool isHASubscribe_ =
true)
3033 return sowAndSubscribe(messageHandler_,
3040 (oofEnabled_ ?
"oof" :
""),
3045 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3046 const std::string& topic_,
3047 const std::string& filter_ =
"",
3048 const std::string& orderBy_ =
"",
3049 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3050 int topN_ = AMPS_DEFAULT_TOP_N,
3051 const std::string& options_ =
"",
3052 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3053 bool isHASubscribe_ =
true)
3055 isHASubscribe_ &= (bool)_subscriptionManager;
3056 Lock<Mutex> l(_lock);
3064 if (filter_.length()) _message.
setFilter(filter_);
3065 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
3068 if (options_.length()) _message.
setOptions(options_);
3073 Unlock<Mutex> u(_lock);
3074 _subscriptionManager->subscribe(messageHandler_, message,
3075 Message::AckType::None);
3076 if (_badTimeToHASubscribe)
return subId;
3078 _routes.addRoute(message.
getQueryID(), messageHandler_,
3079 Message::AckType::None, Message::AckType::Processed,
true);
3081 if (!options_.empty()) message.
setOptions(options_);
3084 syncAckProcessing(timeout_, message, isHASubscribe_);
3086 catch (
const DisconnectedException&)
3088 if (!isHASubscribe_)
3090 _routes.removeRoute(subId);
3094 catch (
const TimedOutException&)
3096 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3104 Unlock<Mutex> unlock(_lock);
3105 _subscriptionManager->unsubscribe(
Field(subId));
3107 _routes.removeRoute(subId);
3113 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3114 const std::string& topic_,
3116 const std::string& filter_ =
"",
3117 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3118 bool oofEnabled_ =
false,
3119 bool sendEmpties_ =
false,
3120 int topN_ = AMPS_DEFAULT_TOP_N,
3121 bool isHASubscribe_ =
true)
3125 if (oofEnabled_) options.
setOOF();
3127 return sowAndDeltaSubscribe(messageHandler_,
3139 const std::string& topic_,
3140 const std::string& filter_,
3146 unsigned ackType = Message::AckType::Processed |
3147 Message::AckType::Stats |
3148 Message::AckType::Persisted;
3149 if (!publishStoreMessage)
3151 publishStoreMessage =
new Message();
3152 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3154 publishStoreMessage->reset();
3155 if (commandId_.
empty())
3157 publishStoreMessage->newCommandId();
3158 commandId_ = publishStoreMessage->getCommandId();
3162 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3164 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3165 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3166 .assignQueryID(commandId_.
data(), commandId_.
len())
3167 .setAckTypeEnum(ackType)
3168 .assignTopic(topic_.c_str(), topic_.length())
3169 .assignFilter(filter_.c_str(), filter_.length());
3170 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3171 char buf[AMPS_NUMBER_BUFFER_LEN];
3172 size_t pos = convertToCharArray(buf, haSequenceNumber);
3173 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3177 Lock<Mutex> l(_lock);
3178 _routes.addRoute(commandId_, messageHandler_,
3179 Message::AckType::Stats,
3180 Message::AckType::Processed|Message::AckType::Persisted,
3182 syncAckProcessing(timeout_, *publishStoreMessage,
3185 catch (
const DisconnectedException&)
3191 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3195 return (std::string)commandId_;
3199 Lock<Mutex> l(_lock);
3201 if (commandId_.
empty())
3212 .assignQueryID(commandId_.
data(), commandId_.
len())
3213 .setAckTypeEnum(Message::AckType::Processed |
3214 Message::AckType::Stats)
3216 .assignFilter(filter_.c_str(), filter_.length());
3217 _routes.addRoute(commandId_, messageHandler_,
3218 Message::AckType::Stats,
3219 Message::AckType::Processed,
3223 syncAckProcessing(timeout_, _message);
3227 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3230 return (std::string)commandId_;
3234 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3235 const std::string& topic_,
3236 const std::string& data_,
3242 unsigned ackType = Message::AckType::Processed |
3243 Message::AckType::Stats |
3244 Message::AckType::Persisted;
3245 if (!publishStoreMessage)
3247 publishStoreMessage =
new Message();
3248 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3250 publishStoreMessage->reset();
3251 if (commandId_.
empty())
3253 publishStoreMessage->newCommandId();
3254 commandId_ = publishStoreMessage->getCommandId();
3258 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3260 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3261 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3262 .assignQueryID(commandId_.
data(), commandId_.
len())
3263 .setAckTypeEnum(ackType)
3264 .assignTopic(topic_.c_str(), topic_.length())
3265 .assignData(data_.c_str(), data_.length());
3266 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3267 char buf[AMPS_NUMBER_BUFFER_LEN];
3268 size_t pos = convertToCharArray(buf, haSequenceNumber);
3269 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3273 Lock<Mutex> l(_lock);
3274 _routes.addRoute(commandId_, messageHandler_,
3275 Message::AckType::Stats,
3276 Message::AckType::Processed|Message::AckType::Persisted,
3278 syncAckProcessing(timeout_, *publishStoreMessage,
3281 catch (
const DisconnectedException&)
3287 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3291 return (std::string)commandId_;
3295 Lock<Mutex> l(_lock);
3297 if (commandId_.
empty())
3308 .assignQueryID(commandId_.
data(), commandId_.
len())
3309 .setAckTypeEnum(Message::AckType::Processed |
3310 Message::AckType::Stats)
3312 .assignData(data_.c_str(), data_.length());
3313 _routes.addRoute(commandId_, messageHandler_,
3314 Message::AckType::Stats,
3315 Message::AckType::Processed,
3319 syncAckProcessing(timeout_, _message);
3323 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3326 return (std::string)commandId_;
3330 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
3331 const std::string& topic_,
3332 const std::string& keys_,
3338 unsigned ackType = Message::AckType::Processed |
3339 Message::AckType::Stats |
3340 Message::AckType::Persisted;
3341 if (!publishStoreMessage)
3343 publishStoreMessage =
new Message();
3344 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3346 publishStoreMessage->reset();
3347 if (commandId_.
empty())
3349 publishStoreMessage->newCommandId();
3350 commandId_ = publishStoreMessage->getCommandId();
3354 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3356 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3357 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3358 .assignQueryID(commandId_.
data(), commandId_.
len())
3359 .setAckTypeEnum(ackType)
3360 .assignTopic(topic_.c_str(), topic_.length())
3361 .assignSowKeys(keys_.c_str(), keys_.length());
3362 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3363 char buf[AMPS_NUMBER_BUFFER_LEN];
3364 size_t pos = convertToCharArray(buf, haSequenceNumber);
3365 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3369 Lock<Mutex> l(_lock);
3370 _routes.addRoute(commandId_, messageHandler_,
3371 Message::AckType::Stats,
3372 Message::AckType::Processed|Message::AckType::Persisted,
3374 syncAckProcessing(timeout_, *publishStoreMessage,
3377 catch (
const DisconnectedException&)
3383 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3387 return (std::string)commandId_;
3391 Lock<Mutex> l(_lock);
3393 if (commandId_.
empty())
3404 .assignQueryID(commandId_.
data(), commandId_.
len())
3405 .setAckTypeEnum(Message::AckType::Processed |
3406 Message::AckType::Stats)
3408 .assignSowKeys(keys_.c_str(), keys_.length());
3409 _routes.addRoute(commandId_, messageHandler_,
3410 Message::AckType::Stats,
3411 Message::AckType::Processed,
3415 syncAckProcessing(timeout_, _message);
3419 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3422 return (std::string)commandId_;
3426 void startTimer(
void)
3428 if (_serverVersion >=
"5.3.2.0")
3430 throw CommandException(
"The start_timer command is deprecated.");
3432 Lock<Mutex> l(_lock);
3441 if (_serverVersion >=
"5.3.2.0")
3443 throw CommandException(
"The stop_timer command is deprecated.");
3445 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
3460 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
3462 _pExceptionListener = pListener_;
3463 _exceptionListener = _pExceptionListener.get();
3468 _exceptionListener = &listener_;
3473 return *_exceptionListener;
3476 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
3478 if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3480 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
3482 Lock<Mutex> l(_lock);
3483 if(_heartbeatInterval != heartbeatInterval_ ||
3484 _readTimeout != readTimeout_)
3486 _heartbeatInterval = heartbeatInterval_;
3487 _readTimeout = readTimeout_;
3492 void _sendHeartbeat(
void)
3494 if (_connected && _heartbeatInterval != 0)
3496 std::ostringstream options;
3497 options <<
"start," << _heartbeatInterval;
3502 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3503 _heartbeatTimer.start();
3506 _sendWithoutRetry(startMessage);
3507 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3509 catch(ConnectionException &ex_)
3513 AMPS_UNHANDLED_EXCEPTION(ex_);
3517 if(_readTimeout && _connected)
3523 AMPSException::throwFor(_client, result);
3529 Lock<Mutex> lock(_lock);
3530 _connectionStateListeners.insert(listener_);
3535 Lock<Mutex> lock(_lock);
3536 _connectionStateListeners.erase(listener_);
3539 void clearConnectionStateListeners()
3541 Lock<Mutex> lock(_lock);
3542 _connectionStateListeners.clear();
3547 unsigned systemAddedAcks_,
bool isSubscribe_)
3549 Message message = command_.getMessage();
3554 bool added = qid.
len() || subid.
len() || cid_.
len();
3556 if (subid.
len() > 0)
3560 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
3561 systemAddedAcks_, isSubscribe_);
3563 if (qid.
len() > 0 && qid != subid)
3565 while (_routes.hasRoute(qid))
3572 if (addedCount == 0)
3574 _routes.addRoute(qid, handler_, requestedAcks_,
3575 systemAddedAcks_, isSubscribe_);
3581 Unlock<Mutex> u(_lock);
3582 data = amps_invoke_copy_route_function(handler_.userData());
3586 _routes.addRoute(qid, handler_, requestedAcks_,
3587 systemAddedAcks_,
false);
3591 _routes.addRoute(qid,
3595 systemAddedAcks_,
false);
3600 if (cid_.
len() > 0 && cid_ != qid && cid_ != subid
3601 && requestedAcks_ & ~
Message::AckType::Persisted)
3603 while (_routes.hasRoute(cid_))
3607 if (addedCount == 0)
3609 _routes.addRoute(cid_, handler_, requestedAcks_,
3610 systemAddedAcks_,
false);
3616 Unlock<Mutex> u(_lock);
3617 data = amps_invoke_copy_route_function(handler_.userData());
3621 _routes.addRoute(cid_, handler_, requestedAcks_,
3622 systemAddedAcks_,
false);
3626 _routes.addRoute(cid_,
3630 systemAddedAcks_,
false);
3634 else if (commandType == Message::Command::Publish ||
3635 commandType == Message::Command::DeltaPublish)
3638 _routes.addRoute(cid_, handler_, requestedAcks_,
3639 systemAddedAcks_,
false);
3644 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
3649 bool isHASubscribe_ =
true)
3651 isHASubscribe_ &= (bool)_subscriptionManager;
3652 Message& message = command_.getMessage();
3653 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3654 Message::AckType::Processed : Message::AckType::None;
3656 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
3658 if (commandType == Message::Command::SOW
3659 || commandType == Message::Command::SOWAndSubscribe
3660 || commandType == Message::Command::SOWAndDeltaSubscribe
3661 || commandType == Message::Command::StopTimer)
3662 systemAddedAcks |= Message::AckType::Completed;
3664 if (handler_.isValid() && cid.
empty())
3670 if (command_.isSubscribe())
3673 if (_bookmarkStore.isValid())
3675 systemAddedAcks |= Message::AckType::Persisted;
3683 _bookmarkStore.
log(message);
3684 if (!BookmarkRange::isRange(bookmark))
3686 _bookmarkStore.
discard(message);
3700 systemAddedAcks |= Message::AckType::Persisted;
3702 bool isSubscribe = command_.isSubscribe();
3703 if (handler_.isValid() && !isSubscribe)
3705 _registerHandler(command_, cid, handler_,
3706 requestedAcks, systemAddedAcks, isSubscribe);
3708 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
3711 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3714 Unlock<Mutex> u(_lock);
3715 haSequenceNumber = _publishStore.
store(message);
3722 syncAckProcessing((
long)command_.getTimeout(), message,
3725 else _send(message, haSequenceNumber);
3727 catch (
const DisconnectedException&)
3733 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3744 Unlock<Mutex> u(_lock);
3745 _subscriptionManager->subscribe(handler_,
3748 if (_badTimeToHASubscribe)
3751 return std::string(subId.
data(), subId.
len());
3754 if (handler_.isValid())
3756 _registerHandler(command_, cid, handler_,
3757 requestedAcks, systemAddedAcks, isSubscribe);
3764 syncAckProcessing((
long)command_.getTimeout(), message,
3767 else _send(message);
3769 catch (
const DisconnectedException&)
3771 if (!isHASubscribe_)
3773 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3774 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3775 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3780 catch (
const TimedOutException&)
3782 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3783 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3784 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
3792 Unlock<Mutex> unlock(_lock);
3793 _subscriptionManager->unsubscribe(subId);
3797 _routes.removeRoute(cid);
3798 _routes.removeRoute(subId);
3801 if (subId.
len() > 0)
3804 return std::string(subId.
data(), subId.
len());
3814 syncAckProcessing((
long)(command_.getTimeout()), message);
3816 else _send(message);
3818 catch (
const DisconnectedException&)
3820 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3821 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3827 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3828 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3841 bool isHASubscribe_ =
true)
3843 Lock<Mutex> lock(_lock);
3844 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3848 void setAutoAck(
bool isAutoAckEnabled_)
3850 _isAutoAckEnabled = isAutoAckEnabled_;
3852 bool getAutoAck(
void)
const 3854 return _isAutoAckEnabled;
3856 void setAckBatchSize(
const unsigned batchSize_)
3858 _ackBatchSize = batchSize_;
3859 if (!_queueAckTimeout)
3861 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3865 unsigned getAckBatchSize(
void)
const 3867 return _ackBatchSize;
3869 int getAckTimeout(
void)
const 3871 return _queueAckTimeout;
3873 void setAckTimeout(
const int ackTimeout_)
3876 _queueAckTimeout = ackTimeout_;
3878 size_t _ack(QueueBookmarks& queueBookmarks_)
3880 if(queueBookmarks_._bookmarkCount)
3882 if (!publishStoreMessage)
3884 publishStoreMessage =
new Message();
3885 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3887 publishStoreMessage->reset();
3888 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3889 .setTopic(queueBookmarks_._topic)
3890 .setBookmark(queueBookmarks_._data)
3891 .setCommandId(
"AMPS-queue-ack");
3892 amps_uint64_t haSequenceNumber = 0;
3895 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3896 publishStoreMessage->setAckType(
"persisted")
3897 .setSequence(haSequenceNumber);
3898 queueBookmarks_._data.erase();
3899 queueBookmarks_._bookmarkCount = 0;
3901 _send(*publishStoreMessage, haSequenceNumber);
3904 queueBookmarks_._data.erase();
3905 queueBookmarks_._bookmarkCount = 0;
3911 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3913 if (_isAutoAckEnabled)
return;
3914 _ack(topic_, bookmark_, options_);
3916 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3918 if (bookmark_.
len() == 0)
return;
3919 Lock<Mutex> lock(_lock);
3920 if(_ackBatchSize < 2 || options_ != NULL)
3922 if (!publishStoreMessage)
3924 publishStoreMessage =
new Message();
3925 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3927 publishStoreMessage->reset();
3928 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3929 .setCommandId(
"AMPS-queue-ack")
3930 .setTopic(topic_).setBookmark(bookmark_);
3931 if (options_) publishStoreMessage->setOptions(options_);
3932 amps_uint64_t haSequenceNumber = 0;
3935 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3936 publishStoreMessage->setAckType(
"persisted")
3937 .setSequence(haSequenceNumber);
3939 _send(*publishStoreMessage, haSequenceNumber);
3943 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(),topic_.
len());
3944 TopicHashMap::iterator it = _topicHashMap.find(hash);
3945 if(it == _topicHashMap.end())
3948 it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3950 QueueBookmarks &queueBookmarks = it->second;
3951 if(queueBookmarks._data.length())
3953 queueBookmarks._data.append(
",");
3957 queueBookmarks._oldestTime = amps_now();
3959 queueBookmarks._data.append(bookmark_);
3960 if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3962 void flushAcks(
void)
3964 size_t sendCount = 0;
3971 Lock<Mutex> lock(_lock);
3972 typedef TopicHashMap::iterator iterator;
3973 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3975 QueueBookmarks& queueBookmarks = it->second;
3976 sendCount += _ack(queueBookmarks);
3979 if(sendCount && _connected) publishFlush(0, Message::AckType::Processed);
3982 void checkQueueAcks(
void)
3984 if(!_topicHashMap.size())
return;
3985 Lock<Mutex> lock(_lock);
3988 amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3989 typedef TopicHashMap::iterator iterator;
3990 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3992 QueueBookmarks& queueBookmarks = it->second;
3993 if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3996 catch(std::exception& ex)
3998 AMPS_UNHANDLED_EXCEPTION(ex);
4002 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4004 Lock<Mutex> lock(_deferredExecutionLock);
4005 _deferredExecutionList.push_back(
4006 DeferredExecutionRequest(func_,userData_));
4009 inline void processDeferredExecutions(
void)
4011 if(_deferredExecutionList.size())
4013 Lock<Mutex> lock(_deferredExecutionLock);
4014 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4015 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4016 for(; it != end; ++it)
4020 it->_func(it->_userData);
4027 _deferredExecutionList.clear();
4028 _routes.invalidateCache();
4029 _routeCache.invalidateCache();
4033 bool getRetryOnDisconnect(
void)
const 4035 return _isRetryOnDisconnect;
4038 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4040 _isRetryOnDisconnect = isRetryOnDisconnect_;
4043 void setDefaultMaxDepth(
unsigned maxDepth_)
4045 _defaultMaxDepth = maxDepth_;
4048 unsigned getDefaultMaxDepth(
void)
const 4050 return _defaultMaxDepth;
4142 RefHandle<MessageStreamImpl> _body;
4152 inline void advance(
void);
4164 bool operator==(
const iterator& rhs)
4166 return _pStream == rhs._pStream;
4168 bool operator!=(
const iterator& rhs)
4170 return _pStream != rhs._pStream;
4172 void operator++(
void) { advance(); }
4173 Message operator*(
void) {
return _current; }
4174 Message* operator->(
void) {
return &_current; }
4183 if(!_body.isValid())
4185 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
4214 unsigned getMaxDepth(
void)
const;
4217 unsigned getDepth(
void)
const;
4221 inline void setSOWOnly(
const std::string& commandId_,
4222 const std::string& queryId_ =
"");
4223 inline void setSubscription(
const std::string& subId_,
4224 const std::string& commandId_ =
"",
4225 const std::string& queryId_ =
"");
4226 inline void setStatsOnly(
const std::string& commandId_,
4227 const std::string& queryId_ =
"");
4228 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
4234 friend class Client;
4260 BorrowRefHandle<ClientImpl> _body;
4262 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4263 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4264 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4275 : _body(new ClientImpl(clientName), true)
4278 Client(ClientImpl* existingClient)
4279 : _body(existingClient,
true)
4282 Client(ClientImpl* existingClient,
bool isRef)
4283 : _body(existingClient, isRef)
4286 Client(
const Client& rhs) : _body(rhs._body) {;}
4287 virtual ~Client(
void) {;}
4289 Client& operator=(
const Client& rhs)
4297 return _body.isValid();
4314 _body.get().setName(name);
4321 return _body.get().getName();
4329 return _body.get().getNameHash();
4340 _body.get().setLogonCorrelationData(logonCorrelationData_);
4347 return _body.get().getLogonCorrelationData();
4360 return _body.get().getServerVersion();
4371 return _body.get().getServerVersionInfo();
4385 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4400 return AMPS::convertVersionToNumber(data_, len_);
4407 return _body.get().getURI();
4431 _body.get().connect(uri);
4438 _body.get().disconnect();
4456 _body.get().send(message);
4469 unsigned requestedAcks_,
bool isSubscribe_)
4471 _body.get().addMessageHandler(commandId_, messageHandler_,
4472 requestedAcks_, isSubscribe_);
4480 return _body.get().removeMessageHandler(commandId_);
4508 return _body.get().send(messageHandler, message, timeout);
4522 _body.get().setDisconnectHandler(disconnectHandler);
4530 return _body.get().getDisconnectHandler();
4539 return _body.get().getConnectionInfo();
4552 _body.get().setBookmarkStore(bookmarkStore_);
4560 return _body.
get().getBookmarkStore();
4568 return _body.get().getSubscriptionManager();
4580 _body.get().setSubscriptionManager(subscriptionManager_);
4604 _body.get().setPublishStore(publishStore_);
4612 return _body.
get().getPublishStore();
4620 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4621 duplicateMessageHandler_);
4635 return _body.get().getDuplicateMessageHandler();
4649 _body.get().setFailedWriteHandler(handler_);
4657 return _body.get().getFailedWriteHandler();
4678 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
4680 return _body.get().publish(topic_.c_str(), topic_.length(),
4681 data_.c_str(), data_.length());
4703 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4704 const char* data_,
size_t dataLength_)
4706 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4727 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
4728 unsigned long expiration_)
4730 return _body.get().publish(topic_.c_str(), topic_.length(),
4731 data_.c_str(), data_.length(), expiration_);
4754 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4755 const char* data_,
size_t dataLength_,
4756 unsigned long expiration_)
4758 return _body.get().publish(topic_, topicLength_,
4759 data_, dataLength_, expiration_);
4800 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
4802 _body.get().publishFlush(timeout_, ackType_);
4821 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
4823 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4824 data_.c_str(), data_.length());
4845 const char* data_,
size_t dataLength_)
4847 return _body.get().deltaPublish(topic_, topicLength_,
4848 data_, dataLength_);
4867 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
4868 unsigned long expiration_)
4870 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4871 data_.c_str(), data_.length(),
4894 const char* data_,
size_t dataLength_,
4895 unsigned long expiration_)
4897 return _body.get().deltaPublish(topic_, topicLength_,
4898 data_, dataLength_, expiration_);
4917 const char* options_ = NULL)
4919 return _body.get().logon(timeout_, authenticator_, options_);
4933 std::string
logon(
const char* options_,
int timeout_ = 0)
4951 std::string
logon(
const std::string& options_,
int timeout_ = 0)
4977 const std::string& topic_,
4979 const std::string& filter_=
"",
4980 const std::string& options_ =
"",
4981 const std::string& subId_ =
"")
4983 return _body.get().subscribe(messageHandler_, topic_, timeout_,
4984 filter_,
"", options_, subId_);
5003 long timeout_=0,
const std::string& filter_=
"",
5004 const std::string& options_ =
"",
5005 const std::string& subId_ =
"")
5008 if (_body.get().getDefaultMaxDepth())
5009 result.
maxDepth(_body.get().getDefaultMaxDepth());
5010 result.setSubscription(_body.get().subscribe(
5012 topic_, timeout_, filter_,
"",
5013 options_, subId_,
false));
5033 long timeout_ = 0,
const std::string& filter_ =
"",
5034 const std::string& options_ =
"",
5035 const std::string& subId_ =
"")
5038 if (_body.get().getDefaultMaxDepth())
5039 result.
maxDepth(_body.get().getDefaultMaxDepth());
5040 result.setSubscription(_body.get().subscribe(
5042 topic_, timeout_, filter_,
"",
5043 options_, subId_,
false));
5060 const std::string& topic_,
5062 const std::string& filter_=
"",
5063 const std::string& options_ =
"",
5064 const std::string& subId_ =
"")
5066 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5067 filter_,
"", options_, subId_);
5078 long timeout_,
const std::string& filter_=
"",
5079 const std::string& options_ =
"",
5080 const std::string& subId_ =
"")
5083 if (_body.get().getDefaultMaxDepth())
5084 result.
maxDepth(_body.get().getDefaultMaxDepth());
5085 result.setSubscription(_body.get().deltaSubscribe(
5087 topic_, timeout_, filter_,
"",
5088 options_, subId_,
false));
5094 long timeout_,
const std::string& filter_ =
"",
5095 const std::string& options_ =
"",
5096 const std::string& subId_ =
"")
5099 if (_body.get().getDefaultMaxDepth())
5100 result.
maxDepth(_body.get().getDefaultMaxDepth());
5101 result.setSubscription(_body.get().deltaSubscribe(
5103 topic_, timeout_, filter_,
"",
5104 options_, subId_,
false));
5134 const std::string& topic_,
5136 const std::string& bookmark_,
5137 const std::string& filter_=
"",
5138 const std::string& options_ =
"",
5139 const std::string& subId_ =
"")
5141 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5142 filter_, bookmark_, options_, subId_);
5163 const std::string& bookmark_,
5164 const std::string& filter_=
"",
5165 const std::string& options_ =
"",
5166 const std::string& subId_ =
"")
5169 if (_body.get().getDefaultMaxDepth())
5170 result.
maxDepth(_body.get().getDefaultMaxDepth());
5171 result.setSubscription(_body.get().subscribe(
5173 topic_, timeout_, filter_,
5174 bookmark_, options_,
5182 const std::string& bookmark_,
5183 const std::string& filter_ =
"",
5184 const std::string& options_ =
"",
5185 const std::string& subId_ =
"")
5188 if (_body.get().getDefaultMaxDepth())
5189 result.
maxDepth(_body.get().getDefaultMaxDepth());
5190 result.setSubscription(_body.get().subscribe(
5192 topic_, timeout_, filter_,
5193 bookmark_, options_,
5208 return _body.get().unsubscribe(commandId);
5220 return _body.get().unsubscribe();
5254 const std::string& topic_,
5255 const std::string& filter_ =
"",
5256 const std::string& orderBy_ =
"",
5257 const std::string& bookmark_ =
"",
5258 int batchSize_ = DEFAULT_BATCH_SIZE,
5259 int topN_ = DEFAULT_TOP_N,
5260 const std::string& options_ =
"",
5261 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5263 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5264 bookmark_, batchSize_, topN_, options_,
5292 const std::string& filter_ =
"",
5293 const std::string& orderBy_ =
"",
5294 const std::string& bookmark_ =
"",
5295 int batchSize_ = DEFAULT_BATCH_SIZE,
5296 int topN_ = DEFAULT_TOP_N,
5297 const std::string& options_ =
"",
5298 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5301 if (_body.get().getDefaultMaxDepth())
5302 result.
maxDepth(_body.get().getDefaultMaxDepth());
5303 result.setSOWOnly(sow(result.operator
MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5309 const std::string& filter_ =
"",
5310 const std::string& orderBy_ =
"",
5311 const std::string& bookmark_ =
"",
5312 int batchSize_ = DEFAULT_BATCH_SIZE,
5313 int topN_ = DEFAULT_TOP_N,
5314 const std::string& options_ =
"",
5315 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5318 if (_body.get().getDefaultMaxDepth())
5319 result.
maxDepth(_body.get().getDefaultMaxDepth());
5320 result.setSOWOnly(sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5346 const std::string& topic_,
5348 const std::string& filter_ =
"",
5349 int batchSize_ = DEFAULT_BATCH_SIZE,
5350 int topN_ = DEFAULT_TOP_N)
5352 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5378 const std::string& topic_,
5380 const std::string& filter_ =
"",
5381 int batchSize_ = DEFAULT_BATCH_SIZE,
5382 bool oofEnabled_ =
false,
5383 int topN_ = DEFAULT_TOP_N)
5385 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5386 filter_, batchSize_, oofEnabled_,
5411 const std::string& filter_ =
"",
5412 int batchSize_ = DEFAULT_BATCH_SIZE,
5413 bool oofEnabled_ =
false,
5414 int topN_ = DEFAULT_TOP_N)
5417 if (_body.get().getDefaultMaxDepth())
5418 result.
maxDepth(_body.get().getDefaultMaxDepth());
5419 result.setSubscription(_body.get().sowAndSubscribe(
5421 topic_, timeout_, filter_,
5422 batchSize_, oofEnabled_,
5447 const std::string& filter_ =
"",
5448 int batchSize_ = DEFAULT_BATCH_SIZE,
5449 bool oofEnabled_ =
false,
5450 int topN_ = DEFAULT_TOP_N)
5453 if (_body.get().getDefaultMaxDepth())
5454 result.
maxDepth(_body.get().getDefaultMaxDepth());
5455 result.setSubscription(_body.get().sowAndSubscribe(
5457 topic_, timeout_, filter_,
5458 batchSize_, oofEnabled_,
5492 const std::string& topic_,
5493 const std::string& filter_ =
"",
5494 const std::string& orderBy_ =
"",
5495 const std::string& bookmark_ =
"",
5496 int batchSize_ = DEFAULT_BATCH_SIZE,
5497 int topN_ = DEFAULT_TOP_N,
5498 const std::string& options_ =
"",
5499 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5501 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5502 orderBy_, bookmark_, batchSize_,
5503 topN_, options_, timeout_);
5531 const std::string& filter_ =
"",
5532 const std::string& orderBy_ =
"",
5533 const std::string& bookmark_ =
"",
5534 int batchSize_ = DEFAULT_BATCH_SIZE,
5535 int topN_ = DEFAULT_TOP_N,
5536 const std::string& options_ =
"",
5537 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5540 if (_body.get().getDefaultMaxDepth())
5541 result.
maxDepth(_body.get().getDefaultMaxDepth());
5542 result.setSubscription(_body.get().sowAndSubscribe(
5544 topic_, filter_, orderBy_,
5545 bookmark_, batchSize_, topN_,
5546 options_, timeout_,
false));
5552 const std::string& filter_ =
"",
5553 const std::string& orderBy_ =
"",
5554 const std::string& bookmark_ =
"",
5555 int batchSize_ = DEFAULT_BATCH_SIZE,
5556 int topN_ = DEFAULT_TOP_N,
5557 const std::string& options_ =
"",
5558 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5561 if (_body.get().getDefaultMaxDepth())
5562 result.
maxDepth(_body.get().getDefaultMaxDepth());
5563 result.setSubscription(_body.get().sowAndSubscribe(
5565 topic_, filter_, orderBy_,
5566 bookmark_, batchSize_, topN_,
5567 options_, timeout_,
false));
5596 const std::string& topic_,
5597 const std::string& filter_ =
"",
5598 const std::string& orderBy_ =
"",
5599 int batchSize_ = DEFAULT_BATCH_SIZE,
5600 int topN_ = DEFAULT_TOP_N,
5601 const std::string& options_ =
"",
5602 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5604 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5605 filter_, orderBy_, batchSize_,
5606 topN_, options_, timeout_);
5629 const std::string& filter_ =
"",
5630 const std::string& orderBy_ =
"",
5631 int batchSize_ = DEFAULT_BATCH_SIZE,
5632 int topN_ = DEFAULT_TOP_N,
5633 const std::string& options_ =
"",
5634 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5637 if (_body.get().getDefaultMaxDepth())
5638 result.
maxDepth(_body.get().getDefaultMaxDepth());
5639 result.setSubscription(sowAndDeltaSubscribe(result.operator
MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5640 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5642 topic_, filter_, orderBy_,
5643 batchSize_, topN_, options_,
5650 const std::string& filter_ =
"",
5651 const std::string& orderBy_ =
"",
5652 int batchSize_ = DEFAULT_BATCH_SIZE,
5653 int topN_ = DEFAULT_TOP_N,
5654 const std::string& options_ =
"",
5655 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5658 if (_body.get().getDefaultMaxDepth())
5659 result.
maxDepth(_body.get().getDefaultMaxDepth());
5660 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5662 topic_, filter_, orderBy_,
5663 batchSize_, topN_, options_,
5693 const std::string& topic_,
5695 const std::string& filter_ =
"",
5696 int batchSize_ = DEFAULT_BATCH_SIZE,
5697 bool oofEnabled_ =
false,
5698 bool sendEmpties_ =
false,
5699 int topN_ = DEFAULT_TOP_N)
5701 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5702 timeout_, filter_, batchSize_,
5703 oofEnabled_, sendEmpties_,
5730 const std::string& filter_ =
"",
5731 int batchSize_ = DEFAULT_BATCH_SIZE,
5732 bool oofEnabled_ =
false,
5733 bool sendEmpties_ =
false,
5734 int topN_ = DEFAULT_TOP_N)
5737 if (_body.get().getDefaultMaxDepth())
5738 result.
maxDepth(_body.get().getDefaultMaxDepth());
5739 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5741 topic_, timeout_, filter_,
5742 batchSize_, oofEnabled_,
5743 sendEmpties_, topN_,
false));
5769 const std::string& filter_ =
"",
5770 int batchSize_ = DEFAULT_BATCH_SIZE,
5771 bool oofEnabled_ =
false,
5772 bool sendEmpties_ =
false,
5773 int topN_ = DEFAULT_TOP_N)
5776 if (_body.get().getDefaultMaxDepth())
5777 result.
maxDepth(_body.get().getDefaultMaxDepth());
5778 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5780 topic_, timeout_, filter_,
5781 batchSize_, oofEnabled_,
5782 sendEmpties_, topN_,
false));
5805 const std::string& topic,
5806 const std::string& filter,
5809 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5837 stream.setStatsOnly(cid);
5838 _body.get().sowDelete(stream.operator
MessageHandler(),topic,filter,timeout,cid);
5839 return *(stream.
begin());
5841 catch (
const DisconnectedException&)
5843 removeMessageHandler(cid);
5854 _body.get().startTimer();
5865 return _body.get().stopTimer(messageHandler);
5890 const std::string& topic_,
5891 const std::string& keys_,
5894 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5926 stream.setStatsOnly(cid);
5927 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(),topic_,keys_,timeout_,cid);
5928 return *(stream.
begin());
5930 catch (
const DisconnectedException&)
5932 removeMessageHandler(cid);
5952 const std::string& topic_,
const std::string& data_,
5955 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5982 stream.setStatsOnly(cid);
5983 _body.get().sowDeleteByData(stream.operator
MessageHandler(),topic_,data_,timeout_,cid);
5984 return *(stream.
begin());
5986 catch (
const DisconnectedException&)
5988 removeMessageHandler(cid);
5998 return _body.get().getHandle();
6011 _body.get().setExceptionListener(pListener_);
6024 _body.get().setExceptionListener(listener_);
6031 return _body.get().getExceptionListener();
6057 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6081 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6087 setLastChanceMessageHandler(messageHandler);
6094 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6120 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6145 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6207 _body.get().addConnectionStateListener(listener);
6215 _body.get().removeConnectionStateListener(listener);
6222 _body.get().clearConnectionStateListeners();
6252 return _body.get().executeAsync(command_, handler_);
6290 if (command_.isSubscribe())
6292 Message& message = command_.getMessage();
6295 if(useExistingHandler)
6298 if (_body.get()._routes.getRoute(subId, existingHandler))
6301 _body.get().executeAsync(command_, existingHandler,
false);
6306 id = _body.get().executeAsync(command_, handler_,
false);
6308 catch (
const DisconnectedException&)
6310 removeMessageHandler(command_.getMessage().
getCommandId());
6311 if (command_.isSubscribe())
6315 if (command_.isSow())
6317 removeMessageHandler(command_.getMessage().
getQueryID());
6348 _body.get().ack(topic_,bookmark_,options_);
6370 void ack(
const std::string& topic_,
const std::string& bookmark_,
6371 const char* options_ = NULL)
6373 _body.get().ack(
Field(topic_.data(),topic_.length()),
Field(bookmark_.data(),bookmark_.length()),options_);
6381 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
6383 _body.get()._ack(topic_,bookmark_,options_);
6396 _body.get().flushAcks();
6405 return _body.get().getAutoAck();
6415 _body.get().setAutoAck(isAutoAckEnabled_);
6423 return _body.get().getAckBatchSize();
6433 _body.get().setAckBatchSize(ackBatchSize_);
6444 return _body.get().getAckTimeout();
6454 _body.get().setAckTimeout(ackTimeout_);
6468 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6477 return _body.get().getRetryOnDisconnect();
6486 _body.get().setDefaultMaxDepth(maxDepth_);
6495 return _body.get().getDefaultMaxDepth();
6507 return _body.get().setTransportFilterFunction(filter_, userData_);
6521 return _body.get().setThreadCreatedCallback(callback_, userData_);
6529 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
6531 _body.get().deferredExecution(func_,userData_);
6541 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6547 unsigned deliveries = 0;
6559 const char* data = NULL;
6561 const char* status = NULL;
6562 size_t statusLen = 0;
6564 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6567 &status, &statusLen);
6568 if (len == NotEntitled || len == Duplicate ||
6569 (statusLen == Failure && status[0] ==
'f'))
6571 if (_failedWriteHandler)
6573 if (_publishStore.isValid())
6575 amps_uint64_t sequence =
6578 FailedWriteStoreReplayer replayer(
this, data, len);
6579 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
6580 replayer, sequence));
6586 AMPS_CALL_EXCEPTION_WRAPPER(
6587 _failedWriteHandler->failedWrite(emptyMessage,
6593 if (_publishStore.isValid())
6602 _publishStore.discardUpTo(seq);
6606 if (!deliveries && _bookmarkStore.isValid())
6613 const char* bookmarkData = NULL;
6614 size_t bookmarkLen = 0;
6616 &bookmarkData, &bookmarkLen);
6618 if (bookmarkLen > 0 && _routes.hasRoute(subId))
6621 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
6626 catch (std::exception& ex)
6628 AMPS_UNHANDLED_EXCEPTION(ex);
6634 ClientImpl::processedAck(
Message &message)
6636 unsigned deliveries = 0;
6638 const char* data = NULL;
6642 Lock<Mutex> l(_lock);
6645 Lock<Mutex> guard(_ackMapLock);
6646 AckMap::iterator i = _ackMap.find(std::string(data, len));
6647 if (i != _ackMap.end())
6658 ack.setStatus(data, len);
6661 ack.setReason(data, len);
6664 ack.setUsername(data, len);
6667 ack.setPassword(data, len);
6670 ack.setSequenceNo(data, len);
6673 ack.setServerVersion(data, len);
6675 ack.setOptions(data,len);
6677 ack.setBookmark(data,len);
6678 ack.setResponded(
true);
6685 ClientImpl::checkAndSendHeartbeat(
bool force)
6687 if (force || _heartbeatTimer.check())
6689 _heartbeatTimer.start();
6692 sendWithoutRetry(_beatMessage);
6694 catch (
const AMPSException&)
6701 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 6703 ConnectionInfo info;
6704 std::ostringstream writer;
6706 info[
"client.uri"] = _lastUri;
6707 info[
"client.name"] = _name;
6708 info[
"client.username"] = _username;
6709 if(_publishStore.isValid())
6711 writer << _publishStore.unpersistedCount();
6712 info[
"publishStore.unpersistedCount"] = writer.str();
6721 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
6723 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6724 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6725 ClientImpl* me = (ClientImpl*) userData_;
6726 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->processDeferredExecutions());
6729 if(me->_queueAckTimeout) me->checkQueueAcks();
6733 me->_readMessage.replace(messageHandle_);
6734 Message& message = me->_readMessage;
6736 if (commandType & SOWMask)
6738 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6742 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6743 me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6745 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6748 else if (commandType & PublishMask)
6750 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6751 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6752 me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6753 GlobalCommandTypeHandlers::Publish :
6754 GlobalCommandTypeHandlers::OOF)].invoke(message));
6756 const char* subIds = NULL;
6757 size_t subIdsLen = 0;
6760 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
6761 for(
size_t i=0; i<subIdCount; ++i)
6763 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6765 if (handler.isValid())
6768 subIds + lookupResult.idOffset, lookupResult.idLength);
6771 bool isAutoAck = me->_isAutoAckEnabled;
6773 if (!isMessageQueue && !bookmark.
empty() &&
6774 me->_bookmarkStore.isValid())
6776 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6779 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6781 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
6786 me->_bookmarkStore.log(me->_readMessage);
6787 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6788 handler.invoke(message));
6793 if(isMessageQueue && isAutoAck)
6797 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6798 if (!message.getIgnoreAutoAck())
6800 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6804 catch(std::exception& ex)
6806 if (!message.getIgnoreAutoAck())
6808 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6811 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6816 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6817 handler.invoke(message));
6821 else me->lastChance(message);
6824 else if (commandType == Message::Command::Ack)
6826 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6827 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6829 unsigned deliveries = 0U;
6832 case Message::AckType::Persisted:
6833 deliveries += me->persistedAck(message);
6835 case Message::AckType::Processed:
6836 deliveries += me->processedAck(message);
6839 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6840 if (deliveries == 0)
6842 me->lastChance(message);
6845 else if (commandType == Message::Command::Heartbeat)
6847 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6848 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6849 if(me->_heartbeatTimer.getTimeout() != 0.0)
6851 me->checkAndSendHeartbeat(
true);
6855 me->lastChance(message);
6861 unsigned deliveries = 0U;
6864 while(me->_connected)
6868 deliveries = me->_routes.deliverData(message, message.
getCommandId());
6872 catch(MessageStreamFullException&)
6874 catch(MessageStreamFullException& ex_)
6877 me->checkAndSendHeartbeat(
false);
6881 catch (std::exception& ex_)
6885 me->_exceptionListener->exceptionThrown(ex_);
6892 if (deliveries == 0)
6893 me->lastChance(message);
6895 me->checkAndSendHeartbeat();
6900 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
6902 ClientImpl* me = (ClientImpl*) userData;
6905 me->clearAcks(failedConnectionVersion);
6909 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
6911 ClientImpl* me = (ClientImpl*) userData;
6912 Lock<Mutex> l(me->_lock);
6913 Client wrapper(me,
false);
6915 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6918 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6921 AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6922 me->_connected =
false;
6926 Unlock<Mutex> unlock(me->_lock);
6927 me->_disconnectHandler.invoke(wrapper);
6930 catch(
const std::exception& ex)
6932 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6935 if (!me->_connected)
6937 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6938 AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException(
"Reconnect failed."));
6944 if (me->_subscriptionManager)
6949 Unlock<Mutex> unlock(me->_lock);
6950 me->_subscriptionManager->resubscribe(wrapper);
6952 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6956 catch(
const AMPSException& subEx)
6958 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6960 catch(
const std::exception& subEx)
6962 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6985 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
6986 : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6988 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6991 typedef void* difference_type;
6992 typedef std::forward_iterator_tag iterator_category;
6993 typedef std::pair<Message::Field, Message::Field> value_type;
6994 typedef value_type* pointer;
6995 typedef value_type& reference;
6996 bool operator==(
const iterator& rhs)
const 6998 return _pos == rhs._pos;
7000 bool operator!=(
const iterator& rhs)
const 7002 return _pos != rhs._pos;
7004 iterator& operator++()
7007 while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
7009 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
7013 value_type operator*()
const 7016 size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
7017 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
7019 result.first.assign(_data+_pos, keyLength);
7021 if (i < _len && _data[i] ==
'=')
7025 for(; i < _len && _data[i] != _fieldSep; ++i)
7030 result.second.assign(_data+valueStart, valueLength);
7036 class reverse_iterator
7043 typedef std::pair<Message::Field, Message::Field> value_type;
7044 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7045 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7050 while(_pos >=_data && *_pos == _fieldSep) --_pos;
7051 while(_pos > _data && *_pos != _fieldSep) --_pos;
7055 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7056 if (_pos < _data) _pos = 0;
7059 bool operator==(
const reverse_iterator& rhs)
const 7061 return _pos == rhs._pos;
7063 bool operator!=(
const reverse_iterator& rhs)
const 7065 return _pos != rhs._pos;
7067 reverse_iterator& operator++()
7078 while(_pos >=_data && *_pos == _fieldSep) --_pos;
7080 while(_pos >_data && *_pos != _fieldSep) --_pos;
7081 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7082 if (_pos < _data) _pos = 0;
7086 value_type operator*()
const 7089 size_t keyLength = 0, valueStart = 0, valueLength = 0;
7090 size_t i = (size_t)(_pos - _data);
7091 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
7092 result.first.assign(_pos, keyLength);
7093 if (i<_len && _data[i] ==
'=')
7097 for(; i<_len && _data[i] != _fieldSep; ++i)
7102 result.second.assign(_data+valueStart, valueLength);
7107 : _data(data.
data()), _len(data.
len()),
7108 _fieldSep(fieldSeparator)
7112 FIX(
const char* data,
size_t len,
char fieldSeparator=1)
7113 : _data(data), _len(len), _fieldSep(fieldSeparator)
7117 iterator begin()
const 7119 return iterator(_data, _len, 0, _fieldSep);
7121 iterator end()
const 7123 return iterator(_data, _len, _len, _fieldSep);
7127 reverse_iterator rbegin()
const 7129 return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
7132 reverse_iterator rend()
const 7134 return reverse_iterator(_data, _len, 0, _fieldSep);
7155 std::stringstream _data;
7172 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
7175 _data.write(value+offset, (std::streamsize)length);
7183 void append(
const T& tag,
const std::string& value)
7185 _data << tag <<
'=' << value << _fs;
7194 operator std::string()
const 7202 _data.str(std::string());
7239 typedef std::map<Message::Field, Message::Field>
map_type;
7250 for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7262 std::deque<Message> _q;
7263 std::string _commandId;
7265 std::string _queryId;
7269 unsigned _requestedAcks;
7272 volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7273 typedef std::map<std::string, Message*> SOWKeyMap;
7274 SOWKeyMap _sowKeyMap;
7276 MessageStreamImpl(
const Client& client_)
7279 _maxDepth((
unsigned)~0),
7283 if (_client.isValid())
7289 MessageStreamImpl(ClientImpl* client_)
7292 _maxDepth((
unsigned)~0),
7296 if (_client.isValid())
7302 ~MessageStreamImpl()
7306 virtual void destroy()
7312 catch(std::exception &e)
7316 if (_client.isValid())
7322 if (_client.isValid())
7326 _client = Client((ClientImpl*)NULL);
7327 c.deferredExecution(MessageStreamImpl::destroyer,
this);
7335 static void destroyer(
void* vpMessageStreamImpl_)
7337 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7340 void setSubscription(
const std::string& subId_,
7341 const std::string& commandId_ =
"",
7342 const std::string& queryId_ =
"")
7344 Lock<Mutex> lock(_lock);
7346 if (!commandId_.empty() && commandId_ != subId_)
7347 _commandId = commandId_;
7348 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
7349 _queryId = queryId_;
7351 if (Disconnected == _state)
return;
7352 assert(Unset==_state);
7356 void setSOWOnly(
const std::string& commandId_,
7357 const std::string& queryId_ =
"")
7359 Lock<Mutex> lock(_lock);
7360 _commandId = commandId_;
7361 if (!queryId_.empty() && queryId_ != commandId_)
7362 _queryId = queryId_;
7364 if (Disconnected == _state)
return;
7365 assert(Unset==_state);
7369 void setStatsOnly(
const std::string& commandId_,
7370 const std::string& queryId_ =
"")
7372 Lock<Mutex> lock(_lock);
7373 _commandId = commandId_;
7374 if (!queryId_.empty() && queryId_ != commandId_)
7375 _queryId = queryId_;
7377 if (Disconnected == _state)
return;
7378 assert(Unset==_state);
7380 _requestedAcks = Message::AckType::Stats;
7383 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
7385 Lock<Mutex> lock(_lock);
7386 _commandId = commandId_;
7388 if (Disconnected == _state)
return;
7389 assert(Unset==_state);
7391 _requestedAcks = acks_;
7396 Lock<Mutex> lock(_lock);
7397 if(state_ == AMPS::ConnectionStateListener::Disconnected)
7399 _state = Disconnected;
7405 void timeout(
unsigned timeout_)
7407 _timeout = timeout_;
7411 if(_state == Subscribe) _state = Conflate;
7413 void maxDepth(
unsigned maxDepth_)
7415 if(maxDepth_) _maxDepth = maxDepth_;
7416 else _maxDepth = (unsigned)~0;
7418 unsigned getMaxDepth(
void)
const 7422 unsigned getDepth(
void)
const 7424 return (
unsigned)(_q.size());
7429 Lock<Mutex> lock(_lock);
7430 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
7434 if (_client.isValid())
7436 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7440 catch (AMPSException&)
7442 catch (AMPSException& e)
7445 current_.invalidate();
7446 _previousTopic.
clear();
7447 _previousBookmark.
clear();
7450 _previousTopic.
clear();
7451 _previousBookmark.
clear();
7453 double minWaitTime = (double)((_timeout && _timeout > 1000)
7455 Timer timer(minWaitTime);
7457 while(_q.empty() && _state & Running)
7460 _lock.wait((
long)minWaitTime);
7462 Unlock<Mutex> unlck(_lock);
7463 amps_invoke_waiting_function();
7468 if(timer.checkAndGetRemaining(&minWaitTime))
7476 current_ = _q.front();
7477 if(_q.size() == _maxDepth) _lock.signalAll();
7479 if(_state == Conflate)
7481 std::string sowKey = current_.
getSowKey();
7482 if(sowKey.length()) _sowKeyMap.erase(sowKey);
7484 else if(_state == AcksOnly)
7488 if((_state == AcksOnly && _requestedAcks == 0) ||
7489 (_state == SOWOnly && current_.
getCommand()==
"group_end"))
7493 else if (current_.
getCommandEnum() == Message::Command::Publish &&
7503 if(_state == Disconnected)
7505 throw DisconnectedException(
"Connection closed.");
7507 current_.invalidate();
7508 if(_state == Closed)
7512 return _timeout != 0;
7516 if (_client.isValid())
7518 if (_state == SOWOnly || _state == Subscribe)
7520 if (!_commandId.empty()) _client.
unsubscribe(_commandId);
7522 if (!_queryId.empty()) _client.
unsubscribe(_queryId);
7531 if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7536 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
7538 Lock<Mutex> lock(this_->_lock);
7539 if(this_->_state != Conflate)
7541 AMPS_TESTING_SLOW_MESSAGE_STREAM
7542 if(this_->_q.size() >= this_->_maxDepth)
7547 this_->_lock.signalAll();
7548 throw MessageStreamFullException(
"Stream is currently full.");
7550 this_->_q.push_back(message_.
deepCopy());
7552 this_->_client.isValid() && this_->_client.getAutoAck() &&
7556 message_.setIgnoreAutoAck();
7561 std::string sowKey = message_.
getSowKey();
7564 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7565 if(it != this_->_sowKeyMap.end())
7567 *(it->second) = message_.
deepCopy();
7571 if(this_->_q.size() >= this_->_maxDepth)
7577 this_->_lock.signalAll();
7578 throw MessageStreamFullException(
"Stream is currently full.");
7580 this_->_q.push_back(message_.
deepCopy());
7581 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7586 while(this_->_q.size() >= this_->_maxDepth)
7588 this_->_lock.wait(1);
7590 this_->_q.push_back(message_.
deepCopy());
7593 this_->_lock.signalAll();
7596 inline MessageStream::MessageStream(
void)
7599 inline MessageStream::MessageStream(
const Client& client_)
7600 :_body(
new MessageStreamImpl(client_))
7603 inline void MessageStream::iterator::advance(
void)
7605 _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7609 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
7614 if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7616 result._body = (MessageStreamImpl*)(handler_._userData);
7621 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
7622 const std::string& queryId_)
7624 _body->setSOWOnly(commandId_, queryId_);
7626 inline void MessageStream::setSubscription(
const std::string& subId_,
7627 const std::string& commandId_,
7628 const std::string& queryId_)
7630 _body->setSubscription(subId_, commandId_, queryId_);
7632 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
7633 const std::string& queryId_)
7635 _body->setStatsOnly(commandId_, queryId_);
7637 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
7640 _body->setAcksOnly(commandId_, acks_);
7659 return _body->getMaxDepth();
7663 return _body->getDepth();
7666 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
7668 return *(_pEmptyMessageStream.get());
7676 ClientImpl& body = _body.get();
7677 Message& message = command_.getMessage();
7681 if(useExistingHandler)
7687 if (body._routes.getRoute(subId, existingHandler))
7690 body.executeAsync(command_, existingHandler,
false);
7691 return MessageStream::fromExistingHandler(existingHandler);
7698 if ((command & Message::Command::NoDataCommands)
7699 && (ackTypes == Message::AckType::Persisted
7700 || ackTypes == Message::AckType::None))
7703 if (!body._pEmptyMessageStream)
7705 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
7706 body._pEmptyMessageStream.get()->_body->close();
7708 return body.getEmptyMessageStream();
7711 if (body.getDefaultMaxDepth())
7712 stream.
maxDepth(body.getDefaultMaxDepth());
7714 std::string commandID = body.executeAsync(command_, handler,
false);
7715 if (command_.hasStatsAck())
7717 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
7719 else if (command_.isSow())
7721 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
7723 else if (command_.isSubscribe())
7725 stream.setSubscription(commandID,
7732 if (command == Message::Command::Publish ||
7733 command == Message::Command::DeltaPublish ||
7734 command == Message::Command::SOWDelete)
7736 stream.setAcksOnly(commandID,
7737 ackTypes & (
unsigned)~Message::AckType::Persisted);
7741 stream.setAcksOnly(commandID, ackTypes);
7748 inline void Message::ack(
const char* options_)
const 7750 ClientImpl* pClient = _body.get().clientImpl();
7752 if(pClient && bookmark.
len() &&
7753 !pClient->getAutoAck())
7755 pClient->ack(getTopic(),bookmark,options_);
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:559
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4274
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1261
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5889
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5863
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:512
std::string getAckType() const
Definition: ampsplusplus.hpp:638
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4478
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7153
void startTimer()
Definition: ampsplusplus.hpp:5852
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5409
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:761
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:610
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7647
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1230
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4506
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:486
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6143
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6484
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4383
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5206
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:546
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1223
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:733
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6493
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6358
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5161
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:540
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4602
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4867
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4398
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1142
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4618
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6213
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6442
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4369
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5916
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:951
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7190
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6504
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4754
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4181
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1145
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6159
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6166
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4454
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1232
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5133
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:813
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6466
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7657
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4429
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1045
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5595
Success.
Definition: amps.h:189
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1128
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:707
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4844
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7235
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5253
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4467
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4177
amps_result
Return values from amps_xxx functions.
Definition: amps.h:184
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4655
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:973
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7671
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:601
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4610
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6518
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:643
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4358
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:608
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1079
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:580
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4915
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:884
const std::string & getNameHash() const
Returns the name hash of this client as generated by the server and returned when the client logged o...
Definition: ampsplusplus.hpp:4327
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:628
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5551
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4257
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6118
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6153
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1138
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:6205
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:501
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6220
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1230
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4703
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1257
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4566
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4976
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:493
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1148
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6029
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:724
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1075
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:538
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5951
amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1225
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:597
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:542
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:905
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:719
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5077
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6173
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4933
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:590
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1139
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1232
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:701
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4338
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7163
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6370
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:714
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:755
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5445
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:838
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7172
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:965
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4558
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:922
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6431
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1224
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1259
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:993
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5827
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1233
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4550
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:536
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6022
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:509
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4578
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6085
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7183
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:6421
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4821
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5530
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:564
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4148
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5767
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:914
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4800
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6250
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5728
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4951
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:566
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5218
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:6413
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5093
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:875
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5692
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6009
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:979
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4633
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4528
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7227
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4537
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1172
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6346
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7239
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4520
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:859
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7246
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:202
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5628
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6079
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:544
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5032
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4140
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6189
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1138
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6197
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:627
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:943
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7652
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:592
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1260
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4436
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5291
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:618
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:576
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7661
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
The operation has not succeeded, but ought to be retried.
Definition: amps.h:213
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4345
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6055
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4405
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:562
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:893
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6181
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:552
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5996
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7200
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4647
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5649
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6452
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:534
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1224
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:6403
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4678
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4312
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:670
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:935
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1021
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:817
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5308
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4727
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6092
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4893
The client and server are disconnected.
Definition: amps.h:217
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5972
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5059
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7642
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5345
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4319
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:407
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5491
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1033
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5002
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5180
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:521
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6284
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5804
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4192
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6475
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:6394
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5377