25 #ifndef _AMPSPLUSPLUS_H_ 26 #define _AMPSPLUSPLUS_H_ 46 #include <sys/atomic.h> 48 #include "BookmarkStore.hpp" 49 #include "MessageRouter.hpp" 51 #include "ampscrc.hpp" 53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM 54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM 82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10 83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960 84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0 85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000 86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200 87 #define AMPS_FLUSH_MIN_VERSION 4000000 88 #define AMPS_MIN_SOW_KEY_PUBLISH_VERSION 4030100 89 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000 90 #define AMPS_DEFAULT_TOP_N -1 91 #define AMPS_DEFAULT_BATCH_SIZE 10 92 #define AMPS_NUMBER_BUFFER_LEN 20 93 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000 95 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64) 100 static __declspec ( thread )
AMPS::Message* publishStoreMessage = 0;
108 typedef std::map<std::string, std::string> ConnectionInfo;
110 class PerThreadMessageTracker {
111 std::vector<AMPS::Message*> _messages;
113 PerThreadMessageTracker() {}
114 ~PerThreadMessageTracker()
116 for (
size_t i=0; i<_messages.size(); ++i)
123 _messages.push_back(message);
127 static AMPS::Mutex _lock;
128 AMPS::Lock<Mutex> l(_lock);
129 _addMessageToCleanupList(message);
133 static PerThreadMessageTracker tracker;
134 tracker.addMessage(message);
139 inline std::string asString(Type x_)
141 std::ostringstream os;
147 size_t convertToCharArray(
char* buf_, amps_uint64_t seqNo_)
149 size_t pos = AMPS_NUMBER_BUFFER_LEN;
150 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
154 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
163 size_t convertToCharArray(
char* buf_,
unsigned long seqNo_)
165 size_t pos = AMPS_NUMBER_BUFFER_LEN;
166 for(
int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
170 buf_[--pos] = (char)(seqNo_ % 10 +
'0');
184 static const char* duplicate() {
return "duplicate";}
185 static const char* badFilter() {
return "bad filter";}
186 static const char* badRegexTopic() {
return "bad regex topic";}
187 static const char* subscriptionAlreadyExists() {
return "subscription already exists";}
188 static const char* nameInUse() {
return "name in use";}
189 static const char* authFailure() {
return "auth failure";}
190 static const char* notEntitled() {
return "not entitled";}
191 static const char* authDisabled() {
return "authentication disabled";}
192 static const char* subidInUse() {
return "subid in use";}
193 static const char* noTopic() {
return "no topic";}
208 virtual void exceptionThrown(
const std::exception&)
const {;}
214 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \ 219 catch (std::exception& ex_)\ 223 _exceptionListener->exceptionThrown(ex_);\ 248 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \ 249 while(me->_connected)\ 256 catch(MessageStreamFullException&)\ 258 me->checkAndSendHeartbeat(false);\ 262 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \ 263 while(me->_connected)\ 270 catch(MessageStreamFullException& ex_)\ 272 me->checkAndSendHeartbeat(false);\ 277 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \ 280 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \ 282 catch (std::exception& ex_)\ 286 me->_exceptionListener->exceptionThrown(ex_);\ 310 #define AMPS_UNHANDLED_EXCEPTION(ex) \ 313 _exceptionListener->exceptionThrown(ex);\ 318 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \ 321 me->_exceptionListener->exceptionThrown(ex);\ 360 static const unsigned Subscribe = 1;
361 static const unsigned SOW = 2;
362 static const unsigned NeedsSequenceNumber = 4;
363 static const unsigned ProcessedAck = 8;
364 static const unsigned StatsAck = 16;
365 void init(Message::Command::Type command_)
374 void init(
const std::string& command_)
386 if (!(command & Message::Command::NoDataCommands))
389 if (command == Message::Command::Subscribe ||
390 command == Message::Command::SOWAndSubscribe ||
391 command == Message::Command::DeltaSubscribe ||
392 command == Message::Command::SOWAndDeltaSubscribe)
397 if (command == Message::Command::SOW
398 || command == Message::Command::SOWAndSubscribe
399 || command == Message::Command::SOWAndDeltaSubscribe)
404 setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
406 if (command == Message::Command::SOW)
411 _flags |= ProcessedAck;
413 else if (command == Message::Command::SOWDelete)
416 _flags |= ProcessedAck;
417 _flags |= NeedsSequenceNumber;
419 else if (command == Message::Command::Publish
420 || command == Message::Command::DeltaPublish)
422 _flags |= NeedsSequenceNumber;
424 else if (command == Message::Command::StopTimer)
513 std::ostringstream os;
558 if (v_ ==
"processed") _flags |= ProcessedAck;
559 else if (v_ ==
"stats") _flags |= StatsAck;
566 if (v_.find(
"processed") != std::string::npos) _flags |= ProcessedAck;
567 else _flags &= ~ProcessedAck;
568 if (v_.find(
"stats") != std::string::npos) _flags |= StatsAck;
569 else _flags &= ~StatsAck;
576 if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
577 else _flags &= ~ProcessedAck;
578 if (v_ & Message::AckType::Stats) _flags |= StatsAck;
579 else _flags &= ~StatsAck;
593 Message& getMessage(
void) {
return _message; }
594 unsigned getTimeout(
void)
const {
return _timeout; }
595 unsigned getBatchSize(
void)
const {
return _batchSize; }
596 bool isSubscribe(
void)
const 598 return _flags & Subscribe;
600 bool isSow(
void)
const {
return (_flags & SOW) != 0; }
601 bool hasProcessedAck(
void)
const {
return (_flags & ProcessedAck) != 0; }
602 bool hasStatsAck(
void)
const {
return (_flags & StatsAck) != 0; }
603 bool needsSequenceNumber(
void)
const {
return (_flags & NeedsSequenceNumber) != 0; }
608 typedef void(*DisconnectHandlerFunc)(
Client&,
void* userData);
625 virtual std::string authenticate(
const std::string& userName_,
const std::string& password_) = 0;
633 virtual std::string retry(
const std::string& userName_,
const std::string& password_) = 0;
640 virtual void completed(
const std::string& userName_,
const std::string& password_,
const std::string& reason_) = 0;
652 std::string
authenticate(
const std::string& ,
const std::string& password_)
659 std::string
retry(
const std::string& ,
const std::string& )
661 throw AuthenticationException(
"retry not implemented by DefaultAuthenticator.");
664 void completed(
const std::string& ,
const std::string& ,
const std::string& ) {;}
685 virtual void execute(
Message& message_) = 0;
700 typedef bool (*PublishStoreResizeHandler)(
Store store_,
709 StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
715 virtual amps_uint64_t store(
const Message& message_) = 0;
721 virtual void discardUpTo(amps_uint64_t index_) = 0;
736 virtual bool replaySingle(
StoreReplayer& replayer_, amps_uint64_t index_) = 0;
742 virtual size_t unpersistedCount()
const = 0;
754 virtual void flush(
long timeout_) = 0;
767 virtual amps_uint64_t getLowestUnpersisted()
const = 0;
772 virtual amps_uint64_t getLastPersisted() = 0;
786 _resizeHandler = handler_;
787 _resizeHandlerData = userData_;
792 return _resizeHandler;
795 bool callResizeHandler(
size_t newSize_);
799 void* _resizeHandlerData;
806 RefHandle<StoreImpl> _body;
821 return _body.get().store(message_);
830 _body.get().discardUpTo(index_);
839 _body.get().replay(replayer_);
851 return _body.get().replaySingle(replayer_, index_);
860 return _body.get().unpersistedCount();
868 return _body.isValid();
881 return _body.get().flush(timeout_);
889 return _body.get().getLowestUnpersisted();
897 return _body.get().getLastPersisted();
912 _body.get().setResizeHandler(handler_, userData_);
917 return _body.get().getResizeHandler();
947 virtual void failedWrite(
const Message& message_,
948 const char* reason_,
size_t reasonLength_) = 0;
952 inline bool StoreImpl::callResizeHandler(
size_t newSize_)
955 return _resizeHandler(
Store(
this), newSize_, _resizeHandlerData);
968 long* timeoutp = (
long*)data_;
970 if (count == 0)
return false;
973 store_.
flush(*timeoutp);
976 catch (
const TimedOutException&)
978 catch (
const TimedOutException& e)
1001 unsigned requestedAckTypes_) = 0;
1008 virtual void clear() = 0;
1012 virtual void resubscribe(Client& client_) = 0;
1023 typedef enum { Disconnected = 0,
1027 PublishReplayed = 8,
1028 HeartbeatInitiated = 16,
1042 virtual void connectionStateChanged(
State newState_) = 0;
1047 class MessageStreamImpl;
1050 typedef void(*DeferredExecutionFunc)(
void*);
1052 class ClientImpl :
public RefBody
1054 friend class Client;
1057 DisconnectHandler _disconnectHandler;
1058 enum GlobalCommandTypeHandlers :
size_t 1068 DuplicateMessage = 8,
1071 std::vector<MessageHandler> _globalCommandTypeHandlers;
1072 Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1074 MessageRouter::RouteCache _routeCache;
1075 mutable Mutex _lock;
1076 std::string _name, _lastUri, _logonCorrelationData;
1078 Store _publishStore;
1079 bool _isRetryOnDisconnect;
1080 amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1081 volatile amps_uint64_t _lastSentHaSequenceNumber;
1082 ATOMIC_TYPE_8 _badTimeToHAPublish;
1083 ATOMIC_TYPE_8 _badTimeToHASubscribe;
1084 VersionInfo _serverVersion;
1085 Timer _heartbeatTimer;
1086 amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1089 int _queueAckTimeout;
1090 bool _isAutoAckEnabled;
1091 unsigned _ackBatchSize;
1092 unsigned _queuedAckCount;
1093 unsigned _defaultMaxDepth;
1094 struct QueueBookmarks
1096 QueueBookmarks(
const std::string topic_)
1103 amps_uint64_t _oldestTime;
1104 unsigned _bookmarkCount;
1106 typedef amps_uint64_t topic_hash;
1107 typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1108 TopicHashMap _topicHashMap;
1112 ClientImpl* _client;
1117 ClientStoreReplayer()
1118 : _client(NULL) , _version(0), _res(
AMPS_E_OK)
1121 ClientStoreReplayer(ClientImpl* client_)
1122 : _client(client_) , _version(0), _res(
AMPS_E_OK)
1125 void setClient(ClientImpl* client_) { _client = client_; }
1127 void execute(
Message& message_)
1129 if (!_client)
throw CommandException(
"Can't replay without a client.");
1132 if (index > _client->_lastSentHaSequenceNumber)
1133 _client->_lastSentHaSequenceNumber = index;
1140 (!_client->_badTimeToHAPublish ||
1141 message_.getOptions().len() < 6))
1144 message_.getMessage(),
1148 throw DisconnectedException(
"AMPS Server disconnected during replay");
1154 ClientStoreReplayer _replayer;
1158 ClientImpl* _parent;
1159 const char* _reason;
1160 size_t _reasonLength;
1161 size_t _replayCount;
1163 FailedWriteStoreReplayer(ClientImpl* parent,
const char* reason_,
size_t reasonLength_)
1166 _reasonLength(reasonLength_),
1169 void execute(
Message& message_)
1171 if (_parent->_failedWriteHandler)
1174 _parent->_failedWriteHandler->failedWrite(message_,
1175 _reason, _reasonLength);
1178 size_t replayCount(
void)
const {
return _replayCount; }
1181 struct AckResponseImpl :
public RefBody
1183 std::string username, password, reason, status, bookmark, options;
1184 amps_uint64_t sequenceNo;
1185 VersionInfo serverVersion;
1186 volatile bool responded, abandoned;
1187 unsigned connectionVersion;
1190 sequenceNo((amps_uint64_t)0),
1194 connectionVersion(0)
1201 RefHandle<AckResponseImpl> _body;
1203 AckResponse() : _body(NULL) {;}
1204 static AckResponse create()
1207 r._body =
new AckResponseImpl();
1211 const std::string& username()
1213 return _body.get().username;
1215 void setUsername(
const char* data_,
size_t len_)
1217 if (data_) _body.get().username.assign(data_, len_);
1218 else _body.get().username.clear();
1220 const std::string& password()
1222 return _body.get().password;
1224 void setPassword(
const char* data_,
size_t len_)
1226 if (data_) _body.get().password.assign(data_, len_);
1227 else _body.get().password.clear();
1229 const std::string& reason()
1231 return _body.get().reason;
1233 void setReason(
const char* data_,
size_t len_)
1235 if (data_) _body.get().reason.assign(data_, len_);
1236 else _body.get().reason.clear();
1238 const std::string& status()
1240 return _body.get().status;
1242 void setStatus(
const char* data_,
size_t len_)
1244 if (data_) _body.get().status.assign(data_, len_);
1245 else _body.get().status.clear();
1247 const std::string& bookmark()
1249 return _body.get().bookmark;
1251 void setBookmark(
const char* data_,
size_t len_)
1253 if (data_) _body.get().bookmark.assign(data_, len_);
1254 else _body.get().bookmark.clear();
1256 amps_uint64_t sequenceNo()
const 1258 return _body.get().sequenceNo;
1260 void setSequenceNo(
const char* data_,
size_t len_)
1262 amps_uint64_t result = (amps_uint64_t)0;
1265 for(
size_t i=0; i<len_; ++i)
1267 result *= (amps_uint64_t)10;
1268 result += (amps_uint64_t)(data_[i] -
'0');
1271 _body.get().sequenceNo = result;
1273 VersionInfo serverVersion()
const 1275 return _body.get().serverVersion;
1277 void setServerVersion(
const char* data_,
size_t len_)
1280 _body.get().serverVersion.setVersion(std::string(data_, len_));
1284 return _body.get().responded;
1286 void setResponded(
bool responded_)
1288 _body.get().responded = responded_;
1292 return _body.get().abandoned;
1294 void setAbandoned(
bool abandoned_)
1296 if (_body.isValid())
1297 _body.get().abandoned = abandoned_;
1300 void setConnectionVersion(
unsigned connectionVersion)
1302 _body.get().connectionVersion = connectionVersion;
1305 unsigned getConnectionVersion()
1307 return _body.get().connectionVersion;
1309 void setOptions(
const char* data_,
size_t len_)
1311 if (data_) _body.get().options.assign(data_,len_);
1312 else _body.get().options.clear();
1315 const std::string& options()
1317 return _body.get().options;
1320 AckResponse& operator=(
const AckResponse& rhs)
1328 typedef std::map<std::string, AckResponse> AckMap;
1330 DefaultExceptionListener _defaultExceptionListener;
1333 struct DeferredExecutionRequest
1335 DeferredExecutionRequest(DeferredExecutionFunc func_,
1338 _userData(userData_)
1341 DeferredExecutionFunc _func;
1345 std::shared_ptr<const ExceptionListener> _pExceptionListener;
1346 amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1348 std::string _username;
1349 typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1350 ConnectionStateListeners _connectionStateListeners;
1351 typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1352 DeferredExecutionList _deferredExecutionList;
1353 unsigned _heartbeatInterval;
1354 unsigned _readTimeout;
1362 if (!_connected && newState_ > ConnectionStateListener::Connected)
1366 for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1368 AMPS_CALL_EXCEPTION_WRAPPER(
1369 (*it)->connectionStateChanged(newState_));
1372 unsigned processedAck(
Message& message);
1373 unsigned persistedAck(
Message& meesage);
1374 void lastChance(
Message& message);
1375 void checkAndSendHeartbeat(
bool force=
false);
1376 virtual ConnectionInfo getConnectionInfo()
const;
1378 ClientImplMessageHandler(
amps_handle message,
void* userData);
1380 ClientImplPreDisconnectHandler(
amps_handle client,
unsigned failedConnectionVersion,
void* userData);
1382 ClientImplDisconnectHandler(
amps_handle client,
void* userData);
1384 void unsubscribeInternal(
const std::string&
id)
1386 if (
id.empty())
return;
1389 subId.assign(
id.data(),
id.length());
1390 _routes.removeRoute(subId);
1392 if (_subscriptionManager)
1395 Unlock<Mutex> unlock(_lock);
1396 _subscriptionManager->unsubscribe(subId);
1402 _sendWithoutRetry(_message);
1405 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1406 bool isHASubscribe_)
1408 return syncAckProcessing(timeout_, message_,
1409 (amps_uint64_t)0, isHASubscribe_);
1412 AckResponse syncAckProcessing(
long timeout_,
Message& message_,
1413 amps_uint64_t haSeq = (amps_uint64_t)0,
1414 bool isHASubscribe_ =
false)
1417 AckResponse ack = AckResponse::create();
1419 ack.setConnectionVersion((
unsigned)_send(message_, haSeq, isHASubscribe_));
1420 if (ack.getConnectionVersion() == 0)
1423 throw DisconnectedException(
"Connection closed while waiting for response.");
1425 bool timedOut =
false;
1426 AMPS_START_TIMER(timeout_)
1427 while(!timedOut && !ack.responded() && !ack.abandoned())
1431 timedOut = !_lock.wait(timeout_);
1433 if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1439 amps_invoke_waiting_function();
1442 if (ack.responded())
1444 if (ack.status() !=
"failure")
1448 amps_uint64_t ackSequence = ack.sequenceNo();
1449 if (_lastSentHaSequenceNumber < ackSequence)
1451 _lastSentHaSequenceNumber = ackSequence;
1461 _serverVersion = ack.serverVersion();
1462 if (_bookmarkStore.isValid())
1467 const std::string& options = ack.options();
1468 size_t index = options.find_first_of(
"max_backlog=");
1469 if(index != std::string::npos)
1472 const char* c = options.c_str()+index+12;
1473 while(*c && *c!=
',')
1475 data = (data*10) + (
unsigned)(*c++-48);
1477 if(_ackBatchSize > data) _ackBatchSize = data;
1482 const size_t NotEntitled = 12;
1483 std::string ackReason = ack.reason();
1484 if (ackReason.length() == 0)
return ack;
1485 if (ackReason.length() == NotEntitled &&
1486 ackReason[0] ==
'n' &&
1491 message_.throwFor(_client, ackReason);
1495 if (!ack.abandoned())
1497 throw TimedOutException(
"timed out waiting for operation.");
1501 throw DisconnectedException(
"Connection closed while waiting for response.");
1509 if (!_client)
return;
1513 AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1514 _pEmptyMessageStream.reset(NULL);
1516 processDeferredExecutions();
1522 ClientImpl(
const std::string& clientName)
1523 : _client(NULL), _name(clientName)
1524 , _isRetryOnDisconnect(
true)
1525 , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1526 , _badTimeToHASubscribe(0), _serverVersion()
1527 , _queueAckTimeout(0)
1528 , _isAutoAckEnabled(
false)
1530 , _queuedAckCount(0)
1531 , _defaultMaxDepth(0)
1533 , _heartbeatInterval(0)
1536 _replayer.setClient(
this);
1541 _exceptionListener = &_defaultExceptionListener;
1542 for (
size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1549 virtual ~ClientImpl()
1555 const std::string& getName()
const 1560 void setName(
const std::string& name)
1565 _client, name.c_str());
1568 AMPSException::throwFor(_client, result);
1573 const std::string& getLogonCorrelationData()
const 1575 return _logonCorrelationData;
1578 void setLogonCorrelationData(
const std::string& logonCorrelationData_)
1580 _logonCorrelationData = logonCorrelationData_;
1583 size_t getServerVersion()
const 1585 return _serverVersion.getOldStyleVersion();
1588 VersionInfo getServerVersionInfo()
const 1590 return _serverVersion;
1593 const std::string& getURI()
const 1595 Lock<Mutex> l(_lock);
1599 virtual void connect(
const std::string& uri)
1601 Lock<Mutex> l(_lock);
1604 _client, uri.c_str());
1607 AMPSException::throwFor(_client, result);
1613 _beatMessage.setOptions(
"beat");
1614 _readMessage.setClientImpl(
this);
1615 if(_queueAckTimeout)
1620 broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1623 void setDisconnected()
1626 Lock<Mutex> l(_lock);
1628 broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
1631 _heartbeatTimer.setTimeout(0.0);
1636 virtual void disconnect()
1639 Lock<Mutex> l(_lock);
1644 AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1646 AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1649 Lock<Mutex> l(_lock);
1650 broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1653 void clearAcks(
unsigned failedVersion)
1656 Lock<Mutex> l(_lock);
1659 std::list<std::string> worklist;
1660 for(AckMap::iterator i = _acks.begin(); i != _acks.end(); i++)
1662 if (i->second.getConnectionVersion() <= failedVersion)
1664 i->second.setAbandoned(
true);
1665 worklist.push_back(i->first);
1669 for(std::list<std::string>::iterator j = worklist.begin(); j != worklist.end(); j++)
1678 int send(
const Message& message)
1680 Lock<Mutex> l(_lock);
1681 return _send(message);
1684 void sendWithoutRetry(
const Message& message_)
1686 Lock<Mutex> l(_lock);
1687 _sendWithoutRetry(message_);
1690 void _sendWithoutRetry(
const Message& message_)
1695 AMPSException::throwFor(_client,result);
1699 int _send(
const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1700 bool isHASubscribe_ =
false)
1707 Message localMessage = message;
1708 unsigned version = 0;
1712 if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1716 if(!_isRetryOnDisconnect)
1720 Unlock<Mutex> l(_lock);
1731 if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1732 (isHASubscribe_ && _badTimeToHASubscribe != 0))
1734 return (
int)version;
1738 if (haSeq > _lastSentHaSequenceNumber)
1740 while (haSeq > _lastSentHaSequenceNumber + 1)
1746 _lastSentHaSequenceNumber+1))
1752 version = _replayer._version;
1755 catch(
const DisconnectedException&)
1757 catch(
const DisconnectedException& e)
1760 result = _replayer._res;
1765 localMessage.getMessage(),
1767 ++_lastSentHaSequenceNumber;
1771 localMessage.getMessage(),
1775 if (!isHASubscribe_ && !haSeq &&
1776 localMessage.getMessage() == message.getMessage())
1780 if(_isRetryOnDisconnect)
1782 Unlock<Mutex> u(_lock);
1787 if ((isHASubscribe_ || haSeq) &&
1790 return (
int)version;
1797 AMPSException::throwFor(_client, result);
1802 amps_invoke_waiting_function();
1805 if (result !=
AMPS_E_OK) AMPSException::throwFor(_client, result);
1806 return (
int)version;
1809 void addMessageHandler(
const Field& commandId_,
1811 unsigned requestedAcks_,
bool isSubscribe_)
1813 Lock<Mutex> lock(_lock);
1814 _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1818 bool removeMessageHandler(
const Field& commandId_)
1820 Lock<Mutex> lock(_lock);
1821 return _routes.removeRoute(commandId_);
1827 bool isSubscribe =
false;
1829 unsigned systemAddedAcks = Message::AckType::None;
1832 case Message::Command::Subscribe:
1833 case Message::Command::DeltaSubscribe:
1836 systemAddedAcks |= Message::AckType::Persisted;
1839 case Message::Command::SOWAndSubscribe:
1840 case Message::Command::SOWAndDeltaSubscribe:
1852 case Message::Command::SOW:
1862 systemAddedAcks |= Message::AckType::Processed;
1864 if (!isSubscribe) systemAddedAcks |= Message::AckType::Completed;
1867 Lock<Mutex> l(_lock);
1868 _routes.addRoute(message_.
getQueryID(), messageHandler_,
1869 requestedAcks, systemAddedAcks, isSubscribe);
1872 messageHandler_.isValid() &&
1876 messageHandler_, requestedAcks,
1877 systemAddedAcks,
true);
1883 syncAckProcessing(timeout_, message_, 0,
false);
1890 _routes.removeRoute(
id);
1897 case Message::Command::Unsubscribe:
1898 case Message::Command::Heartbeat:
1899 case Message::Command::Logon:
1900 case Message::Command::StartTimer:
1901 case Message::Command::StopTimer:
1902 case Message::Command::DeltaPublish:
1903 case Message::Command::Publish:
1904 case Message::Command::SOWDelete:
1906 Lock<Mutex> l(_lock);
1915 if (messageHandler_.isValid())
1917 _routes.addRoute(
id, messageHandler_, requestedAcks,
1918 Message::AckType::None,
false);
1925 case Message::Command::GroupBegin:
1926 case Message::Command::GroupEnd:
1927 case Message::Command::OOF:
1928 case Message::Command::Ack:
1929 case Message::Command::Unknown:
1931 throw CommandException(
"Command type " + message_.
getCommand() +
" can not be sent directly to AMPS");
1937 void setDisconnectHandler(DisconnectHandler disconnectHandler)
1939 Lock<Mutex> l(_lock);
1940 _disconnectHandler = disconnectHandler;
1943 void setGlobalCommandTypeMessageHandler(
const std::string& command_,
const MessageHandler& handler_)
1945 switch (command_[0])
1947 #if 0 // Not currently implemented to avoid an extra branch in delivery 1949 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
1952 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
1956 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
1958 #if 0 // Not currently implemented to avoid an extra branch in delivery 1960 if (command_[6] ==
'b')
1962 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
1964 else if (command_[6] ==
'e')
1966 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
1970 std::ostringstream os;
1971 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
1972 throw CommandException(os.str());
1976 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
1980 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
1984 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
1988 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
1991 std::ostringstream os;
1992 os <<
"Invalid command '" << command_ <<
"' passed to setGlobalCommandTypeHandler";
1993 throw CommandException(os.str());
1998 void setGlobalCommandTypeMessageHandler(
const Message::Command::Type command_,
const MessageHandler& handler_)
2002 #if 0 // Not currently implemented to avoid an extra branch in delivery 2003 case Message::Command::Publish:
2004 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2006 case Message::Command::SOW:
2007 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2010 case Message::Command::Heartbeat:
2011 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2013 #if 0 // Not currently implemented to avoid an extra branch in delivery 2014 case Message::Command::GroupBegin:
2015 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2017 case Message::Command::GroupEnd:
2018 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2020 case Message::Command::OOF:
2021 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2024 case Message::Command::Ack:
2025 _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2029 unsigned command = command_;
2030 while (command > 0) { ++bits; command >>= 1; }
2032 AMPS_snprintf(errBuf,
sizeof(errBuf),
2033 "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2034 CommandConstants<0>::Lengths[bits],
2035 CommandConstants<0>::Values[bits]);
2036 throw CommandException(errBuf);
2041 void setGlobalCommandTypeMessageHandler(
const GlobalCommandTypeHandlers handlerType_,
const MessageHandler& handler_)
2043 _globalCommandTypeHandlers[handlerType_] = handler_;
2048 Lock<Mutex> l(_lock);
2049 _failedWriteHandler.reset(handler_);
2052 void setPublishStore(
const Store& publishStore_)
2054 Lock<Mutex> l(_lock);
2055 if (_connected)
throw AlreadyConnectedException(
"Setting a publish store on a connected client is undefined behavior");
2056 _publishStore = publishStore_;
2061 Lock<Mutex> l(_lock);
2062 if (_connected)
throw AlreadyConnectedException(
"Setting a bookmark store on a connected client is undefined behavior");
2063 _bookmarkStore = bookmarkStore_;
2068 Lock<Mutex> l(_lock);
2069 _subscriptionManager.reset(subscriptionManager_);
2074 Lock<Mutex> l(_lock);
2075 return _subscriptionManager.get();
2078 DisconnectHandler getDisconnectHandler()
2080 Lock<Mutex> l(_lock);
2081 return _disconnectHandler;
2086 return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2091 Lock<Mutex> l(_lock);
2092 return _failedWriteHandler.get();
2095 Store getPublishStore()
2097 Lock<Mutex> l(_lock);
2098 return _publishStore;
2103 Lock<Mutex> l(_lock);
2104 return _bookmarkStore;
2107 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
size_t dataLen_)
2111 Lock<Mutex> l(_lock);
2113 _publishMessage.assignData(data_, dataLen_);
2114 _send(_publishMessage);
2119 if (!publishStoreMessage)
2121 publishStoreMessage =
new Message();
2122 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2124 publishStoreMessage->reset();
2125 publishStoreMessage->setCommandEnum(Message::Command::Publish);
2126 return _publish(topic_, topicLen_, data_, dataLen_);
2130 amps_uint64_t publish(
const char* topic_,
size_t topicLen_,
const char* data_,
2131 size_t dataLen_,
unsigned long expiration_)
2135 Lock<Mutex> l(_lock);
2137 _publishMessage.assignData(data_, dataLen_);
2138 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2139 size_t pos = convertToCharArray(exprBuf, expiration_);
2141 _send(_publishMessage);
2147 if (!publishStoreMessage)
2149 publishStoreMessage =
new Message();
2150 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2152 publishStoreMessage->reset();
2153 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2154 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2155 publishStoreMessage->setCommandEnum(Message::Command::Publish)
2156 .assignExpiration(exprBuf+exprPos,
2157 AMPS_NUMBER_BUFFER_LEN-exprPos);
2158 return _publish(topic_, topicLen_, data_, dataLen_);
2162 void publishFlush(
long timeout_)
2164 static const char* processed =
"processed";
2165 static const size_t processedLen = strlen(processed);
2168 if (_serverVersion.getOldStyleVersion() >= AMPS_FLUSH_MIN_VERSION)
2170 Lock<Mutex> l(_lock);
2174 static const char* flush =
"flush";
2175 static const size_t flushLen = strlen(flush);
2179 syncAckProcessing(timeout_, _message);
2181 catch(
const AMPSException& ex)
2183 AMPS_UNHANDLED_EXCEPTION(ex);
2189 if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2190 else { AMPS_USLEEP(1000 * 1000); }
2198 _publishStore.
flush(timeout_);
2200 catch (
const AMPSException& ex)
2202 AMPS_UNHANDLED_EXCEPTION(ex);
2208 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2209 const char* data_,
size_t dataLength_)
2213 Lock<Mutex> l(_lock);
2215 _deltaMessage.assignData(data_, dataLength_);
2216 _send(_deltaMessage);
2221 if (!publishStoreMessage)
2223 publishStoreMessage =
new Message();
2224 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2226 publishStoreMessage->reset();
2227 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2228 return _publish(topic_, topicLength_, data_, dataLength_);
2232 amps_uint64_t deltaPublish(
const char* topic_,
size_t topicLength_,
2233 const char* data_,
size_t dataLength_,
2234 unsigned long expiration_)
2238 Lock<Mutex> l(_lock);
2240 _deltaMessage.assignData(data_, dataLength_);
2241 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2242 size_t pos = convertToCharArray(exprBuf, expiration_);
2244 _send(_deltaMessage);
2250 if (!publishStoreMessage)
2252 publishStoreMessage =
new Message();
2253 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2255 publishStoreMessage->reset();
2256 char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2257 size_t exprPos = convertToCharArray(exprBuf, expiration_);
2258 publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2259 .assignExpiration(exprBuf+exprPos,
2260 AMPS_NUMBER_BUFFER_LEN-exprPos);
2261 return _publish(topic_, topicLength_, data_, dataLength_);
2265 amps_uint64_t _publish(
const char* topic_,
size_t topicLength_,
2266 const char* data_,
size_t dataLength_)
2268 publishStoreMessage->assignTopic(topic_, topicLength_)
2269 .setAckTypeEnum(Message::AckType::Persisted)
2270 .assignData(data_, dataLength_);
2271 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
2272 char buf[AMPS_NUMBER_BUFFER_LEN];
2273 size_t pos = convertToCharArray(buf, haSequenceNumber);
2274 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2276 Lock<Mutex> l(_lock);
2277 _send(*publishStoreMessage, haSequenceNumber);
2279 return haSequenceNumber;
2282 virtual std::string logon(
long timeout_,
Authenticator& authenticator_,
2283 const char* options_ = NULL)
2285 Lock<Mutex> l(_lock);
2286 AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2292 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE 2294 strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2297 if(uri.user().size()) _message.
setUserId(uri.user());
2298 if(uri.password().size()) _message.
setPassword(uri.password());
2299 if(uri.protocol() ==
"amps" && uri.messageType().size())
2303 if(uri.isTrue(
"pretty"))
2305 _message.setOptions(
"pretty");
2309 if (!_logonCorrelationData.empty())
2315 _message.setOptions(options_);
2323 AckResponse ack = syncAckProcessing(timeout_, _message);
2324 if (ack.status() ==
"retry")
2326 _message.
setPassword(authenticator_.
retry(ack.username(), ack.password()));
2327 _username = ack.username();
2332 authenticator_.
completed(ack.username(), ack.password(), ack.reason());
2336 broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2341 catch(
const AMPSException& ex)
2343 AMPS_UNHANDLED_EXCEPTION(ex);
2355 _publishStore.
replay(_replayer);
2356 broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2358 catch(
const StoreException& ex)
2360 std::ostringstream os;
2361 os <<
"A local store exception occurred while logging on." 2363 throw ConnectionException(os.str());
2365 catch(
const AMPSException& ex)
2367 AMPS_UNHANDLED_EXCEPTION(ex);
2370 catch(
const std::exception& ex)
2372 AMPS_UNHANDLED_EXCEPTION(ex);
2380 return newCommandId;
2384 const std::string& topic_,
2386 const std::string& filter_,
2387 const std::string& bookmark_,
2388 const std::string& options_,
2389 const std::string& subId_,
2390 bool isHASubscribe_ =
true)
2392 isHASubscribe_ &= (bool)_subscriptionManager;
2393 Lock<Mutex> l(_lock);
2397 std::string subId(subId_);
2400 if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2401 throw ConnectionException(
"Cannot issue a replacement subscription; a valid subscription id is required.");
2410 unsigned ackTypes = Message::AckType::Processed;
2412 if (!bookmark_.empty() && _bookmarkStore.isValid())
2414 ackTypes |= Message::AckType::Persisted;
2418 if (filter_.length()) _message.
setFilter(filter_);
2419 if (bookmark_.length())
2429 if (_bookmarkStore.isValid())
2434 _bookmarkStore.
log(_message);
2435 _bookmarkStore.
discard(_message);
2441 if (options_.length()) _message.setOptions(options_);
2447 Unlock<Mutex> u(_lock);
2448 _subscriptionManager->subscribe(messageHandler_, message,
2449 Message::AckType::None);
2450 if (_badTimeToHASubscribe)
return subId;
2455 Message::AckType::None, ackTypes,
true);
2458 if (!options_.empty()) message.setOptions(options_);
2461 syncAckProcessing(timeout_, message, isHASubscribe_);
2463 catch (
const DisconnectedException&)
2465 if (!isHASubscribe_)
2467 _routes.removeRoute(subIdField);
2472 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2476 catch (
const TimedOutException&)
2478 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2486 Unlock<Mutex> unlock(_lock);
2487 _subscriptionManager->unsubscribe(subIdField);
2489 _routes.removeRoute(subIdField);
2496 const std::string& topic_,
2498 const std::string& filter_,
2499 const std::string& bookmark_,
2500 const std::string& options_,
2501 const std::string& subId_ =
"",
2502 bool isHASubscribe_ =
true)
2504 isHASubscribe_ &= (bool)_subscriptionManager;
2505 Lock<Mutex> l(_lock);
2509 std::string subId(subId_);
2519 unsigned ackTypes = Message::AckType::Processed;
2521 if (!bookmark_.empty() && _bookmarkStore.isValid())
2523 ackTypes |= Message::AckType::Persisted;
2526 if (filter_.length()) _message.
setFilter(filter_);
2527 if (bookmark_.length())
2537 if (_bookmarkStore.isValid())
2542 _bookmarkStore.
log(_message);
2543 _bookmarkStore.
discard(_message);
2549 if (options_.length()) _message.setOptions(options_);
2554 Unlock<Mutex> u(_lock);
2555 _subscriptionManager->subscribe(messageHandler_, message,
2556 Message::AckType::None);
2557 if (_badTimeToHASubscribe)
return subId;
2562 Message::AckType::None, ackTypes,
true);
2565 if (!options_.empty()) message.setOptions(options_);
2568 syncAckProcessing(timeout_, message, isHASubscribe_);
2570 catch (
const DisconnectedException&)
2572 if (!isHASubscribe_)
2574 _routes.removeRoute(subIdField);
2578 catch (
const TimedOutException&)
2580 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2588 Unlock<Mutex> unlock(_lock);
2589 _subscriptionManager->unsubscribe(subIdField);
2591 _routes.removeRoute(subIdField);
2597 void unsubscribe(
const std::string&
id)
2599 Lock<Mutex> l(_lock);
2600 unsubscribeInternal(
id);
2603 void unsubscribe(
void)
2605 if (_subscriptionManager)
2607 _subscriptionManager->clear();
2610 _routes.unsubscribeAll();
2611 Lock<Mutex> l(_lock);
2616 _sendWithoutRetry(_message);
2621 const std::string& topic_,
2622 const std::string& filter_ =
"",
2623 const std::string& orderBy_ =
"",
2624 const std::string& bookmark_ =
"",
2625 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2626 int topN_ = AMPS_DEFAULT_TOP_N,
2627 const std::string& options_ =
"",
2628 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2630 Lock<Mutex> l(_lock);
2637 unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2640 if (filter_.length()) _message.
setFilter(filter_);
2641 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2642 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2645 if (options_.length()) _message.setOptions(options_);
2647 _routes.addRoute(_message.
getQueryID(), messageHandler_,
2648 Message::AckType::None, ackTypes,
false);
2652 syncAckProcessing(timeout_, _message);
2656 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2664 const std::string& topic_,
2666 const std::string& filter_ =
"",
2667 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2668 int topN_ = AMPS_DEFAULT_TOP_N)
2671 return sow(messageHandler_,
2683 const std::string& topic_,
2684 const std::string& filter_ =
"",
2685 const std::string& orderBy_ =
"",
2686 const std::string& bookmark_ =
"",
2687 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2688 int topN_ = AMPS_DEFAULT_TOP_N,
2689 const std::string& options_ =
"",
2690 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2691 bool isHASubscribe_ =
true)
2693 isHASubscribe_ &= (bool)_subscriptionManager;
2694 Lock<Mutex> l(_lock);
2701 std::string subId = cid;
2703 if (filter_.length()) _message.
setFilter(filter_);
2704 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2705 if (bookmark_.length()) _message.
setBookmark(bookmark_);
2708 if (options_.length()) _message.setOptions(options_);
2714 Unlock<Mutex> u(_lock);
2715 _subscriptionManager->subscribe(messageHandler_, message,
2716 Message::AckType::None);
2717 if (_badTimeToHASubscribe)
return subId;
2719 if (!_routes.hasRoute(cid))
2721 _routes.addRoute(cid, messageHandler_,
2722 Message::AckType::None, Message::AckType::Processed,
true);
2725 if (!options_.empty()) message.setOptions(options_);
2728 syncAckProcessing(timeout_, message, isHASubscribe_);
2730 catch (
const DisconnectedException&)
2732 if (!isHASubscribe_)
2734 _routes.removeRoute(subId);
2738 catch (
const TimedOutException&)
2740 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
2748 Unlock<Mutex> unlock(_lock);
2749 _subscriptionManager->unsubscribe(cid);
2751 _routes.removeRoute(subId);
2758 const std::string& topic_,
2760 const std::string& filter_ =
"",
2761 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2762 bool oofEnabled_ =
false,
2763 int topN_ = AMPS_DEFAULT_TOP_N,
2764 bool isHASubscribe_ =
true)
2767 return sowAndSubscribe(messageHandler_,
2774 (oofEnabled_ ?
"oof" :
""),
2780 const std::string& topic_,
2781 const std::string& filter_ =
"",
2782 const std::string& orderBy_ =
"",
2783 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2784 int topN_ = AMPS_DEFAULT_TOP_N,
2785 const std::string& options_ =
"",
2786 long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2787 bool isHASubscribe_ =
true)
2789 isHASubscribe_ &= (bool)_subscriptionManager;
2790 Lock<Mutex> l(_lock);
2798 if (filter_.length()) _message.
setFilter(filter_);
2799 if (orderBy_.length()) _message.
setOrderBy(orderBy_);
2802 if (options_.length()) _message.setOptions(options_);
2807 Unlock<Mutex> u(_lock);
2808 _subscriptionManager->subscribe(messageHandler_, message,
2809 Message::AckType::None);
2810 if (_badTimeToHASubscribe)
return subId;
2814 _routes.addRoute(message.
getQueryID(), messageHandler_,
2815 Message::AckType::None, Message::AckType::Processed,
true);
2818 if (!options_.empty()) message.setOptions(options_);
2821 syncAckProcessing(timeout_, message, isHASubscribe_);
2823 catch (
const DisconnectedException&)
2825 if (!isHASubscribe_)
2827 _routes.removeRoute(subId);
2831 catch (
const TimedOutException&)
2833 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
2841 Unlock<Mutex> unlock(_lock);
2842 _subscriptionManager->unsubscribe(
Field(subId));
2844 _routes.removeRoute(subId);
2851 const std::string& topic_,
2853 const std::string& filter_ =
"",
2854 int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2855 bool oofEnabled_ =
false,
2856 bool sendEmpties_ =
false,
2857 int topN_ = AMPS_DEFAULT_TOP_N,
2858 bool isHASubscribe_ =
true)
2862 if (oofEnabled_) options.
setOOF();
2864 return sowAndDeltaSubscribe(messageHandler_,
2876 const std::string& topic_,
2877 const std::string& filter_,
2883 unsigned ackType = Message::AckType::Processed |
2884 Message::AckType::Stats |
2885 Message::AckType::Persisted;
2886 if (!publishStoreMessage)
2888 publishStoreMessage =
new Message();
2889 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2891 publishStoreMessage->reset();
2892 if (commandId_.
empty())
2894 publishStoreMessage->newCommandId();
2895 commandId_ = publishStoreMessage->getCommandId();
2899 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
2901 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
2902 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
2903 .assignQueryID(commandId_.
data(), commandId_.
len())
2904 .setAckTypeEnum(ackType)
2905 .assignTopic(topic_.c_str(), topic_.length())
2906 .assignFilter(filter_.c_str(), filter_.length());
2907 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
2908 char buf[AMPS_NUMBER_BUFFER_LEN];
2909 size_t pos = convertToCharArray(buf, haSequenceNumber);
2910 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2914 Lock<Mutex> l(_lock);
2915 _routes.addRoute(commandId_, messageHandler_,
2916 Message::AckType::Stats,
2917 Message::AckType::Processed|Message::AckType::Persisted,
2919 syncAckProcessing(timeout_, *publishStoreMessage,
2922 catch (
const DisconnectedException&)
2928 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
2932 return (std::string)commandId_;
2936 Lock<Mutex> l(_lock);
2938 if (commandId_.
empty())
2949 .assignQueryID(commandId_.
data(), commandId_.
len())
2950 .setAckTypeEnum(Message::AckType::Processed |
2951 Message::AckType::Stats)
2953 .assignFilter(filter_.c_str(), filter_.length());
2954 _routes.addRoute(commandId_, messageHandler_,
2955 Message::AckType::Stats,
2956 Message::AckType::Processed,
2960 syncAckProcessing(timeout_, _message);
2964 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
2967 return (std::string)commandId_;
2972 const std::string& topic_,
2973 const std::string& data_,
2979 unsigned ackType = Message::AckType::Processed |
2980 Message::AckType::Stats |
2981 Message::AckType::Persisted;
2982 if (!publishStoreMessage)
2984 publishStoreMessage =
new Message();
2985 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2987 publishStoreMessage->reset();
2988 if (commandId_.
empty())
2990 publishStoreMessage->newCommandId();
2991 commandId_ = publishStoreMessage->getCommandId();
2995 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
2997 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
2998 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
2999 .assignQueryID(commandId_.
data(), commandId_.
len())
3000 .setAckTypeEnum(ackType)
3001 .assignTopic(topic_.c_str(), topic_.length())
3002 .assignData(data_.c_str(), data_.length());
3003 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3004 char buf[AMPS_NUMBER_BUFFER_LEN];
3005 size_t pos = convertToCharArray(buf, haSequenceNumber);
3006 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3010 Lock<Mutex> l(_lock);
3011 _routes.addRoute(commandId_, messageHandler_,
3012 Message::AckType::Stats,
3013 Message::AckType::Processed|Message::AckType::Persisted,
3015 syncAckProcessing(timeout_, *publishStoreMessage,
3018 catch (
const DisconnectedException&)
3024 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3028 return (std::string)commandId_;
3032 Lock<Mutex> l(_lock);
3034 if (commandId_.
empty())
3045 .assignQueryID(commandId_.
data(), commandId_.
len())
3046 .setAckTypeEnum(Message::AckType::Processed |
3047 Message::AckType::Stats)
3049 .assignData(data_.c_str(), data_.length());
3050 _routes.addRoute(commandId_, messageHandler_,
3051 Message::AckType::Stats,
3052 Message::AckType::Processed,
3056 syncAckProcessing(timeout_, _message);
3060 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3063 return (std::string)commandId_;
3068 const std::string& topic_,
3069 const std::string& keys_,
3075 unsigned ackType = Message::AckType::Processed |
3076 Message::AckType::Stats |
3077 Message::AckType::Persisted;
3078 if (!publishStoreMessage)
3080 publishStoreMessage =
new Message();
3081 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3083 publishStoreMessage->reset();
3084 if (commandId_.
empty())
3086 publishStoreMessage->newCommandId();
3087 commandId_ = publishStoreMessage->getCommandId();
3091 publishStoreMessage->setCommandId(commandId_.
data(), commandId_.
len());
3093 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3094 .assignSubscriptionId(commandId_.
data(), commandId_.
len())
3095 .assignQueryID(commandId_.
data(), commandId_.
len())
3096 .setAckTypeEnum(ackType)
3097 .assignTopic(topic_.c_str(), topic_.length())
3098 .assignSowKeys(keys_.c_str(), keys_.length());
3099 amps_uint64_t haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3100 char buf[AMPS_NUMBER_BUFFER_LEN];
3101 size_t pos = convertToCharArray(buf, haSequenceNumber);
3102 publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3106 Lock<Mutex> l(_lock);
3107 _routes.addRoute(commandId_, messageHandler_,
3108 Message::AckType::Stats,
3109 Message::AckType::Processed|Message::AckType::Persisted,
3111 syncAckProcessing(timeout_, *publishStoreMessage,
3114 catch (
const DisconnectedException&)
3120 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3124 return (std::string)commandId_;
3128 Lock<Mutex> l(_lock);
3130 if (commandId_.
empty())
3141 .assignQueryID(commandId_.
data(), commandId_.
len())
3142 .setAckTypeEnum(Message::AckType::Processed |
3143 Message::AckType::Stats)
3145 .assignSowKeys(keys_.c_str(), keys_.length());
3146 _routes.addRoute(commandId_, messageHandler_,
3147 Message::AckType::Stats,
3148 Message::AckType::Processed,
3152 syncAckProcessing(timeout_, _message);
3156 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3159 return (std::string)commandId_;
3163 void startTimer(
void)
3165 if (_serverVersion >=
"5.3.2.0")
3167 throw CommandException(
"The start_timer command is deprecated.");
3169 Lock<Mutex> l(_lock);
3178 if (_serverVersion >=
"5.3.2.0")
3180 throw CommandException(
"The stop_timer command is deprecated.");
3182 return executeAsync(
Command(
"stop_timer").addAckType(
"completed"), messageHandler_);
3197 void setExceptionListener(
const std::shared_ptr<const ExceptionListener>& pListener_)
3199 _pExceptionListener = pListener_;
3200 _exceptionListener = _pExceptionListener.get();
3205 _exceptionListener = &listener_;
3210 return *_exceptionListener;
3213 void setHeartbeat(
unsigned heartbeatInterval_,
unsigned readTimeout_)
3215 if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3217 throw UsageException(
"The socket read timeout must be >= the heartbeat interval.");
3219 Lock<Mutex> l(_lock);
3220 if(_heartbeatInterval != heartbeatInterval_ ||
3221 _readTimeout != readTimeout_)
3223 _heartbeatInterval = heartbeatInterval_;
3224 _readTimeout = readTimeout_;
3229 void _sendHeartbeat(
void)
3231 if (_connected && _heartbeatInterval != 0)
3233 std::ostringstream options;
3234 options <<
"start," << _heartbeatInterval;
3237 .setOptions(options.str());
3239 _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3240 _heartbeatTimer.start();
3243 _sendWithoutRetry(startMessage);
3244 broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3246 catch(ConnectionException &ex_)
3250 AMPS_UNHANDLED_EXCEPTION(ex_);
3254 if(_readTimeout && _connected)
3260 AMPSException::throwFor(_client, result);
3266 Lock<Mutex> lock(_lock);
3267 _connectionStateListeners.insert(listener_);
3272 Lock<Mutex> lock(_lock);
3273 _connectionStateListeners.erase(listener_);
3278 unsigned systemAddedAcks_,
bool isSubscribe_)
3280 Message message = command_.getMessage();
3285 bool added = qid.
len() || subid.
len() || cid_.
len();
3288 if (!_routes.hasRoute(qid))
3290 _routes.addRoute(qid, handler_, requestedAcks_,
3291 systemAddedAcks_, isSubscribe_);
3294 if (subid.
len() > 0)
3296 if (!_routes.hasRoute(subid))
3298 _routes.addRoute(subid, handler_, requestedAcks_,
3299 systemAddedAcks_, isSubscribe_);
3304 if (!_routes.hasRoute(cid_))
3306 _routes.addRoute(cid_, handler_, requestedAcks_,
3307 systemAddedAcks_, isSubscribe_);
3310 else if (commandType == Message::Command::Publish ||
3311 commandType == Message::Command::DeltaPublish)
3314 _routes.addRoute(cid_, handler_, requestedAcks_,
3315 systemAddedAcks_, isSubscribe_);
3320 throw UsageException(
"To use a messagehandler, you must also supply a command or subscription ID.");
3325 bool isHASubscribe_ =
true)
3327 isHASubscribe_ &= (bool)_subscriptionManager;
3328 Message& message = command_.getMessage();
3329 unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3330 Message::AckType::Processed : Message::AckType::None;
3332 bool isPublishStore = _publishStore.
isValid() && command_.needsSequenceNumber();
3334 if (commandType == Message::Command::SOW ||
3335 commandType == Message::Command::StopTimer)
3336 systemAddedAcks |= Message::AckType::Completed;
3338 if (handler_.isValid() && cid.
empty())
3344 if (command_.isSubscribe())
3347 if (_bookmarkStore.isValid())
3349 systemAddedAcks |= Message::AckType::Persisted;
3357 _bookmarkStore.
log(message);
3358 if (!BookmarkRange::isRange(bookmark))
3360 _bookmarkStore.
discard(message);
3374 systemAddedAcks |= Message::AckType::Persisted;
3376 bool isSubscribe = command_.isSubscribe();
3377 if (handler_.isValid() && !isSubscribe)
3379 _registerHandler(command_, cid, handler_,
3380 requestedAcks, systemAddedAcks, isSubscribe);
3382 bool useSyncSend = cid.
len() > 0 && command_.hasProcessedAck();
3385 amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3388 Unlock<Mutex> u(_lock);
3389 haSequenceNumber = _publishStore.
store(message);
3396 syncAckProcessing((
long)command_.getTimeout(), message,
3399 catch (
const DisconnectedException&)
3405 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3409 else _send(message, haSequenceNumber);
3413 if(command_.isSubscribe())
3418 Unlock<Mutex> u(_lock);
3419 _subscriptionManager->subscribe(handler_,
3422 if (_badTimeToHASubscribe)
3425 return std::string(subId.
data(), subId.
len());
3428 if (handler_.isValid())
3430 _registerHandler(command_, cid, handler_,
3431 requestedAcks, systemAddedAcks, isSubscribe);
3438 syncAckProcessing((
long)command_.getTimeout(), message,
3441 catch (
const DisconnectedException&)
3443 if (!isHASubscribe_)
3445 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3446 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3447 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3452 catch (
const TimedOutException&)
3454 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3455 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3456 AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
3464 Unlock<Mutex> unlock(_lock);
3465 _subscriptionManager->unsubscribe(subId);
3469 _routes.removeRoute(cid);
3470 _routes.removeRoute(subId);
3474 else _send(message);
3475 if (subId.
len() > 0)
3478 return std::string(subId.
data(), subId.
len());
3488 syncAckProcessing((
long)(command_.getTimeout()), message);
3490 catch (
const DisconnectedException&)
3492 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3493 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3499 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3500 AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3505 else _send(message);
3515 bool isHASubscribe_ =
true)
3517 Lock<Mutex> lock(_lock);
3518 return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3522 void setAutoAck(
bool isAutoAckEnabled_)
3524 _isAutoAckEnabled = isAutoAckEnabled_;
3526 bool getAutoAck(
void)
const 3528 return _isAutoAckEnabled;
3530 void setAckBatchSize(
const unsigned batchSize_)
3532 _ackBatchSize = batchSize_;
3533 if (!_queueAckTimeout)
3535 _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3539 unsigned getAckBatchSize(
void)
const 3541 return _ackBatchSize;
3543 int getAckTimeout(
void)
const 3545 return _queueAckTimeout;
3547 void setAckTimeout(
const int ackTimeout_)
3550 _queueAckTimeout = ackTimeout_;
3552 size_t _ack(QueueBookmarks& queueBookmarks_)
3554 if(queueBookmarks_._bookmarkCount)
3556 if (!publishStoreMessage)
3558 publishStoreMessage =
new Message();
3559 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3561 publishStoreMessage->reset();
3562 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3563 .setTopic(queueBookmarks_._topic)
3564 .setBookmark(queueBookmarks_._data)
3565 .setCommandId(
"AMPS-queue-ack");
3566 amps_uint64_t haSequenceNumber = 0;
3569 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3570 publishStoreMessage->setAckType(
"persisted")
3571 .setSequence(haSequenceNumber);
3572 queueBookmarks_._data.erase();
3573 queueBookmarks_._bookmarkCount = 0;
3575 _send(*publishStoreMessage, haSequenceNumber);
3578 queueBookmarks_._data.erase();
3579 queueBookmarks_._bookmarkCount = 0;
3585 void ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3587 if (_isAutoAckEnabled)
return;
3588 _ack(topic_, bookmark_, options_);
3590 void _ack(
const Field& topic_,
const Field& bookmark_,
const char* options_ = NULL)
3592 if (bookmark_.
len() == 0)
return;
3593 Lock<Mutex> lock(_lock);
3594 if(_ackBatchSize < 2 || options_ != NULL)
3596 if (!publishStoreMessage)
3598 publishStoreMessage =
new Message();
3599 PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3601 publishStoreMessage->reset();
3602 publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3603 .setCommandId(
"AMPS-queue-ack")
3604 .setTopic(topic_).setBookmark(bookmark_);
3605 if (options_) publishStoreMessage->setOptions(options_);
3606 amps_uint64_t haSequenceNumber = 0;
3609 haSequenceNumber = _publishStore.
store(*publishStoreMessage);
3610 publishStoreMessage->setAckType(
"persisted")
3611 .setSequence(haSequenceNumber);
3613 _send(*publishStoreMessage, haSequenceNumber);
3617 topic_hash hash = CRC<0>::crcNoSSE(topic_.
data(),topic_.
len());
3618 TopicHashMap::iterator it = _topicHashMap.find(hash);
3619 if(it == _topicHashMap.end())
3622 it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3624 QueueBookmarks &queueBookmarks = it->second;
3625 if(queueBookmarks._data.length())
3627 queueBookmarks._data.append(
",");
3631 queueBookmarks._oldestTime = amps_now();
3633 queueBookmarks._data.append(bookmark_);
3634 if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3636 void flushAcks(
void)
3638 size_t sendCount = 0;
3641 Lock<Mutex> lock(_lock);
3642 typedef TopicHashMap::iterator iterator;
3643 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3645 QueueBookmarks& queueBookmarks = it->second;
3646 sendCount += _ack(queueBookmarks);
3649 if(sendCount) publishFlush(0);
3652 void checkQueueAcks(
void)
3654 if(!_topicHashMap.size())
return;
3655 Lock<Mutex> lock(_lock);
3658 amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3659 typedef TopicHashMap::iterator iterator;
3660 for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3662 QueueBookmarks& queueBookmarks = it->second;
3663 if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3666 catch(std::exception& ex)
3668 AMPS_UNHANDLED_EXCEPTION(ex);
3672 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
3674 Lock<Mutex> lock(_lock);
3675 _deferredExecutionList.push_back(
3676 DeferredExecutionRequest(func_,userData_));
3679 inline void processDeferredExecutions(
void)
3681 if(_deferredExecutionList.size())
3683 Lock<Mutex> lock(_lock);
3684 DeferredExecutionList::iterator it = _deferredExecutionList.begin();
3685 DeferredExecutionList::iterator end = _deferredExecutionList.end();
3686 for(; it != end; ++it)
3688 it->_func(it->_userData);
3690 _deferredExecutionList.clear();
3691 _routes.invalidateCache();
3692 _routeCache.invalidateCache();
3696 bool getRetryOnDisconnect(
void)
const 3698 return _isRetryOnDisconnect;
3701 void setRetryOnDisconnect(
bool isRetryOnDisconnect_)
3703 _isRetryOnDisconnect = isRetryOnDisconnect_;
3706 void setDefaultMaxDepth(
unsigned maxDepth_)
3708 _defaultMaxDepth = maxDepth_;
3711 unsigned getDefaultMaxDepth(
void)
const 3713 return _defaultMaxDepth;
3805 RefHandle<MessageStreamImpl> _body;
3815 inline void advance(
void);
3827 bool operator==(
const iterator& rhs)
3829 return _pStream == rhs._pStream;
3831 bool operator!=(
const iterator& rhs)
3833 return _pStream != rhs._pStream;
3835 void operator++(
void) { advance(); }
3836 Message operator*(
void) {
return _current; }
3837 Message* operator->(
void) {
return &_current; }
3846 if(!_body.isValid())
3848 throw UsageException(
"This MessageStream is not valid and cannot be iterated.");
3877 unsigned getMaxDepth(
void)
const;
3880 unsigned getDepth(
void)
const;
3884 inline void setSOWOnly(
const std::string& commandId_,
3885 const std::string& queryId_ =
"");
3886 inline void setSubscription(
const std::string& subId_,
3887 const std::string& commandId_ =
"",
3888 const std::string& queryId_ =
"");
3889 inline void setStatsOnly(
const std::string& commandId_,
3890 const std::string& queryId_ =
"");
3891 inline void setAcksOnly(
const std::string& commandId_,
unsigned acks_);
3897 friend class Client;
3923 BorrowRefHandle<ClientImpl> _body;
3925 static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
3926 static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
3927 static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
3938 : _body(new ClientImpl(clientName), true)
3941 Client(ClientImpl* existingClient)
3942 : _body(existingClient,
true)
3945 Client(ClientImpl* existingClient,
bool isRef)
3946 : _body(existingClient, isRef)
3949 Client(
const Client& rhs) : _body(rhs._body) {;}
3950 virtual ~Client(
void) {;}
3952 Client& operator=(
const Client& rhs)
3960 return _body.isValid();
3977 _body.get().setName(name);
3984 return _body.get().getName();
3995 _body.get().setLogonCorrelationData(logonCorrelationData_);
4002 return _body.get().getLogonCorrelationData();
4015 return _body.get().getServerVersion();
4026 return _body.get().getServerVersionInfo();
4040 return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4055 return AMPS::convertVersionToNumber(data_, len_);
4062 return _body.get().getURI();
4086 _body.get().connect(uri);
4093 _body.get().disconnect();
4111 _body.get().send(message);
4124 unsigned requestedAcks_,
bool isSubscribe_)
4126 _body.get().addMessageHandler(commandId_, messageHandler_,
4127 requestedAcks_, isSubscribe_);
4135 return _body.get().removeMessageHandler(commandId_);
4163 return _body.get().send(messageHandler, message, timeout);
4177 _body.get().setDisconnectHandler(disconnectHandler);
4185 return _body.get().getDisconnectHandler();
4194 return _body.get().getConnectionInfo();
4207 _body.get().setBookmarkStore(bookmarkStore_);
4215 return _body.
get().getBookmarkStore();
4223 return _body.get().getSubscriptionManager();
4235 _body.get().setSubscriptionManager(subscriptionManager_);
4259 _body.get().setPublishStore(publishStore_);
4267 return _body.
get().getPublishStore();
4275 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4276 duplicateMessageHandler_);
4290 return _body.get().getDuplicateMessageHandler();
4304 _body.get().setFailedWriteHandler(handler_);
4312 return _body.get().getFailedWriteHandler();
4333 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_)
4335 return _body.get().publish(topic_.c_str(), topic_.length(),
4336 data_.c_str(), data_.length());
4358 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4359 const char* data_,
size_t dataLength_)
4361 return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4382 amps_uint64_t
publish(
const std::string& topic_,
const std::string& data_,
4383 unsigned long expiration_)
4385 return _body.get().publish(topic_.c_str(), topic_.length(),
4386 data_.c_str(), data_.length(), expiration_);
4409 amps_uint64_t
publish(
const char* topic_,
size_t topicLength_,
4410 const char* data_,
size_t dataLength_,
4411 unsigned long expiration_)
4413 return _body.get().publish(topic_, topicLength_,
4414 data_, dataLength_, expiration_);
4452 _body.get().publishFlush(timeout_);
4469 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_)
4471 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4472 data_.c_str(), data_.length());
4491 const char* data_,
size_t dataLength_)
4493 return _body.get().deltaPublish(topic_, topicLength_,
4494 data_, dataLength_);
4511 amps_uint64_t
deltaPublish(
const std::string& topic_,
const std::string& data_,
4512 unsigned long expiration_)
4514 return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4515 data_.c_str(), data_.length(),
4536 const char* data_,
size_t dataLength_,
4537 unsigned long expiration_)
4539 return _body.get().deltaPublish(topic_, topicLength_,
4540 data_, dataLength_, expiration_);
4559 const char* options_ = NULL)
4561 return _body.get().logon(timeout_, authenticator_, options_);
4575 std::string
logon(
const char* options_,
int timeout_ = 0)
4593 std::string
logon(
const std::string& options_,
int timeout_ = 0)
4619 const std::string& topic_,
4621 const std::string& filter_=
"",
4622 const std::string& options_ =
"",
4623 const std::string& subId_ =
"")
4625 return _body.get().subscribe(messageHandler_, topic_, timeout_,
4626 filter_,
"", options_, subId_);
4645 long timeout_=0,
const std::string& filter_=
"",
4646 const std::string& options_ =
"",
4647 const std::string& subId_ =
"")
4650 if (_body.get().getDefaultMaxDepth())
4651 result.
maxDepth(_body.get().getDefaultMaxDepth());
4652 result.setSubscription(_body.get().subscribe(
4654 topic_, timeout_, filter_,
"",
4655 options_, subId_,
false));
4675 long timeout_ = 0,
const std::string& filter_ =
"",
4676 const std::string& options_ =
"",
4677 const std::string& subId_ =
"")
4680 if (_body.get().getDefaultMaxDepth())
4681 result.
maxDepth(_body.get().getDefaultMaxDepth());
4682 result.setSubscription(_body.get().subscribe(
4684 topic_, timeout_, filter_,
"",
4685 options_, subId_,
false));
4702 const std::string& topic_,
4704 const std::string& filter_=
"",
4705 const std::string& options_ =
"",
4706 const std::string& subId_ =
"")
4708 return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
4709 filter_,
"", options_, subId_);
4720 long timeout_,
const std::string& filter_=
"",
4721 const std::string& options_ =
"",
4722 const std::string& subId_ =
"")
4725 if (_body.get().getDefaultMaxDepth())
4726 result.
maxDepth(_body.get().getDefaultMaxDepth());
4727 result.setSubscription(_body.get().deltaSubscribe(
4729 topic_, timeout_, filter_,
"",
4730 options_, subId_,
false));
4736 long timeout_,
const std::string& filter_ =
"",
4737 const std::string& options_ =
"",
4738 const std::string& subId_ =
"")
4741 if (_body.get().getDefaultMaxDepth())
4742 result.
maxDepth(_body.get().getDefaultMaxDepth());
4743 result.setSubscription(_body.get().deltaSubscribe(
4745 topic_, timeout_, filter_,
"",
4746 options_, subId_,
false));
4776 const std::string& topic_,
4778 const std::string& bookmark_,
4779 const std::string& filter_=
"",
4780 const std::string& options_ =
"",
4781 const std::string& subId_ =
"")
4783 return _body.get().subscribe(messageHandler_, topic_, timeout_,
4784 filter_, bookmark_, options_, subId_);
4805 const std::string& bookmark_,
4806 const std::string& filter_=
"",
4807 const std::string& options_ =
"",
4808 const std::string& subId_ =
"")
4811 if (_body.get().getDefaultMaxDepth())
4812 result.
maxDepth(_body.get().getDefaultMaxDepth());
4813 result.setSubscription(_body.get().subscribe(
4815 topic_, timeout_, filter_,
4816 bookmark_, options_,
4824 const std::string& bookmark_,
4825 const std::string& filter_ =
"",
4826 const std::string& options_ =
"",
4827 const std::string& subId_ =
"")
4830 if (_body.get().getDefaultMaxDepth())
4831 result.
maxDepth(_body.get().getDefaultMaxDepth());
4832 result.setSubscription(_body.get().subscribe(
4834 topic_, timeout_, filter_,
4835 bookmark_, options_,
4850 return _body.get().unsubscribe(commandId);
4862 return _body.get().unsubscribe();
4896 const std::string& topic_,
4897 const std::string& filter_ =
"",
4898 const std::string& orderBy_ =
"",
4899 const std::string& bookmark_ =
"",
4900 int batchSize_ = DEFAULT_BATCH_SIZE,
4901 int topN_ = DEFAULT_TOP_N,
4902 const std::string& options_ =
"",
4903 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
4905 return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
4906 bookmark_, batchSize_, topN_, options_,
4934 const std::string& filter_ =
"",
4935 const std::string& orderBy_ =
"",
4936 const std::string& bookmark_ =
"",
4937 int batchSize_ = DEFAULT_BATCH_SIZE,
4938 int topN_ = DEFAULT_TOP_N,
4939 const std::string& options_ =
"",
4940 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
4943 if (_body.get().getDefaultMaxDepth())
4944 result.
maxDepth(_body.get().getDefaultMaxDepth());
4945 result.setSOWOnly(sow(result.operator
MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
4951 const std::string& filter_ =
"",
4952 const std::string& orderBy_ =
"",
4953 const std::string& bookmark_ =
"",
4954 int batchSize_ = DEFAULT_BATCH_SIZE,
4955 int topN_ = DEFAULT_TOP_N,
4956 const std::string& options_ =
"",
4957 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
4960 if (_body.get().getDefaultMaxDepth())
4961 result.
maxDepth(_body.get().getDefaultMaxDepth());
4962 result.setSOWOnly(sow(result.operator
MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
4988 const std::string& topic_,
4990 const std::string& filter_ =
"",
4991 int batchSize_ = DEFAULT_BATCH_SIZE,
4992 int topN_ = DEFAULT_TOP_N)
4994 return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5020 const std::string& topic_,
5022 const std::string& filter_ =
"",
5023 int batchSize_ = DEFAULT_BATCH_SIZE,
5024 bool oofEnabled_ =
false,
5025 int topN_ = DEFAULT_TOP_N)
5027 return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5028 filter_, batchSize_, oofEnabled_,
5053 const std::string& filter_ =
"",
5054 int batchSize_ = DEFAULT_BATCH_SIZE,
5055 bool oofEnabled_ =
false,
5056 int topN_ = DEFAULT_TOP_N)
5059 if (_body.get().getDefaultMaxDepth())
5060 result.
maxDepth(_body.get().getDefaultMaxDepth());
5061 result.setSubscription(_body.get().sowAndSubscribe(
5063 topic_, timeout_, filter_,
5064 batchSize_, oofEnabled_,
5089 const std::string& filter_ =
"",
5090 int batchSize_ = DEFAULT_BATCH_SIZE,
5091 bool oofEnabled_ =
false,
5092 int topN_ = DEFAULT_TOP_N)
5095 if (_body.get().getDefaultMaxDepth())
5096 result.
maxDepth(_body.get().getDefaultMaxDepth());
5097 result.setSubscription(_body.get().sowAndSubscribe(
5099 topic_, timeout_, filter_,
5100 batchSize_, oofEnabled_,
5134 const std::string& topic_,
5135 const std::string& filter_ =
"",
5136 const std::string& orderBy_ =
"",
5137 const std::string& bookmark_ =
"",
5138 int batchSize_ = DEFAULT_BATCH_SIZE,
5139 int topN_ = DEFAULT_TOP_N,
5140 const std::string& options_ =
"",
5141 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5143 return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5144 orderBy_, bookmark_, batchSize_,
5145 topN_, options_, timeout_);
5173 const std::string& filter_ =
"",
5174 const std::string& orderBy_ =
"",
5175 const std::string& bookmark_ =
"",
5176 int batchSize_ = DEFAULT_BATCH_SIZE,
5177 int topN_ = DEFAULT_TOP_N,
5178 const std::string& options_ =
"",
5179 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5182 if (_body.get().getDefaultMaxDepth())
5183 result.
maxDepth(_body.get().getDefaultMaxDepth());
5184 result.setSubscription(_body.get().sowAndSubscribe(
5186 topic_, filter_, orderBy_,
5187 bookmark_, batchSize_, topN_,
5188 options_, timeout_,
false));
5194 const std::string& filter_ =
"",
5195 const std::string& orderBy_ =
"",
5196 const std::string& bookmark_ =
"",
5197 int batchSize_ = DEFAULT_BATCH_SIZE,
5198 int topN_ = DEFAULT_TOP_N,
5199 const std::string& options_ =
"",
5200 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5203 if (_body.get().getDefaultMaxDepth())
5204 result.
maxDepth(_body.get().getDefaultMaxDepth());
5205 result.setSubscription(_body.get().sowAndSubscribe(
5207 topic_, filter_, orderBy_,
5208 bookmark_, batchSize_, topN_,
5209 options_, timeout_,
false));
5238 const std::string& topic_,
5239 const std::string& filter_ =
"",
5240 const std::string& orderBy_ =
"",
5241 int batchSize_ = DEFAULT_BATCH_SIZE,
5242 int topN_ = DEFAULT_TOP_N,
5243 const std::string& options_ =
"",
5244 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5246 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5247 filter_, orderBy_, batchSize_,
5248 topN_, options_, timeout_);
5271 const std::string& filter_ =
"",
5272 const std::string& orderBy_ =
"",
5273 int batchSize_ = DEFAULT_BATCH_SIZE,
5274 int topN_ = DEFAULT_TOP_N,
5275 const std::string& options_ =
"",
5276 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5279 if (_body.get().getDefaultMaxDepth())
5280 result.
maxDepth(_body.get().getDefaultMaxDepth());
5281 result.setSubscription(sowAndDeltaSubscribe(result.operator
MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5282 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5284 topic_, filter_, orderBy_,
5285 batchSize_, topN_, options_,
5292 const std::string& filter_ =
"",
5293 const std::string& orderBy_ =
"",
5294 int batchSize_ = DEFAULT_BATCH_SIZE,
5295 int topN_ = DEFAULT_TOP_N,
5296 const std::string& options_ =
"",
5297 long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5300 if (_body.get().getDefaultMaxDepth())
5301 result.
maxDepth(_body.get().getDefaultMaxDepth());
5302 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5304 topic_, filter_, orderBy_,
5305 batchSize_, topN_, options_,
5335 const std::string& topic_,
5337 const std::string& filter_ =
"",
5338 int batchSize_ = DEFAULT_BATCH_SIZE,
5339 bool oofEnabled_ =
false,
5340 bool sendEmpties_ =
false,
5341 int topN_ = DEFAULT_TOP_N)
5343 return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5344 timeout_, filter_, batchSize_,
5345 oofEnabled_, sendEmpties_,
5372 const std::string& filter_ =
"",
5373 int batchSize_ = DEFAULT_BATCH_SIZE,
5374 bool oofEnabled_ =
false,
5375 bool sendEmpties_ =
false,
5376 int topN_ = DEFAULT_TOP_N)
5379 if (_body.get().getDefaultMaxDepth())
5380 result.
maxDepth(_body.get().getDefaultMaxDepth());
5381 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5383 topic_, timeout_, filter_,
5384 batchSize_, oofEnabled_,
5385 sendEmpties_, topN_,
false));
5411 const std::string& filter_ =
"",
5412 int batchSize_ = DEFAULT_BATCH_SIZE,
5413 bool oofEnabled_ =
false,
5414 bool sendEmpties_ =
false,
5415 int topN_ = DEFAULT_TOP_N)
5418 if (_body.get().getDefaultMaxDepth())
5419 result.
maxDepth(_body.get().getDefaultMaxDepth());
5420 result.setSubscription(_body.get().sowAndDeltaSubscribe(
5422 topic_, timeout_, filter_,
5423 batchSize_, oofEnabled_,
5424 sendEmpties_, topN_,
false));
5447 const std::string& topic,
5448 const std::string& filter,
5451 return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5479 stream.setStatsOnly(cid);
5480 _body.get().sowDelete(stream.operator
MessageHandler(),topic,filter,timeout,cid);
5481 return *(stream.
begin());
5483 catch (
const DisconnectedException&)
5485 removeMessageHandler(cid);
5496 _body.get().startTimer();
5507 return _body.get().stopTimer(messageHandler);
5532 const std::string& topic_,
5533 const std::string& keys_,
5536 return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5568 stream.setStatsOnly(cid);
5569 _body.get().sowDeleteByKeys(stream.operator
MessageHandler(),topic_,keys_,timeout_,cid);
5570 return *(stream.
begin());
5572 catch (
const DisconnectedException&)
5574 removeMessageHandler(cid);
5594 const std::string& topic_,
const std::string& data_,
5597 return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5624 stream.setStatsOnly(cid);
5625 _body.get().sowDeleteByData(stream.operator
MessageHandler(),topic_,data_,timeout_,cid);
5626 return *(stream.
begin());
5628 catch (
const DisconnectedException&)
5630 removeMessageHandler(cid);
5640 return _body.get().getHandle();
5653 _body.get().setExceptionListener(pListener_);
5666 _body.get().setExceptionListener(listener_);
5673 return _body.get().getExceptionListener();
5699 _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
5723 _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
5729 setLastChanceMessageHandler(messageHandler);
5736 _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
5762 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
5787 _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
5806 _body.get().addConnectionStateListener(listener);
5814 _body.get().removeConnectionStateListener(listener);
5844 return _body.get().executeAsync(command_, handler_);
5882 id = _body.get().executeAsync(command_, handler_,
false);
5884 catch (
const DisconnectedException&)
5886 removeMessageHandler(command_.getMessage().
getCommandId());
5887 if (command_.isSubscribe())
5891 if (command_.isSow())
5893 removeMessageHandler(command_.getMessage().
getQueryID());
5924 _body.get().ack(topic_,bookmark_,options_);
5946 void ack(
const std::string& topic_,
const std::string& bookmark_,
5947 const char* options_ = NULL)
5949 _body.get().ack(
Field(topic_.data(),topic_.length()),
Field(bookmark_.data(),bookmark_.length()),options_);
5957 void ackDeferredAutoAck(
Field& topic_,
Field& bookmark_,
const char* options_ = NULL)
5959 _body.get()._ack(topic_,bookmark_,options_);
5972 _body.get().flushAcks();
5981 return _body.get().getAutoAck();
5991 _body.get().setAutoAck(isAutoAckEnabled_);
5999 return _body.get().getAckBatchSize();
6009 _body.get().setAckBatchSize(ackBatchSize_);
6020 return _body.get().getAckTimeout();
6030 _body.get().setAckTimeout(ackTimeout_);
6044 _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6053 return _body.get().getRetryOnDisconnect();
6062 _body.get().setDefaultMaxDepth(maxDepth_);
6071 return _body.get().getDefaultMaxDepth();
6083 return _body.get().setTransportFilterFunction(filter_, userData_);
6095 return _body.get().setThreadCreatedCallback(callback_, userData_);
6103 void deferredExecution(DeferredExecutionFunc func_,
void* userData_)
6105 _body.get().deferredExecution(func_,userData_);
6115 AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6121 unsigned deliveries = 0;
6133 const char* data = NULL;
6135 const char* status = NULL;
6136 size_t statusLen = 0;
6138 const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6141 &status, &statusLen);
6142 if (len == NotEntitled || len == Duplicate ||
6143 (statusLen == Failure && status[0] ==
'f'))
6145 if (_failedWriteHandler)
6147 if (_publishStore.isValid())
6149 amps_uint64_t sequence =
6152 FailedWriteStoreReplayer replayer(
this, data, len);
6153 _publishStore.replaySingle(replayer, sequence);
6159 _failedWriteHandler->failedWrite(emptyMessage, data, len);
6164 if (_publishStore.isValid())
6173 _publishStore.discardUpTo(seq);
6177 if (!deliveries && _bookmarkStore.isValid())
6184 const char* bookmarkData = NULL;
6185 size_t bookmarkLen = 0;
6187 &bookmarkData, &bookmarkLen);
6189 if (bookmarkLen > 0 && _routes.hasRoute(subId))
6192 _bookmarkStore.persisted(subId,
Message::Field(bookmarkData, bookmarkLen));
6197 catch (std::exception& ex)
6199 AMPS_UNHANDLED_EXCEPTION(ex);
6205 ClientImpl::processedAck(
Message &message)
6207 unsigned deliveries = 0;
6209 const char* data = NULL;
6213 Lock<Mutex> l(_lock);
6216 AckMap::iterator i = _acks.find(std::string(data, len));
6217 if (i != _acks.end())
6228 ack.setStatus(data, len);
6231 ack.setReason(data, len);
6234 ack.setUsername(data, len);
6237 ack.setPassword(data, len);
6240 ack.setSequenceNo(data, len);
6243 ack.setServerVersion(data, len);
6245 ack.setOptions(data,len);
6246 ack.setResponded(
true);
6253 ClientImpl::checkAndSendHeartbeat(
bool force)
6255 if (force || _heartbeatTimer.check())
6257 _heartbeatTimer.start();
6260 sendWithoutRetry(_beatMessage);
6262 catch (
const AMPSException&)
6269 inline ConnectionInfo ClientImpl::getConnectionInfo()
const 6271 ConnectionInfo info;
6272 std::ostringstream writer;
6274 info[
"client.uri"] = _lastUri;
6275 info[
"client.name"] = _name;
6276 info[
"client.username"] = _username;
6277 if(_publishStore.isValid())
6279 writer << _publishStore.unpersistedCount();
6280 info[
"publishStore.unpersistedCount"] = writer.str();
6289 ClientImpl::ClientImplMessageHandler(
amps_handle messageHandle_,
void* userData_)
6291 const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6292 const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6293 ClientImpl* me = (ClientImpl*) userData_;
6296 if(me->_queueAckTimeout) me->checkQueueAcks();
6299 me->processDeferredExecutions();
6301 me->_readMessage.replace(messageHandle_);
6302 Message& message = me->_readMessage;
6304 if (commandType & SOWMask)
6306 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6310 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6311 me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6313 AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6316 else if (commandType & PublishMask)
6318 #if 0 // Not currently implemented, to avoid an extra branch in delivery 6319 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6320 me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6321 GlobalCommandTypeHandlers::Publish :
6322 GlobalCommandTypeHandlers::OOF)].invoke(message));
6324 const char* subIds = NULL;
6325 size_t subIdsLen = 0;
6328 size_t subIdCount = me->_routes.parseRoutes(
AMPS::Field(subIds, subIdsLen), me->_routeCache);
6329 for(
size_t i=0; i<subIdCount; ++i)
6331 MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6333 if (handler.isValid())
6336 subIds + lookupResult.idOffset, lookupResult.idLength);
6339 bool isAutoAck = me->_isAutoAckEnabled;
6341 if (!isMessageQueue && !bookmark.
empty() &&
6342 me->_bookmarkStore.isValid())
6344 if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6347 if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6349 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message);
6354 me->_bookmarkStore.log(me->_readMessage);
6355 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6356 handler.invoke(message));
6361 if(isMessageQueue && isAutoAck)
6365 AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6366 if (!message.getIgnoreAutoAck())
6368 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6372 catch(std::exception& ex)
6374 if (!message.getIgnoreAutoAck())
6376 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6379 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6384 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6385 handler.invoke(message));
6389 else me->lastChance(message);
6392 else if (commandType == Message::Command::Ack)
6394 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6395 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6397 unsigned deliveries = 0U;
6400 case Message::AckType::Persisted:
6401 deliveries += me->persistedAck(message);
6403 case Message::AckType::Processed:
6404 deliveries += me->processedAck(message);
6407 AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6408 if (deliveries == 0)
6410 me->lastChance(message);
6413 else if (commandType == Message::Command::Heartbeat)
6415 AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6416 me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6417 if(me->_heartbeatTimer.getTimeout() != 0.0)
6419 me->checkAndSendHeartbeat(
true);
6423 me->lastChance(message);
6429 unsigned deliveries = 0U;
6432 while(me->_connected)
6436 deliveries = me->_routes.deliverData(message, message.
getCommandId());
6440 catch(MessageStreamFullException&)
6442 catch(MessageStreamFullException& ex_)
6445 me->checkAndSendHeartbeat(
false);
6449 catch (std::exception& ex_)
6453 me->_exceptionListener->exceptionThrown(ex_);
6460 if (deliveries == 0)
6461 me->lastChance(message);
6463 me->checkAndSendHeartbeat();
6468 ClientImpl::ClientImplPreDisconnectHandler(
amps_handle ,
unsigned failedConnectionVersion,
void* userData)
6470 ClientImpl* me = (ClientImpl*) userData;
6473 me->clearAcks(failedConnectionVersion);
6477 ClientImpl::ClientImplDisconnectHandler(
amps_handle ,
void* userData)
6479 ClientImpl* me = (ClientImpl*) userData;
6480 Lock<Mutex> l(me->_lock);
6481 Client wrapper(me,
false);
6483 me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6486 AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6489 AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6490 me->_connected =
false;
6494 Unlock<Mutex> unlock(me->_lock);
6495 me->_disconnectHandler.invoke(wrapper);
6498 catch(
const std::exception& ex)
6500 AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6503 if (!me->_connected)
6505 me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6506 AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException(
"Reconnect failed."));
6512 if (me->_subscriptionManager)
6517 Unlock<Mutex> unlock(me->_lock);
6518 me->_subscriptionManager->resubscribe(wrapper);
6520 me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6524 catch(
const AMPSException& subEx)
6526 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6528 catch(
const std::exception& subEx)
6530 AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6553 iterator(
const char* data_,
size_t len_,
size_t pos_,
char fieldSep_)
6554 : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6556 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6559 typedef void* difference_type;
6560 typedef std::forward_iterator_tag iterator_category;
6561 typedef std::pair<Message::Field, Message::Field> value_type;
6562 typedef value_type* pointer;
6563 typedef value_type& reference;
6564 bool operator==(
const iterator& rhs)
const 6566 return _pos == rhs._pos;
6568 bool operator!=(
const iterator& rhs)
const 6570 return _pos != rhs._pos;
6572 iterator& operator++()
6575 while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
6577 while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6581 value_type operator*()
const 6584 size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
6585 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
6587 result.first.assign(_data+_pos, keyLength);
6589 if (i < _len && _data[i] ==
'=')
6593 for(; i < _len && _data[i] != _fieldSep; ++i)
6598 result.second.assign(_data+valueStart, valueLength);
6604 class reverse_iterator
6611 typedef std::pair<Message::Field, Message::Field> value_type;
6612 reverse_iterator(
const char* data,
size_t len,
const char* pos,
char fieldsep)
6613 : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
6618 while(_pos >=_data && *_pos == _fieldSep) --_pos;
6619 while(_pos > _data && *_pos != _fieldSep) --_pos;
6623 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6624 if (_pos < _data) _pos = 0;
6627 bool operator==(
const reverse_iterator& rhs)
const 6629 return _pos == rhs._pos;
6631 bool operator!=(
const reverse_iterator& rhs)
const 6633 return _pos != rhs._pos;
6635 reverse_iterator& operator++()
6646 while(_pos >=_data && *_pos == _fieldSep) --_pos;
6648 while(_pos >_data && *_pos != _fieldSep) --_pos;
6649 if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6650 if (_pos < _data) _pos = 0;
6654 value_type operator*()
const 6657 size_t keyLength = 0, valueStart = 0, valueLength = 0;
6658 size_t i = (size_t)(_pos - _data);
6659 for(; i < _len && _data[i] !=
'='; ++i) ++keyLength;
6660 result.first.assign(_pos, keyLength);
6661 if (i<_len && _data[i] ==
'=')
6665 for(; i<_len && _data[i] != _fieldSep; ++i)
6670 result.second.assign(_data+valueStart, valueLength);
6675 : _data(data.
data()), _len(data.
len()),
6676 _fieldSep(fieldSeparator)
6680 FIX(
const char* data,
size_t len,
char fieldSeparator=1)
6681 : _data(data), _len(len), _fieldSep(fieldSeparator)
6685 iterator begin()
const 6687 return iterator(_data, _len, 0, _fieldSep);
6689 iterator end()
const 6691 return iterator(_data, _len, _len, _fieldSep);
6695 reverse_iterator rbegin()
const 6697 return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
6700 reverse_iterator rend()
const 6702 return reverse_iterator(_data, _len, 0, _fieldSep);
6723 std::stringstream _data;
6740 void append(
const T& tag,
const char* value,
size_t offset,
size_t length)
6743 _data.write(value+offset, (std::streamsize)length);
6751 void append(
const T& tag,
const std::string& value)
6753 _data << tag <<
'=' << value << _fs;
6762 operator std::string()
const 6770 _data.str(std::string());
6807 typedef std::map<Message::Field, Message::Field>
map_type;
6818 for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
6830 std::deque<Message> _q;
6831 std::string _commandId;
6833 std::string _queryId;
6837 unsigned _requestedAcks;
6840 volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
6841 typedef std::map<std::string, Message*> SOWKeyMap;
6842 SOWKeyMap _sowKeyMap;
6844 MessageStreamImpl(
const Client& client_)
6847 _maxDepth((
unsigned)~0),
6851 if (_client.isValid())
6857 MessageStreamImpl(ClientImpl* client_)
6860 _maxDepth((
unsigned)~0),
6864 if (_client.isValid())
6870 ~MessageStreamImpl()
6874 virtual void destroy()
6880 catch(std::exception &e)
6884 if (_client.isValid())
6890 if (_client.isValid())
6894 _client = Client((ClientImpl*)NULL);
6895 c.deferredExecution(MessageStreamImpl::destroyer,
this);
6903 static void destroyer(
void* vpMessageStreamImpl_)
6905 delete ((MessageStreamImpl*)vpMessageStreamImpl_);
6908 void setSubscription(
const std::string& subId_,
6909 const std::string& commandId_ =
"",
6910 const std::string& queryId_ =
"")
6912 Lock<Mutex> lock(_lock);
6914 if (!commandId_.empty() && commandId_ != subId_)
6915 _commandId = commandId_;
6916 if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
6917 _queryId = queryId_;
6919 if (Disconnected == _state)
return;
6920 assert(Unset==_state);
6924 void setSOWOnly(
const std::string& commandId_,
6925 const std::string& queryId_ =
"")
6927 Lock<Mutex> lock(_lock);
6928 _commandId = commandId_;
6929 if (!queryId_.empty() && queryId_ != commandId_)
6930 _queryId = queryId_;
6932 if (Disconnected == _state)
return;
6933 assert(Unset==_state);
6937 void setStatsOnly(
const std::string& commandId_,
6938 const std::string& queryId_ =
"")
6940 Lock<Mutex> lock(_lock);
6941 _commandId = commandId_;
6942 if (!queryId_.empty() && queryId_ != commandId_)
6943 _queryId = queryId_;
6945 if (Disconnected == _state)
return;
6946 assert(Unset==_state);
6948 _requestedAcks = Message::AckType::Stats;
6951 void setAcksOnly(
const std::string& commandId_,
unsigned acks_)
6953 Lock<Mutex> lock(_lock);
6954 _commandId = commandId_;
6956 if (Disconnected == _state)
return;
6957 assert(Unset==_state);
6959 _requestedAcks = acks_;
6964 Lock<Mutex> lock(_lock);
6965 if(state_ == AMPS::ConnectionStateListener::Disconnected)
6967 _state = Disconnected;
6972 void timeout(
unsigned timeout_)
6974 _timeout = timeout_;
6978 if(_state == Subscribe) _state = Conflate;
6980 void maxDepth(
unsigned maxDepth_)
6982 if(maxDepth_) _maxDepth = maxDepth_;
6983 else _maxDepth = (unsigned)~0;
6985 unsigned getMaxDepth(
void)
const 6989 unsigned getDepth(
void)
const 6991 return (
unsigned)(_q.size());
6996 Lock<Mutex> lock(_lock);
6997 if (!_previousTopic.
empty() && !_previousBookmark.
empty())
7001 if (_client.isValid())
7003 _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7007 catch (AMPSException&)
7009 catch (AMPSException& e)
7012 current_.invalidate();
7013 _previousTopic.
clear();
7014 _previousBookmark.
clear();
7017 _previousTopic.
clear();
7018 _previousBookmark.
clear();
7020 double minWaitTime = (double)(_timeout ? _timeout : 1000);
7021 Timer timer(minWaitTime);
7022 while(_q.empty() && _state & Running)
7025 _lock.wait((
long)minWaitTime);
7026 amps_invoke_waiting_function();
7030 if(timer.checkAndGetRemaining(&minWaitTime))
7038 current_ = _q.front();
7039 if(_q.size() == _maxDepth) _lock.signalAll();
7041 if(_state == Conflate)
7043 std::string sowKey = current_.
getSowKey();
7044 if(sowKey.length()) _sowKeyMap.erase(sowKey);
7046 else if(_state == AcksOnly)
7050 if((_state == AcksOnly && _requestedAcks == 0) ||
7051 (_state == SOWOnly && current_.
getCommand()==
"group_end"))
7055 else if (current_.
getCommandEnum() == Message::Command::Publish &&
7065 if(_state == Disconnected)
7067 throw DisconnectedException(
"Connection closed.");
7069 current_.invalidate();
7070 if(_state == Closed)
7074 return _timeout != 0;
7078 if (_client.isValid())
7080 if (_state == SOWOnly || _state == Subscribe)
7082 if (!_commandId.empty()) _client.
unsubscribe(_commandId);
7084 if (!_queryId.empty()) _client.
unsubscribe(_queryId);
7093 if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7098 static void _messageHandler(
const Message& message_, MessageStreamImpl* this_)
7100 Lock<Mutex> lock(this_->_lock);
7101 if(this_->_state != Conflate)
7103 AMPS_TESTING_SLOW_MESSAGE_STREAM
7104 if(this_->_q.size() >= this_->_maxDepth)
7109 this_->_lock.signalAll();
7110 throw MessageStreamFullException(
"Stream is currently full.");
7112 this_->_q.push_back(message_.
deepCopy());
7114 this_->_client.isValid() && this_->_client.getAutoAck() &&
7118 message_.setIgnoreAutoAck();
7123 std::string sowKey = message_.
getSowKey();
7126 SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7127 if(it != this_->_sowKeyMap.end())
7129 *(it->second) = message_.
deepCopy();
7133 if(this_->_q.size() >= this_->_maxDepth)
7139 this_->_lock.signalAll();
7140 throw MessageStreamFullException(
"Stream is currently full.");
7142 this_->_q.push_back(message_.
deepCopy());
7143 this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7148 while(this_->_q.size() >= this_->_maxDepth)
7150 this_->_lock.wait(1);
7152 this_->_q.push_back(message_.
deepCopy());
7155 this_->_lock.signalAll();
7158 inline MessageStream::MessageStream(
void)
7161 inline MessageStream::MessageStream(
const Client& client_)
7162 :_body(
new MessageStreamImpl(client_))
7165 inline void MessageStream::iterator::advance(
void)
7167 _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7171 return MessageHandler((
void(*)(
const Message&,
void*))MessageStreamImpl::_messageHandler, &_body.get());
7176 if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7178 result._body = (MessageStreamImpl*)(handler_._userData);
7183 inline void MessageStream::setSOWOnly(
const std::string& commandId_,
7184 const std::string& queryId_)
7186 _body->setSOWOnly(commandId_, queryId_);
7188 inline void MessageStream::setSubscription(
const std::string& subId_,
7189 const std::string& commandId_,
7190 const std::string& queryId_)
7192 _body->setSubscription(subId_, commandId_, queryId_);
7194 inline void MessageStream::setStatsOnly(
const std::string& commandId_,
7195 const std::string& queryId_)
7197 _body->setStatsOnly(commandId_, queryId_);
7199 inline void MessageStream::setAcksOnly(
const std::string& commandId_,
7202 _body->setAcksOnly(commandId_, acks_);
7221 return _body->getMaxDepth();
7225 return _body->getDepth();
7228 inline MessageStream ClientImpl::getEmptyMessageStream(
void)
7230 return *(_pEmptyMessageStream.get());
7238 Message& message = command_.getMessage();
7241 bool useExistingHandler = !subId.
empty() && ((!message.getOptions().empty() && message.getOptions().contains(
"replace",7)) || message.
getCommandEnum() == Message::Command::SOW);
7242 if(useExistingHandler)
7248 if (_body.get()._routes.getRoute(subId, existingHandler))
7251 _body.get().executeAsync(command_, existingHandler,
false);
7252 return MessageStream::fromExistingHandler(existingHandler);
7258 Message::Command::Type command = command_.getMessage().
getCommandEnum();
7259 if ((command & Message::Command::NoDataCommands)
7260 && (ackTypes == Message::AckType::Persisted
7261 || ackTypes == Message::AckType::None))
7264 if (!_body.get()._pEmptyMessageStream)
7266 _body.get()._pEmptyMessageStream.reset(
new MessageStream((ClientImpl*)0));
7267 _body.get()._pEmptyMessageStream.get()->_body->close();
7269 return _body.get().getEmptyMessageStream();
7272 if (_body.get().getDefaultMaxDepth())
7273 stream.
maxDepth(_body.get().getDefaultMaxDepth());
7275 std::string commandID = _body.get().executeAsync(command_, handler,
false);
7276 if (command_.hasStatsAck())
7278 stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
7280 else if (command_.isSow())
7282 stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
7284 else if (command_.isSubscribe())
7286 stream.setSubscription(commandID,
7288 command_.getMessage().getQueryId());
7293 if (command == Message::Command::Publish ||
7294 command == Message::Command::DeltaPublish ||
7295 command == Message::Command::SOWDelete)
7297 stream.setAcksOnly(commandID,
7298 ackTypes & (
unsigned)~Message::AckType::Persisted);
7302 stream.setAcksOnly(commandID, ackTypes);
7309 inline void Message::ack(
const char* options_)
const 7311 ClientImpl* pClient = _body.get().clientImpl();
7313 if(pClient && bookmark.
len() &&
7314 !pClient->getAutoAck())
7316 pClient->ack(getTopic(),bookmark,options_);
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:504
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:181
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1229
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:3937
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1012
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:457
std::string getAckType() const
Definition: ampsplusplus.hpp:583
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4133
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1032
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:6721
void startTimer()
Definition: ampsplusplus.hpp:5494
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5051
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:706
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:555
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7209
std::string subscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4618
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1119
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1118
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1112
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:433
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:939
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:5785
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6060
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1105
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4038
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:4848
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:493
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1116
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:678
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6069
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:5934
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:4803
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:487
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4257
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4511
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4053
DisconnectHandler getDisconnectHandler(void)
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4183
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1033
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:668
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4273
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:483
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self's set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:5812
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:657
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6018
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4024
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5558
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:446
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1095
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:895
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:6758
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6080
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4409
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:3844
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1110
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5237
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4109
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1128
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:758
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6042
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7219
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1012
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4084
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:989
Success.
Definition: amps.h:189
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:999
std::string deltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:4701
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:652
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4490
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:6803
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:498
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4122
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:3840
amps_result
Return values from amps_xxx functions.
Definition: amps.h:184
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4310
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1045
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1009
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7233
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:601
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1032
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4265
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6092
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5019
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:588
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4013
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:553
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1023
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:525
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4557
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:828
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5334
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:573
std::string send(MessageHandler messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4161
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5193
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:3920
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:5760
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1010
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self's set of listeners.
Definition: ampsplusplus.hpp:5804
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:448
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1119
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4358
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1012
void publishFlush(long timeout_=0)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4450
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1032
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4221
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:440
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1034
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1010
std::string sowDelete(MessageHandler messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5446
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:5671
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:669
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1019
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1105
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:485
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:542
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5133
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1107
void setDisconnectHandler(DisconnectHandler disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4175
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:489
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:849
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:664
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:4719
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4575
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:535
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1011
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1128
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:646
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:3993
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1009
std::string bookmarkSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:4775
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:6731
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:5946
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:659
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1105
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:700
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5087
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:783
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6740
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1009
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:909
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4213
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:866
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6007
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1132
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1115
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:937
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic's SOW cache.
Definition: ampsplusplus.hpp:5469
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1117
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4205
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:121
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1107
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:483
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:5664
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:456
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4233
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5727
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6751
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:5997
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4469
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5172
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:509
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:3811
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5409
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:858
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1045
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:5842
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5370
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4593
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:511
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:4860
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:5989
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:4735
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1118
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:819
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:5651
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:923
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4288
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:6795
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1045
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4192
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:5922
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:6807
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:804
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:6814
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:204
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5270
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:5721
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:491
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4674
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:3803
std::string sow(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:4895
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1010
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:627
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:887
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7214
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
std::string sow(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:4987
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:537
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1133
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4091
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:4933
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:563
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:521
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7223
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1112
The operation has not succeeded, but ought to be retried.
Definition: amps.h:213
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4000
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:5697
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4060
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:507
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:837
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1120
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
std::string sowDeleteByData(MessageHandler messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5593
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1075
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:497
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5638
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:6768
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4302
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5291
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6028
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:481
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1132
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:5979
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4333
Definition: ampsplusplus.hpp:105
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:3975
std::string stopTimer(MessageHandler messageHandler)
Definition: ampsplusplus.hpp:5505
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:615
std::string sowDeleteByKeys(MessageHandler messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic's SOW cache.
Definition: ampsplusplus.hpp:5531
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:879
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1120
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:965
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:762
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:4950
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4382
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:5734
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4535
The client and server are disconnected.
Definition: amps.h:217
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5614
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7204
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:3982
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:354
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1114
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4644
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:4822
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:468
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:5876
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:3855
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6051
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:5970