25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 46 #include <sys/atomic.h> 48 #include "BookmarkStore.hpp" 49 #include "MessageRouter.hpp" 51 #include "ampscrc.hpp" 53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 88 #define AMPS_DEFAULT_TOP_N -1 89 #define AMPS_DEFAULT_BATCH_SIZE 10 90 #define AMPS_NUMBER_BUFFER_LEN 20 91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 98 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
106 typedef std::map<std::string, std::string> ConnectionInfo;
108 class PerThreadMessageTracker {
109 std::vector<AMPS::Message*> _messages;
111 PerThreadMessageTracker() {}
112 ~PerThreadMessageTracker()
114 for (
size_t i=0; i<_messages.size(); ++i)
121 _messages.push_back(message);
125 static AMPS::Mutex _lock;
126 AMPS::Lock<Mutex> l(_lock);
127 _addMessageToCleanupList(message);
131 static PerThreadMessageTracker tracker;
132 tracker.addMessage(message);
137 inline std::string asString(Type x_)
139 std::ostringstream os;
145 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
147 size_t pos = AMPS_NUMBER_BUFFER_LEN;
148 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
152 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
161 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
163 size_t pos = AMPS_NUMBER_BUFFER_LEN;
164 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
168 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
182 static const char* duplicate() {
return "duplicate";}
183 static const char* badFilter() {
return "bad filter";}
184 static const char* badRegexTopic() {
return "bad regex topic";}
185 static const char* subscriptionAlreadyExists() {
return "subscription already exists";}
186 static const char* nameInUse() {
return "name in use";}
187 static const char* authFailure() {
return "auth failure";}
188 static const char* notEntitled() {
return "not entitled";}
189 static const char* authDisabled() {
return "authentication disabled";}
190 static const char* subidInUse() {
return "subid in use";}
191 static const char* noTopic() {
return "no topic";}
206 virtual void exceptionThrown(
const std::exception&)
const {;}
212 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 217 catch (std::exception& ex_)\ 221 _exceptionListener->exceptionThrown(ex_);\ 246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 249 while(me->_connected)\ 256 catch(MessageStreamFullException&)\ 258 me->checkAndSendHeartbeat(false);\ 262 catch (std::exception& ex_)\ 266 me->_exceptionListener->exceptionThrown(ex_);\ 290 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 291 while(me->_connected)\ 298 catch(MessageStreamFullException&)\ 300 me->checkAndSendHeartbeat(false);\ 304 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 307 while(me->_connected)\ 314 catch(MessageStreamFullException& ex_)\ 316 me->checkAndSendHeartbeat(false);\ 320 catch (std::exception& ex_)\ 324 me->_exceptionListener->exceptionThrown(ex_);\ 348 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 349 while(me->_connected)\ 356 catch(MessageStreamFullException& ex_)\ 358 me->checkAndSendHeartbeat(false);\ 363 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 366 _exceptionListener->exceptionThrown(ex);\ 371 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 374 me->_exceptionListener->exceptionThrown(ex);\ 413 static const unsigned Subscribe = 1;
414 static const unsigned SOW = 2;
415 static const unsigned NeedsSequenceNumber = 4;
416 static const unsigned ProcessedAck = 8;
417 static const unsigned StatsAck = 16;
418 void init(Message::Command::Type command_)
427 void init(
const std::string& command_)
439 if (!(command & Message::Command::NoDataCommands))
442 if (command == Message::Command::Subscribe ||
443 command == Message::Command::SOWAndSubscribe ||
444 command == Message::Command::DeltaSubscribe ||
445 command == Message::Command::SOWAndDeltaSubscribe)
450 if (command == Message::Command::SOW
451 || command == Message::Command::SOWAndSubscribe
452 || command == Message::Command::SOWAndDeltaSubscribe)
457 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
459 if (command == Message::Command::SOW)
464 _flags |= ProcessedAck;
466 else if (command == Message::Command::SOWDelete)
469 _flags |= ProcessedAck;
470 _flags |= NeedsSequenceNumber;
472 else if (command == Message::Command::Publish
473 || command == Message::Command::DeltaPublish)
475 _flags |= NeedsSequenceNumber;
477 else if (command == Message::Command::StopTimer)
568 std::ostringstream os;
613 if (v_ ==
"processed") _flags |= ProcessedAck;
614 else if (v_ ==
"stats") _flags |= StatsAck;
621 if (v_.find(
"processed") != std::string::npos) _flags |= ProcessedAck;
622 else _flags &= ~ProcessedAck;
623 if (v_.find(
"stats") != std::string::npos) _flags |= StatsAck;
624 else _flags &= ~StatsAck;
631 if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
632 else _flags &= ~ProcessedAck;
633 if (v_ & Message::AckType::Stats) _flags |= StatsAck;
634 else _flags &= ~StatsAck;
648 Message& getMessage(
void) {
return _message; }
649 unsigned getTimeout(
void)
const {
return _timeout; }
650 unsigned getBatchSize(
void)
const {
return _batchSize; }
651 bool isSubscribe(
void)
const 653 return _flags & Subscribe;
655 bool isSow(
void)
const {
return (_flags & SOW) != 0; }
656 bool hasProcessedAck(
void)
const {
return (_flags & ProcessedAck) != 0; }
657 bool hasStatsAck(
void)
const {
return (_flags & StatsAck) != 0; }
658 bool needsSequenceNumber(
void)
const {
return (_flags & NeedsSequenceNumber) != 0; }
663 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
680 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
688 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
695 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
707 std::string
authenticate(
const std::string& ,
const std::string& password_)
714 std::string
retry(
const std::string& ,
const std::string& )
716 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
719 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
740 virtual void execute(
Message& message_) = 0;
755 typedef bool (*PublishStoreResizeHandler)(
Store store_,
764 StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
770 virtual amps_uint64_t store(
const Message& message_) = 0;
776 virtual void discardUpTo(amps_uint64_t index_) = 0;
791 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
797 virtual size_t unpersistedCount()
const = 0;
809 virtual void flush(
long timeout_) = 0;
822 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
827 virtual amps_uint64_t getLastPersisted() = 0;
841 _resizeHandler = handler_;
842 _resizeHandlerData = userData_;
847 return _resizeHandler;
850 bool callResizeHandler(
size_t newSize_);
854 void* _resizeHandlerData;
861 RefHandle<StoreImpl> _body;
865 Store(
const Store& rhs) : _body(rhs._body) {;}
877 return _body.get().store(message_);
886 _body.get().discardUpTo(index_);
895 _body.get().replay(replayer_);
907 return _body.get().replaySingle(replayer_, index_);
916 return _body.get().unpersistedCount();
924 return _body.isValid();
937 return _body.get().flush(timeout_);
945 return _body.get().getLowestUnpersisted();
953 return _body.get().getLastPersisted();
968 _body.get().setResizeHandler(handler_, userData_);
973 return _body.get().getResizeHandler();
1003 virtual void failedWrite(
const Message& message_,
1004 const char* reason_,
size_t reasonLength_) = 0;
1008 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1011 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1024 long* timeoutp = (
long*)data_;
1026 if (count == 0)
return false;
1029 store_.
flush(*timeoutp);
1032 catch (
const TimedOutException&)
1034 catch (
const TimedOutException& e)
1057 unsigned requestedAckTypes_) = 0;
1064 virtual void clear() = 0;
1068 virtual void resubscribe(Client& client_) = 0;
1079 typedef enum { Disconnected = 0,
1083 PublishReplayed = 8,
1084 HeartbeatInitiated = 16,
1098 virtual void connectionStateChanged(
State newState_) = 0;
1103 class MessageStreamImpl;
1106 typedef void(*DeferredExecutionFunc)(
void*);
1108 class ClientImpl :
public RefBody
1110 friend class Client;
1113 DisconnectHandler _disconnectHandler;
1114 enum GlobalCommandTypeHandlers :
size_t 1124 DuplicateMessage = 8,
1127 std::vector<MessageHandler> _globalCommandTypeHandlers;
1128 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1130 MessageRouter::RouteCache _routeCache;
1131 mutable Mutex _lock;
1132 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1134 Store _publishStore;
1135 bool _isRetryOnDisconnect;
1136 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1137 volatile amps_uint64_t _lastSentHaSequenceNumber;
1138 ATOMIC_TYPE_8 _badTimeToHAPublish;
1139 ATOMIC_TYPE_8 _badTimeToHASubscribe;
1140 VersionInfo _serverVersion;
1141 Timer _heartbeatTimer;
1142 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1145 int _queueAckTimeout;
1146 bool _isAutoAckEnabled;
1147 unsigned _ackBatchSize;
1148 unsigned _queuedAckCount;
1149 unsigned _defaultMaxDepth;
1150 struct QueueBookmarks
1152 QueueBookmarks(
const std::string& topic_)
1159 amps_uint64_t _oldestTime;
1160 unsigned _bookmarkCount;
1162 typedef amps_uint64_t topic_hash;
1163 typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1164 TopicHashMap _topicHashMap;
1168 ClientImpl* _client;
1173 ClientStoreReplayer()
1174 : _client(NULL) , _version(0), _res(
AMPS_E_OK)
1177 ClientStoreReplayer(ClientImpl* client_)
1178 : _client(client_) , _version(0), _res(
AMPS_E_OK)
1181 void setClient(ClientImpl* client_) { _client = client_; }
1183 void execute(
Message& message_)
1185 if (!_client)
throw CommandException(
"Can't replay without a client.");
1188 if (index > _client->_lastSentHaSequenceNumber)
1189 _client->_lastSentHaSequenceNumber = index;
1196 (!_client->_badTimeToHAPublish ||
1200 message_.getMessage(),
1204 throw DisconnectedException(
"AMPS Server disconnected during replay");
1210 ClientStoreReplayer _replayer;
1214 ClientImpl* _parent;
1215 const char* _reason;
1216 size_t _reasonLength;
1217 size_t _replayCount;
1219 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1222 _reasonLength(reasonLength_),
1225 void execute(
Message& message_)
1227 if (_parent->_failedWriteHandler)
1230 _parent->_failedWriteHandler->failedWrite(message_,
1231 _reason, _reasonLength);
1234 size_t replayCount(
void)
const {
return _replayCount; }
1237 struct AckResponseImpl :
public RefBody
1239 std::string username, password, reason, status, bookmark, options;
1240 amps_uint64_t sequenceNo;
1241 VersionInfo serverVersion;
1242 volatile bool responded, abandoned;
1243 unsigned connectionVersion;
1246 sequenceNo((amps_uint64_t)0),
1250 connectionVersion(0)
1257 RefHandle<AckResponseImpl> _body;
1259 AckResponse() : _body(NULL) {;}
1260 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1261 static AckResponse create()
1264 r._body =
new AckResponseImpl();
1268 const std::string& username()
1270 return _body.get().username;
1272 void setUsername(
const char* data_,
size_t len_)
1274 if (data_) _body.get().username.assign(data_, len_);
1275 else _body.get().username.clear();
1277 const std::string& password()
1279 return _body.get().password;
1281 void setPassword(
const char* data_,
size_t len_)
1283 if (data_) _body.get().password.assign(data_, len_);
1284 else _body.get().password.clear();
1286 const std::string& reason()
1288 return _body.get().reason;
1290 void setReason(
const char* data_,
size_t len_)
1292 if (data_) _body.get().reason.assign(data_, len_);
1293 else _body.get().reason.clear();
1295 const std::string& status()
1297 return _body.get().status;
1299 void setStatus(
const char* data_,
size_t len_)
1301 if (data_) _body.get().status.assign(data_, len_);
1302 else _body.get().status.clear();
1304 const std::string& bookmark()
1306 return _body.get().bookmark;
1308 void setBookmark(
const char* data_,
size_t len_)
1310 if (data_) _body.get().bookmark.assign(data_, len_);
1311 else _body.get().bookmark.clear();
1313 amps_uint64_t sequenceNo()
const 1315 return _body.get().sequenceNo;
1317 void setSequenceNo(
const char* data_,
size_t len_)
1319 amps_uint64_t result = (amps_uint64_t)0;
1322 for(
size_t i=0; i<len_; ++i)
1324 result *= (amps_uint64_t)10;
1325 result += (amps_uint64_t)(data_[i] -
'0');
1328 _body.get().sequenceNo = result;
1330 VersionInfo serverVersion()
const 1332 return _body.get().serverVersion;
1334 void setServerVersion(
const char* data_,
size_t len_)
1337 _body.get().serverVersion.setVersion(std::string(data_, len_));
1341 return _body.get().responded;
1343 void setResponded(
bool responded_)
1345 _body.get().responded = responded_;
1349 return _body.get().abandoned;
1351 void setAbandoned(
bool abandoned_)
1353 if (_body.isValid())
1354 _body.get().abandoned = abandoned_;
1357 void setConnectionVersion(
unsigned connectionVersion)
1359 _body.get().connectionVersion = connectionVersion;
1362 unsigned getConnectionVersion()
1364 return _body.get().connectionVersion;
1366 void setOptions(
const char* data_,
size_t len_)
1368 if (data_) _body.get().options.assign(data_,len_);
1369 else _body.get().options.clear();
1372 const std::string& options()
1374 return _body.get().options;
1377 AckResponse& operator=(
const AckResponse& rhs)
1385 typedef std::map<std::string, AckResponse> AckMap;
1388 DefaultExceptionListener _defaultExceptionListener;
1391 struct DeferredExecutionRequest
1393 DeferredExecutionRequest(DeferredExecutionFunc func_,
1396 _userData(userData_)
1399 DeferredExecutionFunc _func;
1403 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1404 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1406 std::string _username;
1407 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1408 ConnectionStateListeners _connectionStateListeners;
1409 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1410 Mutex _deferredExecutionLock;
1411 DeferredExecutionList _deferredExecutionList;
1412 unsigned _heartbeatInterval;
1413 unsigned _readTimeout;
1421 if (!_connected && newState_ > ConnectionStateListener::Connected)
1425 for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1427 AMPS_CALL_EXCEPTION_WRAPPER(
1428 (*it)->connectionStateChanged(newState_));
1431 unsigned processedAck(
Message& message);
1432 unsigned persistedAck(
Message& meesage);
1433 void lastChance(
Message& message);
1434 void checkAndSendHeartbeat(
bool force=
false);
1435 virtual ConnectionInfo getConnectionInfo()
const;
1437 ClientImplMessageHandler(
amps_handle message,
void* userData);
1439 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1441 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1443 void unsubscribeInternal(
const std::string&
id)
1445 if (
id.empty())
return;
1448 subId.assign(
id.data(),
id.length());
1449 _routes.removeRoute(subId);
1451 if (_subscriptionManager)
1454 Unlock<Mutex> unlock(_lock);
1455 _subscriptionManager->unsubscribe(subId);
1461 _sendWithoutRetry(_message);
1462 deferredExecution(&s_noOpFn, NULL);
1465 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1466 bool isHASubscribe_)
1468 return syncAckProcessing(timeout_, message_,
1469 (amps_uint64_t)0, isHASubscribe_);
1472 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1473 amps_uint64_t haSeq = (amps_uint64_t)0,
1474 bool isHASubscribe_ =
false)
1477 AckResponse ack = AckResponse::create();
1480 Lock<Mutex> guard(_ackMapLock);
1483 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1484 if (ack.getConnectionVersion() == 0)
1487 throw DisconnectedException(
"Connection closed while waiting for response.");
1489 bool timedOut =
false;
1490 AMPS_START_TIMER(timeout_)
1491 while(!timedOut && !ack.responded() && !ack.abandoned() && _connected)
1495 timedOut = !_lock.wait(timeout_);
1497 if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1503 Unlock<Mutex> unlck(_lock);
1504 amps_invoke_waiting_function();
1507 if (ack.responded())
1509 if (ack.status() !=
"failure")
1513 amps_uint64_t ackSequence = ack.sequenceNo();
1514 if (_lastSentHaSequenceNumber < ackSequence)
1516 _lastSentHaSequenceNumber = ackSequence;
1526 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
1527 _serverVersion = ack.serverVersion();
1528 if (_bookmarkStore.isValid())
1533 const std::string& options = ack.options();
1534 size_t index = options.find_first_of(
"max_backlog=");
1535 if(index != std::string::npos)
1538 const char* c = options.c_str()+index+12;
1539 while(*c && *c!=
',')
1541 data = (data*10) + (
unsigned)(*c++-48);
1543 if(_ackBatchSize > data) _ackBatchSize = data;
1548 const size_t NotEntitled = 12;
1549 std::string ackReason = ack.reason();
1550 if (ackReason.length() == 0)
return ack;
1551 if (ackReason.length() == NotEntitled &&
1552 ackReason[0] ==
'n' &&
1557 message_.throwFor(_client, ackReason);
1561 if (!ack.abandoned())
1563 throw TimedOutException(
"timed out waiting for operation.");
1567 throw DisconnectedException(
"Connection closed while waiting for response.");
1575 if (!_client)
return;
1582 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1583 _pEmptyMessageStream.reset(NULL);
1590 ClientImpl(
const std::string& clientName)
1591 : _client(NULL), _name(clientName)
1592 , _isRetryOnDisconnect(
true)
1593 , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1594 , _badTimeToHASubscribe(0), _serverVersion()
1595 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1596 , _isAutoAckEnabled(
false)
1598 , _queuedAckCount(0)
1599 , _defaultMaxDepth(0)
1601 , _heartbeatInterval(0)
1604 _replayer.setClient(
this);
1609 _exceptionListener = &_defaultExceptionListener;
1610 for (
size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1616 virtual ~ClientImpl()
1621 const std::string& getName()
const 1626 const std::string& getNameHash()
const 1631 void setName(
const std::string& name)
1636 _client, name.c_str());
1639 AMPSException::throwFor(_client, result);
1644 const std::string& getLogonCorrelationData()
const 1646 return _logonCorrelationData;
1649 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
1651 _logonCorrelationData = logonCorrelationData_;
1654 size_t getServerVersion()
const 1656 return _serverVersion.getOldStyleVersion();
1659 VersionInfo getServerVersionInfo()
const 1661 return _serverVersion;
1664 const std::string& getURI()
const 1669 virtual void connect(
const std::string& uri)
1671 Lock<Mutex> l(_lock);
1675 virtual void _connect(
const std::string& uri)
1679 _client, uri.c_str());
1682 AMPSException::throwFor(_client, result);
1689 _readMessage.setClientImpl(
this);
1690 if(_queueAckTimeout)
1695 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1698 void setDisconnected()
1701 Lock<Mutex> l(_lock);
1704 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
1708 _heartbeatTimer.setTimeout(0.0);
1714 virtual void disconnect()
1717 Lock<Mutex> l(_lock);
1722 AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1724 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1726 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
1727 Lock<Mutex> l(_lock);
1728 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1731 void clearAcks(
unsigned failedVersion)
1734 Lock<Mutex> guard(_ackMapLock);
1737 std::vector<std::string> worklist;
1738 for(AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
1740 if (i->second.getConnectionVersion() <= failedVersion)
1742 i->second.setAbandoned(
true);
1743 worklist.push_back(i->first);
1747 for(std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
1756 int send(
const Message& message)
1758 Lock<Mutex> l(_lock);
1759 return _send(message);
1762 void sendWithoutRetry(
const Message& message_)
1764 Lock<Mutex> l(_lock);
1765 _sendWithoutRetry(message_);
1768 void _sendWithoutRetry(
const Message& message_)
1773 AMPSException::throwFor(_client,result);
1777 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1778 bool isHASubscribe_ =
false)
1785 Message localMessage = message;
1786 unsigned version = 0;
1790 if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1794 if(!_isRetryOnDisconnect)
1798 if (!_lock.wait(1000))
1800 amps_invoke_waiting_function();
1805 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1806 (isHASubscribe_ && _badTimeToHASubscribe != 0))
1808 return (
int)version;
1812 if (haSeq > _lastSentHaSequenceNumber)
1814 while (haSeq > _lastSentHaSequenceNumber + 1)
1820 _lastSentHaSequenceNumber+1))
1826 version = _replayer._version;
1829 catch(
const DisconnectedException&)
1831 catch(
const DisconnectedException& e)
1834 result = _replayer._res;
1839 localMessage.getMessage(),
1841 ++_lastSentHaSequenceNumber;
1845 localMessage.getMessage(),
1849 if (!isHASubscribe_ && !haSeq &&
1850 localMessage.getMessage() == message.getMessage())
1854 if(_isRetryOnDisconnect)
1856 Unlock<Mutex> u(_lock);
1861 if ((isHASubscribe_ || haSeq) &&
1864 return (
int)version;
1871 AMPSException::throwFor(_client, result);
1877 amps_invoke_waiting_function();
1881 if (result !=
AMPS_E_OK) AMPSException::throwFor(_client, result);
1882 return (
int)version;
1885 void addMessageHandler(
const Field& commandId_,
1887 unsigned requestedAcks_,
bool isSubscribe_)
1889 Lock<Mutex> lock(_lock);
1890 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1894 bool removeMessageHandler(
const Field& commandId_)
1896 Lock<Mutex> lock(_lock);
1897 return _routes.removeRoute(commandId_);
1905 bool isSubscribe =
false;
1906 bool isSubscribeOnly =
false;
1907 bool replace =
false;
1909 unsigned systemAddedAcks = Message::AckType::None;
1913 case Message::Command::Subscribe:
1914 case Message::Command::DeltaSubscribe:
1915 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos;
1918 systemAddedAcks |= Message::AckType::Persisted;
1920 isSubscribeOnly =
true;
1922 case Message::Command::SOWAndSubscribe:
1923 case Message::Command::SOWAndDeltaSubscribe:
1930 while (!replace &&
id != subId && _routes.hasRoute(
id))
1942 case Message::Command::SOW:
1949 while (!replace &&
id != subId && _routes.hasRoute(
id))
1960 if (!isSubscribeOnly)
1969 while (!replace && qid != subId && qid !=
id 1970 && _routes.hasRoute(qid))
1976 systemAddedAcks |= Message::AckType::Processed;
1978 if (!isSubscribeOnly) systemAddedAcks |= Message::AckType::Completed;
1981 int routesAdded = 0;
1982 Lock<Mutex> l(_lock);
1983 if (!subId.
empty() && messageHandler_.isValid())
1985 if (!_routes.hasRoute(subId))
1991 _routes.addRoute(subId, messageHandler_, requestedAcks,
1992 systemAddedAcks, isSubscribe);
1994 if (!isSubscribeOnly && !qid.
empty()
1995 && messageHandler_.isValid() && qid != subId)
1997 if (routesAdded == 0)
1999 _routes.addRoute(qid, messageHandler_,
2000 requestedAcks, systemAddedAcks,
false);
2006 Unlock<Mutex> u(_lock);
2007 data = amps_invoke_copy_route_function(
2008 messageHandler_.userData());
2012 _routes.addRoute(qid, messageHandler_, requestedAcks,
2013 systemAddedAcks,
false);
2017 _routes.addRoute(qid,
2020 requestedAcks, systemAddedAcks,
false);
2025 if (!
id.empty() && messageHandler_.isValid()
2026 && requestedAcks & ~
Message::AckType::Persisted
2027 &&
id != subId &&
id != qid)
2029 if (routesAdded == 0)
2031 _routes.addRoute(
id, messageHandler_, requestedAcks,
2032 systemAddedAcks,
false);
2038 Unlock<Mutex> u(_lock);
2039 data = amps_invoke_copy_route_function(
2040 messageHandler_.userData());
2044 _routes.addRoute(
id, messageHandler_, requestedAcks,
2045 systemAddedAcks,
false);
2049 _routes.addRoute(
id,
2053 systemAddedAcks,
false);
2062 syncAckProcessing(timeout_, message_, 0,
false);
2069 _routes.removeRoute(
id);
2076 case Message::Command::Unsubscribe:
2077 case Message::Command::Heartbeat:
2078 case Message::Command::Logon:
2079 case Message::Command::StartTimer:
2080 case Message::Command::StopTimer:
2081 case Message::Command::DeltaPublish:
2082 case Message::Command::Publish:
2083 case Message::Command::SOWDelete:
2085 Lock<Mutex> l(_lock);
2094 if (messageHandler_.isValid())
2096 _routes.addRoute(
id, messageHandler_, requestedAcks,
2097 Message::AckType::None,
false);
2104 case Message::Command::GroupBegin:
2105 case Message::Command::GroupEnd:
2106 case Message::Command::OOF:
2107 case Message::Command::Ack:
2108 case Message::Command::Unknown:
2110 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2116 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2118 Lock<Mutex> l(_lock);
2119 _disconnectHandler = disconnectHandler;
2122 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2124 switch (command_[0])
2126 #if 0 // Not currently implemented to avoid an extra branch in delivery 2128 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2131 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2135 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2137 #if 0 // Not currently implemented to avoid an extra branch in delivery 2139 if (command_[6] ==
'b')
2141 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2143 else if (command_[6] ==
'e')
2145 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2149 std::ostringstream os;
2150 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2151 throw CommandException(os.str());
2155 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2159 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2163 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2167 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2170 std::ostringstream os;
2171 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2172 throw CommandException(os.str());
2177 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2181 #if 0 // Not currently implemented to avoid an extra branch in delivery 2182 case Message::Command::Publish:
2183 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2185 case Message::Command::SOW:
2186 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2189 case Message::Command::Heartbeat:
2190 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2192 #if 0 // Not currently implemented to avoid an extra branch in delivery 2193 case Message::Command::GroupBegin:
2194 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2196 case Message::Command::GroupEnd:
2197 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2199 case Message::Command::OOF:
2200 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2203 case Message::Command::Ack:
2204 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2208 unsigned command = command_;
2209 while (command > 0) { ++bits; command >>= 1; }
2211 AMPS_snprintf(errBuf,
sizeof(errBuf),
2212 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2213 CommandConstants<0>::Lengths[bits],
2214 CommandConstants<0>::Values[bits]);
2215 throw CommandException(errBuf);
2220 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2222 _globalCommandTypeHandlers[handlerType_] = handler_;
2227 Lock<Mutex> l(_lock);
2228 _failedWriteHandler.reset(handler_);
2231 void setPublishStore(
const Store& publishStore_)
2233 Lock<Mutex> l(_lock);
2234 if (_connected)
throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2235 _publishStore = publishStore_;
2240 Lock<Mutex> l(_lock);
2241 if (_connected)
throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2242 _bookmarkStore = bookmarkStore_;
2247 Lock<Mutex> l(_lock);
2248 _subscriptionManager.reset(subscriptionManager_);
2256 DisconnectHandler getDisconnectHandler()
const 2258 return _disconnectHandler;
2263 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2271 Store getPublishStore()
const 2273 return _publishStore;
2278 return _bookmarkStore;
2281 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2285 Lock<Mutex> l(_lock);
2287 _publishMessage.assignData(data_, dataLen_);
2288 _send(_publishMessage);
2293 if (!publishStoreMessage)
2295 publishStoreMessage =
new Message();
2296 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2298 publishStoreMessage->reset();
2299 publishStoreMessage->setCommandEnum(Message::Command::Publish);
2300 return _publish(topic_, topicLen_, data_, dataLen_);
2304 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2305 size_t dataLen_,
unsigned long expiration_)
2309 Lock<Mutex> l(_lock);
2311 _publishMessage.assignData(data_, dataLen_);
2312 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2313 size_t pos = convertToCharArray(exprBuf, expiration_);
2315 _send(_publishMessage);
2321 if (!publishStoreMessage)
2323 publishStoreMessage =
new Message();
2324 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2326 publishStoreMessage->reset();
2327 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2328 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2329 publishStoreMessage->setCommandEnum(Message::Command::Publish)
2330 .assignExpiration(exprBuf+exprPos,
2331 AMPS_NUMBER_BUFFER_LEN-exprPos);
2332 return _publish(topic_, topicLen_, data_, dataLen_);
2339 ClientImpl* _pClient;
2341 volatile bool _acked;
2342 volatile bool _disconnected;
2344 FlushAckHandler(ClientImpl* pClient_)
2345 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
2347 pClient_->addConnectionStateListener(
this);
2351 _pClient->removeConnectionStateListener(
this);
2352 _pClient->removeMessageHandler(_cmdId);
2355 void setCommandId(
const Field& cmdId_)
2363 void connectionStateChanged(
State state_)
2365 if (state_ <= Shutdown)
2367 _disconnected =
true;
2376 return _acked || _disconnected;
2380 void publishFlush(
long timeout_,
unsigned ackType_)
2382 static const char* processed =
"processed";
2383 static const size_t processedLen = strlen(processed);
2384 static const char* persisted =
"persisted";
2385 static const size_t persistedLen = strlen(persisted);
2386 static const char* flush =
"flush";
2387 static const size_t flushLen = strlen(flush);
2388 static VersionInfo minPersisted(
"5.3.3.0");
2389 static VersionInfo minFlush(
"4");
2390 if (ackType_ != Message::AckType::Processed
2391 && ackType_ != Message::AckType::Persisted)
2393 throw new CommandException(
"Flush can only be used with processed or persisted acks.");
2395 FlushAckHandler flushHandler(
this);
2396 if (_serverVersion >= minFlush)
2398 Lock<Mutex> l(_lock);
2400 throw DisconnectedException(
"Not cconnected trying to flush");
2404 if (_serverVersion < minPersisted
2405 || ackType_ == Message::AckType::Processed)
2415 std::bind(&FlushAckHandler::invoke,
2416 std::ref(flushHandler),
2417 std::placeholders::_1),
2419 if (_send(_message) == -1)
2420 throw DisconnectedException(
"Disconnected trying to flush");
2426 _publishStore.
flush(timeout_);
2428 catch (
const AMPSException& ex)
2430 AMPS_UNHANDLED_EXCEPTION(ex);
2434 else if (_serverVersion < minFlush)
2436 if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2437 else { AMPS_USLEEP(1000 * 1000); }
2442 Timer timer((
double)timeout_);
2444 while (!timer.check() && !flushHandler.done())
2447 amps_invoke_waiting_function();
2452 while (!flushHandler.done())
2455 amps_invoke_waiting_function();
2459 if (!flushHandler.done())
2460 throw TimedOutException(
"Timed out waiting for flush");
2462 if (!flushHandler.acked() && !_publishStore.
isValid())
2463 throw DisconnectedException(
"Disconnected waiting for flush");
2466 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2467 const char* data_,
size_t dataLength_)
2471 Lock<Mutex> l(_lock);
2473 _deltaMessage.assignData(data_, dataLength_);
2474 _send(_deltaMessage);
2479 if (!publishStoreMessage)
2481 publishStoreMessage =
new Message();
2482 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2484 publishStoreMessage->reset();
2485 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2486 return _publish(topic_, topicLength_, data_, dataLength_);
2490 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2491 const char* data_,
size_t dataLength_,
2492 unsigned long expiration_)
2496 Lock<Mutex> l(_lock);
2498 _deltaMessage.assignData(data_, dataLength_);
2499 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2500 size_t pos = convertToCharArray(exprBuf, expiration_);
2502 _send(_deltaMessage);
2508 if (!publishStoreMessage)
2510 publishStoreMessage =
new Message();
2511 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2513 publishStoreMessage->reset();
2514 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2515 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2516 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2517 .assignExpiration(exprBuf+exprPos,
2518 AMPS_NUMBER_BUFFER_LEN-exprPos);
2519 return _publish(topic_, topicLength_, data_, dataLength_);
2523 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
2524 const char* data_,
size_t dataLength_)
2526 publishStoreMessage->assignTopic(topic_, topicLength_)
2527 .setAckTypeEnum(Message::AckType::Persisted)
2528 .assignData(data_, dataLength_);
2529 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
2530 char buf[AMPS_NUMBER_BUFFER_LEN];
2531 size_t pos = convertToCharArray(buf, haSequenceNumber);
2532 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2534 Lock<Mutex> l(_lock);
2535 _send(*publishStoreMessage, haSequenceNumber);
2537 return haSequenceNumber;
2540 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
2541 const char* options_ = NULL)
2543 Lock<Mutex> l(_lock);
2544 return _logon(timeout_, authenticator_, options_);
2547 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
2548 const char* options_ = NULL)
2550 AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2556 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 2558 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2561 if(uri.user().size()) _message.
setUserId(uri.user());
2562 if(uri.password().size()) _message.
setPassword(uri.password());
2563 if(uri.protocol() ==
"amps" && uri.messageType().size())
2567 if(uri.isTrue(
"pretty"))
2573 if (!_logonCorrelationData.empty())
2587 AckResponse ack = syncAckProcessing(timeout_, _message);
2588 if (ack.status() ==
"retry")
2590 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
2591 _username = ack.username();
2596 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
2600 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2605 catch(
const AMPSException& ex)
2608 AMPS_UNHANDLED_EXCEPTION(ex);
2621 _publishStore.
replay(_replayer);
2622 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2624 catch(
const StoreException& ex)
2627 std::ostringstream os;
2628 os <<
"A local store exception occurred while logging on." 2630 throw ConnectionException(os.str());
2632 catch(
const AMPSException& ex)
2635 AMPS_UNHANDLED_EXCEPTION(ex);
2638 catch(
const std::exception& ex)
2641 AMPS_UNHANDLED_EXCEPTION(ex);
2651 return newCommandId;
2655 const std::string& topic_,
2657 const std::string& filter_,
2658 const std::string& bookmark_,
2659 const std::string& options_,
2660 const std::string& subId_,
2661 bool isHASubscribe_ =
true)
2663 isHASubscribe_ &= (bool)_subscriptionManager;
2664 Lock<Mutex> l(_lock);
2668 std::string subId(subId_);
2671 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2672 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
2681 unsigned ackTypes = Message::AckType::Processed;
2683 if (!bookmark_.empty() && _bookmarkStore.isValid())
2685 ackTypes |= Message::AckType::Persisted;
2689 if (filter_.length()) _message.
setFilter(filter_);
2690 if (bookmark_.length())
2700 if (_bookmarkStore.isValid())
2705 _bookmarkStore.
log(_message);
2706 _bookmarkStore.
discard(_message);
2712 if (options_.length()) _message.
setOptions(options_);
2718 Unlock<Mutex> u(_lock);
2719 _subscriptionManager->subscribe(messageHandler_, message,
2720 Message::AckType::None);
2721 if (_badTimeToHASubscribe)
return subId;
2726 Message::AckType::None, ackTypes,
true);
2729 if (!options_.empty()) message.
setOptions(options_);
2732 syncAckProcessing(timeout_, message, isHASubscribe_);
2734 catch (
const DisconnectedException&)
2736 if (!isHASubscribe_)
2738 _routes.removeRoute(subIdField);
2743 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2747 catch (
const TimedOutException&)
2749 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2757 Unlock<Mutex> unlock(_lock);
2758 _subscriptionManager->unsubscribe(subIdField);
2760 _routes.removeRoute(subIdField);
2766 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
2767 const std::string& topic_,
2769 const std::string& filter_,
2770 const std::string& bookmark_,
2771 const std::string& options_,
2772 const std::string& subId_ =
"",
2773 bool isHASubscribe_ =
true)
2775 isHASubscribe_ &= (bool)_subscriptionManager;
2776 Lock<Mutex> l(_lock);
2780 std::string subId(subId_);
2790 unsigned ackTypes = Message::AckType::Processed;
2792 if (!bookmark_.empty() && _bookmarkStore.isValid())
2794 ackTypes |= Message::AckType::Persisted;
2797 if (filter_.length()) _message.
setFilter(filter_);
2798 if (bookmark_.length())
2808 if (_bookmarkStore.isValid())
2813 _bookmarkStore.
log(_message);
2814 _bookmarkStore.
discard(_message);
2820 if (options_.length()) _message.
setOptions(options_);
2825 Unlock<Mutex> u(_lock);
2826 _subscriptionManager->subscribe(messageHandler_, message,
2827 Message::AckType::None);
2828 if (_badTimeToHASubscribe)
return subId;
2833 Message::AckType::None, ackTypes,
true);
2836 if (!options_.empty()) message.
setOptions(options_);
2839 syncAckProcessing(timeout_, message, isHASubscribe_);
2841 catch (
const DisconnectedException&)
2843 if (!isHASubscribe_)
2845 _routes.removeRoute(subIdField);
2849 catch (
const TimedOutException&)
2851 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2859 Unlock<Mutex> unlock(_lock);
2860 _subscriptionManager->unsubscribe(subIdField);
2862 _routes.removeRoute(subIdField);
2868 void unsubscribe(
const std::string&
id)
2870 Lock<Mutex> l(_lock);
2871 unsubscribeInternal(
id);
2874 void unsubscribe(
void)
2876 if (_subscriptionManager)
2878 _subscriptionManager->clear();
2881 _routes.unsubscribeAll();
2882 Lock<Mutex> l(_lock);
2887 _sendWithoutRetry(_message);
2889 deferredExecution(&s_noOpFn, NULL);
2893 const std::string& topic_,
2894 const std::string& filter_ =
"",
2895 const std::string& orderBy_ =
"",
2896 const std::string& bookmark_ =
"",
2897 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2898 int topN_ = AMPS_DEFAULT_TOP_N,
2899 const std::string& options_ =
"",
2900 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2902 Lock<Mutex> l(_lock);
2909 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2912 if (filter_.length()) _message.
setFilter(filter_);
2913 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2914 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2917 if (options_.length()) _message.
setOptions(options_);
2919 _routes.addRoute(_message.
getQueryID(), messageHandler_,
2920 Message::AckType::None, ackTypes,
false);
2924 syncAckProcessing(timeout_, _message);
2928 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2936 const std::string& topic_,
2938 const std::string& filter_ =
"",
2939 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2940 int topN_ = AMPS_DEFAULT_TOP_N)
2943 return sow(messageHandler_,
2954 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
2955 const std::string& topic_,
2956 const std::string& filter_ =
"",
2957 const std::string& orderBy_ =
"",
2958 const std::string& bookmark_ =
"",
2959 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2960 int topN_ = AMPS_DEFAULT_TOP_N,
2961 const std::string& options_ =
"",
2962 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2963 bool isHASubscribe_ =
true)
2965 isHASubscribe_ &= (bool)_subscriptionManager;
2966 Lock<Mutex> l(_lock);
2973 std::string subId = cid;
2975 if (filter_.length()) _message.
setFilter(filter_);
2976 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2977 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2980 if (options_.length()) _message.
setOptions(options_);
2986 Unlock<Mutex> u(_lock);
2987 _subscriptionManager->subscribe(messageHandler_, message,
2988 Message::AckType::None);
2989 if (_badTimeToHASubscribe)
return subId;
2991 _routes.addRoute(cid, messageHandler_,
2992 Message::AckType::None, Message::AckType::Processed,
true);
2994 if (!options_.empty()) message.
setOptions(options_);
2997 syncAckProcessing(timeout_, message, isHASubscribe_);
2999 catch (
const DisconnectedException&)
3001 if (!isHASubscribe_)
3003 _routes.removeRoute(subId);
3007 catch (
const TimedOutException&)
3009 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3017 Unlock<Mutex> unlock(_lock);
3018 _subscriptionManager->unsubscribe(cid);
3020 _routes.removeRoute(subId);
3026 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3027 const std::string& topic_,
3029 const std::string& filter_ =
"",
3030 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3031 bool oofEnabled_ =
false,
3032 int topN_ = AMPS_DEFAULT_TOP_N,
3033 bool isHASubscribe_ =
true)
3036 return sowAndSubscribe(messageHandler_,
3043 (oofEnabled_ ?
"oof" :
""),
3048 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3049 const std::string& topic_,
3050 const std::string& filter_ =
"",
3051 const std::string& orderBy_ =
"",
3052 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3053 int topN_ = AMPS_DEFAULT_TOP_N,
3054 const std::string& options_ =
"",
3055 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3056 bool isHASubscribe_ =
true)
3058 isHASubscribe_ &= (bool)_subscriptionManager;
3059 Lock<Mutex> l(_lock);
3067 if (filter_.length()) _message.
setFilter(filter_);
3068 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
3071 if (options_.length()) _message.
setOptions(options_);
3076 Unlock<Mutex> u(_lock);
3077 _subscriptionManager->subscribe(messageHandler_, message,
3078 Message::AckType::None);
3079 if (_badTimeToHASubscribe)
return subId;
3081 _routes.addRoute(message.
getQueryID(), messageHandler_,
3082 Message::AckType::None, Message::AckType::Processed,
true);
3084 if (!options_.empty()) message.
setOptions(options_);
3087 syncAckProcessing(timeout_, message, isHASubscribe_);
3089 catch (
const DisconnectedException&)
3091 if (!isHASubscribe_)
3093 _routes.removeRoute(subId);
3097 catch (
const TimedOutException&)
3099 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3107 Unlock<Mutex> unlock(_lock);
3108 _subscriptionManager->unsubscribe(
Field(subId));
3110 _routes.removeRoute(subId);
3116 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3117 const std::string& topic_,
3119 const std::string& filter_ =
"",
3120 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3121 bool oofEnabled_ =
false,
3122 bool sendEmpties_ =
false,
3123 int topN_ = AMPS_DEFAULT_TOP_N,
3124 bool isHASubscribe_ =
true)
3128 if (oofEnabled_) options.
setOOF();
3130 return sowAndDeltaSubscribe(messageHandler_,
3142 const std::string& topic_,
3143 const std::string& filter_,
3149 unsigned ackType = Message::AckType::Processed |
3150 Message::AckType::Stats |
3151 Message::AckType::Persisted;
3152 if (!publishStoreMessage)
3154 publishStoreMessage =
new Message();
3155 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3157 publishStoreMessage->reset();
3158 if (commandId_.
empty())
3160 publishStoreMessage->newCommandId();
3161 commandId_ = publishStoreMessage->getCommandId();
3165 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3167 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3168 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3169 .assignQueryID(commandId_.
data(), commandId_.
len())
3170 .setAckTypeEnum(ackType)
3171 .assignTopic(topic_.c_str(), topic_.length())
3172 .assignFilter(filter_.c_str(), filter_.length());
3173 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3174 char buf[AMPS_NUMBER_BUFFER_LEN];
3175 size_t pos = convertToCharArray(buf, haSequenceNumber);
3176 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3180 Lock<Mutex> l(_lock);
3181 _routes.addRoute(commandId_, messageHandler_,
3182 Message::AckType::Stats,
3183 Message::AckType::Processed|Message::AckType::Persisted,
3185 syncAckProcessing(timeout_, *publishStoreMessage,
3188 catch (
const DisconnectedException&)
3194 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3198 return (std::string)commandId_;
3202 Lock<Mutex> l(_lock);
3204 if (commandId_.
empty())
3215 .assignQueryID(commandId_.
data(), commandId_.
len())
3216 .setAckTypeEnum(Message::AckType::Processed |
3217 Message::AckType::Stats)
3219 .assignFilter(filter_.c_str(), filter_.length());
3220 _routes.addRoute(commandId_, messageHandler_,
3221 Message::AckType::Stats,
3222 Message::AckType::Processed,
3226 syncAckProcessing(timeout_, _message);
3230 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3233 return (std::string)commandId_;
3237 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3238 const std::string& topic_,
3239 const std::string& data_,
3245 unsigned ackType = Message::AckType::Processed |
3246 Message::AckType::Stats |
3247 Message::AckType::Persisted;
3248 if (!publishStoreMessage)
3250 publishStoreMessage =
new Message();
3251 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3253 publishStoreMessage->reset();
3254 if (commandId_.
empty())
3256 publishStoreMessage->newCommandId();
3257 commandId_ = publishStoreMessage->getCommandId();
3261 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3263 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3264 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3265 .assignQueryID(commandId_.
data(), commandId_.
len())
3266 .setAckTypeEnum(ackType)
3267 .assignTopic(topic_.c_str(), topic_.length())
3268 .assignData(data_.c_str(), data_.length());
3269 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3270 char buf[AMPS_NUMBER_BUFFER_LEN];
3271 size_t pos = convertToCharArray(buf, haSequenceNumber);
3272 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3276 Lock<Mutex> l(_lock);
3277 _routes.addRoute(commandId_, messageHandler_,
3278 Message::AckType::Stats,
3279 Message::AckType::Processed|Message::AckType::Persisted,
3281 syncAckProcessing(timeout_, *publishStoreMessage,
3284 catch (
const DisconnectedException&)
3290 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3294 return (std::string)commandId_;
3298 Lock<Mutex> l(_lock);
3300 if (commandId_.
empty())
3311 .assignQueryID(commandId_.
data(), commandId_.
len())
3312 .setAckTypeEnum(Message::AckType::Processed |
3313 Message::AckType::Stats)
3315 .assignData(data_.c_str(), data_.length());
3316 _routes.addRoute(commandId_, messageHandler_,
3317 Message::AckType::Stats,
3318 Message::AckType::Processed,
3322 syncAckProcessing(timeout_, _message);
3326 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3329 return (std::string)commandId_;
3333 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
3334 const std::string& topic_,
3335 const std::string& keys_,
3341 unsigned ackType = Message::AckType::Processed |
3342 Message::AckType::Stats |
3343 Message::AckType::Persisted;
3344 if (!publishStoreMessage)
3346 publishStoreMessage =
new Message();
3347 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3349 publishStoreMessage->reset();
3350 if (commandId_.
empty())
3352 publishStoreMessage->newCommandId();
3353 commandId_ = publishStoreMessage->getCommandId();
3357 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3359 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3360 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3361 .assignQueryID(commandId_.
data(), commandId_.
len())
3362 .setAckTypeEnum(ackType)
3363 .assignTopic(topic_.c_str(), topic_.length())
3364 .assignSowKeys(keys_.c_str(), keys_.length());
3365 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3366 char buf[AMPS_NUMBER_BUFFER_LEN];
3367 size_t pos = convertToCharArray(buf, haSequenceNumber);
3368 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3372 Lock<Mutex> l(_lock);
3373 _routes.addRoute(commandId_, messageHandler_,
3374 Message::AckType::Stats,
3375 Message::AckType::Processed|Message::AckType::Persisted,
3377 syncAckProcessing(timeout_, *publishStoreMessage,
3380 catch (
const DisconnectedException&)
3386 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3390 return (std::string)commandId_;
3394 Lock<Mutex> l(_lock);
3396 if (commandId_.
empty())
3407 .assignQueryID(commandId_.
data(), commandId_.
len())
3408 .setAckTypeEnum(Message::AckType::Processed |
3409 Message::AckType::Stats)
3411 .assignSowKeys(keys_.c_str(), keys_.length());
3412 _routes.addRoute(commandId_, messageHandler_,
3413 Message::AckType::Stats,
3414 Message::AckType::Processed,
3418 syncAckProcessing(timeout_, _message);
3422 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3425 return (std::string)commandId_;
3429 void startTimer(
void)
3431 if (_serverVersion >=
"5.3.2.0")
3433 throw CommandException(
"The start_timer command is deprecated.");
3435 Lock<Mutex> l(_lock);
3444 if (_serverVersion >=
"5.3.2.0")
3446 throw CommandException(
"The stop_timer command is deprecated.");
3448 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
3463 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
3465 _pExceptionListener = pListener_;
3466 _exceptionListener = _pExceptionListener.get();
3471 _exceptionListener = &listener_;
3476 return *_exceptionListener;
3479 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
3481 if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3483 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
3485 Lock<Mutex> l(_lock);
3486 if(_heartbeatInterval != heartbeatInterval_ ||
3487 _readTimeout != readTimeout_)
3489 _heartbeatInterval = heartbeatInterval_;
3490 _readTimeout = readTimeout_;
3495 void _sendHeartbeat(
void)
3497 if (_connected && _heartbeatInterval != 0)
3499 std::ostringstream options;
3500 options <<
"start," << _heartbeatInterval;
3505 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3506 _heartbeatTimer.start();
3509 _sendWithoutRetry(startMessage);
3510 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3512 catch(ConnectionException &ex_)
3516 AMPS_UNHANDLED_EXCEPTION(ex_);
3520 if(_readTimeout && _connected)
3526 AMPSException::throwFor(_client, result);
3532 Lock<Mutex> lock(_lock);
3533 _connectionStateListeners.insert(listener_);
3538 Lock<Mutex> lock(_lock);
3539 _connectionStateListeners.erase(listener_);
3542 void clearConnectionStateListeners()
3544 Lock<Mutex> lock(_lock);
3545 _connectionStateListeners.clear();
3550 unsigned systemAddedAcks_,
bool isSubscribe_)
3552 Message message = command_.getMessage();
3557 bool added = qid.
len() || subid.
len() || cid_.
len();
3559 if (subid.
len() > 0)
3563 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
3564 systemAddedAcks_, isSubscribe_);
3566 if (qid.
len() > 0 && qid != subid)
3568 while (_routes.hasRoute(qid))
3575 if (addedCount == 0)
3577 _routes.addRoute(qid, handler_, requestedAcks_,
3578 systemAddedAcks_, isSubscribe_);
3584 Unlock<Mutex> u(_lock);
3585 data = amps_invoke_copy_route_function(handler_.userData());
3589 _routes.addRoute(qid, handler_, requestedAcks_,
3590 systemAddedAcks_,
false);
3594 _routes.addRoute(qid,
3598 systemAddedAcks_,
false);
3603 if (cid_.
len() > 0 && cid_ != qid && cid_ != subid
3604 && requestedAcks_ & ~
Message::AckType::Persisted)
3606 while (_routes.hasRoute(cid_))
3610 if (addedCount == 0)
3612 _routes.addRoute(cid_, handler_, requestedAcks_,
3613 systemAddedAcks_,
false);
3619 Unlock<Mutex> u(_lock);
3620 data = amps_invoke_copy_route_function(handler_.userData());
3624 _routes.addRoute(cid_, handler_, requestedAcks_,
3625 systemAddedAcks_,
false);
3629 _routes.addRoute(cid_,
3633 systemAddedAcks_,
false);
3637 else if (commandType == Message::Command::Publish ||
3638 commandType == Message::Command::DeltaPublish)
3641 _routes.addRoute(cid_, handler_, requestedAcks_,
3642 systemAddedAcks_,
false);
3647 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
3652 bool isHASubscribe_ =
true)
3654 isHASubscribe_ &= (bool)_subscriptionManager;
3655 Message& message = command_.getMessage();
3656 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3657 Message::AckType::Processed : Message::AckType::None;
3659 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
3661 if (commandType == Message::Command::SOW
3662 || commandType == Message::Command::SOWAndSubscribe
3663 || commandType == Message::Command::SOWAndDeltaSubscribe
3664 || commandType == Message::Command::StopTimer)
3665 systemAddedAcks |= Message::AckType::Completed;
3667 if (handler_.isValid() && cid.
empty())
3673 if (command_.isSubscribe())
3676 if (_bookmarkStore.isValid())
3678 systemAddedAcks |= Message::AckType::Persisted;
3686 _bookmarkStore.
log(message);
3687 if (!BookmarkRange::isRange(bookmark))
3689 _bookmarkStore.
discard(message);
3703 systemAddedAcks |= Message::AckType::Persisted;
3705 bool isSubscribe = command_.isSubscribe();
3706 if (handler_.isValid() && !isSubscribe)
3708 _registerHandler(command_, cid, handler_,
3709 requestedAcks, systemAddedAcks, isSubscribe);
3711 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
3714 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3717 Unlock<Mutex> u(_lock);
3718 haSequenceNumber = _publishStore.
store(message);
3725 syncAckProcessing((
long)command_.getTimeout(), message,
3728 else _send(message, haSequenceNumber);
3730 catch (
const DisconnectedException&)
3736 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3747 Unlock<Mutex> u(_lock);
3748 _subscriptionManager->subscribe(handler_,
3751 if (_badTimeToHASubscribe)
3754 return std::string(subId.
data(), subId.
len());
3757 if (handler_.isValid())
3759 _registerHandler(command_, cid, handler_,
3760 requestedAcks, systemAddedAcks, isSubscribe);
3767 syncAckProcessing((
long)command_.getTimeout(), message,
3770 else _send(message);
3772 catch (
const DisconnectedException&)
3774 if (!isHASubscribe_)
3776 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3777 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3778 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3783 catch (
const TimedOutException&)
3785 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3786 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3787 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
3795 Unlock<Mutex> unlock(_lock);
3796 _subscriptionManager->unsubscribe(subId);
3800 _routes.removeRoute(cid);
3801 _routes.removeRoute(subId);
3804 if (subId.
len() > 0)
3807 return std::string(subId.
data(), subId.
len());
3817 syncAckProcessing((
long)(command_.getTimeout()), message);
3819 else _send(message);
3821 catch (
const DisconnectedException&)
3823 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3824 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3830 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3831 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3844 bool isHASubscribe_ =
true)
3846 Lock<Mutex> lock(_lock);
3847 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3851 void setAutoAck(
bool isAutoAckEnabled_)
3853 _isAutoAckEnabled = isAutoAckEnabled_;
3855 bool getAutoAck(
void)
const 3857 return _isAutoAckEnabled;
3859 void setAckBatchSize(
const unsigned batchSize_)
3861 _ackBatchSize = batchSize_;
3862 if (!_queueAckTimeout)
3864 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3868 unsigned getAckBatchSize(
void)
const 3870 return _ackBatchSize;
3872 int getAckTimeout(
void)
const 3874 return _queueAckTimeout;
3876 void setAckTimeout(
const int ackTimeout_)
3879 _queueAckTimeout = ackTimeout_;
3881 size_t _ack(QueueBookmarks& queueBookmarks_)
3883 if(queueBookmarks_._bookmarkCount)
3885 if (!publishStoreMessage)
3887 publishStoreMessage =
new Message();
3888 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3890 publishStoreMessage->reset();
3891 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3892 .setTopic(queueBookmarks_._topic)
3893 .setBookmark(queueBookmarks_._data)
3894 .setCommandId(
"AMPS-queue-ack");
3895 amps_uint64_t haSequenceNumber = 0;
3898 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3899 publishStoreMessage->setAckType(
"persisted")
3900 .setSequence(haSequenceNumber);
3901 queueBookmarks_._data.erase();
3902 queueBookmarks_._bookmarkCount = 0;
3904 _send(*publishStoreMessage, haSequenceNumber);
3907 queueBookmarks_._data.erase();
3908 queueBookmarks_._bookmarkCount = 0;
3914 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3916 if (_isAutoAckEnabled)
return;
3917 _ack(topic_, bookmark_, options_);
3919 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3921 if (bookmark_.
len() == 0)
return;
3922 Lock<Mutex> lock(_lock);
3923 if(_ackBatchSize < 2 || options_ != NULL)
3925 if (!publishStoreMessage)
3927 publishStoreMessage =
new Message();
3928 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3930 publishStoreMessage->reset();
3931 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3932 .setCommandId(
"AMPS-queue-ack")
3933 .setTopic(topic_).setBookmark(bookmark_);
3934 if (options_) publishStoreMessage->setOptions(options_);
3935 amps_uint64_t haSequenceNumber = 0;
3938 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3939 publishStoreMessage->setAckType(
"persisted")
3940 .setSequence(haSequenceNumber);
3942 _send(*publishStoreMessage, haSequenceNumber);
3946 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(),topic_.
len());
3947 TopicHashMap::iterator it = _topicHashMap.find(hash);
3948 if(it == _topicHashMap.end())
3951 it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3953 QueueBookmarks &queueBookmarks = it->second;
3954 if(queueBookmarks._data.length())
3956 queueBookmarks._data.append(
",");
3960 queueBookmarks._oldestTime = amps_now();
3962 queueBookmarks._data.append(bookmark_);
3963 if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3965 void flushAcks(
void)
3967 size_t sendCount = 0;
3974 Lock<Mutex> lock(_lock);
3975 typedef TopicHashMap::iterator iterator;
3976 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3978 QueueBookmarks& queueBookmarks = it->second;
3979 sendCount += _ack(queueBookmarks);
3982 if(sendCount && _connected) publishFlush(0, Message::AckType::Processed);
3985 void checkQueueAcks(
void)
3987 if(!_topicHashMap.size())
return;
3988 Lock<Mutex> lock(_lock);
3991 amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3992 typedef TopicHashMap::iterator iterator;
3993 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3995 QueueBookmarks& queueBookmarks = it->second;
3996 if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3999 catch(std::exception& ex)
4001 AMPS_UNHANDLED_EXCEPTION(ex);
4005 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
4007 Lock<Mutex> lock(_deferredExecutionLock);
4008 _deferredExecutionList.push_back(
4009 DeferredExecutionRequest(func_,userData_));
4012 inline void processDeferredExecutions(
void)
4014 if(_deferredExecutionList.size())
4016 Lock<Mutex> lock(_deferredExecutionLock);
4017 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4018 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4019 for(; it != end; ++it)
4023 it->_func(it->_userData);
4030 _deferredExecutionList.clear();
4031 _routes.invalidateCache();
4032 _routeCache.invalidateCache();
4036 bool getRetryOnDisconnect(
void)
const 4038 return _isRetryOnDisconnect;
4041 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4043 _isRetryOnDisconnect = isRetryOnDisconnect_;
4046 void setDefaultMaxDepth(
unsigned maxDepth_)
4048 _defaultMaxDepth = maxDepth_;
4051 unsigned getDefaultMaxDepth(
void)
const 4053 return _defaultMaxDepth;
4145 RefHandle<MessageStreamImpl> _body;
4155 inline void advance(
void);
4167 bool operator==(
const iterator& rhs)
4169 return _pStream == rhs._pStream;
4171 bool operator!=(
const iterator& rhs)
4173 return _pStream != rhs._pStream;
4175 void operator++(
void) { advance(); }
4176 Message operator*(
void) {
return _current; }
4177 Message* operator->(
void) {
return &_current; }
4186 if(!_body.isValid())
4188 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
4217 unsigned getMaxDepth(
void)
const;
4220 unsigned getDepth(
void)
const;
4224 inline void setSOWOnly(
const std::string& commandId_,
4225 const std::string& queryId_ =
"");
4226 inline void setSubscription(
const std::string& subId_,
4227 const std::string& commandId_ =
"",
4228 const std::string& queryId_ =
"");
4229 inline void setStatsOnly(
const std::string& commandId_,
4230 const std::string& queryId_ =
"");
4231 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
4237 friend class Client;
4263 BorrowRefHandle<ClientImpl> _body;
4265 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4266 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4267 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4278 : _body(new ClientImpl(clientName), true)
4281 Client(ClientImpl* existingClient)
4282 : _body(existingClient,
true)
4285 Client(ClientImpl* existingClient,
bool isRef)
4286 : _body(existingClient, isRef)
4289 Client(
const Client& rhs) : _body(rhs._body) {;}
4290 virtual ~Client(
void) {;}
4292 Client& operator=(
const Client& rhs)
4300 return _body.isValid();
4317 _body.get().setName(name);
4324 return _body.get().getName();
4332 return _body.get().getNameHash();
4343 _body.get().setLogonCorrelationData(logonCorrelationData_);
4350 return _body.get().getLogonCorrelationData();
4363 return _body.get().getServerVersion();
4374 return _body.get().getServerVersionInfo();
4388 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4403 return AMPS::convertVersionToNumber(data_, len_);
4410 return _body.get().getURI();
4434 _body.get().connect(uri);
4441 _body.get().disconnect();
4459 _body.get().send(message);
4472 unsigned requestedAcks_,
bool isSubscribe_)
4474 _body.get().addMessageHandler(commandId_, messageHandler_,
4475 requestedAcks_, isSubscribe_);
4483 return _body.get().removeMessageHandler(commandId_);
4511 return _body.get().send(messageHandler, message, timeout);
4525 _body.get().setDisconnectHandler(disconnectHandler);
4533 return _body.get().getDisconnectHandler();
4542 return _body.get().getConnectionInfo();
4555 _body.get().setBookmarkStore(bookmarkStore_);
4563 return _body.
get().getBookmarkStore();
4571 return _body.get().getSubscriptionManager();
4583 _body.get().setSubscriptionManager(subscriptionManager_);
4607 _body.get().setPublishStore(publishStore_);
4615 return _body.
get().getPublishStore();
4623 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4624 duplicateMessageHandler_);
4638 return _body.get().getDuplicateMessageHandler();
4652 _body.get().setFailedWriteHandler(handler_);
4660 return _body.get().getFailedWriteHandler();
4681 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
4683 return _body.get().publish(topic_.c_str(), topic_.length(),
4684 data_.c_str(), data_.length());
4706 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4707 const char* data_,
size_t dataLength_)
4709 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4730 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
4731 unsigned long expiration_)
4733 return _body.get().publish(topic_.c_str(), topic_.length(),
4734 data_.c_str(), data_.length(), expiration_);
4757 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4758 const char* data_,
size_t dataLength_,
4759 unsigned long expiration_)
4761 return _body.get().publish(topic_, topicLength_,
4762 data_, dataLength_, expiration_);
4803 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
4805 _body.get().publishFlush(timeout_, ackType_);
4824 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
4826 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4827 data_.c_str(), data_.length());
4848 const char* data_,
size_t dataLength_)
4850 return _body.get().deltaPublish(topic_, topicLength_,
4851 data_, dataLength_);
4870 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
4871 unsigned long expiration_)
4873 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4874 data_.c_str(), data_.length(),
4897 const char* data_,
size_t dataLength_,
4898 unsigned long expiration_)
4900 return _body.get().deltaPublish(topic_, topicLength_,
4901 data_, dataLength_, expiration_);
4920 const char* options_ = NULL)
4922 return _body.get().logon(timeout_, authenticator_, options_);
4936 std::string
logon(
const char* options_,
int timeout_ = 0)
4954 std::string
logon(
const std::string& options_,
int timeout_ = 0)
4980 const std::string& topic_,
4982 const std::string& filter_=
"",
4983 const std::string& options_ =
"",
4984 const std::string& subId_ =
"")
4986 return _body.get().subscribe(messageHandler_, topic_, timeout_,
4987 filter_,
"", options_, subId_);
5006 long timeout_=0,
const std::string& filter_=
"",
5007 const std::string& options_ =
"",
5008 const std::string& subId_ =
"")
5011 if (_body.get().getDefaultMaxDepth())
5012 result.
maxDepth(_body.get().getDefaultMaxDepth());
5013 result.setSubscription(_body.get().subscribe(
5015 topic_, timeout_, filter_,
"",
5016 options_, subId_,
false));
5036 long timeout_ = 0,
const std::string& filter_ =
"",
5037 const std::string& options_ =
"",
5038 const std::string& subId_ =
"")
5041 if (_body.get().getDefaultMaxDepth())
5042 result.
maxDepth(_body.get().getDefaultMaxDepth());
5043 result.setSubscription(_body.get().subscribe(
5045 topic_, timeout_, filter_,
"",
5046 options_, subId_,
false));
5063 const std::string& topic_,
5065 const std::string& filter_=
"",
5066 const std::string& options_ =
"",
5067 const std::string& subId_ =
"")
5069 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5070 filter_,
"", options_, subId_);
5081 long timeout_,
const std::string& filter_=
"",
5082 const std::string& options_ =
"",
5083 const std::string& subId_ =
"")
5086 if (_body.get().getDefaultMaxDepth())
5087 result.
maxDepth(_body.get().getDefaultMaxDepth());
5088 result.setSubscription(_body.get().deltaSubscribe(
5090 topic_, timeout_, filter_,
"",
5091 options_, subId_,
false));
5097 long timeout_,
const std::string& filter_ =
"",
5098 const std::string& options_ =
"",
5099 const std::string& subId_ =
"")
5102 if (_body.get().getDefaultMaxDepth())
5103 result.
maxDepth(_body.get().getDefaultMaxDepth());
5104 result.setSubscription(_body.get().deltaSubscribe(
5106 topic_, timeout_, filter_,
"",
5107 options_, subId_,
false));
5137 const std::string& topic_,
5139 const std::string& bookmark_,
5140 const std::string& filter_=
"",
5141 const std::string& options_ =
"",
5142 const std::string& subId_ =
"")
5144 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5145 filter_, bookmark_, options_, subId_);
5166 const std::string& bookmark_,
5167 const std::string& filter_=
"",
5168 const std::string& options_ =
"",
5169 const std::string& subId_ =
"")
5172 if (_body.get().getDefaultMaxDepth())
5173 result.
maxDepth(_body.get().getDefaultMaxDepth());
5174 result.setSubscription(_body.get().subscribe(
5176 topic_, timeout_, filter_,
5177 bookmark_, options_,
5185 const std::string& bookmark_,
5186 const std::string& filter_ =
"",
5187 const std::string& options_ =
"",
5188 const std::string& subId_ =
"")
5191 if (_body.get().getDefaultMaxDepth())
5192 result.
maxDepth(_body.get().getDefaultMaxDepth());
5193 result.setSubscription(_body.get().subscribe(
5195 topic_, timeout_, filter_,
5196 bookmark_, options_,
5211 return _body.get().unsubscribe(commandId);
5223 return _body.get().unsubscribe();
5257 const std::string& topic_,
5258 const std::string& filter_ =
"",
5259 const std::string& orderBy_ =
"",
5260 const std::string& bookmark_ =
"",
5261 int batchSize_ = DEFAULT_BATCH_SIZE,
5262 int topN_ = DEFAULT_TOP_N,
5263 const std::string& options_ =
"",
5264 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5266 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5267 bookmark_, batchSize_, topN_, options_,
5295 const std::string& filter_ =
"",
5296 const std::string& orderBy_ =
"",
5297 const std::string& bookmark_ =
"",
5298 int batchSize_ = DEFAULT_BATCH_SIZE,
5299 int topN_ = DEFAULT_TOP_N,
5300 const std::string& options_ =
"",
5301 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5304 if (_body.get().getDefaultMaxDepth())
5305 result.
maxDepth(_body.get().getDefaultMaxDepth());
5306 result.setSOWOnly(sow(result.operator
MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5312 const std::string& filter_ =
"",
5313 const std::string& orderBy_ =
"",
5314 const std::string& bookmark_ =
"",
5315 int batchSize_ = DEFAULT_BATCH_SIZE,
5316 int topN_ = DEFAULT_TOP_N,
5317 const std::string& options_ =
"",
5318 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5321 if (_body.get().getDefaultMaxDepth())
5322 result.
maxDepth(_body.get().getDefaultMaxDepth());
5323 result.setSOWOnly(sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5349 const std::string& topic_,
5351 const std::string& filter_ =
"",
5352 int batchSize_ = DEFAULT_BATCH_SIZE,
5353 int topN_ = DEFAULT_TOP_N)
5355 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5381 const std::string& topic_,
5383 const std::string& filter_ =
"",
5384 int batchSize_ = DEFAULT_BATCH_SIZE,
5385 bool oofEnabled_ =
false,
5386 int topN_ = DEFAULT_TOP_N)
5388 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5389 filter_, batchSize_, oofEnabled_,
5414 const std::string& filter_ =
"",
5415 int batchSize_ = DEFAULT_BATCH_SIZE,
5416 bool oofEnabled_ =
false,
5417 int topN_ = DEFAULT_TOP_N)
5420 if (_body.get().getDefaultMaxDepth())
5421 result.
maxDepth(_body.get().getDefaultMaxDepth());
5422 result.setSubscription(_body.get().sowAndSubscribe(
5424 topic_, timeout_, filter_,
5425 batchSize_, oofEnabled_,
5450 const std::string& filter_ =
"",
5451 int batchSize_ = DEFAULT_BATCH_SIZE,
5452 bool oofEnabled_ =
false,
5453 int topN_ = DEFAULT_TOP_N)
5456 if (_body.get().getDefaultMaxDepth())
5457 result.
maxDepth(_body.get().getDefaultMaxDepth());
5458 result.setSubscription(_body.get().sowAndSubscribe(
5460 topic_, timeout_, filter_,
5461 batchSize_, oofEnabled_,
5495 const std::string& topic_,
5496 const std::string& filter_ =
"",
5497 const std::string& orderBy_ =
"",
5498 const std::string& bookmark_ =
"",
5499 int batchSize_ = DEFAULT_BATCH_SIZE,
5500 int topN_ = DEFAULT_TOP_N,
5501 const std::string& options_ =
"",
5502 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5504 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5505 orderBy_, bookmark_, batchSize_,
5506 topN_, options_, timeout_);
5534 const std::string& filter_ =
"",
5535 const std::string& orderBy_ =
"",
5536 const std::string& bookmark_ =
"",
5537 int batchSize_ = DEFAULT_BATCH_SIZE,
5538 int topN_ = DEFAULT_TOP_N,
5539 const std::string& options_ =
"",
5540 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5543 if (_body.get().getDefaultMaxDepth())
5544 result.
maxDepth(_body.get().getDefaultMaxDepth());
5545 result.setSubscription(_body.get().sowAndSubscribe(
5547 topic_, filter_, orderBy_,
5548 bookmark_, batchSize_, topN_,
5549 options_, timeout_,
false));
5555 const std::string& filter_ =
"",
5556 const std::string& orderBy_ =
"",
5557 const std::string& bookmark_ =
"",
5558 int batchSize_ = DEFAULT_BATCH_SIZE,
5559 int topN_ = DEFAULT_TOP_N,
5560 const std::string& options_ =
"",
5561 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5564 if (_body.get().getDefaultMaxDepth())
5565 result.
maxDepth(_body.get().getDefaultMaxDepth());
5566 result.setSubscription(_body.get().sowAndSubscribe(
5568 topic_, filter_, orderBy_,
5569 bookmark_, batchSize_, topN_,
5570 options_, timeout_,
false));
5599 const std::string& topic_,
5600 const std::string& filter_ =
"",
5601 const std::string& orderBy_ =
"",
5602 int batchSize_ = DEFAULT_BATCH_SIZE,
5603 int topN_ = DEFAULT_TOP_N,
5604 const std::string& options_ =
"",
5605 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5607 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5608 filter_, orderBy_, batchSize_,
5609 topN_, options_, timeout_);
5632 const std::string& filter_ =
"",
5633 const std::string& orderBy_ =
"",
5634 int batchSize_ = DEFAULT_BATCH_SIZE,
5635 int topN_ = DEFAULT_TOP_N,
5636 const std::string& options_ =
"",
5637 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5640 if (_body.get().getDefaultMaxDepth())
5641 result.
maxDepth(_body.get().getDefaultMaxDepth());
5642 result.setSubscription(sowAndDeltaSubscribe(result.operator
MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5643 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5645 topic_, filter_, orderBy_,
5646 batchSize_, topN_, options_,
5653 const std::string& filter_ =
"",
5654 const std::string& orderBy_ =
"",
5655 int batchSize_ = DEFAULT_BATCH_SIZE,
5656 int topN_ = DEFAULT_TOP_N,
5657 const std::string& options_ =
"",
5658 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5661 if (_body.get().getDefaultMaxDepth())
5662 result.
maxDepth(_body.get().getDefaultMaxDepth());
5663 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5665 topic_, filter_, orderBy_,
5666 batchSize_, topN_, options_,
5696 const std::string& topic_,
5698 const std::string& filter_ =
"",
5699 int batchSize_ = DEFAULT_BATCH_SIZE,
5700 bool oofEnabled_ =
false,
5701 bool sendEmpties_ =
false,
5702 int topN_ = DEFAULT_TOP_N)
5704 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5705 timeout_, filter_, batchSize_,
5706 oofEnabled_, sendEmpties_,
5733 const std::string& filter_ =
"",
5734 int batchSize_ = DEFAULT_BATCH_SIZE,
5735 bool oofEnabled_ =
false,
5736 bool sendEmpties_ =
false,
5737 int topN_ = DEFAULT_TOP_N)
5740 if (_body.get().getDefaultMaxDepth())
5741 result.
maxDepth(_body.get().getDefaultMaxDepth());
5742 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5744 topic_, timeout_, filter_,
5745 batchSize_, oofEnabled_,
5746 sendEmpties_, topN_,
false));
5772 const std::string& filter_ =
"",
5773 int batchSize_ = DEFAULT_BATCH_SIZE,
5774 bool oofEnabled_ =
false,
5775 bool sendEmpties_ =
false,
5776 int topN_ = DEFAULT_TOP_N)
5779 if (_body.get().getDefaultMaxDepth())
5780 result.
maxDepth(_body.get().getDefaultMaxDepth());
5781 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5783 topic_, timeout_, filter_,
5784 batchSize_, oofEnabled_,
5785 sendEmpties_, topN_,
false));
5808 const std::string& topic,
5809 const std::string& filter,
5812 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5840 stream.setStatsOnly(cid);
5841 _body.get().sowDelete(stream.operator
MessageHandler(),topic,filter,timeout,cid);
5842 return *(stream.
begin());
5844 catch (
const DisconnectedException&)
5846 removeMessageHandler(cid);
5857 _body.get().startTimer();
5868 return _body.get().stopTimer(messageHandler);
5893 const std::string& topic_,
5894 const std::string& keys_,
5897 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5929 stream.setStatsOnly(cid);
5930 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(),topic_,keys_,timeout_,cid);
5931 return *(stream.
begin());
5933 catch (
const DisconnectedException&)
5935 removeMessageHandler(cid);
5955 const std::string& topic_,
const std::string& data_,
5958 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5985 stream.setStatsOnly(cid);
5986 _body.get().sowDeleteByData(stream.operator
MessageHandler(),topic_,data_,timeout_,cid);
5987 return *(stream.
begin());
5989 catch (
const DisconnectedException&)
5991 removeMessageHandler(cid);
6001 return _body.get().getHandle();
6014 _body.get().setExceptionListener(pListener_);
6027 _body.get().setExceptionListener(listener_);
6034 return _body.get().getExceptionListener();
6060 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6084 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6090 setLastChanceMessageHandler(messageHandler);
6097 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6123 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6148 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6210 _body.get().addConnectionStateListener(listener);
6218 _body.get().removeConnectionStateListener(listener);
6225 _body.get().clearConnectionStateListeners();
6255 return _body.get().executeAsync(command_, handler_);
6293 if (command_.isSubscribe())
6295 Message& message = command_.getMessage();
6298 if(useExistingHandler)
6301 if (_body.get()._routes.getRoute(subId, existingHandler))
6304 _body.get().executeAsync(command_, existingHandler,
false);
6309 id = _body.get().executeAsync(command_, handler_,
false);
6311 catch (
const DisconnectedException&)
6313 removeMessageHandler(command_.getMessage().
getCommandId());
6314 if (command_.isSubscribe())
6318 if (command_.isSow())
6320 removeMessageHandler(command_.getMessage().
getQueryID());
6351 _body.get().ack(topic_,bookmark_,options_);
6373 void ack(
const std::string& topic_,
const std::string& bookmark_,
6374 const char* options_ = NULL)
6376 _body.get().ack(
Field(topic_.data(),topic_.length()),
Field(bookmark_.data(),bookmark_.length()),options_);
6384 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
6386 _body.get()._ack(topic_,bookmark_,options_);
6399 _body.get().flushAcks();
6408 return _body.get().getAutoAck();
6418 _body.get().setAutoAck(isAutoAckEnabled_);
6426 return _body.get().getAckBatchSize();
6436 _body.get().setAckBatchSize(ackBatchSize_);
6447 return _body.get().getAckTimeout();
6457 _body.get().setAckTimeout(ackTimeout_);
6471 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6480 return _body.get().getRetryOnDisconnect();
6489 _body.get().setDefaultMaxDepth(maxDepth_);
6498 return _body.get().getDefaultMaxDepth();
6510 return _body.get().setTransportFilterFunction(filter_, userData_);
6524 return _body.get().setThreadCreatedCallback(callback_, userData_);
6532 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
6534 _body.get().deferredExecution(func_,userData_);
6544 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6550 unsigned deliveries = 0;
6562 const char* data = NULL;
6564 const char* status = NULL;
6565 size_t statusLen = 0;
6567 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6570 &status, &statusLen);
6571 if (len == NotEntitled || len == Duplicate ||
6572 (statusLen == Failure && status[0] ==
'f'))
6574 if (_failedWriteHandler)
6576 if (_publishStore.isValid())
6578 amps_uint64_t sequence =
6581 FailedWriteStoreReplayer replayer(
this, data, len);
6582 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
6583 replayer, sequence));
6589 AMPS_CALL_EXCEPTION_WRAPPER(
6590 _failedWriteHandler->failedWrite(emptyMessage,
6596 if (_publishStore.isValid())
6605 _publishStore.discardUpTo(seq);
6609 if (!deliveries && _bookmarkStore.isValid())
6616 const char* bookmarkData = NULL;
6617 size_t bookmarkLen = 0;
6619 &bookmarkData, &bookmarkLen);
6621 if (bookmarkLen > 0 && _routes.hasRoute(subId))
6624 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
6629 catch (std::exception& ex)
6631 AMPS_UNHANDLED_EXCEPTION(ex);
6637 ClientImpl::processedAck(
Message &message)
6639 unsigned deliveries = 0;
6641 const char* data = NULL;
6645 Lock<Mutex> l(_lock);
6648 Lock<Mutex> guard(_ackMapLock);
6649 AckMap::iterator i = _ackMap.find(std::string(data, len));
6650 if (i != _ackMap.end())
6661 ack.setStatus(data, len);
6664 ack.setReason(data, len);
6667 ack.setUsername(data, len);
6670 ack.setPassword(data, len);
6673 ack.setSequenceNo(data, len);
6676 ack.setServerVersion(data, len);
6678 ack.setOptions(data,len);
6680 ack.setBookmark(data,len);
6681 ack.setResponded(
true);
6688 ClientImpl::checkAndSendHeartbeat(
bool force)
6690 if (force || _heartbeatTimer.check())
6692 _heartbeatTimer.start();
6695 sendWithoutRetry(_beatMessage);
6697 catch (
const AMPSException&)
6704 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 6706 ConnectionInfo info;
6707 std::ostringstream writer;
6709 info[
"client.uri"] = _lastUri;
6710 info[
"client.name"] = _name;
6711 info[
"client.username"] = _username;
6712 if(_publishStore.isValid())
6714 writer << _publishStore.unpersistedCount();
6715 info[
"publishStore.unpersistedCount"] = writer.str();
6724 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
6726 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6727 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6728 ClientImpl* me = (ClientImpl*) userData_;
6729 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->processDeferredExecutions());
6732 if(me->_queueAckTimeout) me->checkQueueAcks();
6736 me->_readMessage.replace(messageHandle_);
6737 Message& message = me->_readMessage;
6739 if (commandType & SOWMask)
6741 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6745 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6746 me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6748 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6751 else if (commandType & PublishMask)
6753 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6754 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6755 me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6756 GlobalCommandTypeHandlers::Publish :
6757 GlobalCommandTypeHandlers::OOF)].invoke(message));
6759 const char* subIds = NULL;
6760 size_t subIdsLen = 0;
6763 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
6764 for(
size_t i=0; i<subIdCount; ++i)
6766 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6768 if (handler.isValid())
6771 subIds + lookupResult.idOffset, lookupResult.idLength);
6774 bool isAutoAck = me->_isAutoAckEnabled;
6776 if (!isMessageQueue && !bookmark.
empty() &&
6777 me->_bookmarkStore.isValid())
6779 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6782 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6784 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
6789 me->_bookmarkStore.log(me->_readMessage);
6790 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6791 handler.invoke(message));
6796 if(isMessageQueue && isAutoAck)
6800 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6801 if (!message.getIgnoreAutoAck())
6803 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6807 catch(std::exception& ex)
6809 if (!message.getIgnoreAutoAck())
6811 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6814 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6819 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6820 handler.invoke(message));
6824 else me->lastChance(message);
6827 else if (commandType == Message::Command::Ack)
6829 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6830 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6832 unsigned deliveries = 0U;
6835 case Message::AckType::Persisted:
6836 deliveries += me->persistedAck(message);
6838 case Message::AckType::Processed:
6839 deliveries += me->processedAck(message);
6842 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6843 if (deliveries == 0)
6845 me->lastChance(message);
6848 else if (commandType == Message::Command::Heartbeat)
6850 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6851 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6852 if(me->_heartbeatTimer.getTimeout() != 0.0)
6854 me->checkAndSendHeartbeat(
true);
6858 me->lastChance(message);
6864 unsigned deliveries = 0U;
6867 while(me->_connected)
6871 deliveries = me->_routes.deliverData(message, message.
getCommandId());
6875 catch(MessageStreamFullException&)
6877 catch(MessageStreamFullException& ex_)
6880 me->checkAndSendHeartbeat(
false);
6884 catch (std::exception& ex_)
6888 me->_exceptionListener->exceptionThrown(ex_);
6895 if (deliveries == 0)
6896 me->lastChance(message);
6898 me->checkAndSendHeartbeat();
6903 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
6905 ClientImpl* me = (ClientImpl*) userData;
6908 me->clearAcks(failedConnectionVersion);
6912 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
6914 ClientImpl* me = (ClientImpl*) userData;
6915 Lock<Mutex> l(me->_lock);
6916 Client wrapper(me,
false);
6918 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6921 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6924 AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6925 me->_connected =
false;
6929 Unlock<Mutex> unlock(me->_lock);
6930 me->_disconnectHandler.invoke(wrapper);
6933 catch(
const std::exception& ex)
6935 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6937 me->_lock.signalAll();
6939 if (!me->_connected)
6941 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6942 AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException(
"Reconnect failed."));
6948 if (me->_subscriptionManager)
6953 Unlock<Mutex> unlock(me->_lock);
6954 me->_subscriptionManager->resubscribe(wrapper);
6956 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6960 catch(
const AMPSException& subEx)
6962 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6964 catch(
const std::exception& subEx)
6966 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6989 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
6990 : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6992 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6995 typedef void* difference_type;
6996 typedef std::forward_iterator_tag iterator_category;
6997 typedef std::pair<Message::Field, Message::Field> value_type;
6998 typedef value_type* pointer;
6999 typedef value_type& reference;
7000 bool operator==(
const iterator& rhs)
const 7002 return _pos == rhs._pos;
7004 bool operator!=(
const iterator& rhs)
const 7006 return _pos != rhs._pos;
7008 iterator& operator++()
7011 while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
7013 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
7017 value_type operator*()
const 7020 size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
7021 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
7023 result.first.assign(_data+_pos, keyLength);
7025 if (i < _len && _data[i] ==
'=')
7029 for(; i < _len && _data[i] != _fieldSep; ++i)
7034 result.second.assign(_data+valueStart, valueLength);
7040 class reverse_iterator
7047 typedef std::pair<Message::Field, Message::Field> value_type;
7048 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
7049 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7054 while(_pos >=_data && *_pos == _fieldSep) --_pos;
7055 while(_pos > _data && *_pos != _fieldSep) --_pos;
7059 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7060 if (_pos < _data) _pos = 0;
7063 bool operator==(
const reverse_iterator& rhs)
const 7065 return _pos == rhs._pos;
7067 bool operator!=(
const reverse_iterator& rhs)
const 7069 return _pos != rhs._pos;
7071 reverse_iterator& operator++()
7082 while(_pos >=_data && *_pos == _fieldSep) --_pos;
7084 while(_pos >_data && *_pos != _fieldSep) --_pos;
7085 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7086 if (_pos < _data) _pos = 0;
7090 value_type operator*()
const 7093 size_t keyLength = 0, valueStart = 0, valueLength = 0;
7094 size_t i = (size_t)(_pos - _data);
7095 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
7096 result.first.assign(_pos, keyLength);
7097 if (i<_len && _data[i] ==
'=')
7101 for(; i<_len && _data[i] != _fieldSep; ++i)
7106 result.second.assign(_data+valueStart, valueLength);
7111 : _data(data.
data()), _len(data.
len()),
7112 _fieldSep(fieldSeparator)
7116 FIX(
const char* data,
size_t len,
char fieldSeparator=1)
7117 : _data(data), _len(len), _fieldSep(fieldSeparator)
7121 iterator begin()
const 7123 return iterator(_data, _len, 0, _fieldSep);
7125 iterator end()
const 7127 return iterator(_data, _len, _len, _fieldSep);
7131 reverse_iterator rbegin()
const 7133 return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
7136 reverse_iterator rend()
const 7138 return reverse_iterator(_data, _len, 0, _fieldSep);
7159 std::stringstream _data;
7176 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
7179 _data.write(value+offset, (std::streamsize)length);
7187 void append(
const T& tag,
const std::string& value)
7189 _data << tag <<
'=' << value << _fs;
7198 operator std::string()
const 7206 _data.str(std::string());
7243 typedef std::map<Message::Field, Message::Field>
map_type;
7254 for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7266 std::deque<Message> _q;
7267 std::string _commandId;
7269 std::string _queryId;
7273 unsigned _requestedAcks;
7276 volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7277 typedef std::map<std::string, Message*> SOWKeyMap;
7278 SOWKeyMap _sowKeyMap;
7280 MessageStreamImpl(
const Client& client_)
7283 _maxDepth((
unsigned)~0),
7287 if (_client.isValid())
7293 MessageStreamImpl(ClientImpl* client_)
7296 _maxDepth((
unsigned)~0),
7300 if (_client.isValid())
7306 ~MessageStreamImpl()
7310 virtual void destroy()
7316 catch(std::exception &e)
7320 if (_client.isValid())
7326 if (_client.isValid())
7330 _client = Client((ClientImpl*)NULL);
7331 c.deferredExecution(MessageStreamImpl::destroyer,
this);
7339 static void destroyer(
void* vpMessageStreamImpl_)
7341 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7344 void setSubscription(
const std::string& subId_,
7345 const std::string& commandId_ =
"",
7346 const std::string& queryId_ =
"")
7348 Lock<Mutex> lock(_lock);
7350 if (!commandId_.empty() && commandId_ != subId_)
7351 _commandId = commandId_;
7352 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
7353 _queryId = queryId_;
7355 if (Disconnected == _state)
return;
7356 assert(Unset==_state);
7360 void setSOWOnly(
const std::string& commandId_,
7361 const std::string& queryId_ =
"")
7363 Lock<Mutex> lock(_lock);
7364 _commandId = commandId_;
7365 if (!queryId_.empty() && queryId_ != commandId_)
7366 _queryId = queryId_;
7368 if (Disconnected == _state)
return;
7369 assert(Unset==_state);
7373 void setStatsOnly(
const std::string& commandId_,
7374 const std::string& queryId_ =
"")
7376 Lock<Mutex> lock(_lock);
7377 _commandId = commandId_;
7378 if (!queryId_.empty() && queryId_ != commandId_)
7379 _queryId = queryId_;
7381 if (Disconnected == _state)
return;
7382 assert(Unset==_state);
7384 _requestedAcks = Message::AckType::Stats;
7387 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
7389 Lock<Mutex> lock(_lock);
7390 _commandId = commandId_;
7392 if (Disconnected == _state)
return;
7393 assert(Unset==_state);
7395 _requestedAcks = acks_;
7400 Lock<Mutex> lock(_lock);
7401 if(state_ == AMPS::ConnectionStateListener::Disconnected)
7403 _state = Disconnected;
7409 void timeout(
unsigned timeout_)
7411 _timeout = timeout_;
7415 if(_state == Subscribe) _state = Conflate;
7417 void maxDepth(
unsigned maxDepth_)
7419 if(maxDepth_) _maxDepth = maxDepth_;
7420 else _maxDepth = (unsigned)~0;
7422 unsigned getMaxDepth(
void)
const 7426 unsigned getDepth(
void)
const 7428 return (
unsigned)(_q.size());
7433 Lock<Mutex> lock(_lock);
7434 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
7438 if (_client.isValid())
7440 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7444 catch (AMPSException&)
7446 catch (AMPSException& e)
7449 current_.invalidate();
7450 _previousTopic.
clear();
7451 _previousBookmark.
clear();
7454 _previousTopic.
clear();
7455 _previousBookmark.
clear();
7457 double minWaitTime = (double)((_timeout && _timeout > 1000)
7459 Timer timer(minWaitTime);
7461 while(_q.empty() && _state & Running)
7464 _lock.wait((
long)minWaitTime);
7466 Unlock<Mutex> unlck(_lock);
7467 amps_invoke_waiting_function();
7472 if(timer.checkAndGetRemaining(&minWaitTime))
7480 current_ = _q.front();
7481 if(_q.size() == _maxDepth) _lock.signalAll();
7483 if(_state == Conflate)
7485 std::string sowKey = current_.
getSowKey();
7486 if(sowKey.length()) _sowKeyMap.erase(sowKey);
7488 else if(_state == AcksOnly)
7492 if((_state == AcksOnly && _requestedAcks == 0) ||
7493 (_state == SOWOnly && current_.
getCommand()==
"group_end"))
7497 else if (current_.
getCommandEnum() == Message::Command::Publish &&
7507 if(_state == Disconnected)
7509 throw DisconnectedException(
"Connection closed.");
7511 current_.invalidate();
7512 if(_state == Closed)
7516 return _timeout != 0;
7520 if (_client.isValid())
7522 if (_state == SOWOnly || _state == Subscribe)
7524 if (!_commandId.empty()) _client.
unsubscribe(_commandId);
7526 if (!_queryId.empty()) _client.
unsubscribe(_queryId);
7535 if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7540 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
7542 Lock<Mutex> lock(this_->_lock);
7543 if(this_->_state != Conflate)
7545 AMPS_TESTING_SLOW_MESSAGE_STREAM
7546 if(this_->_q.size() >= this_->_maxDepth)
7551 this_->_lock.signalAll();
7552 throw MessageStreamFullException(
"Stream is currently full.");
7554 this_->_q.push_back(message_.
deepCopy());
7556 this_->_client.isValid() && this_->_client.getAutoAck() &&
7560 message_.setIgnoreAutoAck();
7565 std::string sowKey = message_.
getSowKey();
7568 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7569 if(it != this_->_sowKeyMap.end())
7571 *(it->second) = message_.
deepCopy();
7575 if(this_->_q.size() >= this_->_maxDepth)
7581 this_->_lock.signalAll();
7582 throw MessageStreamFullException(
"Stream is currently full.");
7584 this_->_q.push_back(message_.
deepCopy());
7585 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7590 while(this_->_q.size() >= this_->_maxDepth)
7592 this_->_lock.wait(1);
7594 this_->_q.push_back(message_.
deepCopy());
7597 this_->_lock.signalAll();
7600 inline MessageStream::MessageStream(
void)
7603 inline MessageStream::MessageStream(
const Client& client_)
7604 :_body(
new MessageStreamImpl(client_))
7607 inline void MessageStream::iterator::advance(
void)
7609 _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7613 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
7618 if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7620 result._body = (MessageStreamImpl*)(handler_._userData);
7625 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
7626 const std::string& queryId_)
7628 _body->setSOWOnly(commandId_, queryId_);
7630 inline void MessageStream::setSubscription(
const std::string& subId_,
7631 const std::string& commandId_,
7632 const std::string& queryId_)
7634 _body->setSubscription(subId_, commandId_, queryId_);
7636 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
7637 const std::string& queryId_)
7639 _body->setStatsOnly(commandId_, queryId_);
7641 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
7644 _body->setAcksOnly(commandId_, acks_);
7663 return _body->getMaxDepth();
7667 return _body->getDepth();
7670 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
7672 return *(_pEmptyMessageStream.get());
7680 ClientImpl& body = _body.get();
7681 Message& message = command_.getMessage();
7685 if(useExistingHandler)
7691 if (body._routes.getRoute(subId, existingHandler))
7694 body.executeAsync(command_, existingHandler,
false);
7695 return MessageStream::fromExistingHandler(existingHandler);
7702 if ((command & Message::Command::NoDataCommands)
7703 && (ackTypes == Message::AckType::Persisted
7704 || ackTypes == Message::AckType::None))
7707 if (!body._pEmptyMessageStream)
7709 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
7710 body._pEmptyMessageStream.get()->_body->close();
7712 return body.getEmptyMessageStream();
7715 if (body.getDefaultMaxDepth())
7716 stream.
maxDepth(body.getDefaultMaxDepth());
7718 std::string commandID = body.executeAsync(command_, handler,
false);
7719 if (command_.hasStatsAck())
7721 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
7723 else if (command_.isSow())
7725 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
7727 else if (command_.isSubscribe())
7729 stream.setSubscription(commandID,
7736 if (command == Message::Command::Publish ||
7737 command == Message::Command::DeltaPublish ||
7738 command == Message::Command::SOWDelete)
7740 stream.setAcksOnly(commandID,
7741 ackTypes & (
unsigned)~Message::AckType::Persisted);
7745 stream.setAcksOnly(commandID, ackTypes);
7752 inline void Message::ack(
const char* options_)
const 7754 ClientImpl* pClient = _body.get().clientImpl();
7756 if(pClient && bookmark.
len() &&
7757 !pClient->getAutoAck())
7759 pClient->ack(getTopic(),bookmark,options_);
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:559
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4277
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1261
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5892
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5866
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:512
std::string getAckType() const
Definition: ampsplusplus.hpp:638
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4481
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7157
void startTimer()
Definition: ampsplusplus.hpp:5855
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5412
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:761
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:610
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7651
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1230
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4509
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:486
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6146
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6487
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4386
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5209
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:546
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1223
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:733
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6496
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6361
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5164
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:540
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4605
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4870
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4401
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1142
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4621
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6216
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6445
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4372
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5919
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:951
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7194
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6507
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4757
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4184
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1145
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6162
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6169
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4457
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1232
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5136
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:813
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6469
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7661
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4432
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1045
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5598
Success.
Definition: amps.h:197
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1128
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:707
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4847
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7239
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5256
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:187
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4470
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4180
amps_result
Return values from amps_xxx functions.
Definition: amps.h:192
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4658
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:973
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7675
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:609
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4613
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6521
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:643
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4361
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:608
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1079
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:580
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4918
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:884
const std::string & getNameHash() const
Returns the name hash of this client as generated by the server and returned when the client logged o...
Definition: ampsplusplus.hpp:4330
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:628
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5554
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4260
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6121
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6156
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1138
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:6208
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:501
void clearConnectionStateListeners()
Clear all listeners from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6223
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1230
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4706
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1257
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4569
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4979
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:493
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1148
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6032
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:724
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1075
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:538
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5954
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1225
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:597
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:542
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:905
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:719
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5080
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6176
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4936
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:590
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1139
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1232
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:701
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4341
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7167
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6373
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:714
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:755
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5448
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:838
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7176
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:965
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4561
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:922
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6434
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1224
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1259
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:993
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5830
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1233
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4553
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:536
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6025
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:509
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4581
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6088
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7187
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:6424
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4824
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5533
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:564
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4151
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5770
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:914
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4803
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6253
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5731
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4954
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:566
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5221
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:6416
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5096
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:875
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5695
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6012
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:979
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4636
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4531
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7231
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4540
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1172
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6349
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7243
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4523
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:859
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7250
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:202
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5631
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6082
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:544
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5035
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4143
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6192
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1138
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6200
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:635
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:943
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7656
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:592
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1260
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4439
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5294
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:618
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:576
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7665
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
The operation has not succeeded, but ought to be retried.
Definition: amps.h:221
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4348
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6058
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4408
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:562
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:893
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6184
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:552
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5999
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7204
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4650
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5652
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6455
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:534
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1224
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:6406
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4681
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4315
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:670
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:935
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1021
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:817
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5311
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4730
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6095
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4896
The client and server are disconnected.
Definition: amps.h:225
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5975
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5062
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7646
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5348
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4322
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:407
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5494
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1033
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5005
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5183
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:521
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6287
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5807
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4195
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6478
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:6397
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5380