25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 46 #include <sys/atomic.h> 48 #include "BookmarkStore.hpp" 49 #include "MessageRouter.hpp" 51 #include "ampscrc.hpp" 53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 88 #define AMPS_DEFAULT_TOP_N -1 89 #define AMPS_DEFAULT_BATCH_SIZE 10 90 #define AMPS_NUMBER_BUFFER_LEN 20 91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 98 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
106 typedef std::map<std::string, std::string> ConnectionInfo;
108 class PerThreadMessageTracker {
109 std::vector<AMPS::Message*> _messages;
111 PerThreadMessageTracker() {}
112 ~PerThreadMessageTracker()
114 for (
size_t i=0; i<_messages.size(); ++i)
121 _messages.push_back(message);
125 static AMPS::Mutex _lock;
126 AMPS::Lock<Mutex> l(_lock);
127 _addMessageToCleanupList(message);
131 static PerThreadMessageTracker tracker;
132 tracker.addMessage(message);
137 inline std::string asString(Type x_)
139 std::ostringstream os;
145 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
147 size_t pos = AMPS_NUMBER_BUFFER_LEN;
148 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
152 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
161 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
163 size_t pos = AMPS_NUMBER_BUFFER_LEN;
164 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
168 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
182 static const char* duplicate() {
return "duplicate";}
183 static const char* badFilter() {
return "bad filter";}
184 static const char* badRegexTopic() {
return "bad regex topic";}
185 static const char* subscriptionAlreadyExists() {
return "subscription already exists";}
186 static const char* nameInUse() {
return "name in use";}
187 static const char* authFailure() {
return "auth failure";}
188 static const char* notEntitled() {
return "not entitled";}
189 static const char* authDisabled() {
return "authentication disabled";}
190 static const char* subidInUse() {
return "subid in use";}
191 static const char* noTopic() {
return "no topic";}
206 virtual void exceptionThrown(
const std::exception&)
const {;}
212 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 217 catch (std::exception& ex_)\ 221 _exceptionListener->exceptionThrown(ex_);\ 246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 249 while(me->_connected)\ 256 catch(MessageStreamFullException&)\ 258 me->checkAndSendHeartbeat(false);\ 262 catch (std::exception& ex_)\ 266 me->_exceptionListener->exceptionThrown(ex_);\ 290 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 291 while(me->_connected)\ 298 catch(MessageStreamFullException&)\ 300 me->checkAndSendHeartbeat(false);\ 304 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 307 while(me->_connected)\ 314 catch(MessageStreamFullException& ex_)\ 316 me->checkAndSendHeartbeat(false);\ 320 catch (std::exception& ex_)\ 324 me->_exceptionListener->exceptionThrown(ex_);\ 348 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\ 349 while(me->_connected)\ 356 catch(MessageStreamFullException& ex_)\ 358 me->checkAndSendHeartbeat(false);\ 363 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 366 _exceptionListener->exceptionThrown(ex);\ 371 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 374 me->_exceptionListener->exceptionThrown(ex);\ 413 static const unsigned Subscribe = 1;
414 static const unsigned SOW = 2;
415 static const unsigned NeedsSequenceNumber = 4;
416 static const unsigned ProcessedAck = 8;
417 static const unsigned StatsAck = 16;
418 void init(Message::Command::Type command_)
427 void init(
const std::string& command_)
439 if (!(command & Message::Command::NoDataCommands))
442 if (command == Message::Command::Subscribe ||
443 command == Message::Command::SOWAndSubscribe ||
444 command == Message::Command::DeltaSubscribe ||
445 command == Message::Command::SOWAndDeltaSubscribe)
450 if (command == Message::Command::SOW
451 || command == Message::Command::SOWAndSubscribe
452 || command == Message::Command::SOWAndDeltaSubscribe)
457 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
459 if (command == Message::Command::SOW)
464 _flags |= ProcessedAck;
466 else if (command == Message::Command::SOWDelete)
469 _flags |= ProcessedAck;
470 _flags |= NeedsSequenceNumber;
472 else if (command == Message::Command::Publish
473 || command == Message::Command::DeltaPublish)
475 _flags |= NeedsSequenceNumber;
477 else if (command == Message::Command::StopTimer)
566 std::ostringstream os;
611 if (v_ ==
"processed") _flags |= ProcessedAck;
612 else if (v_ ==
"stats") _flags |= StatsAck;
619 if (v_.find(
"processed") != std::string::npos) _flags |= ProcessedAck;
620 else _flags &= ~ProcessedAck;
621 if (v_.find(
"stats") != std::string::npos) _flags |= StatsAck;
622 else _flags &= ~StatsAck;
629 if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
630 else _flags &= ~ProcessedAck;
631 if (v_ & Message::AckType::Stats) _flags |= StatsAck;
632 else _flags &= ~StatsAck;
646 Message& getMessage(
void) {
return _message; }
647 unsigned getTimeout(
void)
const {
return _timeout; }
648 unsigned getBatchSize(
void)
const {
return _batchSize; }
649 bool isSubscribe(
void)
const 651 return _flags & Subscribe;
653 bool isSow(
void)
const {
return (_flags & SOW) != 0; }
654 bool hasProcessedAck(
void)
const {
return (_flags & ProcessedAck) != 0; }
655 bool hasStatsAck(
void)
const {
return (_flags & StatsAck) != 0; }
656 bool needsSequenceNumber(
void)
const {
return (_flags & NeedsSequenceNumber) != 0; }
661 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
678 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
686 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
693 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
705 std::string
authenticate(
const std::string& ,
const std::string& password_)
712 std::string
retry(
const std::string& ,
const std::string& )
714 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
717 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
738 virtual void execute(
Message& message_) = 0;
753 typedef bool (*PublishStoreResizeHandler)(
Store store_,
762 StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
768 virtual amps_uint64_t store(
const Message& message_) = 0;
774 virtual void discardUpTo(amps_uint64_t index_) = 0;
789 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
795 virtual size_t unpersistedCount()
const = 0;
807 virtual void flush(
long timeout_) = 0;
820 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
825 virtual amps_uint64_t getLastPersisted() = 0;
839 _resizeHandler = handler_;
840 _resizeHandlerData = userData_;
845 return _resizeHandler;
848 bool callResizeHandler(
size_t newSize_);
852 void* _resizeHandlerData;
859 RefHandle<StoreImpl> _body;
863 Store(
const Store& rhs) : _body(rhs._body) {;}
875 return _body.get().store(message_);
884 _body.get().discardUpTo(index_);
893 _body.get().replay(replayer_);
905 return _body.get().replaySingle(replayer_, index_);
914 return _body.get().unpersistedCount();
922 return _body.isValid();
935 return _body.get().flush(timeout_);
943 return _body.get().getLowestUnpersisted();
951 return _body.get().getLastPersisted();
966 _body.get().setResizeHandler(handler_, userData_);
971 return _body.get().getResizeHandler();
1001 virtual void failedWrite(
const Message& message_,
1002 const char* reason_,
size_t reasonLength_) = 0;
1006 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
1009 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
1022 long* timeoutp = (
long*)data_;
1024 if (count == 0)
return false;
1027 store_.
flush(*timeoutp);
1030 catch (
const TimedOutException&)
1032 catch (
const TimedOutException& e)
1055 unsigned requestedAckTypes_) = 0;
1062 virtual void clear() = 0;
1066 virtual void resubscribe(Client& client_) = 0;
1077 typedef enum { Disconnected = 0,
1081 PublishReplayed = 8,
1082 HeartbeatInitiated = 16,
1096 virtual void connectionStateChanged(
State newState_) = 0;
1101 class MessageStreamImpl;
1104 typedef void(*DeferredExecutionFunc)(
void*);
1106 class ClientImpl :
public RefBody
1108 friend class Client;
1111 DisconnectHandler _disconnectHandler;
1112 enum GlobalCommandTypeHandlers :
size_t 1122 DuplicateMessage = 8,
1125 std::vector<MessageHandler> _globalCommandTypeHandlers;
1126 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1128 MessageRouter::RouteCache _routeCache;
1129 mutable Mutex _lock;
1130 std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1132 Store _publishStore;
1133 bool _isRetryOnDisconnect;
1134 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1135 volatile amps_uint64_t _lastSentHaSequenceNumber;
1136 ATOMIC_TYPE_8 _badTimeToHAPublish;
1137 ATOMIC_TYPE_8 _badTimeToHASubscribe;
1138 VersionInfo _serverVersion;
1139 Timer _heartbeatTimer;
1140 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1143 int _queueAckTimeout;
1144 bool _isAutoAckEnabled;
1145 unsigned _ackBatchSize;
1146 unsigned _queuedAckCount;
1147 unsigned _defaultMaxDepth;
1148 struct QueueBookmarks
1150 QueueBookmarks(
const std::string& topic_)
1157 amps_uint64_t _oldestTime;
1158 unsigned _bookmarkCount;
1160 typedef amps_uint64_t topic_hash;
1161 typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1162 TopicHashMap _topicHashMap;
1166 ClientImpl* _client;
1171 ClientStoreReplayer()
1172 : _client(NULL) , _version(0), _res(
AMPS_E_OK)
1175 ClientStoreReplayer(ClientImpl* client_)
1176 : _client(client_) , _version(0), _res(
AMPS_E_OK)
1179 void setClient(ClientImpl* client_) { _client = client_; }
1181 void execute(
Message& message_)
1183 if (!_client)
throw CommandException(
"Can't replay without a client.");
1186 if (index > _client->_lastSentHaSequenceNumber)
1187 _client->_lastSentHaSequenceNumber = index;
1194 (!_client->_badTimeToHAPublish ||
1198 message_.getMessage(),
1202 throw DisconnectedException(
"AMPS Server disconnected during replay");
1208 ClientStoreReplayer _replayer;
1212 ClientImpl* _parent;
1213 const char* _reason;
1214 size_t _reasonLength;
1215 size_t _replayCount;
1217 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1220 _reasonLength(reasonLength_),
1223 void execute(
Message& message_)
1225 if (_parent->_failedWriteHandler)
1228 _parent->_failedWriteHandler->failedWrite(message_,
1229 _reason, _reasonLength);
1232 size_t replayCount(
void)
const {
return _replayCount; }
1235 struct AckResponseImpl :
public RefBody
1237 std::string username, password, reason, status, bookmark, options;
1238 amps_uint64_t sequenceNo;
1239 VersionInfo serverVersion;
1240 volatile bool responded, abandoned;
1241 unsigned connectionVersion;
1244 sequenceNo((amps_uint64_t)0),
1248 connectionVersion(0)
1255 RefHandle<AckResponseImpl> _body;
1257 AckResponse() : _body(NULL) {;}
1258 AckResponse(
const AckResponse& rhs) : _body(rhs._body) {;}
1259 static AckResponse create()
1262 r._body =
new AckResponseImpl();
1266 const std::string& username()
1268 return _body.get().username;
1270 void setUsername(
const char* data_,
size_t len_)
1272 if (data_) _body.get().username.assign(data_, len_);
1273 else _body.get().username.clear();
1275 const std::string& password()
1277 return _body.get().password;
1279 void setPassword(
const char* data_,
size_t len_)
1281 if (data_) _body.get().password.assign(data_, len_);
1282 else _body.get().password.clear();
1284 const std::string& reason()
1286 return _body.get().reason;
1288 void setReason(
const char* data_,
size_t len_)
1290 if (data_) _body.get().reason.assign(data_, len_);
1291 else _body.get().reason.clear();
1293 const std::string& status()
1295 return _body.get().status;
1297 void setStatus(
const char* data_,
size_t len_)
1299 if (data_) _body.get().status.assign(data_, len_);
1300 else _body.get().status.clear();
1302 const std::string& bookmark()
1304 return _body.get().bookmark;
1306 void setBookmark(
const char* data_,
size_t len_)
1308 if (data_) _body.get().bookmark.assign(data_, len_);
1309 else _body.get().bookmark.clear();
1311 amps_uint64_t sequenceNo()
const 1313 return _body.get().sequenceNo;
1315 void setSequenceNo(
const char* data_,
size_t len_)
1317 amps_uint64_t result = (amps_uint64_t)0;
1320 for(
size_t i=0; i<len_; ++i)
1322 result *= (amps_uint64_t)10;
1323 result += (amps_uint64_t)(data_[i] -
'0');
1326 _body.get().sequenceNo = result;
1328 VersionInfo serverVersion()
const 1330 return _body.get().serverVersion;
1332 void setServerVersion(
const char* data_,
size_t len_)
1335 _body.get().serverVersion.setVersion(std::string(data_, len_));
1339 return _body.get().responded;
1341 void setResponded(
bool responded_)
1343 _body.get().responded = responded_;
1347 return _body.get().abandoned;
1349 void setAbandoned(
bool abandoned_)
1351 if (_body.isValid())
1352 _body.get().abandoned = abandoned_;
1355 void setConnectionVersion(
unsigned connectionVersion)
1357 _body.get().connectionVersion = connectionVersion;
1360 unsigned getConnectionVersion()
1362 return _body.get().connectionVersion;
1364 void setOptions(
const char* data_,
size_t len_)
1366 if (data_) _body.get().options.assign(data_,len_);
1367 else _body.get().options.clear();
1370 const std::string& options()
1372 return _body.get().options;
1375 AckResponse& operator=(
const AckResponse& rhs)
1383 typedef std::map<std::string, AckResponse> AckMap;
1386 DefaultExceptionListener _defaultExceptionListener;
1389 struct DeferredExecutionRequest
1391 DeferredExecutionRequest(DeferredExecutionFunc func_,
1394 _userData(userData_)
1397 DeferredExecutionFunc _func;
1401 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1402 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1404 std::string _username;
1405 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1406 ConnectionStateListeners _connectionStateListeners;
1407 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1408 Mutex _deferredExecutionLock;
1409 DeferredExecutionList _deferredExecutionList;
1410 unsigned _heartbeatInterval;
1411 unsigned _readTimeout;
1419 if (!_connected && newState_ > ConnectionStateListener::Connected)
1423 for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1425 AMPS_CALL_EXCEPTION_WRAPPER(
1426 (*it)->connectionStateChanged(newState_));
1429 unsigned processedAck(
Message& message);
1430 unsigned persistedAck(
Message& meesage);
1431 void lastChance(
Message& message);
1432 void checkAndSendHeartbeat(
bool force=
false);
1433 virtual ConnectionInfo getConnectionInfo()
const;
1435 ClientImplMessageHandler(
amps_handle message,
void* userData);
1437 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1439 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1441 void unsubscribeInternal(
const std::string&
id)
1443 if (
id.empty())
return;
1446 subId.assign(
id.data(),
id.length());
1447 _routes.removeRoute(subId);
1449 if (_subscriptionManager)
1452 Unlock<Mutex> unlock(_lock);
1453 _subscriptionManager->unsubscribe(subId);
1459 _sendWithoutRetry(_message);
1460 deferredExecution(&s_noOpFn, NULL);
1463 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1464 bool isHASubscribe_)
1466 return syncAckProcessing(timeout_, message_,
1467 (amps_uint64_t)0, isHASubscribe_);
1470 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1471 amps_uint64_t haSeq = (amps_uint64_t)0,
1472 bool isHASubscribe_ =
false)
1475 AckResponse ack = AckResponse::create();
1478 Lock<Mutex> guard(_ackMapLock);
1481 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1482 if (ack.getConnectionVersion() == 0)
1485 throw DisconnectedException(
"Connection closed while waiting for response.");
1487 bool timedOut =
false;
1488 AMPS_START_TIMER(timeout_)
1489 while(!timedOut && !ack.responded() && !ack.abandoned())
1493 timedOut = !_lock.wait(timeout_);
1495 if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1501 Unlock<Mutex> unlck(_lock);
1502 amps_invoke_waiting_function();
1505 if (ack.responded())
1507 if (ack.status() !=
"failure")
1511 amps_uint64_t ackSequence = ack.sequenceNo();
1512 if (_lastSentHaSequenceNumber < ackSequence)
1514 _lastSentHaSequenceNumber = ackSequence;
1524 _nameHash = ack.bookmark().substr(0, ack.bookmark().find(
'|'));
1525 _serverVersion = ack.serverVersion();
1526 if (_bookmarkStore.isValid())
1531 const std::string& options = ack.options();
1532 size_t index = options.find_first_of(
"max_backlog=");
1533 if(index != std::string::npos)
1536 const char* c = options.c_str()+index+12;
1537 while(*c && *c!=
',')
1539 data = (data*10) + (
unsigned)(*c++-48);
1541 if(_ackBatchSize > data) _ackBatchSize = data;
1546 const size_t NotEntitled = 12;
1547 std::string ackReason = ack.reason();
1548 if (ackReason.length() == 0)
return ack;
1549 if (ackReason.length() == NotEntitled &&
1550 ackReason[0] ==
'n' &&
1555 message_.throwFor(_client, ackReason);
1559 if (!ack.abandoned())
1561 throw TimedOutException(
"timed out waiting for operation.");
1565 throw DisconnectedException(
"Connection closed while waiting for response.");
1573 if (!_client)
return;
1580 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1581 _pEmptyMessageStream.reset(NULL);
1588 ClientImpl(
const std::string& clientName)
1589 : _client(NULL), _name(clientName)
1590 , _isRetryOnDisconnect(
true)
1591 , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1592 , _badTimeToHASubscribe(0), _serverVersion()
1593 , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1594 , _isAutoAckEnabled(
false)
1596 , _queuedAckCount(0)
1597 , _defaultMaxDepth(0)
1599 , _heartbeatInterval(0)
1602 _replayer.setClient(
this);
1607 _exceptionListener = &_defaultExceptionListener;
1608 for (
size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1614 virtual ~ClientImpl()
1619 const std::string& getName()
const 1624 const std::string& getNameHash()
const 1629 void setName(
const std::string& name)
1634 _client, name.c_str());
1637 AMPSException::throwFor(_client, result);
1642 const std::string& getLogonCorrelationData()
const 1644 return _logonCorrelationData;
1647 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
1649 _logonCorrelationData = logonCorrelationData_;
1652 size_t getServerVersion()
const 1654 return _serverVersion.getOldStyleVersion();
1657 VersionInfo getServerVersionInfo()
const 1659 return _serverVersion;
1662 const std::string& getURI()
const 1667 virtual void connect(
const std::string& uri)
1669 Lock<Mutex> l(_lock);
1673 virtual void _connect(
const std::string& uri)
1677 _client, uri.c_str());
1680 AMPSException::throwFor(_client, result);
1687 _readMessage.setClientImpl(
this);
1688 if(_queueAckTimeout)
1693 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1696 void setDisconnected()
1699 Lock<Mutex> l(_lock);
1702 AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
1706 _heartbeatTimer.setTimeout(0.0);
1712 virtual void disconnect()
1715 Lock<Mutex> l(_lock);
1720 AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1722 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1724 AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
1725 Lock<Mutex> l(_lock);
1726 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1729 void clearAcks(
unsigned failedVersion)
1732 Lock<Mutex> guard(_ackMapLock);
1735 std::vector<std::string> worklist;
1736 for(AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
1738 if (i->second.getConnectionVersion() <= failedVersion)
1740 i->second.setAbandoned(
true);
1741 worklist.push_back(i->first);
1745 for(std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
1754 int send(
const Message& message)
1756 Lock<Mutex> l(_lock);
1757 return _send(message);
1760 void sendWithoutRetry(
const Message& message_)
1762 Lock<Mutex> l(_lock);
1763 _sendWithoutRetry(message_);
1766 void _sendWithoutRetry(
const Message& message_)
1771 AMPSException::throwFor(_client,result);
1775 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1776 bool isHASubscribe_ =
false)
1783 Message localMessage = message;
1784 unsigned version = 0;
1788 if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1792 if(!_isRetryOnDisconnect)
1796 Unlock<Mutex> l(_lock);
1807 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1808 (isHASubscribe_ && _badTimeToHASubscribe != 0))
1810 return (
int)version;
1814 if (haSeq > _lastSentHaSequenceNumber)
1816 while (haSeq > _lastSentHaSequenceNumber + 1)
1822 _lastSentHaSequenceNumber+1))
1828 version = _replayer._version;
1831 catch(
const DisconnectedException&)
1833 catch(
const DisconnectedException& e)
1836 result = _replayer._res;
1841 localMessage.getMessage(),
1843 ++_lastSentHaSequenceNumber;
1847 localMessage.getMessage(),
1851 if (!isHASubscribe_ && !haSeq &&
1852 localMessage.getMessage() == message.getMessage())
1856 if(_isRetryOnDisconnect)
1858 Unlock<Mutex> u(_lock);
1863 if ((isHASubscribe_ || haSeq) &&
1866 return (
int)version;
1873 AMPSException::throwFor(_client, result);
1879 amps_invoke_waiting_function();
1883 if (result !=
AMPS_E_OK) AMPSException::throwFor(_client, result);
1884 return (
int)version;
1887 void addMessageHandler(
const Field& commandId_,
1889 unsigned requestedAcks_,
bool isSubscribe_)
1891 Lock<Mutex> lock(_lock);
1892 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1896 bool removeMessageHandler(
const Field& commandId_)
1898 Lock<Mutex> lock(_lock);
1899 return _routes.removeRoute(commandId_);
1907 bool isSubscribe =
false;
1908 bool isSubscribeOnly =
false;
1909 bool replace =
false;
1911 unsigned systemAddedAcks = Message::AckType::None;
1915 case Message::Command::Subscribe:
1916 case Message::Command::DeltaSubscribe:
1917 replace = message_.
getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos;
1920 systemAddedAcks |= Message::AckType::Persisted;
1922 isSubscribeOnly =
true;
1924 case Message::Command::SOWAndSubscribe:
1925 case Message::Command::SOWAndDeltaSubscribe:
1932 while (!replace &&
id != subId && _routes.hasRoute(
id))
1944 case Message::Command::SOW:
1951 while (!replace &&
id != subId && _routes.hasRoute(
id))
1962 if (!isSubscribeOnly)
1971 while (!replace && qid != subId && qid !=
id 1972 && _routes.hasRoute(qid))
1978 systemAddedAcks |= Message::AckType::Processed;
1980 if (!isSubscribeOnly) systemAddedAcks |= Message::AckType::Completed;
1983 int routesAdded = 0;
1984 Lock<Mutex> l(_lock);
1985 if (!subId.
empty() && messageHandler_.isValid())
1987 if (!_routes.hasRoute(subId))
1993 _routes.addRoute(subId, messageHandler_, requestedAcks,
1994 systemAddedAcks, isSubscribe);
1996 if (!isSubscribeOnly && !qid.
empty()
1997 && messageHandler_.isValid() && qid != subId)
1999 if (routesAdded == 0)
2001 _routes.addRoute(qid, messageHandler_,
2002 requestedAcks, systemAddedAcks,
false);
2008 Unlock<Mutex> u(_lock);
2009 data = amps_invoke_copy_route_function(
2010 messageHandler_.userData());
2014 _routes.addRoute(qid, messageHandler_, requestedAcks,
2015 systemAddedAcks,
false);
2019 _routes.addRoute(qid,
2022 requestedAcks, systemAddedAcks,
false);
2027 if (!
id.empty() && messageHandler_.isValid()
2028 && requestedAcks & ~
Message::AckType::Persisted
2029 &&
id != subId &&
id != qid)
2031 if (routesAdded == 0)
2033 _routes.addRoute(
id, messageHandler_, requestedAcks,
2034 systemAddedAcks,
false);
2040 Unlock<Mutex> u(_lock);
2041 data = amps_invoke_copy_route_function(
2042 messageHandler_.userData());
2046 _routes.addRoute(
id, messageHandler_, requestedAcks,
2047 systemAddedAcks,
false);
2051 _routes.addRoute(
id,
2055 systemAddedAcks,
false);
2064 syncAckProcessing(timeout_, message_, 0,
false);
2071 _routes.removeRoute(
id);
2078 case Message::Command::Unsubscribe:
2079 case Message::Command::Heartbeat:
2080 case Message::Command::Logon:
2081 case Message::Command::StartTimer:
2082 case Message::Command::StopTimer:
2083 case Message::Command::DeltaPublish:
2084 case Message::Command::Publish:
2085 case Message::Command::SOWDelete:
2087 Lock<Mutex> l(_lock);
2096 if (messageHandler_.isValid())
2098 _routes.addRoute(
id, messageHandler_, requestedAcks,
2099 Message::AckType::None,
false);
2106 case Message::Command::GroupBegin:
2107 case Message::Command::GroupEnd:
2108 case Message::Command::OOF:
2109 case Message::Command::Ack:
2110 case Message::Command::Unknown:
2112 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
2118 void setDisconnectHandler(
const DisconnectHandler& disconnectHandler)
2120 Lock<Mutex> l(_lock);
2121 _disconnectHandler = disconnectHandler;
2124 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
2126 switch (command_[0])
2128 #if 0 // Not currently implemented to avoid an extra branch in delivery 2130 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2133 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2137 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2139 #if 0 // Not currently implemented to avoid an extra branch in delivery 2141 if (command_[6] ==
'b')
2143 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2145 else if (command_[6] ==
'e')
2147 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2151 std::ostringstream os;
2152 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2153 throw CommandException(os.str());
2157 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2161 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2165 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2169 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2172 std::ostringstream os;
2173 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
2174 throw CommandException(os.str());
2179 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2183 #if 0 // Not currently implemented to avoid an extra branch in delivery 2184 case Message::Command::Publish:
2185 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2187 case Message::Command::SOW:
2188 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2191 case Message::Command::Heartbeat:
2192 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2194 #if 0 // Not currently implemented to avoid an extra branch in delivery 2195 case Message::Command::GroupBegin:
2196 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2198 case Message::Command::GroupEnd:
2199 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2201 case Message::Command::OOF:
2202 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2205 case Message::Command::Ack:
2206 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2210 unsigned command = command_;
2211 while (command > 0) { ++bits; command >>= 1; }
2213 AMPS_snprintf(errBuf,
sizeof(errBuf),
2214 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2215 CommandConstants<0>::Lengths[bits],
2216 CommandConstants<0>::Values[bits]);
2217 throw CommandException(errBuf);
2222 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2224 _globalCommandTypeHandlers[handlerType_] = handler_;
2229 Lock<Mutex> l(_lock);
2230 _failedWriteHandler.reset(handler_);
2233 void setPublishStore(
const Store& publishStore_)
2235 Lock<Mutex> l(_lock);
2236 if (_connected)
throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2237 _publishStore = publishStore_;
2242 Lock<Mutex> l(_lock);
2243 if (_connected)
throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2244 _bookmarkStore = bookmarkStore_;
2249 Lock<Mutex> l(_lock);
2250 _subscriptionManager.reset(subscriptionManager_);
2258 DisconnectHandler getDisconnectHandler()
const 2260 return _disconnectHandler;
2265 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2273 Store getPublishStore()
const 2275 return _publishStore;
2280 return _bookmarkStore;
2283 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2287 Lock<Mutex> l(_lock);
2289 _publishMessage.assignData(data_, dataLen_);
2290 _send(_publishMessage);
2295 if (!publishStoreMessage)
2297 publishStoreMessage =
new Message();
2298 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2300 publishStoreMessage->reset();
2301 publishStoreMessage->setCommandEnum(Message::Command::Publish);
2302 return _publish(topic_, topicLen_, data_, dataLen_);
2306 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2307 size_t dataLen_,
unsigned long expiration_)
2311 Lock<Mutex> l(_lock);
2313 _publishMessage.assignData(data_, dataLen_);
2314 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2315 size_t pos = convertToCharArray(exprBuf, expiration_);
2317 _send(_publishMessage);
2323 if (!publishStoreMessage)
2325 publishStoreMessage =
new Message();
2326 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2328 publishStoreMessage->reset();
2329 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2330 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2331 publishStoreMessage->setCommandEnum(Message::Command::Publish)
2332 .assignExpiration(exprBuf+exprPos,
2333 AMPS_NUMBER_BUFFER_LEN-exprPos);
2334 return _publish(topic_, topicLen_, data_, dataLen_);
2341 ClientImpl* _pClient;
2343 volatile bool _acked;
2344 volatile bool _disconnected;
2346 FlushAckHandler(ClientImpl* pClient_)
2347 : _pClient(pClient_), _cmdId(), _acked(
false), _disconnected(
false)
2349 pClient_->addConnectionStateListener(
this);
2353 _pClient->removeConnectionStateListener(
this);
2354 _pClient->removeMessageHandler(_cmdId);
2357 void setCommandId(
const Field& cmdId_)
2365 void connectionStateChanged(
State state_)
2367 if (state_ <= Shutdown)
2369 _disconnected =
true;
2378 return _acked || _disconnected;
2382 void publishFlush(
long timeout_,
unsigned ackType_)
2384 static const char* processed =
"processed";
2385 static const size_t processedLen = strlen(processed);
2386 static const char* persisted =
"persisted";
2387 static const size_t persistedLen = strlen(persisted);
2388 static const char* flush =
"flush";
2389 static const size_t flushLen = strlen(flush);
2390 static VersionInfo minPersisted(
"5.3.3.0");
2391 static VersionInfo minFlush(
"4");
2392 if (ackType_ != Message::AckType::Processed
2393 && ackType_ != Message::AckType::Persisted)
2395 throw new CommandException(
"Flush can only be used with processed or persisted acks.");
2397 FlushAckHandler flushHandler(
this);
2398 if (_serverVersion >= minFlush)
2400 Lock<Mutex> l(_lock);
2402 throw DisconnectedException(
"Not cconnected trying to flush");
2406 if (_serverVersion < minPersisted
2407 || ackType_ == Message::AckType::Processed)
2417 std::bind(&FlushAckHandler::invoke,
2418 std::ref(flushHandler),
2419 std::placeholders::_1),
2421 if (_send(_message) == -1)
2422 throw DisconnectedException(
"Disconnected trying to flush");
2428 _publishStore.
flush(timeout_);
2430 catch (
const AMPSException& ex)
2432 AMPS_UNHANDLED_EXCEPTION(ex);
2436 else if (_serverVersion < minFlush)
2438 if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2439 else { AMPS_USLEEP(1000 * 1000); }
2444 Timer timer((
double)timeout_);
2446 while (!timer.check() && !flushHandler.done())
2449 amps_invoke_waiting_function();
2454 while (!flushHandler.done())
2457 amps_invoke_waiting_function();
2461 if (!flushHandler.done())
2462 throw TimedOutException(
"Timed out waiting for flush");
2464 if (!flushHandler.acked() && !_publishStore.
isValid())
2465 throw DisconnectedException(
"Disconnected waiting for flush");
2468 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2469 const char* data_,
size_t dataLength_)
2473 Lock<Mutex> l(_lock);
2475 _deltaMessage.assignData(data_, dataLength_);
2476 _send(_deltaMessage);
2481 if (!publishStoreMessage)
2483 publishStoreMessage =
new Message();
2484 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2486 publishStoreMessage->reset();
2487 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2488 return _publish(topic_, topicLength_, data_, dataLength_);
2492 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2493 const char* data_,
size_t dataLength_,
2494 unsigned long expiration_)
2498 Lock<Mutex> l(_lock);
2500 _deltaMessage.assignData(data_, dataLength_);
2501 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2502 size_t pos = convertToCharArray(exprBuf, expiration_);
2504 _send(_deltaMessage);
2510 if (!publishStoreMessage)
2512 publishStoreMessage =
new Message();
2513 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2515 publishStoreMessage->reset();
2516 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2517 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2518 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2519 .assignExpiration(exprBuf+exprPos,
2520 AMPS_NUMBER_BUFFER_LEN-exprPos);
2521 return _publish(topic_, topicLength_, data_, dataLength_);
2525 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
2526 const char* data_,
size_t dataLength_)
2528 publishStoreMessage->assignTopic(topic_, topicLength_)
2529 .setAckTypeEnum(Message::AckType::Persisted)
2530 .assignData(data_, dataLength_);
2531 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
2532 char buf[AMPS_NUMBER_BUFFER_LEN];
2533 size_t pos = convertToCharArray(buf, haSequenceNumber);
2534 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2536 Lock<Mutex> l(_lock);
2537 _send(*publishStoreMessage, haSequenceNumber);
2539 return haSequenceNumber;
2542 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
2543 const char* options_ = NULL)
2545 Lock<Mutex> l(_lock);
2546 return _logon(timeout_, authenticator_, options_);
2549 virtual std::string _logon(
long timeout_,
Authenticator& authenticator_,
2550 const char* options_ = NULL)
2552 AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2558 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 2560 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2563 if(uri.user().size()) _message.
setUserId(uri.user());
2564 if(uri.password().size()) _message.
setPassword(uri.password());
2565 if(uri.protocol() ==
"amps" && uri.messageType().size())
2569 if(uri.isTrue(
"pretty"))
2575 if (!_logonCorrelationData.empty())
2589 AckResponse ack = syncAckProcessing(timeout_, _message);
2590 if (ack.status() ==
"retry")
2592 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
2593 _username = ack.username();
2598 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
2602 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2607 catch(
const AMPSException& ex)
2609 AMPS_UNHANDLED_EXCEPTION(ex);
2621 _publishStore.
replay(_replayer);
2622 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2624 catch(
const StoreException& ex)
2626 std::ostringstream os;
2627 os <<
"A local store exception occurred while logging on." 2629 throw ConnectionException(os.str());
2631 catch(
const AMPSException& ex)
2633 AMPS_UNHANDLED_EXCEPTION(ex);
2636 catch(
const std::exception& ex)
2638 AMPS_UNHANDLED_EXCEPTION(ex);
2646 return newCommandId;
2650 const std::string& topic_,
2652 const std::string& filter_,
2653 const std::string& bookmark_,
2654 const std::string& options_,
2655 const std::string& subId_,
2656 bool isHASubscribe_ =
true)
2658 isHASubscribe_ &= (bool)_subscriptionManager;
2659 Lock<Mutex> l(_lock);
2663 std::string subId(subId_);
2666 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2667 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
2676 unsigned ackTypes = Message::AckType::Processed;
2678 if (!bookmark_.empty() && _bookmarkStore.isValid())
2680 ackTypes |= Message::AckType::Persisted;
2684 if (filter_.length()) _message.
setFilter(filter_);
2685 if (bookmark_.length())
2695 if (_bookmarkStore.isValid())
2700 _bookmarkStore.
log(_message);
2701 _bookmarkStore.
discard(_message);
2707 if (options_.length()) _message.
setOptions(options_);
2713 Unlock<Mutex> u(_lock);
2714 _subscriptionManager->subscribe(messageHandler_, message,
2715 Message::AckType::None);
2716 if (_badTimeToHASubscribe)
return subId;
2721 Message::AckType::None, ackTypes,
true);
2724 if (!options_.empty()) message.
setOptions(options_);
2727 syncAckProcessing(timeout_, message, isHASubscribe_);
2729 catch (
const DisconnectedException&)
2731 if (!isHASubscribe_)
2733 _routes.removeRoute(subIdField);
2738 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2742 catch (
const TimedOutException&)
2744 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2752 Unlock<Mutex> unlock(_lock);
2753 _subscriptionManager->unsubscribe(subIdField);
2755 _routes.removeRoute(subIdField);
2761 std::string deltaSubscribe(
const MessageHandler& messageHandler_,
2762 const std::string& topic_,
2764 const std::string& filter_,
2765 const std::string& bookmark_,
2766 const std::string& options_,
2767 const std::string& subId_ =
"",
2768 bool isHASubscribe_ =
true)
2770 isHASubscribe_ &= (bool)_subscriptionManager;
2771 Lock<Mutex> l(_lock);
2775 std::string subId(subId_);
2785 unsigned ackTypes = Message::AckType::Processed;
2787 if (!bookmark_.empty() && _bookmarkStore.isValid())
2789 ackTypes |= Message::AckType::Persisted;
2792 if (filter_.length()) _message.
setFilter(filter_);
2793 if (bookmark_.length())
2803 if (_bookmarkStore.isValid())
2808 _bookmarkStore.
log(_message);
2809 _bookmarkStore.
discard(_message);
2815 if (options_.length()) _message.
setOptions(options_);
2820 Unlock<Mutex> u(_lock);
2821 _subscriptionManager->subscribe(messageHandler_, message,
2822 Message::AckType::None);
2823 if (_badTimeToHASubscribe)
return subId;
2828 Message::AckType::None, ackTypes,
true);
2831 if (!options_.empty()) message.
setOptions(options_);
2834 syncAckProcessing(timeout_, message, isHASubscribe_);
2836 catch (
const DisconnectedException&)
2838 if (!isHASubscribe_)
2840 _routes.removeRoute(subIdField);
2844 catch (
const TimedOutException&)
2846 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2854 Unlock<Mutex> unlock(_lock);
2855 _subscriptionManager->unsubscribe(subIdField);
2857 _routes.removeRoute(subIdField);
2863 void unsubscribe(
const std::string&
id)
2865 Lock<Mutex> l(_lock);
2866 unsubscribeInternal(
id);
2869 void unsubscribe(
void)
2871 if (_subscriptionManager)
2873 _subscriptionManager->clear();
2876 _routes.unsubscribeAll();
2877 Lock<Mutex> l(_lock);
2882 _sendWithoutRetry(_message);
2884 deferredExecution(&s_noOpFn, NULL);
2888 const std::string& topic_,
2889 const std::string& filter_ =
"",
2890 const std::string& orderBy_ =
"",
2891 const std::string& bookmark_ =
"",
2892 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2893 int topN_ = AMPS_DEFAULT_TOP_N,
2894 const std::string& options_ =
"",
2895 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2897 Lock<Mutex> l(_lock);
2904 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2907 if (filter_.length()) _message.
setFilter(filter_);
2908 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2909 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2912 if (options_.length()) _message.
setOptions(options_);
2914 _routes.addRoute(_message.
getQueryID(), messageHandler_,
2915 Message::AckType::None, ackTypes,
false);
2919 syncAckProcessing(timeout_, _message);
2923 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2931 const std::string& topic_,
2933 const std::string& filter_ =
"",
2934 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2935 int topN_ = AMPS_DEFAULT_TOP_N)
2938 return sow(messageHandler_,
2949 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
2950 const std::string& topic_,
2951 const std::string& filter_ =
"",
2952 const std::string& orderBy_ =
"",
2953 const std::string& bookmark_ =
"",
2954 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2955 int topN_ = AMPS_DEFAULT_TOP_N,
2956 const std::string& options_ =
"",
2957 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2958 bool isHASubscribe_ =
true)
2960 isHASubscribe_ &= (bool)_subscriptionManager;
2961 Lock<Mutex> l(_lock);
2968 std::string subId = cid;
2970 if (filter_.length()) _message.
setFilter(filter_);
2971 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2972 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2975 if (options_.length()) _message.
setOptions(options_);
2981 Unlock<Mutex> u(_lock);
2982 _subscriptionManager->subscribe(messageHandler_, message,
2983 Message::AckType::None);
2984 if (_badTimeToHASubscribe)
return subId;
2986 _routes.addRoute(cid, messageHandler_,
2987 Message::AckType::None, Message::AckType::Processed,
true);
2989 if (!options_.empty()) message.
setOptions(options_);
2992 syncAckProcessing(timeout_, message, isHASubscribe_);
2994 catch (
const DisconnectedException&)
2996 if (!isHASubscribe_)
2998 _routes.removeRoute(subId);
3002 catch (
const TimedOutException&)
3004 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3012 Unlock<Mutex> unlock(_lock);
3013 _subscriptionManager->unsubscribe(cid);
3015 _routes.removeRoute(subId);
3021 std::string sowAndSubscribe(
const MessageHandler& messageHandler_,
3022 const std::string& topic_,
3024 const std::string& filter_ =
"",
3025 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3026 bool oofEnabled_ =
false,
3027 int topN_ = AMPS_DEFAULT_TOP_N,
3028 bool isHASubscribe_ =
true)
3031 return sowAndSubscribe(messageHandler_,
3038 (oofEnabled_ ?
"oof" :
""),
3043 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3044 const std::string& topic_,
3045 const std::string& filter_ =
"",
3046 const std::string& orderBy_ =
"",
3047 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3048 int topN_ = AMPS_DEFAULT_TOP_N,
3049 const std::string& options_ =
"",
3050 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3051 bool isHASubscribe_ =
true)
3053 isHASubscribe_ &= (bool)_subscriptionManager;
3054 Lock<Mutex> l(_lock);
3062 if (filter_.length()) _message.
setFilter(filter_);
3063 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
3066 if (options_.length()) _message.
setOptions(options_);
3071 Unlock<Mutex> u(_lock);
3072 _subscriptionManager->subscribe(messageHandler_, message,
3073 Message::AckType::None);
3074 if (_badTimeToHASubscribe)
return subId;
3076 _routes.addRoute(message.
getQueryID(), messageHandler_,
3077 Message::AckType::None, Message::AckType::Processed,
true);
3079 if (!options_.empty()) message.
setOptions(options_);
3082 syncAckProcessing(timeout_, message, isHASubscribe_);
3084 catch (
const DisconnectedException&)
3086 if (!isHASubscribe_)
3088 _routes.removeRoute(subId);
3092 catch (
const TimedOutException&)
3094 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3102 Unlock<Mutex> unlock(_lock);
3103 _subscriptionManager->unsubscribe(
Field(subId));
3105 _routes.removeRoute(subId);
3111 std::string sowAndDeltaSubscribe(
const MessageHandler& messageHandler_,
3112 const std::string& topic_,
3114 const std::string& filter_ =
"",
3115 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3116 bool oofEnabled_ =
false,
3117 bool sendEmpties_ =
false,
3118 int topN_ = AMPS_DEFAULT_TOP_N,
3119 bool isHASubscribe_ =
true)
3123 if (oofEnabled_) options.
setOOF();
3125 return sowAndDeltaSubscribe(messageHandler_,
3137 const std::string& topic_,
3138 const std::string& filter_,
3144 unsigned ackType = Message::AckType::Processed |
3145 Message::AckType::Stats |
3146 Message::AckType::Persisted;
3147 if (!publishStoreMessage)
3149 publishStoreMessage =
new Message();
3150 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3152 publishStoreMessage->reset();
3153 if (commandId_.
empty())
3155 publishStoreMessage->newCommandId();
3156 commandId_ = publishStoreMessage->getCommandId();
3160 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3162 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3163 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3164 .assignQueryID(commandId_.
data(), commandId_.
len())
3165 .setAckTypeEnum(ackType)
3166 .assignTopic(topic_.c_str(), topic_.length())
3167 .assignFilter(filter_.c_str(), filter_.length());
3168 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3169 char buf[AMPS_NUMBER_BUFFER_LEN];
3170 size_t pos = convertToCharArray(buf, haSequenceNumber);
3171 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3175 Lock<Mutex> l(_lock);
3176 _routes.addRoute(commandId_, messageHandler_,
3177 Message::AckType::Stats,
3178 Message::AckType::Processed|Message::AckType::Persisted,
3180 syncAckProcessing(timeout_, *publishStoreMessage,
3183 catch (
const DisconnectedException&)
3189 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3193 return (std::string)commandId_;
3197 Lock<Mutex> l(_lock);
3199 if (commandId_.
empty())
3210 .assignQueryID(commandId_.
data(), commandId_.
len())
3211 .setAckTypeEnum(Message::AckType::Processed |
3212 Message::AckType::Stats)
3214 .assignFilter(filter_.c_str(), filter_.length());
3215 _routes.addRoute(commandId_, messageHandler_,
3216 Message::AckType::Stats,
3217 Message::AckType::Processed,
3221 syncAckProcessing(timeout_, _message);
3225 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3228 return (std::string)commandId_;
3232 std::string sowDeleteByData(
const MessageHandler& messageHandler_,
3233 const std::string& topic_,
3234 const std::string& data_,
3240 unsigned ackType = Message::AckType::Processed |
3241 Message::AckType::Stats |
3242 Message::AckType::Persisted;
3243 if (!publishStoreMessage)
3245 publishStoreMessage =
new Message();
3246 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3248 publishStoreMessage->reset();
3249 if (commandId_.
empty())
3251 publishStoreMessage->newCommandId();
3252 commandId_ = publishStoreMessage->getCommandId();
3256 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3258 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3259 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3260 .assignQueryID(commandId_.
data(), commandId_.
len())
3261 .setAckTypeEnum(ackType)
3262 .assignTopic(topic_.c_str(), topic_.length())
3263 .assignData(data_.c_str(), data_.length());
3264 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3265 char buf[AMPS_NUMBER_BUFFER_LEN];
3266 size_t pos = convertToCharArray(buf, haSequenceNumber);
3267 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3271 Lock<Mutex> l(_lock);
3272 _routes.addRoute(commandId_, messageHandler_,
3273 Message::AckType::Stats,
3274 Message::AckType::Processed|Message::AckType::Persisted,
3276 syncAckProcessing(timeout_, *publishStoreMessage,
3279 catch (
const DisconnectedException&)
3285 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3289 return (std::string)commandId_;
3293 Lock<Mutex> l(_lock);
3295 if (commandId_.
empty())
3306 .assignQueryID(commandId_.
data(), commandId_.
len())
3307 .setAckTypeEnum(Message::AckType::Processed |
3308 Message::AckType::Stats)
3310 .assignData(data_.c_str(), data_.length());
3311 _routes.addRoute(commandId_, messageHandler_,
3312 Message::AckType::Stats,
3313 Message::AckType::Processed,
3317 syncAckProcessing(timeout_, _message);
3321 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3324 return (std::string)commandId_;
3328 std::string sowDeleteByKeys(
const MessageHandler& messageHandler_,
3329 const std::string& topic_,
3330 const std::string& keys_,
3336 unsigned ackType = Message::AckType::Processed |
3337 Message::AckType::Stats |
3338 Message::AckType::Persisted;
3339 if (!publishStoreMessage)
3341 publishStoreMessage =
new Message();
3342 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3344 publishStoreMessage->reset();
3345 if (commandId_.
empty())
3347 publishStoreMessage->newCommandId();
3348 commandId_ = publishStoreMessage->getCommandId();
3352 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3354 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3355 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3356 .assignQueryID(commandId_.
data(), commandId_.
len())
3357 .setAckTypeEnum(ackType)
3358 .assignTopic(topic_.c_str(), topic_.length())
3359 .assignSowKeys(keys_.c_str(), keys_.length());
3360 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3361 char buf[AMPS_NUMBER_BUFFER_LEN];
3362 size_t pos = convertToCharArray(buf, haSequenceNumber);
3363 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3367 Lock<Mutex> l(_lock);
3368 _routes.addRoute(commandId_, messageHandler_,
3369 Message::AckType::Stats,
3370 Message::AckType::Processed|Message::AckType::Persisted,
3372 syncAckProcessing(timeout_, *publishStoreMessage,
3375 catch (
const DisconnectedException&)
3381 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3385 return (std::string)commandId_;
3389 Lock<Mutex> l(_lock);
3391 if (commandId_.
empty())
3402 .assignQueryID(commandId_.
data(), commandId_.
len())
3403 .setAckTypeEnum(Message::AckType::Processed |
3404 Message::AckType::Stats)
3406 .assignSowKeys(keys_.c_str(), keys_.length());
3407 _routes.addRoute(commandId_, messageHandler_,
3408 Message::AckType::Stats,
3409 Message::AckType::Processed,
3413 syncAckProcessing(timeout_, _message);
3417 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3420 return (std::string)commandId_;
3424 void startTimer(
void)
3426 if (_serverVersion >=
"5.3.2.0")
3428 throw CommandException(
"The start_timer command is deprecated.");
3430 Lock<Mutex> l(_lock);
3439 if (_serverVersion >=
"5.3.2.0")
3441 throw CommandException(
"The stop_timer command is deprecated.");
3443 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
3458 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
3460 _pExceptionListener = pListener_;
3461 _exceptionListener = _pExceptionListener.get();
3466 _exceptionListener = &listener_;
3471 return *_exceptionListener;
3474 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
3476 if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3478 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
3480 Lock<Mutex> l(_lock);
3481 if(_heartbeatInterval != heartbeatInterval_ ||
3482 _readTimeout != readTimeout_)
3484 _heartbeatInterval = heartbeatInterval_;
3485 _readTimeout = readTimeout_;
3490 void _sendHeartbeat(
void)
3492 if (_connected && _heartbeatInterval != 0)
3494 std::ostringstream options;
3495 options <<
"start," << _heartbeatInterval;
3500 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3501 _heartbeatTimer.start();
3504 _sendWithoutRetry(startMessage);
3505 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3507 catch(ConnectionException &ex_)
3511 AMPS_UNHANDLED_EXCEPTION(ex_);
3515 if(_readTimeout && _connected)
3521 AMPSException::throwFor(_client, result);
3527 Lock<Mutex> lock(_lock);
3528 _connectionStateListeners.insert(listener_);
3533 Lock<Mutex> lock(_lock);
3534 _connectionStateListeners.erase(listener_);
3539 unsigned systemAddedAcks_,
bool isSubscribe_)
3541 Message message = command_.getMessage();
3546 bool added = qid.
len() || subid.
len() || cid_.
len();
3548 if (subid.
len() > 0)
3552 addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
3553 systemAddedAcks_, isSubscribe_);
3555 if (qid.
len() > 0 && qid != subid)
3557 while (_routes.hasRoute(qid))
3564 if (addedCount == 0)
3566 _routes.addRoute(qid, handler_, requestedAcks_,
3567 systemAddedAcks_, isSubscribe_);
3573 Unlock<Mutex> u(_lock);
3574 data = amps_invoke_copy_route_function(handler_.userData());
3578 _routes.addRoute(qid, handler_, requestedAcks_,
3579 systemAddedAcks_,
false);
3583 _routes.addRoute(qid,
3587 systemAddedAcks_,
false);
3592 if (cid_.
len() > 0 && cid_ != qid && cid_ != subid
3593 && requestedAcks_ & ~
Message::AckType::Persisted)
3595 while (_routes.hasRoute(cid_))
3599 if (addedCount == 0)
3601 _routes.addRoute(cid_, handler_, requestedAcks_,
3602 systemAddedAcks_,
false);
3608 Unlock<Mutex> u(_lock);
3609 data = amps_invoke_copy_route_function(handler_.userData());
3613 _routes.addRoute(cid_, handler_, requestedAcks_,
3614 systemAddedAcks_,
false);
3618 _routes.addRoute(cid_,
3622 systemAddedAcks_,
false);
3626 else if (commandType == Message::Command::Publish ||
3627 commandType == Message::Command::DeltaPublish)
3630 _routes.addRoute(cid_, handler_, requestedAcks_,
3631 systemAddedAcks_,
false);
3636 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
3641 bool isHASubscribe_ =
true)
3643 isHASubscribe_ &= (bool)_subscriptionManager;
3644 Message& message = command_.getMessage();
3645 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3646 Message::AckType::Processed : Message::AckType::None;
3648 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
3650 if (commandType == Message::Command::SOW
3651 || commandType == Message::Command::SOWAndSubscribe
3652 || commandType == Message::Command::SOWAndDeltaSubscribe
3653 || commandType == Message::Command::StopTimer)
3654 systemAddedAcks |= Message::AckType::Completed;
3656 if (handler_.isValid() && cid.
empty())
3662 if (command_.isSubscribe())
3665 if (_bookmarkStore.isValid())
3667 systemAddedAcks |= Message::AckType::Persisted;
3675 _bookmarkStore.
log(message);
3676 if (!BookmarkRange::isRange(bookmark))
3678 _bookmarkStore.
discard(message);
3692 systemAddedAcks |= Message::AckType::Persisted;
3694 bool isSubscribe = command_.isSubscribe();
3695 if (handler_.isValid() && !isSubscribe)
3697 _registerHandler(command_, cid, handler_,
3698 requestedAcks, systemAddedAcks, isSubscribe);
3700 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
3703 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3706 Unlock<Mutex> u(_lock);
3707 haSequenceNumber = _publishStore.
store(message);
3714 syncAckProcessing((
long)command_.getTimeout(), message,
3717 catch (
const DisconnectedException&)
3723 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3727 else _send(message, haSequenceNumber);
3731 if(command_.isSubscribe())
3736 Unlock<Mutex> u(_lock);
3737 _subscriptionManager->subscribe(handler_,
3740 if (_badTimeToHASubscribe)
3743 return std::string(subId.
data(), subId.
len());
3746 if (handler_.isValid())
3748 _registerHandler(command_, cid, handler_,
3749 requestedAcks, systemAddedAcks, isSubscribe);
3756 syncAckProcessing((
long)command_.getTimeout(), message,
3759 catch (
const DisconnectedException&)
3761 if (!isHASubscribe_)
3763 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3764 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3765 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3770 catch (
const TimedOutException&)
3772 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3773 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3774 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.
getQueryId()));
3782 Unlock<Mutex> unlock(_lock);
3783 _subscriptionManager->unsubscribe(subId);
3787 _routes.removeRoute(cid);
3788 _routes.removeRoute(subId);
3792 else _send(message);
3793 if (subId.
len() > 0)
3796 return std::string(subId.
data(), subId.
len());
3806 syncAckProcessing((
long)(command_.getTimeout()), message);
3808 catch (
const DisconnectedException&)
3810 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3811 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3817 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3818 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.
getQueryId()));
3823 else _send(message);
3833 bool isHASubscribe_ =
true)
3835 Lock<Mutex> lock(_lock);
3836 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3840 void setAutoAck(
bool isAutoAckEnabled_)
3842 _isAutoAckEnabled = isAutoAckEnabled_;
3844 bool getAutoAck(
void)
const 3846 return _isAutoAckEnabled;
3848 void setAckBatchSize(
const unsigned batchSize_)
3850 _ackBatchSize = batchSize_;
3851 if (!_queueAckTimeout)
3853 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3857 unsigned getAckBatchSize(
void)
const 3859 return _ackBatchSize;
3861 int getAckTimeout(
void)
const 3863 return _queueAckTimeout;
3865 void setAckTimeout(
const int ackTimeout_)
3868 _queueAckTimeout = ackTimeout_;
3870 size_t _ack(QueueBookmarks& queueBookmarks_)
3872 if(queueBookmarks_._bookmarkCount)
3874 if (!publishStoreMessage)
3876 publishStoreMessage =
new Message();
3877 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3879 publishStoreMessage->reset();
3880 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3881 .setTopic(queueBookmarks_._topic)
3882 .setBookmark(queueBookmarks_._data)
3883 .setCommandId(
"AMPS-queue-ack");
3884 amps_uint64_t haSequenceNumber = 0;
3887 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3888 publishStoreMessage->setAckType(
"persisted")
3889 .setSequence(haSequenceNumber);
3890 queueBookmarks_._data.erase();
3891 queueBookmarks_._bookmarkCount = 0;
3893 _send(*publishStoreMessage, haSequenceNumber);
3896 queueBookmarks_._data.erase();
3897 queueBookmarks_._bookmarkCount = 0;
3903 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3905 if (_isAutoAckEnabled)
return;
3906 _ack(topic_, bookmark_, options_);
3908 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3910 if (bookmark_.
len() == 0)
return;
3911 Lock<Mutex> lock(_lock);
3912 if(_ackBatchSize < 2 || options_ != NULL)
3914 if (!publishStoreMessage)
3916 publishStoreMessage =
new Message();
3917 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3919 publishStoreMessage->reset();
3920 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3921 .setCommandId(
"AMPS-queue-ack")
3922 .setTopic(topic_).setBookmark(bookmark_);
3923 if (options_) publishStoreMessage->setOptions(options_);
3924 amps_uint64_t haSequenceNumber = 0;
3927 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3928 publishStoreMessage->setAckType(
"persisted")
3929 .setSequence(haSequenceNumber);
3931 _send(*publishStoreMessage, haSequenceNumber);
3935 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(),topic_.
len());
3936 TopicHashMap::iterator it = _topicHashMap.find(hash);
3937 if(it == _topicHashMap.end())
3940 it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3942 QueueBookmarks &queueBookmarks = it->second;
3943 if(queueBookmarks._data.length())
3945 queueBookmarks._data.append(
",");
3949 queueBookmarks._oldestTime = amps_now();
3951 queueBookmarks._data.append(bookmark_);
3952 if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3954 void flushAcks(
void)
3956 size_t sendCount = 0;
3963 Lock<Mutex> lock(_lock);
3964 typedef TopicHashMap::iterator iterator;
3965 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3967 QueueBookmarks& queueBookmarks = it->second;
3968 sendCount += _ack(queueBookmarks);
3971 if(sendCount && _connected) publishFlush(0, Message::AckType::Processed);
3974 void checkQueueAcks(
void)
3976 if(!_topicHashMap.size())
return;
3977 Lock<Mutex> lock(_lock);
3980 amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3981 typedef TopicHashMap::iterator iterator;
3982 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3984 QueueBookmarks& queueBookmarks = it->second;
3985 if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3988 catch(std::exception& ex)
3990 AMPS_UNHANDLED_EXCEPTION(ex);
3994 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
3996 Lock<Mutex> lock(_deferredExecutionLock);
3997 _deferredExecutionList.push_back(
3998 DeferredExecutionRequest(func_,userData_));
4001 inline void processDeferredExecutions(
void)
4003 if(_deferredExecutionList.size())
4005 Lock<Mutex> lock(_deferredExecutionLock);
4006 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4007 DeferredExecutionList::iterator end = _deferredExecutionList.end();
4008 for(; it != end; ++it)
4012 it->_func(it->_userData);
4019 _deferredExecutionList.clear();
4020 _routes.invalidateCache();
4021 _routeCache.invalidateCache();
4025 bool getRetryOnDisconnect(
void)
const 4027 return _isRetryOnDisconnect;
4030 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
4032 _isRetryOnDisconnect = isRetryOnDisconnect_;
4035 void setDefaultMaxDepth(
unsigned maxDepth_)
4037 _defaultMaxDepth = maxDepth_;
4040 unsigned getDefaultMaxDepth(
void)
const 4042 return _defaultMaxDepth;
4134 RefHandle<MessageStreamImpl> _body;
4144 inline void advance(
void);
4156 bool operator==(
const iterator& rhs)
4158 return _pStream == rhs._pStream;
4160 bool operator!=(
const iterator& rhs)
4162 return _pStream != rhs._pStream;
4164 void operator++(
void) { advance(); }
4165 Message operator*(
void) {
return _current; }
4166 Message* operator->(
void) {
return &_current; }
4175 if(!_body.isValid())
4177 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
4206 unsigned getMaxDepth(
void)
const;
4209 unsigned getDepth(
void)
const;
4213 inline void setSOWOnly(
const std::string& commandId_,
4214 const std::string& queryId_ =
"");
4215 inline void setSubscription(
const std::string& subId_,
4216 const std::string& commandId_ =
"",
4217 const std::string& queryId_ =
"");
4218 inline void setStatsOnly(
const std::string& commandId_,
4219 const std::string& queryId_ =
"");
4220 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
4226 friend class Client;
4252 BorrowRefHandle<ClientImpl> _body;
4254 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4255 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4256 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4267 : _body(new ClientImpl(clientName), true)
4270 Client(ClientImpl* existingClient)
4271 : _body(existingClient,
true)
4274 Client(ClientImpl* existingClient,
bool isRef)
4275 : _body(existingClient, isRef)
4278 Client(
const Client& rhs) : _body(rhs._body) {;}
4279 virtual ~Client(
void) {;}
4281 Client& operator=(
const Client& rhs)
4289 return _body.isValid();
4306 _body.get().setName(name);
4313 return _body.get().getName();
4321 return _body.get().getNameHash();
4332 _body.get().setLogonCorrelationData(logonCorrelationData_);
4339 return _body.get().getLogonCorrelationData();
4352 return _body.get().getServerVersion();
4363 return _body.get().getServerVersionInfo();
4377 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4392 return AMPS::convertVersionToNumber(data_, len_);
4399 return _body.get().getURI();
4423 _body.get().connect(uri);
4430 _body.get().disconnect();
4448 _body.get().send(message);
4461 unsigned requestedAcks_,
bool isSubscribe_)
4463 _body.get().addMessageHandler(commandId_, messageHandler_,
4464 requestedAcks_, isSubscribe_);
4472 return _body.get().removeMessageHandler(commandId_);
4500 return _body.get().send(messageHandler, message, timeout);
4514 _body.get().setDisconnectHandler(disconnectHandler);
4522 return _body.get().getDisconnectHandler();
4531 return _body.get().getConnectionInfo();
4544 _body.get().setBookmarkStore(bookmarkStore_);
4552 return _body.
get().getBookmarkStore();
4560 return _body.get().getSubscriptionManager();
4572 _body.get().setSubscriptionManager(subscriptionManager_);
4596 _body.get().setPublishStore(publishStore_);
4604 return _body.
get().getPublishStore();
4612 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4613 duplicateMessageHandler_);
4627 return _body.get().getDuplicateMessageHandler();
4641 _body.get().setFailedWriteHandler(handler_);
4649 return _body.get().getFailedWriteHandler();
4670 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
4672 return _body.get().publish(topic_.c_str(), topic_.length(),
4673 data_.c_str(), data_.length());
4695 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4696 const char* data_,
size_t dataLength_)
4698 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4719 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
4720 unsigned long expiration_)
4722 return _body.get().publish(topic_.c_str(), topic_.length(),
4723 data_.c_str(), data_.length(), expiration_);
4746 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4747 const char* data_,
size_t dataLength_,
4748 unsigned long expiration_)
4750 return _body.get().publish(topic_, topicLength_,
4751 data_, dataLength_, expiration_);
4792 void publishFlush(
long timeout_ = 0,
unsigned ackType_ = Message::AckType::Processed)
4794 _body.get().publishFlush(timeout_, ackType_);
4813 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
4815 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4816 data_.c_str(), data_.length());
4837 const char* data_,
size_t dataLength_)
4839 return _body.get().deltaPublish(topic_, topicLength_,
4840 data_, dataLength_);
4859 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
4860 unsigned long expiration_)
4862 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4863 data_.c_str(), data_.length(),
4886 const char* data_,
size_t dataLength_,
4887 unsigned long expiration_)
4889 return _body.get().deltaPublish(topic_, topicLength_,
4890 data_, dataLength_, expiration_);
4909 const char* options_ = NULL)
4911 return _body.get().logon(timeout_, authenticator_, options_);
4925 std::string
logon(
const char* options_,
int timeout_ = 0)
4943 std::string
logon(
const std::string& options_,
int timeout_ = 0)
4969 const std::string& topic_,
4971 const std::string& filter_=
"",
4972 const std::string& options_ =
"",
4973 const std::string& subId_ =
"")
4975 return _body.get().subscribe(messageHandler_, topic_, timeout_,
4976 filter_,
"", options_, subId_);
4995 long timeout_=0,
const std::string& filter_=
"",
4996 const std::string& options_ =
"",
4997 const std::string& subId_ =
"")
5000 if (_body.get().getDefaultMaxDepth())
5001 result.
maxDepth(_body.get().getDefaultMaxDepth());
5002 result.setSubscription(_body.get().subscribe(
5004 topic_, timeout_, filter_,
"",
5005 options_, subId_,
false));
5025 long timeout_ = 0,
const std::string& filter_ =
"",
5026 const std::string& options_ =
"",
5027 const std::string& subId_ =
"")
5030 if (_body.get().getDefaultMaxDepth())
5031 result.
maxDepth(_body.get().getDefaultMaxDepth());
5032 result.setSubscription(_body.get().subscribe(
5034 topic_, timeout_, filter_,
"",
5035 options_, subId_,
false));
5052 const std::string& topic_,
5054 const std::string& filter_=
"",
5055 const std::string& options_ =
"",
5056 const std::string& subId_ =
"")
5058 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5059 filter_,
"", options_, subId_);
5070 long timeout_,
const std::string& filter_=
"",
5071 const std::string& options_ =
"",
5072 const std::string& subId_ =
"")
5075 if (_body.get().getDefaultMaxDepth())
5076 result.
maxDepth(_body.get().getDefaultMaxDepth());
5077 result.setSubscription(_body.get().deltaSubscribe(
5079 topic_, timeout_, filter_,
"",
5080 options_, subId_,
false));
5086 long timeout_,
const std::string& filter_ =
"",
5087 const std::string& options_ =
"",
5088 const std::string& subId_ =
"")
5091 if (_body.get().getDefaultMaxDepth())
5092 result.
maxDepth(_body.get().getDefaultMaxDepth());
5093 result.setSubscription(_body.get().deltaSubscribe(
5095 topic_, timeout_, filter_,
"",
5096 options_, subId_,
false));
5126 const std::string& topic_,
5128 const std::string& bookmark_,
5129 const std::string& filter_=
"",
5130 const std::string& options_ =
"",
5131 const std::string& subId_ =
"")
5133 return _body.get().subscribe(messageHandler_, topic_, timeout_,
5134 filter_, bookmark_, options_, subId_);
5155 const std::string& bookmark_,
5156 const std::string& filter_=
"",
5157 const std::string& options_ =
"",
5158 const std::string& subId_ =
"")
5161 if (_body.get().getDefaultMaxDepth())
5162 result.
maxDepth(_body.get().getDefaultMaxDepth());
5163 result.setSubscription(_body.get().subscribe(
5165 topic_, timeout_, filter_,
5166 bookmark_, options_,
5174 const std::string& bookmark_,
5175 const std::string& filter_ =
"",
5176 const std::string& options_ =
"",
5177 const std::string& subId_ =
"")
5180 if (_body.get().getDefaultMaxDepth())
5181 result.
maxDepth(_body.get().getDefaultMaxDepth());
5182 result.setSubscription(_body.get().subscribe(
5184 topic_, timeout_, filter_,
5185 bookmark_, options_,
5200 return _body.get().unsubscribe(commandId);
5212 return _body.get().unsubscribe();
5246 const std::string& topic_,
5247 const std::string& filter_ =
"",
5248 const std::string& orderBy_ =
"",
5249 const std::string& bookmark_ =
"",
5250 int batchSize_ = DEFAULT_BATCH_SIZE,
5251 int topN_ = DEFAULT_TOP_N,
5252 const std::string& options_ =
"",
5253 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5255 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5256 bookmark_, batchSize_, topN_, options_,
5284 const std::string& filter_ =
"",
5285 const std::string& orderBy_ =
"",
5286 const std::string& bookmark_ =
"",
5287 int batchSize_ = DEFAULT_BATCH_SIZE,
5288 int topN_ = DEFAULT_TOP_N,
5289 const std::string& options_ =
"",
5290 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5293 if (_body.get().getDefaultMaxDepth())
5294 result.
maxDepth(_body.get().getDefaultMaxDepth());
5295 result.setSOWOnly(sow(result.operator
MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5301 const std::string& filter_ =
"",
5302 const std::string& orderBy_ =
"",
5303 const std::string& bookmark_ =
"",
5304 int batchSize_ = DEFAULT_BATCH_SIZE,
5305 int topN_ = DEFAULT_TOP_N,
5306 const std::string& options_ =
"",
5307 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5310 if (_body.get().getDefaultMaxDepth())
5311 result.
maxDepth(_body.get().getDefaultMaxDepth());
5312 result.setSOWOnly(sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5338 const std::string& topic_,
5340 const std::string& filter_ =
"",
5341 int batchSize_ = DEFAULT_BATCH_SIZE,
5342 int topN_ = DEFAULT_TOP_N)
5344 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5370 const std::string& topic_,
5372 const std::string& filter_ =
"",
5373 int batchSize_ = DEFAULT_BATCH_SIZE,
5374 bool oofEnabled_ =
false,
5375 int topN_ = DEFAULT_TOP_N)
5377 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5378 filter_, batchSize_, oofEnabled_,
5403 const std::string& filter_ =
"",
5404 int batchSize_ = DEFAULT_BATCH_SIZE,
5405 bool oofEnabled_ =
false,
5406 int topN_ = DEFAULT_TOP_N)
5409 if (_body.get().getDefaultMaxDepth())
5410 result.
maxDepth(_body.get().getDefaultMaxDepth());
5411 result.setSubscription(_body.get().sowAndSubscribe(
5413 topic_, timeout_, filter_,
5414 batchSize_, oofEnabled_,
5439 const std::string& filter_ =
"",
5440 int batchSize_ = DEFAULT_BATCH_SIZE,
5441 bool oofEnabled_ =
false,
5442 int topN_ = DEFAULT_TOP_N)
5445 if (_body.get().getDefaultMaxDepth())
5446 result.
maxDepth(_body.get().getDefaultMaxDepth());
5447 result.setSubscription(_body.get().sowAndSubscribe(
5449 topic_, timeout_, filter_,
5450 batchSize_, oofEnabled_,
5484 const std::string& topic_,
5485 const std::string& filter_ =
"",
5486 const std::string& orderBy_ =
"",
5487 const std::string& bookmark_ =
"",
5488 int batchSize_ = DEFAULT_BATCH_SIZE,
5489 int topN_ = DEFAULT_TOP_N,
5490 const std::string& options_ =
"",
5491 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5493 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5494 orderBy_, bookmark_, batchSize_,
5495 topN_, options_, timeout_);
5523 const std::string& filter_ =
"",
5524 const std::string& orderBy_ =
"",
5525 const std::string& bookmark_ =
"",
5526 int batchSize_ = DEFAULT_BATCH_SIZE,
5527 int topN_ = DEFAULT_TOP_N,
5528 const std::string& options_ =
"",
5529 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5532 if (_body.get().getDefaultMaxDepth())
5533 result.
maxDepth(_body.get().getDefaultMaxDepth());
5534 result.setSubscription(_body.get().sowAndSubscribe(
5536 topic_, filter_, orderBy_,
5537 bookmark_, batchSize_, topN_,
5538 options_, timeout_,
false));
5544 const std::string& filter_ =
"",
5545 const std::string& orderBy_ =
"",
5546 const std::string& bookmark_ =
"",
5547 int batchSize_ = DEFAULT_BATCH_SIZE,
5548 int topN_ = DEFAULT_TOP_N,
5549 const std::string& options_ =
"",
5550 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5553 if (_body.get().getDefaultMaxDepth())
5554 result.
maxDepth(_body.get().getDefaultMaxDepth());
5555 result.setSubscription(_body.get().sowAndSubscribe(
5557 topic_, filter_, orderBy_,
5558 bookmark_, batchSize_, topN_,
5559 options_, timeout_,
false));
5588 const std::string& topic_,
5589 const std::string& filter_ =
"",
5590 const std::string& orderBy_ =
"",
5591 int batchSize_ = DEFAULT_BATCH_SIZE,
5592 int topN_ = DEFAULT_TOP_N,
5593 const std::string& options_ =
"",
5594 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5596 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5597 filter_, orderBy_, batchSize_,
5598 topN_, options_, timeout_);
5621 const std::string& filter_ =
"",
5622 const std::string& orderBy_ =
"",
5623 int batchSize_ = DEFAULT_BATCH_SIZE,
5624 int topN_ = DEFAULT_TOP_N,
5625 const std::string& options_ =
"",
5626 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5629 if (_body.get().getDefaultMaxDepth())
5630 result.
maxDepth(_body.get().getDefaultMaxDepth());
5631 result.setSubscription(sowAndDeltaSubscribe(result.operator
MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5632 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5634 topic_, filter_, orderBy_,
5635 batchSize_, topN_, options_,
5642 const std::string& filter_ =
"",
5643 const std::string& orderBy_ =
"",
5644 int batchSize_ = DEFAULT_BATCH_SIZE,
5645 int topN_ = DEFAULT_TOP_N,
5646 const std::string& options_ =
"",
5647 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5650 if (_body.get().getDefaultMaxDepth())
5651 result.
maxDepth(_body.get().getDefaultMaxDepth());
5652 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5654 topic_, filter_, orderBy_,
5655 batchSize_, topN_, options_,
5685 const std::string& topic_,
5687 const std::string& filter_ =
"",
5688 int batchSize_ = DEFAULT_BATCH_SIZE,
5689 bool oofEnabled_ =
false,
5690 bool sendEmpties_ =
false,
5691 int topN_ = DEFAULT_TOP_N)
5693 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5694 timeout_, filter_, batchSize_,
5695 oofEnabled_, sendEmpties_,
5722 const std::string& filter_ =
"",
5723 int batchSize_ = DEFAULT_BATCH_SIZE,
5724 bool oofEnabled_ =
false,
5725 bool sendEmpties_ =
false,
5726 int topN_ = DEFAULT_TOP_N)
5729 if (_body.get().getDefaultMaxDepth())
5730 result.
maxDepth(_body.get().getDefaultMaxDepth());
5731 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5733 topic_, timeout_, filter_,
5734 batchSize_, oofEnabled_,
5735 sendEmpties_, topN_,
false));
5761 const std::string& filter_ =
"",
5762 int batchSize_ = DEFAULT_BATCH_SIZE,
5763 bool oofEnabled_ =
false,
5764 bool sendEmpties_ =
false,
5765 int topN_ = DEFAULT_TOP_N)
5768 if (_body.get().getDefaultMaxDepth())
5769 result.
maxDepth(_body.get().getDefaultMaxDepth());
5770 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5772 topic_, timeout_, filter_,
5773 batchSize_, oofEnabled_,
5774 sendEmpties_, topN_,
false));
5797 const std::string& topic,
5798 const std::string& filter,
5801 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5829 stream.setStatsOnly(cid);
5830 _body.get().sowDelete(stream.operator
MessageHandler(),topic,filter,timeout,cid);
5831 return *(stream.
begin());
5833 catch (
const DisconnectedException&)
5835 removeMessageHandler(cid);
5846 _body.get().startTimer();
5857 return _body.get().stopTimer(messageHandler);
5882 const std::string& topic_,
5883 const std::string& keys_,
5886 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5918 stream.setStatsOnly(cid);
5919 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(),topic_,keys_,timeout_,cid);
5920 return *(stream.
begin());
5922 catch (
const DisconnectedException&)
5924 removeMessageHandler(cid);
5944 const std::string& topic_,
const std::string& data_,
5947 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5974 stream.setStatsOnly(cid);
5975 _body.get().sowDeleteByData(stream.operator
MessageHandler(),topic_,data_,timeout_,cid);
5976 return *(stream.
begin());
5978 catch (
const DisconnectedException&)
5980 removeMessageHandler(cid);
5990 return _body.get().getHandle();
6003 _body.get().setExceptionListener(pListener_);
6016 _body.get().setExceptionListener(listener_);
6023 return _body.get().getExceptionListener();
6049 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6073 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6079 setLastChanceMessageHandler(messageHandler);
6086 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6112 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6137 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6156 _body.get().addConnectionStateListener(listener);
6164 _body.get().removeConnectionStateListener(listener);
6194 return _body.get().executeAsync(command_, handler_);
6232 if (command_.isSubscribe())
6234 Message& message = command_.getMessage();
6237 if(useExistingHandler)
6240 if (_body.get()._routes.getRoute(subId, existingHandler))
6243 _body.get().executeAsync(command_, existingHandler,
false);
6248 id = _body.get().executeAsync(command_, handler_,
false);
6250 catch (
const DisconnectedException&)
6252 removeMessageHandler(command_.getMessage().
getCommandId());
6253 if (command_.isSubscribe())
6257 if (command_.isSow())
6259 removeMessageHandler(command_.getMessage().
getQueryID());
6290 _body.get().ack(topic_,bookmark_,options_);
6312 void ack(
const std::string& topic_,
const std::string& bookmark_,
6313 const char* options_ = NULL)
6315 _body.get().ack(
Field(topic_.data(),topic_.length()),
Field(bookmark_.data(),bookmark_.length()),options_);
6323 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
6325 _body.get()._ack(topic_,bookmark_,options_);
6338 _body.get().flushAcks();
6347 return _body.get().getAutoAck();
6357 _body.get().setAutoAck(isAutoAckEnabled_);
6365 return _body.get().getAckBatchSize();
6375 _body.get().setAckBatchSize(ackBatchSize_);
6386 return _body.get().getAckTimeout();
6396 _body.get().setAckTimeout(ackTimeout_);
6410 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6419 return _body.get().getRetryOnDisconnect();
6428 _body.get().setDefaultMaxDepth(maxDepth_);
6437 return _body.get().getDefaultMaxDepth();
6449 return _body.get().setTransportFilterFunction(filter_, userData_);
6463 return _body.get().setThreadCreatedCallback(callback_, userData_);
6471 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
6473 _body.get().deferredExecution(func_,userData_);
6483 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6489 unsigned deliveries = 0;
6501 const char* data = NULL;
6503 const char* status = NULL;
6504 size_t statusLen = 0;
6506 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6509 &status, &statusLen);
6510 if (len == NotEntitled || len == Duplicate ||
6511 (statusLen == Failure && status[0] ==
'f'))
6513 if (_failedWriteHandler)
6515 if (_publishStore.isValid())
6517 amps_uint64_t sequence =
6520 FailedWriteStoreReplayer replayer(
this, data, len);
6521 AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
6522 replayer, sequence));
6528 AMPS_CALL_EXCEPTION_WRAPPER(
6529 _failedWriteHandler->failedWrite(emptyMessage,
6535 if (_publishStore.isValid())
6544 _publishStore.discardUpTo(seq);
6548 if (!deliveries && _bookmarkStore.isValid())
6555 const char* bookmarkData = NULL;
6556 size_t bookmarkLen = 0;
6558 &bookmarkData, &bookmarkLen);
6560 if (bookmarkLen > 0 && _routes.hasRoute(subId))
6563 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
6568 catch (std::exception& ex)
6570 AMPS_UNHANDLED_EXCEPTION(ex);
6576 ClientImpl::processedAck(
Message &message)
6578 unsigned deliveries = 0;
6580 const char* data = NULL;
6584 Lock<Mutex> l(_lock);
6587 Lock<Mutex> guard(_ackMapLock);
6588 AckMap::iterator i = _ackMap.find(std::string(data, len));
6589 if (i != _ackMap.end())
6600 ack.setStatus(data, len);
6603 ack.setReason(data, len);
6606 ack.setUsername(data, len);
6609 ack.setPassword(data, len);
6612 ack.setSequenceNo(data, len);
6615 ack.setServerVersion(data, len);
6617 ack.setOptions(data,len);
6619 ack.setBookmark(data,len);
6620 ack.setResponded(
true);
6627 ClientImpl::checkAndSendHeartbeat(
bool force)
6629 if (force || _heartbeatTimer.check())
6631 _heartbeatTimer.start();
6634 sendWithoutRetry(_beatMessage);
6636 catch (
const AMPSException&)
6643 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 6645 ConnectionInfo info;
6646 std::ostringstream writer;
6648 info[
"client.uri"] = _lastUri;
6649 info[
"client.name"] = _name;
6650 info[
"client.username"] = _username;
6651 if(_publishStore.isValid())
6653 writer << _publishStore.unpersistedCount();
6654 info[
"publishStore.unpersistedCount"] = writer.str();
6663 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
6665 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6666 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6667 ClientImpl* me = (ClientImpl*) userData_;
6668 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->processDeferredExecutions());
6671 if(me->_queueAckTimeout) me->checkQueueAcks();
6675 me->_readMessage.replace(messageHandle_);
6676 Message& message = me->_readMessage;
6678 if (commandType & SOWMask)
6680 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6684 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6685 me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6687 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6690 else if (commandType & PublishMask)
6692 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6693 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6694 me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6695 GlobalCommandTypeHandlers::Publish :
6696 GlobalCommandTypeHandlers::OOF)].invoke(message));
6698 const char* subIds = NULL;
6699 size_t subIdsLen = 0;
6702 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
6703 for(
size_t i=0; i<subIdCount; ++i)
6705 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6707 if (handler.isValid())
6710 subIds + lookupResult.idOffset, lookupResult.idLength);
6713 bool isAutoAck = me->_isAutoAckEnabled;
6715 if (!isMessageQueue && !bookmark.
empty() &&
6716 me->_bookmarkStore.isValid())
6718 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6721 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6723 AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
6728 me->_bookmarkStore.log(me->_readMessage);
6729 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6730 handler.invoke(message));
6735 if(isMessageQueue && isAutoAck)
6739 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6740 if (!message.getIgnoreAutoAck())
6742 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6746 catch(std::exception& ex)
6748 if (!message.getIgnoreAutoAck())
6750 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6753 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6758 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6759 handler.invoke(message));
6763 else me->lastChance(message);
6766 else if (commandType == Message::Command::Ack)
6768 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6769 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6771 unsigned deliveries = 0U;
6774 case Message::AckType::Persisted:
6775 deliveries += me->persistedAck(message);
6777 case Message::AckType::Processed:
6778 deliveries += me->processedAck(message);
6781 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6782 if (deliveries == 0)
6784 me->lastChance(message);
6787 else if (commandType == Message::Command::Heartbeat)
6789 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6790 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6791 if(me->_heartbeatTimer.getTimeout() != 0.0)
6793 me->checkAndSendHeartbeat(
true);
6797 me->lastChance(message);
6803 unsigned deliveries = 0U;
6806 while(me->_connected)
6810 deliveries = me->_routes.deliverData(message, message.
getCommandId());
6814 catch(MessageStreamFullException&)
6816 catch(MessageStreamFullException& ex_)
6819 me->checkAndSendHeartbeat(
false);
6823 catch (std::exception& ex_)
6827 me->_exceptionListener->exceptionThrown(ex_);
6834 if (deliveries == 0)
6835 me->lastChance(message);
6837 me->checkAndSendHeartbeat();
6842 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
6844 ClientImpl* me = (ClientImpl*) userData;
6847 me->clearAcks(failedConnectionVersion);
6851 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
6853 ClientImpl* me = (ClientImpl*) userData;
6854 Lock<Mutex> l(me->_lock);
6855 Client wrapper(me,
false);
6857 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6860 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6863 AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6864 me->_connected =
false;
6868 Unlock<Mutex> unlock(me->_lock);
6869 me->_disconnectHandler.invoke(wrapper);
6872 catch(
const std::exception& ex)
6874 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6877 if (!me->_connected)
6879 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6880 AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException(
"Reconnect failed."));
6886 if (me->_subscriptionManager)
6891 Unlock<Mutex> unlock(me->_lock);
6892 me->_subscriptionManager->resubscribe(wrapper);
6894 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6898 catch(
const AMPSException& subEx)
6900 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6902 catch(
const std::exception& subEx)
6904 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6927 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
6928 : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6930 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6933 typedef void* difference_type;
6934 typedef std::forward_iterator_tag iterator_category;
6935 typedef std::pair<Message::Field, Message::Field> value_type;
6936 typedef value_type* pointer;
6937 typedef value_type& reference;
6938 bool operator==(
const iterator& rhs)
const 6940 return _pos == rhs._pos;
6942 bool operator!=(
const iterator& rhs)
const 6944 return _pos != rhs._pos;
6946 iterator& operator++()
6949 while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
6951 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6955 value_type operator*()
const 6958 size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
6959 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
6961 result.first.assign(_data+_pos, keyLength);
6963 if (i < _len && _data[i] ==
'=')
6967 for(; i < _len && _data[i] != _fieldSep; ++i)
6972 result.second.assign(_data+valueStart, valueLength);
6978 class reverse_iterator
6985 typedef std::pair<Message::Field, Message::Field> value_type;
6986 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
6987 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
6992 while(_pos >=_data && *_pos == _fieldSep) --_pos;
6993 while(_pos > _data && *_pos != _fieldSep) --_pos;
6997 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6998 if (_pos < _data) _pos = 0;
7001 bool operator==(
const reverse_iterator& rhs)
const 7003 return _pos == rhs._pos;
7005 bool operator!=(
const reverse_iterator& rhs)
const 7007 return _pos != rhs._pos;
7009 reverse_iterator& operator++()
7020 while(_pos >=_data && *_pos == _fieldSep) --_pos;
7022 while(_pos >_data && *_pos != _fieldSep) --_pos;
7023 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7024 if (_pos < _data) _pos = 0;
7028 value_type operator*()
const 7031 size_t keyLength = 0, valueStart = 0, valueLength = 0;
7032 size_t i = (size_t)(_pos - _data);
7033 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
7034 result.first.assign(_pos, keyLength);
7035 if (i<_len && _data[i] ==
'=')
7039 for(; i<_len && _data[i] != _fieldSep; ++i)
7044 result.second.assign(_data+valueStart, valueLength);
7049 : _data(data.
data()), _len(data.
len()),
7050 _fieldSep(fieldSeparator)
7054 FIX(
const char* data,
size_t len,
char fieldSeparator=1)
7055 : _data(data), _len(len), _fieldSep(fieldSeparator)
7059 iterator begin()
const 7061 return iterator(_data, _len, 0, _fieldSep);
7063 iterator end()
const 7065 return iterator(_data, _len, _len, _fieldSep);
7069 reverse_iterator rbegin()
const 7071 return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
7074 reverse_iterator rend()
const 7076 return reverse_iterator(_data, _len, 0, _fieldSep);
7097 std::stringstream _data;
7114 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
7117 _data.write(value+offset, (std::streamsize)length);
7125 void append(
const T& tag,
const std::string& value)
7127 _data << tag <<
'=' << value << _fs;
7136 operator std::string()
const 7144 _data.str(std::string());
7181 typedef std::map<Message::Field, Message::Field>
map_type;
7192 for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7204 std::deque<Message> _q;
7205 std::string _commandId;
7207 std::string _queryId;
7211 unsigned _requestedAcks;
7214 volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7215 typedef std::map<std::string, Message*> SOWKeyMap;
7216 SOWKeyMap _sowKeyMap;
7218 MessageStreamImpl(
const Client& client_)
7221 _maxDepth((
unsigned)~0),
7225 if (_client.isValid())
7231 MessageStreamImpl(ClientImpl* client_)
7234 _maxDepth((
unsigned)~0),
7238 if (_client.isValid())
7244 ~MessageStreamImpl()
7248 virtual void destroy()
7254 catch(std::exception &e)
7258 if (_client.isValid())
7264 if (_client.isValid())
7268 _client = Client((ClientImpl*)NULL);
7269 c.deferredExecution(MessageStreamImpl::destroyer,
this);
7277 static void destroyer(
void* vpMessageStreamImpl_)
7279 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7282 void setSubscription(
const std::string& subId_,
7283 const std::string& commandId_ =
"",
7284 const std::string& queryId_ =
"")
7286 Lock<Mutex> lock(_lock);
7288 if (!commandId_.empty() && commandId_ != subId_)
7289 _commandId = commandId_;
7290 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
7291 _queryId = queryId_;
7293 if (Disconnected == _state)
return;
7294 assert(Unset==_state);
7298 void setSOWOnly(
const std::string& commandId_,
7299 const std::string& queryId_ =
"")
7301 Lock<Mutex> lock(_lock);
7302 _commandId = commandId_;
7303 if (!queryId_.empty() && queryId_ != commandId_)
7304 _queryId = queryId_;
7306 if (Disconnected == _state)
return;
7307 assert(Unset==_state);
7311 void setStatsOnly(
const std::string& commandId_,
7312 const std::string& queryId_ =
"")
7314 Lock<Mutex> lock(_lock);
7315 _commandId = commandId_;
7316 if (!queryId_.empty() && queryId_ != commandId_)
7317 _queryId = queryId_;
7319 if (Disconnected == _state)
return;
7320 assert(Unset==_state);
7322 _requestedAcks = Message::AckType::Stats;
7325 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
7327 Lock<Mutex> lock(_lock);
7328 _commandId = commandId_;
7330 if (Disconnected == _state)
return;
7331 assert(Unset==_state);
7333 _requestedAcks = acks_;
7338 Lock<Mutex> lock(_lock);
7339 if(state_ == AMPS::ConnectionStateListener::Disconnected)
7341 _state = Disconnected;
7347 void timeout(
unsigned timeout_)
7349 _timeout = timeout_;
7353 if(_state == Subscribe) _state = Conflate;
7355 void maxDepth(
unsigned maxDepth_)
7357 if(maxDepth_) _maxDepth = maxDepth_;
7358 else _maxDepth = (unsigned)~0;
7360 unsigned getMaxDepth(
void)
const 7364 unsigned getDepth(
void)
const 7366 return (
unsigned)(_q.size());
7371 Lock<Mutex> lock(_lock);
7372 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
7376 if (_client.isValid())
7378 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7382 catch (AMPSException&)
7384 catch (AMPSException& e)
7387 current_.invalidate();
7388 _previousTopic.
clear();
7389 _previousBookmark.
clear();
7392 _previousTopic.
clear();
7393 _previousBookmark.
clear();
7395 double minWaitTime = (double)((_timeout && _timeout > 1000)
7397 Timer timer(minWaitTime);
7399 while(_q.empty() && _state & Running)
7402 _lock.wait((
long)minWaitTime);
7404 Unlock<Mutex> unlck(_lock);
7405 amps_invoke_waiting_function();
7410 if(timer.checkAndGetRemaining(&minWaitTime))
7418 current_ = _q.front();
7419 if(_q.size() == _maxDepth) _lock.signalAll();
7421 if(_state == Conflate)
7423 std::string sowKey = current_.
getSowKey();
7424 if(sowKey.length()) _sowKeyMap.erase(sowKey);
7426 else if(_state == AcksOnly)
7430 if((_state == AcksOnly && _requestedAcks == 0) ||
7431 (_state == SOWOnly && current_.
getCommand()==
"group_end"))
7435 else if (current_.
getCommandEnum() == Message::Command::Publish &&
7445 if(_state == Disconnected)
7447 throw DisconnectedException(
"Connection closed.");
7449 current_.invalidate();
7450 if(_state == Closed)
7454 return _timeout != 0;
7458 if (_client.isValid())
7460 if (_state == SOWOnly || _state == Subscribe)
7462 if (!_commandId.empty()) _client.
unsubscribe(_commandId);
7464 if (!_queryId.empty()) _client.
unsubscribe(_queryId);
7473 if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7478 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
7480 Lock<Mutex> lock(this_->_lock);
7481 if(this_->_state != Conflate)
7483 AMPS_TESTING_SLOW_MESSAGE_STREAM
7484 if(this_->_q.size() >= this_->_maxDepth)
7489 this_->_lock.signalAll();
7490 throw MessageStreamFullException(
"Stream is currently full.");
7492 this_->_q.push_back(message_.
deepCopy());
7494 this_->_client.isValid() && this_->_client.getAutoAck() &&
7498 message_.setIgnoreAutoAck();
7503 std::string sowKey = message_.
getSowKey();
7506 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7507 if(it != this_->_sowKeyMap.end())
7509 *(it->second) = message_.
deepCopy();
7513 if(this_->_q.size() >= this_->_maxDepth)
7519 this_->_lock.signalAll();
7520 throw MessageStreamFullException(
"Stream is currently full.");
7522 this_->_q.push_back(message_.
deepCopy());
7523 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7528 while(this_->_q.size() >= this_->_maxDepth)
7530 this_->_lock.wait(1);
7532 this_->_q.push_back(message_.
deepCopy());
7535 this_->_lock.signalAll();
7538 inline MessageStream::MessageStream(
void)
7541 inline MessageStream::MessageStream(
const Client& client_)
7542 :_body(
new MessageStreamImpl(client_))
7545 inline void MessageStream::iterator::advance(
void)
7547 _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7551 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
7556 if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7558 result._body = (MessageStreamImpl*)(handler_._userData);
7563 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
7564 const std::string& queryId_)
7566 _body->setSOWOnly(commandId_, queryId_);
7568 inline void MessageStream::setSubscription(
const std::string& subId_,
7569 const std::string& commandId_,
7570 const std::string& queryId_)
7572 _body->setSubscription(subId_, commandId_, queryId_);
7574 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
7575 const std::string& queryId_)
7577 _body->setStatsOnly(commandId_, queryId_);
7579 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
7582 _body->setAcksOnly(commandId_, acks_);
7601 return _body->getMaxDepth();
7605 return _body->getDepth();
7608 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
7610 return *(_pEmptyMessageStream.get());
7618 ClientImpl& body = _body.get();
7619 Message& message = command_.getMessage();
7623 if(useExistingHandler)
7629 if (body._routes.getRoute(subId, existingHandler))
7632 body.executeAsync(command_, existingHandler,
false);
7633 return MessageStream::fromExistingHandler(existingHandler);
7640 if ((command & Message::Command::NoDataCommands)
7641 && (ackTypes == Message::AckType::Persisted
7642 || ackTypes == Message::AckType::None))
7645 if (!body._pEmptyMessageStream)
7647 body._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
7648 body._pEmptyMessageStream.get()->_body->close();
7650 return body.getEmptyMessageStream();
7653 if (body.getDefaultMaxDepth())
7654 stream.
maxDepth(body.getDefaultMaxDepth());
7656 std::string commandID = body.executeAsync(command_, handler,
false);
7657 if (command_.hasStatsAck())
7659 stream.setStatsOnly(commandID, command_.getMessage().
getQueryId());
7661 else if (command_.isSow())
7663 stream.setSOWOnly(commandID, command_.getMessage().
getQueryId());
7665 else if (command_.isSubscribe())
7667 stream.setSubscription(commandID,
7674 if (command == Message::Command::Publish ||
7675 command == Message::Command::DeltaPublish ||
7676 command == Message::Command::SOWDelete)
7678 stream.setAcksOnly(commandID,
7679 ackTypes & (
unsigned)~Message::AckType::Persisted);
7683 stream.setAcksOnly(commandID, ackTypes);
7690 inline void Message::ack(
const char* options_)
const 7692 ClientImpl* pClient = _body.get().clientImpl();
7694 if(pClient && bookmark.
len() &&
7695 !pClient->getAutoAck())
7697 pClient->ack(getTopic(),bookmark,options_);
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:557
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4266
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1261
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5881
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5855
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:512
std::string getAckType() const
Definition: ampsplusplus.hpp:636
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4470
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7095
void startTimer()
Definition: ampsplusplus.hpp:5844
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5401
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:759
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:608
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7589
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1230
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4498
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:486
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6135
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6426
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4375
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5198
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:546
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1223
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:731
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6435
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6300
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5153
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:540
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4594
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4859
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4390
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1142
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4610
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6162
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6384
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4361
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5908
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:949
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7132
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6446
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4746
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4173
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1145
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4446
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1232
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5125
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:811
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6408
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7599
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4421
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1043
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5587
Success.
Definition: amps.h:189
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1128
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:705
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4836
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7177
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5245
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4459
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4169
amps_result
Return values from amps_xxx functions.
Definition: amps.h:184
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4647
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:973
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7613
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:601
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4602
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6460
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:641
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4350
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:606
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1077
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:578
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4907
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:882
const std::string & getNameHash() const
Returns the name hash of this client as generated by the server and returned when the client logged o...
Definition: ampsplusplus.hpp:4319
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:626
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5543
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4249
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6110
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1138
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:6154
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:501
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1230
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4695
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1257
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4558
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4968
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:493
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1148
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6021
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:722
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1073
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:538
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5943
amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1225
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:595
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:542
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:903
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:717
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5069
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4925
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:588
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1139
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1232
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:699
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4330
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7105
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6312
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:712
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:753
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5437
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:836
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7114
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:963
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4550
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:920
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6373
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1224
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1259
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:991
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5819
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1233
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4542
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:536
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6014
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:509
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4570
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6077
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7125
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:6363
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4813
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5522
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:562
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4140
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5759
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:912
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4792
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6192
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5720
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4943
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:564
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5210
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:6355
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5085
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:873
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5684
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6001
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:977
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4625
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4520
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7169
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4529
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1172
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6288
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7181
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4512
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:857
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7188
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:202
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5620
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6071
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:544
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5024
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4132
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1138
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:627
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:941
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7594
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:590
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1260
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4428
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5283
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:616
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:574
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7603
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
The operation has not succeeded, but ought to be retried.
Definition: amps.h:213
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4337
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6047
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4397
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:560
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:891
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:550
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5988
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7142
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4639
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5641
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6394
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:534
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1224
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:6345
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4670
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4304
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:668
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:933
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1019
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:815
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5300
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4719
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6084
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4885
The client and server are disconnected.
Definition: amps.h:217
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5964
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5051
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7584
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5337
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4311
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:407
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5483
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1033
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4994
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5172
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:521
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6226
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5796
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4184
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6417
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:6336
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5369