AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps.h"
28 #include "ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <set>
38 #include <deque>
39 #include <vector>
40 #include <assert.h>
41 #ifndef _WIN32
42 #include <inttypes.h>
43 #endif
44 #if defined(sun)
45 #include <sys/atomic.h>
46 #endif
47 #include "MessageRouter.hpp"
48 #include "util.hpp"
49 #include "ampscrc.hpp"
50 
51 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
52 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
53 #endif
54 
59 
60 
67 
78 
79 #define AMPS_INITIAL_LOG_SIZE 40960UL
80 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
81 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
82 #define AMPS_RING_POSITIONS 3
83 #define AMPS_RING_BYTES_SUBID 128
84 #define AMPS_RING_BYTES_BOOKMARK 64
85 #define AMPS_RING_ENTRIES 16384
86 #define AMPS_RING_ENTRY_SIZE ( AMPS_RING_BYTES_SUBID + ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) )
87 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
88 
98 #define AMPS_BOOKMARK_RECENT "recent"
102 
105 #define AMPS_BOOKMARK_EPOCH "0"
106 
109 #define AMPS_BOOKMARK_NOW "0|1|"
110 
111 /* @} */
112 
113 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL
114 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
115 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
116 #define AMPS_PERSISTED_BOOKMARK_MIN_VERSION 3080000
117 #define AMPS_MULTI_BOOKMARK_MIN_VERSION 4000000
118 #define AMPS_FLUSH_MIN_VERSION 4000000
119 #define AMPS_MIN_SOW_KEY_PUBLISH_VERSION 4030100
120 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
121 #define AMPS_DEFAULT_TOP_N -1
122 #define AMPS_DEFAULT_BATCH_SIZE 10
123 #define AMPS_NUMBER_BUFFER_LEN 20
124 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
125 
126 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
127 #define AMPS_X64 1
128 #endif
129 
130 #ifdef _WIN32
131 static __declspec ( thread ) AMPS::Message* publishStoreMessage = 0;
132 #else
133 static __thread AMPS::Message* publishStoreMessage = 0;
134 #endif
135 
136 namespace AMPS
137 {
138 
139 typedef std::map<std::string, std::string> ConnectionInfo;
140 
141 class PerThreadMessageTracker {
142 std::vector<AMPS::Message*> _messages;
143 public:
144  PerThreadMessageTracker() {}
145  ~PerThreadMessageTracker()
146  {
147  for (size_t i=0; i<_messages.size(); ++i)
148  {
149  delete _messages[i];
150  }
151  }
152  void addMessage(AMPS::Message* message)
153  {
154  _messages.push_back(message);
155  }
156  static void addMessageToCleanupList(AMPS::Message* message)
157  {
158  static AMPS::Mutex _lock;
159  AMPS::Lock<Mutex> l(_lock);
160  _addMessageToCleanupList(message);
161  }
162  static void _addMessageToCleanupList(AMPS::Message* message)
163  {
164  static PerThreadMessageTracker tracker;
165  tracker.addMessage(message);
166  }
167 };
168 
169 template<class Type>
170 inline std::string asString(Type x_)
171 {
172  std::ostringstream os;
173  os << x_;
174  return os.str();
175 }
176 
177 inline
178 size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
179 {
180  size_t pos = AMPS_NUMBER_BUFFER_LEN;
181  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
182  {
183  if (seqNo_ > 0)
184  {
185  buf_[--pos] = (char)(seqNo_ % 10 + '0');
186  seqNo_ /= 10;
187  }
188  }
189  return pos;
190 }
191 
192 #ifdef _WIN32
193 inline
194 size_t convertToCharArray(char* buf_, unsigned long seqNo_)
195 {
196  size_t pos = AMPS_NUMBER_BUFFER_LEN;
197  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
198  {
199  if (seqNo_ > 0)
200  {
201  buf_[--pos] = (char)(seqNo_ % 10 + '0');
202  seqNo_ /= 10;
203  }
204  }
205  return pos;
206 }
207 #endif
208 
212 class Reason
213 {
214  public:
215  static const char* duplicate() { return "duplicate";}
216  static const char* badFilter() { return "bad filter";}
217  static const char* badRegexTopic() { return "bad regex topic";}
218  static const char* subscriptionAlreadyExists() { return "subscription already exists";}
219  static const char* nameInUse() { return "name in use";}
220  static const char* authFailure() { return "auth failure";}
221  static const char* notEntitled() { return "not entitled";}
222  static const char* authDisabled() { return "authentication disabled";}
223  static const char* subidInUse() { return "subid in use";}
224  static const char* noTopic() { return "no topic";}
225 };
226 
236 {
237 public:
238  virtual ~ExceptionListener() {;}
239  virtual void exceptionThrown(const std::exception&) const {;}
240 };
241 
243 
244 
245 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
246  try\
247  {\
248  x;\
249  }\
250  catch (std::exception& ex_)\
251  {\
252  try\
253  {\
254  _exceptionListener->exceptionThrown(ex_);\
255  }\
256  catch(...)\
257  {\
258  ;\
259  }\
260  }
261  /*
262  * Note : we don't attempt to trap non std::exception exceptions
263  * here because doing so interferes with pthread_exit on some OSes.
264  catch (...)\
265  {\
266  try\
267  {\
268  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
269  "An unhandled exception of unknown type was thrown by "\
270  "the registered handler.", AMPS_E_USAGE));\
271  }\
272  catch(...)\
273  {\
274  ;\
275  }\
276  }
277  */
278 #ifdef _WIN32
279 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \
280  while(me->_connected)\
281  {\
282  try\
283  {\
284  x;\
285  break;\
286  }\
287  catch(MessageStreamFullException&)\
288  {\
289  me->checkAndSendHeartbeat(false);\
290  }\
291  }
292 #else
293 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \
294  while(me->_connected)\
295  {\
296  try\
297  {\
298  x;\
299  break;\
300  }\
301  catch(MessageStreamFullException& ex_)\
302  {\
303  me->checkAndSendHeartbeat(false);\
304  }\
305  }
306 #endif
307 
308 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
309  try\
310  {\
311  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \
312  }\
313  catch (std::exception& ex_)\
314  {\
315  try\
316  {\
317  me->_exceptionListener->exceptionThrown(ex_);\
318  }\
319  catch(...)\
320  {\
321  ;\
322  }\
323  }
324  /*
325  * Note : we don't attempt to trap non std::exception exceptions
326  * here because doing so interferes with pthread_exit on some OSes.
327  catch (...)\
328  {\
329  try\
330  {\
331  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
332  "An unhandled exception of unknown type was thrown by "\
333  "the registered handler.", AMPS_E_USAGE));\
334  }\
335  catch(...)\
336  {\
337  ;\
338  }\
339  }*/
340 
341 #define AMPS_UNHANDLED_EXCEPTION(ex) \
342  try\
343  {\
344  _exceptionListener->exceptionThrown(ex);\
345  }\
346  catch(...)\
347  {;}
348 
349 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
350  try\
351  {\
352  me->_exceptionListener->exceptionThrown(ex);\
353  }\
354  catch(...)\
355  {;}
356 
357 
358 class Client;
359 
384 
385 class Command
386 {
387  Message _message;
388  unsigned _timeout;
389  unsigned _batchSize;
390  unsigned _flags;
391  static const unsigned Subscribe = 1;
392  static const unsigned SOW = 2;
393  static const unsigned NeedsSequenceNumber = 4;
394  static const unsigned ProcessedAck = 8;
395  static const unsigned StatsAck = 16;
396  void init(Message::Command::Type command_)
397  {
398  _timeout = 0;
399  _batchSize = 0;
400  _flags = 0;
401  _message.reset();
402  _message.setCommandEnum(command_);
403  _setIds();
404  }
405  void init(const std::string& command_)
406  {
407  _timeout = 0;
408  _batchSize = 0;
409  _flags = 0;
410  _message.reset();
411  _message.setCommand(command_);
412  _setIds();
413  }
414  void _setIds(void)
415  {
416  Message::Command::Type command = _message.getCommandEnum();
417  if (command != Message::Command::Publish && command != Message::Command::DeltaPublish)
418  {
419  _message.newCommandId();
420  if (command == Message::Command::Subscribe ||
421  command == Message::Command::SOWAndSubscribe ||
422  command == Message::Command::DeltaSubscribe ||
423  command == Message::Command::SOWAndDeltaSubscribe)
424  {
425  _message.setSubscriptionId(_message.getCommandId());
426  _flags |= Subscribe;
427  }
428  if (command == Message::Command::SOWDelete)
429  {
430  _flags |= NeedsSequenceNumber;
431  }
432  else
433  {
434  _message.setQueryID(_message.getCommandId());
435  if (command == Message::Command::SOW)
436  {
437  _flags |= SOW;
438  if (_batchSize == 0)
439  {
440  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
441  }
442  }
443  else if ((command == Message::Command::SOWAndSubscribe ||
444  command == Message::Command::SOWAndDeltaSubscribe)
445  && _batchSize == 0)
446  {
447  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
448  }
449  }
450  _flags |= ProcessedAck;
451  }
452  else
453  {
454  _flags |= NeedsSequenceNumber;
455  }
456  }
457 public:
461  Command(const std::string& command_)
462  {
463  init(command_);
464  }
468  Command(Message::Command::Type command_)
469  {
470  init(command_);
471  }
472 
476  Command& reset(const std::string& command_)
477  {
478  init(command_);
479  return *this;
480  }
484  Command& reset(Message::Command::Type command_)
485  {
486  init(command_);
487  return *this;
488  }
496  Command& setSowKey(const std::string& sowKey_) { _message.setSowKey(sowKey_); return *this; }
509  Command& setSowKeys(const std::string& sowKeys_) { _message.setSowKeys(sowKeys_); return *this; }
511  Command& setCommandId(const std::string& v_) { _message.setCommandId(v_); return *this; }
513  Command& setTopic(const std::string& v_) { _message.setTopic(v_); return *this; }
515  Command& setFilter(const std::string& v_) { _message.setFilter(v_); return *this; }
517  Command& setOrderBy(const std::string& v_) { _message.setOrderBy(v_); return *this; }
519  Command& setSubId(const std::string& v_) { _message.setSubscriptionId(v_); return *this; }
523  Command& setBookmark(const std::string& v_) { _message.setBookmark(v_); return *this; }
530  Command& setCorrelationId(const std::string& v_) { _message.setCorrelationId(v_); return *this; }
533  Command& setOptions(const std::string& v_) { _message.setOptions(v_); return *this; }
536  Command& setData(const std::string& v_) { _message.setData(v_); return *this; }
540  Command& setData(const char* v_, size_t length_) { _message.setData(v_, length_); return *this; }
550  Command& setTimeout(unsigned v_) { _timeout = v_; return *this; }
552  Command& setTopN(unsigned v_) { _message.setTopNRecordsReturned(v_); return *this; }
557  Command& setBatchSize(unsigned v_) { _message.setBatchSize(v_); _batchSize = v_; return *this; }
568  Command& setExpiration(unsigned v_) { _message.setExpiration(v_); return *this; }
570  Command& addAckType(const std::string& v_)
571  {
572  _message.setAckType(_message.getAckType() + "," + v_);
573  if (v_ == "processed") _flags |= ProcessedAck;
574  else if (v_ == "stats") _flags |= StatsAck;
575  return *this;
576  }
577 
578  Message& getMessage(void) { return _message; }
579  unsigned getTimeout(void) const { return _timeout; }
580  unsigned getBatchSize(void) const { return _batchSize; }
581  bool isSubscribe(void) const
582  {
583  return _flags & Subscribe;
584  }
585  bool isSow(void) const { return (_flags & SOW) != 0; }
586  bool hasProcessedAck(void) const { return (_flags & ProcessedAck) != 0; }
587  bool hasStatsAck(void) const { return (_flags & StatsAck) != 0; }
588  bool needsSequenceNumber(void) const { return (_flags & NeedsSequenceNumber) != 0; }
589 };
590 
593 typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
594 
595 class Message;
597 
601 {
602 public:
603  virtual ~Authenticator() {;}
604 
610  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
618  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
625  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
626 };
627 
632 {
633 public:
634  virtual ~DefaultAuthenticator() {;}
637  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
638  {
639  return password_;
640  }
641 
644  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
645  {
646  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
647  }
648 
649  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
650 
655  {
656  static DefaultAuthenticator d;
657  return d;
658  }
659 };
660 
661 class BookmarkStore;
662 
672 typedef bool (*BookmarkStoreResizeHandler)(BookmarkStore store_,
673  const Message::Field& subId_,
674  size_t size_,
675  void* userData_);
676 
679 class BookmarkStoreImpl : public RefBody
680 {
681 public:
683  : _resizeHandler(NULL)
684  , _resizeHandlerData(NULL)
685  , _maxSubIdLength(AMPS_MAX_SUBID_LEN)
686  {;}
687 
688  virtual ~BookmarkStoreImpl() {;}
689 
696  virtual size_t log(Message& message_) = 0;
697 
704  virtual void discard(const Message::Field& subId_,
705  size_t bookmarkSeqNo_) = 0;
706 
712  virtual void discard(const Message& message_) = 0;
713 
720  virtual Message::Field getMostRecent(const Message::Field& subId_) = 0;
721 
730  virtual bool isDiscarded(Message& message_) = 0;
731 
737  virtual void purge() = 0;
738 
744  virtual void purge(const Message::Field& subId_) = 0;
745 
750  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_) = 0;
751 
758  virtual void setResizeHandler(BookmarkStoreResizeHandler handler_, void* userData_)
759  {
760  _resizeHandler = handler_;
761  _resizeHandlerData = userData_;
762  }
763 
769  virtual void persisted(const Message::Field& subId_,
770  const Message::Field& bookmark_) = 0;
771 
778  virtual Message::Field persisted(const Message::Field& subId_, size_t bookmark_) = 0;
779 
784  virtual void noPersistedAcks(const Message::Field& subId_) = 0;
785 
790  virtual void setServerVersion(size_t version_) = 0;
791 
796  virtual void setServerVersion(const VersionInfo& version_) = 0;
797 
798  bool callResizeHandler(const Message::Field& subId_, size_t newSize_);
799 
800  inline void prune(std::string tmpFileName_ = std::string())
801  {
802  _prune(tmpFileName_);
803  }
804 
805  virtual void _prune(std::string) { return; }
806 
811  size_t getMaxSubIdLength() const
812  {
813  return _maxSubIdLength;
814  }
815 
820  void setMaxSubIdLength(size_t maxSubIdLength_)
821  {
822  _maxSubIdLength = maxSubIdLength_;
823  }
824 
825 private:
826  BookmarkStoreResizeHandler _resizeHandler;
827  void* _resizeHandlerData;
828  size_t _maxSubIdLength;
829 };
830 
834 {
835  RefHandle<BookmarkStoreImpl> _body;
836 public:
840 
843  BookmarkStore(BookmarkStoreImpl* impl_) : _body(impl_) {;}
844 
845  BookmarkStore(const BookmarkStore& rhs) : _body(rhs._body) {;}
846 
847  BookmarkStore& operator=(const BookmarkStore& rhs)
848  {
849  _body = rhs._body;
850  return *this;
851  }
852 
853  ~BookmarkStore() {;}
854 
858  {
859  _body = impl_;
860  }
861 
862  bool isValid() const
863  {
864  return _body.isValid();
865  }
866 
873  size_t log(Message& message_)
874  {
875  if (_body.isValid())
876  return _body.get().log(message_);
877  return Message::BOOKMARK_NONE;
878  }
879 
886  void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
887  {
888  if (_body.isValid())
889  _body.get().discard(subId_, bookmarkSeqNo_);
890  }
891 
897  void discard(const Message& message_)
898  {
899  if (_body.isValid())
900  _body.get().discard(message_);
901  }
902 
910  {
911  if (_body.isValid())
912  return _body.get().getMostRecent(subId_);
914  }
915 
924  bool isDiscarded(Message& message_)
925  {
926  if (_body.isValid())
927  return _body.get().isDiscarded(message_);
928  return false;
929  }
930 
936  void purge()
937  {
938  if (_body.isValid())
939  _body.get().purge();
940  }
941 
947  void purge(const Message::Field& subId_)
948  {
949  if (_body.isValid())
950  _body.get().purge(subId_);
951  }
952 
959  void setResizeHandler(BookmarkStoreResizeHandler handler_, void* userData_)
960  {
961  if (_body.isValid())
962  _body.get().setResizeHandler(handler_, userData_);
963  }
964 
969  size_t getOldestBookmarkSeq(const std::string& subId_)
970  {
971  if (_body.isValid())
972  return _body.get().getOldestBookmarkSeq(Message::Field(subId_.c_str(),
973  subId_.length()));
974  return AMPS_UNSET_INDEX;
975  }
976 
981  size_t getOldestBookmarkSeq(const Message::Field& subId_)
982  {
983  if (_body.isValid())
984  return _body.get().getOldestBookmarkSeq(subId_);
985  return AMPS_UNSET_INDEX;
986  }
987 
992  void persisted(const Message::Field& subId_, const Message::Field& bookmark_)
993  {
994  if (_body.isValid())
995  _body.get().persisted(subId_, bookmark_);
996  }
997 
1002  void persisted(const Message::Field& subId_, size_t bookmark_)
1003  {
1004  if (_body.isValid())
1005  _body.get().persisted(subId_, bookmark_);
1006  }
1007 
1012  void noPersistedAcks(const Message::Field& subId_)
1013  {
1014  if (_body.isValid())
1015  _body.get().noPersistedAcks(subId_);
1016  }
1017 
1022  void setServerVersion(size_t version_)
1023  {
1024  if (_body.isValid())
1025  _body.get().setServerVersion(version_);
1026  }
1027 
1032  void setServerVersion(const VersionInfo& version_)
1033  {
1034  if (_body.isValid())
1035  _body.get().setServerVersion(version_);
1036  }
1037 
1043  void prune(std::string tmpFileName_ = "")
1044  {
1045  if (_body.isValid())
1046  _body.get().prune(tmpFileName_);
1047  }
1048 
1053  {
1054  if (_body.isValid())
1055  return &_body.get();
1056  else
1057  return NULL;
1058  }
1059 
1064  size_t getMaxSubIdLength() const
1065  {
1066  if (_body.isValid())
1067  return _body.get().getMaxSubIdLength();
1068  else
1069  return 0;
1070  }
1071 
1076  void setMaxSubIdLength(size_t maxSubIdLength_)
1077  {
1078  if (_body.isValid())
1079  _body.get().setMaxSubIdLength(maxSubIdLength_);
1080  }
1081 
1082 };
1083 
1084 inline bool BookmarkStoreImpl::callResizeHandler(const Message::Field& subId_,
1085  size_t newSize_)
1086 {
1087  if (_resizeHandler)
1088  return _resizeHandler(BookmarkStore(this), subId_, newSize_, _resizeHandlerData);
1089  return true;
1090 }
1091 
1099  const Message::Field& subId_,
1100  size_t newSize_, void* data_)
1101 {
1102  size_t* maxSizep = (size_t*)data_;
1103  if (newSize_ > *maxSizep)
1104  {
1105  size_t discardSeq = store_.getOldestBookmarkSeq(subId_);
1106  store_.discard(subId_, discardSeq);
1107  store_.persisted(subId_, discardSeq);
1108  return false;
1109  }
1110  return true;
1111 }
1112 
1116 {
1117 public:
1118 
1122  virtual void execute(Message& message_) = 0;
1123 
1124  virtual ~StoreReplayer() {;}
1125 };
1126 
1127 class Store;
1128 
1137 typedef bool (*PublishStoreResizeHandler)(Store store_,
1138  size_t size_,
1139  void* userData_);
1140 
1143 class StoreImpl : public RefBody
1144 {
1145 public:
1146  StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
1147 
1152  virtual amps_uint64_t store(const Message& message_) = 0;
1153 
1158  virtual void discardUpTo(amps_uint64_t index_) = 0;
1159 
1164  virtual void replay(StoreReplayer& replayer_) = 0;
1165 
1173  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
1174 
1179  virtual size_t unpersistedCount() const = 0;
1180 
1181  virtual ~StoreImpl() {;}
1182 
1191  virtual void flush(long timeout_) = 0;
1192 
1195  static inline size_t getUnsetPosition() { return AMPS_UNSET_INDEX; }
1196 
1199  static inline amps_uint64_t getUnsetSequence() { return AMPS_UNSET_SEQUENCE; }
1200 
1204  virtual amps_uint64_t getLowestUnpersisted() const = 0;
1205 
1209  virtual amps_uint64_t getLastPersisted() = 0;
1210 
1220  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1221  void* userData_)
1222  {
1223  _resizeHandler = handler_;
1224  _resizeHandlerData = userData_;
1225  }
1226 
1227  inline virtual PublishStoreResizeHandler getResizeHandler() const
1228  {
1229  return _resizeHandler;
1230  }
1231 
1232  bool callResizeHandler(size_t newSize_);
1233 
1234 private:
1235  PublishStoreResizeHandler _resizeHandler;
1236  void* _resizeHandlerData;
1237 };
1238 
1241 class Store
1242 {
1243  RefHandle<StoreImpl> _body;
1244 public:
1245  Store() {;}
1246  Store(StoreImpl* body_) : _body(body_) {;}
1247  Store& operator=(const Store& rhs)
1248  {
1249  _body = rhs._body;
1250  return *this;
1251  }
1252 
1256  amps_uint64_t store(const Message& message_)
1257  {
1258  return _body.get().store(message_);
1259  }
1260 
1265  void discardUpTo(amps_uint64_t index_)
1266  {
1267  _body.get().discardUpTo(index_);
1268  }
1269 
1274  void replay(StoreReplayer& replayer_)
1275  {
1276  _body.get().replay(replayer_);
1277  }
1278 
1286  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1287  {
1288  return _body.get().replaySingle(replayer_, index_);
1289  }
1290 
1295  size_t unpersistedCount() const
1296  {
1297  return _body.get().unpersistedCount();
1298  }
1299 
1303  bool isValid() const
1304  {
1305  return _body.isValid();
1306  }
1307 
1316  void flush(long timeout_ = 0)
1317  {
1318  return _body.get().flush(timeout_);
1319  }
1320 
1324  amps_uint64_t getLowestUnpersisted()
1325  {
1326  return _body.get().getLowestUnpersisted();
1327  }
1328 
1332  amps_uint64_t getLastPersisted()
1333  {
1334  return _body.get().getLastPersisted();
1335  }
1336 
1346  void setResizeHandler(PublishStoreResizeHandler handler_,
1347  void* userData_)
1348  {
1349  _body.get().setResizeHandler(handler_, userData_);
1350  }
1351 
1352  PublishStoreResizeHandler getResizeHandler()
1353  {
1354  return _body.get().getResizeHandler();
1355  }
1356 
1360  StoreImpl* get()
1361  {
1362  if (_body.isValid())
1363  return &_body.get();
1364  else
1365  return NULL;
1366  }
1367 
1368 };
1369 
1375 {
1376 public:
1377  virtual ~FailedWriteHandler() {;}
1384  virtual void failedWrite(const Message& message_,
1385  const char* reason_, size_t reasonLength_) = 0;
1386 };
1387 
1388 
1389 inline bool StoreImpl::callResizeHandler(size_t newSize_)
1390 {
1391  if(_resizeHandler)
1392  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1393  return true;
1394 }
1395 
1402 inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1403  void* data_)
1404 {
1405  long* timeoutp = (long*)data_;
1406  size_t count = store_.unpersistedCount();
1407  if (count == 0) return false;
1408  try
1409  {
1410  store_.flush(*timeoutp);
1411  }
1412 #ifdef _WIN32
1413  catch (const TimedOutException&)
1414 #else
1415  catch (const TimedOutException& e)
1416 #endif
1417  {
1418  return true;
1419  }
1420  return (count == store_.unpersistedCount());
1421 }
1422 
1427 {
1428 public:
1429  virtual ~SubscriptionManager() {;}
1437  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1438  unsigned requestedAckTypes_) = 0;
1442  virtual void unsubscribe(const Message::Field& subId_) = 0;
1445  virtual void clear() = 0;
1449  virtual void resubscribe(Client& client_) = 0;
1450 };
1451 
1455 
1457 {
1458 public:
1460  typedef enum { Disconnected = 0,
1461  Shutdown = 1,
1462  Connected = 2,
1463  LoggedOn = 4,
1464  PublishReplayed = 8,
1465  HeartbeatInitiated = 16,
1466  Resubscribed = 32,
1467  UNKNOWN = 16384
1468  } State;
1469 
1479  virtual void connectionStateChanged(State newState_) = 0;
1480  virtual ~ConnectionStateListener() {;};
1481 };
1482 
1483 
1484 class MessageStreamImpl;
1485 class MessageStream;
1486 
1487 typedef void(*DeferredExecutionFunc)(void*);
1488 
1489 class ClientImpl : public RefBody
1490 {
1491  friend class Client;
1492 protected:
1493  amps_handle _client;
1494  DisconnectHandler _disconnectHandler;
1495  enum GlobalCommandTypeHandlers : size_t
1496  {
1497  Publish = 0,
1498  SOW = 1,
1499  GroupBegin = 2,
1500  GroupEnd = 3,
1501  Heartbeat = 4,
1502  OOF = 5,
1503  Ack = 6,
1504  LastChance = 7,
1505  DuplicateMessage = 8,
1506  COUNT = 9
1507  };
1508  std::vector<MessageHandler> _globalCommandTypeHandlers;
1509  MessageHandler _lastChanceMessageHandler, _duplicateMessageHandler;
1510  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1511  MessageRouter _routes;
1512  MessageRouter::RouteCache _routeCache;
1513  mutable Mutex _lock;
1514  std::string _name, _lastUri, _logonCorrelationData;
1515  BookmarkStore _bookmarkStore;
1516  Store _publishStore;
1517  bool _isRetryOnDisconnect;
1518  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1519  volatile amps_uint64_t _lastSentHaSequenceNumber;
1520  ATOMIC_TYPE_8 _badTimeToHAPublish;
1521  ATOMIC_TYPE_8 _badTimeToHASubscribe;
1522  VersionInfo _serverVersion;
1523  Timer _heartbeatTimer;
1524  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1525 
1526  // queue data
1527  int _queueAckTimeout;
1528  bool _isAutoAckEnabled;
1529  unsigned _ackBatchSize;
1530  unsigned _queuedAckCount;
1531  unsigned _defaultMaxDepth;
1532  struct QueueBookmarks
1533  {
1534  QueueBookmarks(const std::string topic_)
1535  :_topic(topic_)
1536  ,_oldestTime(0)
1537  ,_bookmarkCount(0)
1538  {;}
1539  std::string _topic;
1540  std::string _data;
1541  amps_uint64_t _oldestTime;
1542  unsigned _bookmarkCount;
1543  };
1544  typedef amps_uint64_t topic_hash;
1545  typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1546  TopicHashMap _topicHashMap;
1547 
1548  class ClientStoreReplayer : public StoreReplayer
1549  {
1550  ClientImpl* _client;
1551  public:
1552  unsigned _version;
1553  amps_result _res;
1554 
1555  ClientStoreReplayer()
1556  : _client(NULL) , _version(0), _res(AMPS_E_OK)
1557  {}
1558 
1559  ClientStoreReplayer(ClientImpl* client_)
1560  : _client(client_) , _version(0), _res(AMPS_E_OK)
1561  {}
1562 
1563  void setClient(ClientImpl* client_) { _client = client_; }
1564 
1565  void execute(Message& message_)
1566  {
1567  if (!_client) throw CommandException("Can't replay without a client.");
1568  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1569  AMPS_Sequence);
1570  if (index > _client->_lastSentHaSequenceNumber)
1571  _client->_lastSentHaSequenceNumber = index;
1572 
1573  _res = AMPS_E_OK;
1574  // Don't replay a queue cancel message after a reconnect.
1575  // Currently, the only messages that will have anything in options
1576  // are cancel messages.
1577  if (!message_.getCommand().empty() &&
1578  (!_client->_badTimeToHAPublish ||
1579  message_.getOptions().len() < 6))
1580  {
1581  _res= amps_client_send_with_version(_client->_client,
1582  message_.getMessage(),
1583  &_version);
1584  if (_res != AMPS_E_OK)
1585  {
1586  throw DisconnectedException("AMPS Server disconnected during replay");
1587  }
1588  }
1589  }
1590 
1591  };
1592  ClientStoreReplayer _replayer;
1593 
1594  class FailedWriteStoreReplayer : public StoreReplayer
1595  {
1596  ClientImpl* _parent;
1597  const char* _reason;
1598  size_t _reasonLength;
1599  size_t _replayCount;
1600  public:
1601  FailedWriteStoreReplayer(ClientImpl* parent,const char* reason_, size_t reasonLength_)
1602  : _parent(parent),
1603  _reason(reason_),
1604  _reasonLength(reasonLength_),
1605  _replayCount(0)
1606  {;}
1607  void execute(Message& message_)
1608  {
1609  if (_parent->_failedWriteHandler)
1610  {
1611  ++_replayCount;
1612  _parent->_failedWriteHandler->failedWrite(message_,
1613  _reason, _reasonLength);
1614  }
1615  }
1616  size_t replayCount(void) const { return _replayCount; }
1617  };
1618 
1619  struct AckResponseImpl : public RefBody
1620  {
1621  std::string username, password, reason, status, bookmark, options;
1622  amps_uint64_t sequenceNo;
1623  VersionInfo serverVersion;
1624  volatile bool responded, abandoned;
1625  unsigned connectionVersion;
1626  AckResponseImpl() :
1627  RefBody(),
1628  sequenceNo((amps_uint64_t)0),
1629  serverVersion(),
1630  responded(false),
1631  abandoned(false),
1632  connectionVersion(0)
1633  {
1634  }
1635  };
1636 
1637  class AckResponse
1638  {
1639  RefHandle<AckResponseImpl> _body;
1640  public:
1641  AckResponse() : _body(NULL) {;}
1642  static AckResponse create()
1643  {
1644  AckResponse r;
1645  r._body = new AckResponseImpl();
1646  return r;
1647  }
1648 
1649  const std::string& username()
1650  {
1651  return _body.get().username;
1652  }
1653  void setUsername(const char* data_, size_t len_)
1654  {
1655  if (data_) _body.get().username.assign(data_, len_);
1656  else _body.get().username.clear();
1657  }
1658  const std::string& password()
1659  {
1660  return _body.get().password;
1661  }
1662  void setPassword(const char* data_, size_t len_)
1663  {
1664  if (data_) _body.get().password.assign(data_, len_);
1665  else _body.get().password.clear();
1666  }
1667  const std::string& reason()
1668  {
1669  return _body.get().reason;
1670  }
1671  void setReason(const char* data_, size_t len_)
1672  {
1673  if (data_) _body.get().reason.assign(data_, len_);
1674  else _body.get().reason.clear();
1675  }
1676  const std::string& status()
1677  {
1678  return _body.get().status;
1679  }
1680  void setStatus(const char* data_, size_t len_)
1681  {
1682  if (data_) _body.get().status.assign(data_, len_);
1683  else _body.get().status.clear();
1684  }
1685  const std::string& bookmark()
1686  {
1687  return _body.get().bookmark;
1688  }
1689  void setBookmark(const char* data_, size_t len_)
1690  {
1691  if (data_) _body.get().bookmark.assign(data_, len_);
1692  else _body.get().bookmark.clear();
1693  }
1694  amps_uint64_t sequenceNo() const
1695  {
1696  return _body.get().sequenceNo;
1697  }
1698  void setSequenceNo(const char* data_, size_t len_)
1699  {
1700  amps_uint64_t result = (amps_uint64_t)0;
1701  if (data_)
1702  {
1703  for(size_t i=0; i<len_; ++i)
1704  {
1705  result *= (amps_uint64_t)10;
1706  result += (amps_uint64_t)(data_[i] - '0');
1707  }
1708  }
1709  _body.get().sequenceNo = result;
1710  }
1711  VersionInfo serverVersion() const
1712  {
1713  return _body.get().serverVersion;
1714  }
1715  void setServerVersion(const char* data_, size_t len_)
1716  {
1717  if (data_)
1718  _body.get().serverVersion.setVersion(std::string(data_, len_));
1719  }
1720  bool responded()
1721  {
1722  return _body.get().responded;
1723  }
1724  void setResponded(bool responded_)
1725  {
1726  _body.get().responded = responded_;
1727  }
1728  bool abandoned()
1729  {
1730  return _body.get().abandoned;
1731  }
1732  void setAbandoned(bool abandoned_)
1733  {
1734  if (_body.isValid())
1735  _body.get().abandoned = abandoned_;
1736  }
1737 
1738  void setConnectionVersion(unsigned connectionVersion)
1739  {
1740  _body.get().connectionVersion = connectionVersion;
1741  }
1742 
1743  unsigned getConnectionVersion()
1744  {
1745  return _body.get().connectionVersion;
1746  }
1747  void setOptions(const char* data_, size_t len_)
1748  {
1749  if (data_) _body.get().options.assign(data_,len_);
1750  else _body.get().options.clear();
1751  }
1752 
1753  const std::string& options()
1754  {
1755  return _body.get().options;
1756  }
1757 
1758  AckResponse& operator=(const AckResponse& rhs)
1759  {
1760  _body = rhs._body;
1761  return *this;
1762  }
1763  };
1764 
1765 
1766  typedef std::map<std::string, AckResponse> AckMap;
1767  AckMap _acks;
1768  DefaultExceptionListener _defaultExceptionListener;
1769 protected:
1770 
1771  struct DeferredExecutionRequest
1772  {
1773  DeferredExecutionRequest(DeferredExecutionFunc func_,
1774  void* userData_)
1775  : _func(func_),
1776  _userData(userData_)
1777  {;}
1778 
1779  DeferredExecutionFunc _func;
1780  void* _userData;
1781  };
1782  const ExceptionListener* _exceptionListener;
1783  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1784  bool _connected;
1785  std::string _username;
1786  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1787  ConnectionStateListeners _connectionStateListeners;
1788  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1789  DeferredExecutionList _deferredExecutionList;
1790  unsigned _heartbeatInterval;
1791  unsigned _readTimeout;
1792 
1793  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1794  {
1795  // If we disconnectd before we got to notification, don't notify.
1796  // This should only be able to happen for Resubscribed, since the lock
1797  // is released to let the subscription manager run resubscribe so a
1798  // disconnect could be called before the change is broadcast.
1799  if (!_connected && newState_ > ConnectionStateListener::Connected)
1800  {
1801  return;
1802  }
1803  for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1804  {
1805  AMPS_CALL_EXCEPTION_WRAPPER(
1806  (*it)->connectionStateChanged(newState_));
1807  }
1808  }
1809  unsigned processedAck(Message& message);
1810  unsigned persistedAck(Message& meesage);
1811  void lastChance(Message& message);
1812  void checkAndSendHeartbeat(bool force=false);
1813  virtual ConnectionInfo getConnectionInfo() const;
1814  static amps_result
1815  ClientImplMessageHandler(amps_handle message, void* userData);
1816  static void
1817  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1818  static amps_result
1819  ClientImplDisconnectHandler(amps_handle client, void* userData);
1820 
1821  void unsubscribeInternal(const std::string& id)
1822  {
1823  // Lock is already acquired
1824  if (_subscriptionManager)
1825  {
1826  Message::Field subId;
1827  subId.assign(id.data(), id.length());
1828  // Have to unlock before calling into sub manager to avoid deadlock
1829  Unlock<Mutex> unlock(_lock);
1830  _subscriptionManager->unsubscribe(subId);
1831  }
1832  _message.reset();
1833  _message.setCommandEnum(Message::Command::Unsubscribe);
1834  _message.newCommandId();
1835  _message.setSubscriptionId(id);
1836  // remove the handler before sending the message, in case send fails.
1837  _routes.removeRoute(_message.getSubscriptionId());
1838  _sendWithoutRetry(_message);
1839  }
1840 
1841  AckResponse syncAckProcessing(long timeout_, Message& message_,
1842  bool isHASubscribe_)
1843  {
1844  return syncAckProcessing(timeout_, message_,
1845  (amps_uint64_t)0, isHASubscribe_);
1846  }
1847 
1848  AckResponse syncAckProcessing(long timeout_, Message& message_,
1849  amps_uint64_t haSeq = (amps_uint64_t)0,
1850  bool isHASubscribe_ = false)
1851  {
1852  // inv: we already have _lock locked up.
1853  AckResponse ack = AckResponse::create();
1854  _acks[message_.getCommandId()] = ack;
1855  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
1856  if (ack.getConnectionVersion() == 0)
1857  {
1858  // Send failed
1859  throw DisconnectedException("Connection closed while waiting for response.");
1860  }
1861  bool timedOut = false;
1862  AMPS_START_TIMER(timeout_)
1863  while(!timedOut && !ack.responded() && !ack.abandoned())
1864  {
1865  if (timeout_)
1866  {
1867  timedOut = !_lock.wait(timeout_);
1868  // May have woken up early, check real time
1869  if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1870  }
1871  else
1872  {
1873  // Using a timeout version to ensure python can interrupt
1874  _lock.wait(1000);
1875  amps_invoke_waiting_function();
1876  }
1877  }
1878  if (ack.responded())
1879  {
1880  if (ack.status() != "failure")
1881  {
1882  if (message_.getCommand() == "logon")
1883  {
1884  amps_uint64_t ackSequence = ack.sequenceNo();
1885  if (_lastSentHaSequenceNumber < ackSequence)
1886  {
1887  _lastSentHaSequenceNumber = ackSequence;
1888  }
1889  if (_publishStore.isValid())
1890  {
1891  _publishStore.discardUpTo(ackSequence);
1892  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
1893  {
1894  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
1895  }
1896  }
1897  _serverVersion = ack.serverVersion();
1898  if (_bookmarkStore.isValid())
1899  _bookmarkStore.setServerVersion(_serverVersion);
1900  }
1901  if(_ackBatchSize)
1902  {
1903  const std::string& options = ack.options();
1904  size_t index = options.find_first_of("max_backlog=");
1905  if(index != std::string::npos)
1906  {
1907  unsigned data =0;
1908  const char* c = options.c_str()+index+12;
1909  while(*c && *c!=',')
1910  {
1911  data = (data*10) + (unsigned)(*c++-48);
1912  }
1913  if(_ackBatchSize > data) _ackBatchSize = data;
1914  }
1915  }
1916  return ack;
1917  }
1918  const size_t NotEntitled = 12;
1919  std::string ackReason = ack.reason();
1920  if (ackReason.length() == 0) return ack; // none
1921  if (ackReason.length() == NotEntitled &&
1922  ackReason[0] == 'n' &&
1923  message_.getUserId().len() == 0)
1924  {
1925  message_.assignUserId(_username);
1926  }
1927  message_.throwFor(_client, ackReason);
1928  }
1929  else // !ack.responded()
1930  {
1931  if (!ack.abandoned())
1932  {
1933  throw TimedOutException("timed out waiting for operation.");
1934  }
1935  else
1936  {
1937  throw DisconnectedException("Connection closed while waiting for response.");
1938  }
1939  }
1940  return ack;
1941  }
1942 
1943  void _cleanup(void)
1944  {
1945  if (!_client) return;
1947  NULL,
1948  0L);
1949  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1950  _pEmptyMessageStream.reset(NULL);
1951  amps_client_destroy(_client);
1952  processDeferredExecutions();
1953  _client = NULL;
1954  }
1955 
1956 public:
1957 
1958  ClientImpl(const std::string& clientName)
1959  : _client(NULL), _name(clientName)
1960  , _isRetryOnDisconnect(true)
1961  , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1962  , _badTimeToHASubscribe(0), _serverVersion()
1963  , _queueAckTimeout(0)
1964  , _isAutoAckEnabled(false)
1965  , _ackBatchSize(0)
1966  , _queuedAckCount(0)
1967  , _defaultMaxDepth(0)
1968  , _connected(false)
1969  , _heartbeatInterval(0)
1970  , _readTimeout(0)
1971  {
1972  _replayer.setClient(this);
1973  _client = amps_client_create(clientName.c_str());
1974  amps_client_set_message_handler(_client, (amps_handler)ClientImpl::ClientImplMessageHandler, this);
1975  amps_client_set_predisconnect_handler(_client, (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler, this);
1976  amps_client_set_disconnect_handler(_client, (amps_handler)ClientImpl::ClientImplDisconnectHandler, this);
1977  _exceptionListener = &_defaultExceptionListener;
1978  for (size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1979  {
1980  _globalCommandTypeHandlers.push_back(MessageHandler());
1981  }
1982  }
1983 
1984 
1985  virtual ~ClientImpl()
1986  {
1987  _cleanup();
1988  }
1989 
1990 
1991  const std::string& getName() const
1992  {
1993  return _name;
1994  }
1995 
1996  void setName(const std::string& name)
1997  {
1998  // This operation will fail if the client's
1999  // name is already set.
2001  _client, name.c_str());
2002  if (result != AMPS_E_OK)
2003  {
2004  AMPSException::throwFor(_client, result);
2005  }
2006  _name = name;
2007  }
2008 
2009  const std::string& getLogonCorrelationData() const
2010  {
2011  return _logonCorrelationData;
2012  }
2013 
2014  void setLogonCorrelationData(const std::string& logonCorrelationData_)
2015  {
2016  _logonCorrelationData = logonCorrelationData_;
2017  }
2018 
2019  size_t getServerVersion() const
2020  {
2021  return _serverVersion.getOldStyleVersion();
2022  }
2023 
2024  VersionInfo getServerVersionInfo() const
2025  {
2026  return _serverVersion;
2027  }
2028 
2029  const std::string& getURI() const
2030  {
2031  Lock<Mutex> l(_lock);
2032  return _lastUri;
2033  }
2034 
2035  virtual void connect(const std::string& uri)
2036  {
2037  Lock<Mutex> l(_lock);
2038  _lastUri = uri;
2040  _client, uri.c_str());
2041  if (result != AMPS_E_OK)
2042  {
2043  AMPSException::throwFor(_client, result);
2044  }
2045  _message.reset();
2046  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2047  _publishMessage.setCommandEnum(Message::Command::Publish);
2048  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2049  _beatMessage.setOptions("beat");
2050  _readMessage.setClientImpl(this);
2051  if(_queueAckTimeout)
2052  {
2053  amps_client_set_idle_time(_client,_queueAckTimeout);
2054  }
2055  _connected = true;
2056  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2057  }
2058 
2059  void setDisconnected()
2060  {
2061  {
2062  Lock<Mutex> l(_lock);
2063  if (_connected)
2064  broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
2065  _connected = false;
2066  _routes.clear();
2067  _heartbeatTimer.setTimeout(0.0);
2068  }
2069  clearAcks(INT_MAX);
2070  }
2071 
2072  virtual void disconnect()
2073  {
2074  {
2075  Lock<Mutex> l(_lock);
2076  _message.reset();
2077  _message.setCommandEnum(Message::Command::Unsubscribe);
2078  _message.newCommandId();
2079  _message.setSubscriptionId("all");
2080  AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
2081  }
2082  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2083  setDisconnected();
2084  amps_client_disconnect(_client);
2085  Lock<Mutex> l(_lock);
2086  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2087  }
2088 
2089  void clearAcks(unsigned failedVersion)
2090  {
2091  // Have to lock to prevent race conditions
2092  Lock<Mutex> l(_lock);
2093  {
2094  // Go ahead and signal any waiters if they are around...
2095  std::list<std::string> worklist;
2096  for(AckMap::iterator i = _acks.begin(); i != _acks.end(); i++)
2097  {
2098  if (i->second.getConnectionVersion() <= failedVersion)
2099  {
2100  i->second.setAbandoned(true);
2101  worklist.push_back(i->first);
2102  }
2103  }
2104 
2105  for(std::list<std::string>::iterator j = worklist.begin(); j != worklist.end(); j++)
2106  {
2107  _acks.erase(*j);
2108  }
2109  }
2110 
2111  _lock.signalAll();
2112  }
2113 
2114  int send(const Message& message)
2115  {
2116  Lock<Mutex> l(_lock);
2117  return _send(message);
2118  }
2119 
2120  void sendWithoutRetry(const Message& message_)
2121  {
2122  Lock<Mutex> l(_lock);
2123  _sendWithoutRetry(message_);
2124  }
2125 
2126  void _sendWithoutRetry(const Message& message_)
2127  {
2128  amps_result result = amps_client_send(_client, message_.getMessage());
2129  if(result != AMPS_E_OK)
2130  {
2131  AMPSException::throwFor(_client,result);
2132  }
2133  }
2134 
2135  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2136  bool isHASubscribe_ = false)
2137  {
2138  // Lock is already acquired
2139  amps_result result = AMPS_E_RETRY;
2140 
2141  // Create a local reference to this message, as we'll need to hold on
2142  // to a reference to it in case reconnect occurs.
2143  Message localMessage = message;
2144  unsigned version = 0;
2145 
2146  while(result == AMPS_E_RETRY)
2147  {
2148  if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
2149  {
2150  // If retrySend is disabled, do not wait for the reconnect
2151  // to finish, just throw.
2152  if(!_isRetryOnDisconnect)
2153  {
2154  AMPSException::throwFor(_client,AMPS_E_RETRY);
2155  }
2156  Unlock<Mutex> l(_lock);
2157 #ifdef _WIN32
2158  Sleep(0);
2159 #elif defined(sun)
2160  sched_yield();
2161 #else
2162  pthread_yield();
2163 #endif
2164  }
2165  else
2166  {
2167  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2168  (isHASubscribe_ && _badTimeToHASubscribe != 0))
2169  {
2170  return (int)version;
2171  }
2172  // It's possible to get here out of order, but this way we'll
2173  // always send in order.
2174  if (haSeq > _lastSentHaSequenceNumber)
2175  {
2176  while (haSeq > _lastSentHaSequenceNumber + 1)
2177  {
2178  try
2179  {
2180  // Replayer updates _lastSentHaSsequenceNumber
2181  if (!_publishStore.replaySingle(_replayer,
2182  _lastSentHaSequenceNumber+1))
2183  {
2184  //++_lastSentHaSequenceNumber;
2185  continue;
2186  }
2187  result = AMPS_E_OK;
2188  version = _replayer._version;
2189  }
2190  #ifdef _WIN32
2191  catch(const DisconnectedException&)
2192  #else
2193  catch(const DisconnectedException& e)
2194  #endif
2195  {
2196  result = _replayer._res;
2197  break;
2198  }
2199  }
2200  result = amps_client_send_with_version(_client,
2201  localMessage.getMessage(),
2202  &version);
2203  ++_lastSentHaSequenceNumber;
2204  }
2205  else
2206  result = amps_client_send_with_version(_client,
2207  localMessage.getMessage(),
2208  &version);
2209  if (result != AMPS_E_OK)
2210  {
2211  if (!isHASubscribe_ && !haSeq &&
2212  localMessage.getMessage() == message.getMessage())
2213  {
2214  localMessage = message.deepCopy();
2215  }
2216  if(_isRetryOnDisconnect)
2217  {
2218  Unlock<Mutex> u(_lock);
2219  result = amps_client_attempt_reconnect(_client, version);
2220  // If this is an HA publish or subscrbie command, it was
2221  // stored first and will have already been replayed by the
2222  // store or sub manager after reconnect, so just return.
2223  if ((isHASubscribe_ || haSeq) &&
2224  result == AMPS_E_RETRY)
2225  {
2226  return (int)version;
2227  }
2228  }
2229  else
2230  {
2231  // retrySend is disabled so throw the error
2232  // from the send as an exception, do not retry.
2233  AMPSException::throwFor(_client, result);
2234  }
2235  }
2236  }
2237  if (result == AMPS_E_RETRY)
2238  amps_invoke_waiting_function();
2239  }
2240 
2241  if (result != AMPS_E_OK) AMPSException::throwFor(_client, result);
2242  return (int)version;
2243  }
2244 
2245  void addMessageHandler(const Field& commandId_,
2246  const AMPS::MessageHandler& messageHandler_,
2247  unsigned requestedAcks_, bool isSubscribe_)
2248  {
2249  Lock<Mutex> lock(_lock);
2250  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2251  0, isSubscribe_);
2252  }
2253 
2254  bool removeMessageHandler(const Field& commandId_)
2255  {
2256  Lock<Mutex> lock(_lock);
2257  return _routes.removeRoute(commandId_);
2258  }
2259 
2260  std::string send(MessageHandler messageHandler_, Message& message_, int timeout_ = 0)
2261  {
2262  Field id = message_.getCommandId();
2263  bool isSubscribe = false;
2264  unsigned requestedAcks = message_.getAckTypeEnum();
2265  unsigned systemAddedAcks = Message::AckType::None;
2266  switch(message_.getCommandEnum())
2267  {
2268  case Message::Command::Subscribe:
2269  case Message::Command::DeltaSubscribe:
2270  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2271  {
2272  systemAddedAcks |= Message::AckType::Persisted;
2273  }
2274  // fall through
2275  case Message::Command::SOWAndSubscribe:
2276  case Message::Command::SOWAndDeltaSubscribe:
2277  if (id.empty())
2278  {
2279  message_.newCommandId();
2280  id = message_.getCommandId();
2281  }
2282  if (message_.getSubscriptionId().empty())
2283  {
2284  message_.setSubscriptionId(id);
2285  }
2286  isSubscribe = true;
2287  // fall through
2288  case Message::Command::SOW:
2289  if(id.empty())
2290  {
2291  message_.newCommandId();
2292  id = message_.getCommandId();
2293  }
2294  if (message_.getQueryID().empty())
2295  {
2296  message_.setQueryID(id);
2297  }
2298  systemAddedAcks |= Message::AckType::Processed;
2299  // for SOW only, we get a completed ack so we know when to remove the handler.
2300  if (!isSubscribe) systemAddedAcks |= Message::AckType::Completed;
2301  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2302  {
2303  Lock<Mutex> l(_lock);
2304  _routes.addRoute(message_.getQueryID(), messageHandler_, requestedAcks,
2305  systemAddedAcks, isSubscribe);
2306  if (!message_.getSubscriptionId().empty() &&
2307  message_.getQueryID() != message_.getSubscriptionId() &&
2308  messageHandler_.isValid() &&
2309  !_routes.hasRoute(message_.getSubscriptionId()))
2310  {
2311  _routes.addRoute(message_.getSubscriptionId(), messageHandler_,
2312  requestedAcks, systemAddedAcks, true);
2313  }
2314  try
2315  {
2316  // We aren't adding to subscription manager, so this isn't
2317  // an HA subscribe.
2318  syncAckProcessing(timeout_, message_, 0, false);
2319  message_.setAckTypeEnum(requestedAcks);
2320  }
2321  catch (...)
2322  {
2323  _routes.removeRoute(message_.getQueryID());
2324  _routes.removeRoute(message_.getSubscriptionId());
2325  _routes.removeRoute(id);
2326  message_.setAckTypeEnum(requestedAcks);
2327  throw;
2328  }
2329  }
2330  break;
2331  // These are valid commands that are used as-is
2332  case Message::Command::Unsubscribe:
2333  case Message::Command::Heartbeat:
2334  case Message::Command::Logon:
2335  case Message::Command::StartTimer:
2336  case Message::Command::StopTimer:
2337  case Message::Command::DeltaPublish:
2338  case Message::Command::Publish:
2339  case Message::Command::SOWDelete:
2340  {
2341  Lock<Mutex> l(_lock);
2342  // if an ack is requested, it'll need a command ID.
2343  if (message_.getAckTypeEnum() != Message::AckType::None)
2344  {
2345  if (id.empty())
2346  {
2347  message_.newCommandId();
2348  id = message_.getCommandId();
2349  }
2350  if (messageHandler_.isValid())
2351  {
2352  _routes.addRoute(id, messageHandler_, requestedAcks,
2353  Message::AckType::None, false);
2354  }
2355  }
2356  _send(message_);
2357  }
2358  break;
2359  // These are things that shouldn't be sent (not meaningful)
2360  case Message::Command::GroupBegin:
2361  case Message::Command::GroupEnd:
2362  case Message::Command::OOF:
2363  case Message::Command::Ack:
2364  case Message::Command::Unknown:
2365  default:
2366  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2367  }
2368  message_.setAckTypeEnum(requestedAcks);
2369  return id;
2370  }
2371 
2372  void setDisconnectHandler(DisconnectHandler disconnectHandler)
2373  {
2374  Lock<Mutex> l(_lock);
2375  _disconnectHandler = disconnectHandler;
2376  }
2377 
2378  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2379  {
2380  switch (command_[0])
2381  {
2382 #if 0 // Not currently implemented to avoid an extra branch in delivery
2383  case 'p':
2384  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2385  break;
2386  case 's':
2387  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2388  break;
2389 #endif
2390  case 'h':
2391  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2392  break;
2393 #if 0 // Not currently implemented to avoid an extra branch in delivery
2394  case 'g':
2395  if (command_[6] == 'b')
2396  {
2397  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2398  }
2399  else if (command_[6] == 'e')
2400  {
2401  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2402  }
2403  else
2404  {
2405  std::ostringstream os;
2406  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2407  throw CommandException(os.str());
2408  }
2409  break;
2410  case 'o':
2411  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2412  break;
2413 #endif
2414  case 'a':
2415  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2416  break;
2417  case 'l':
2418  case 'L':
2419  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2420  break;
2421  case 'd':
2422  case 'D':
2423  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2424  break;
2425  default:
2426  std::ostringstream os;
2427  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2428  throw CommandException(os.str());
2429  break;
2430  }
2431  }
2432 
2433  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2434  {
2435  switch (command_)
2436  {
2437 #if 0 // Not currently implemented to avoid an extra branch in delivery
2438  case Message::Command::Publish:
2439  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2440  break;
2441  case Message::Command::SOW:
2442  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2443  break;
2444 #endif
2445  case Message::Command::Heartbeat:
2446  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2447  break;
2448 #if 0 // Not currently implemented to avoid an extra branch in delivery
2449  case Message::Command::GroupBegin:
2450  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2451  break;
2452  case Message::Command::GroupEnd:
2453  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2454  break;
2455  case Message::Command::OOF:
2456  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2457  break;
2458 #endif
2459  case Message::Command::Ack:
2460  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2461  break;
2462  default:
2463  unsigned bits = 0;
2464  unsigned command = command_;
2465  while (command > 0) { ++bits; command >>= 1; }
2466  char errBuf[128];
2467  AMPS_snprintf(errBuf, sizeof(errBuf),
2468  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2469  CommandConstants<0>::Lengths[bits],
2470  CommandConstants<0>::Values[bits]);
2471  throw CommandException(errBuf);
2472  break;
2473  }
2474  }
2475 
2476  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2477  {
2478  _globalCommandTypeHandlers[handlerType_] = handler_;
2479  }
2480 
2481  void setFailedWriteHandler(FailedWriteHandler* handler_)
2482  {
2483  Lock<Mutex> l(_lock);
2484  _failedWriteHandler.reset(handler_);
2485  }
2486 
2487  void setPublishStore(const Store& publishStore_)
2488  {
2489  Lock<Mutex> l(_lock);
2490  if (_connected) throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2491  _publishStore = publishStore_;
2492  }
2493 
2494  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2495  {
2496  Lock<Mutex> l(_lock);
2497  if (_connected) throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2498  _bookmarkStore = bookmarkStore_;
2499  }
2500 
2501  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2502  {
2503  Lock<Mutex> l(_lock);
2504  _subscriptionManager.reset(subscriptionManager_);
2505  }
2506 
2507  SubscriptionManager* getSubscriptionManager()
2508  {
2509  Lock<Mutex> l(_lock);
2510  return _subscriptionManager.get();
2511  }
2512 
2513  DisconnectHandler getDisconnectHandler()
2514  {
2515  Lock<Mutex> l(_lock);
2516  return _disconnectHandler;
2517  }
2518 
2519  MessageHandler getDuplicateMessageHandler()
2520  {
2521  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2522  }
2523 
2524  FailedWriteHandler* getFailedWriteHandler()
2525  {
2526  Lock<Mutex> l(_lock);
2527  return _failedWriteHandler.get();
2528  }
2529 
2530  Store getPublishStore()
2531  {
2532  Lock<Mutex> l(_lock);
2533  return _publishStore;
2534  }
2535 
2536  BookmarkStore getBookmarkStore()
2537  {
2538  Lock<Mutex> l(_lock);
2539  return _bookmarkStore;
2540  }
2541 
2542  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,size_t dataLen_)
2543  {
2544  if (!_publishStore.isValid())
2545  {
2546  Lock<Mutex> l(_lock);
2547  _publishMessage.assignTopic(topic_, topicLen_);
2548  _publishMessage.assignData(data_, dataLen_);
2549  _send(_publishMessage);
2550  return 0;
2551  }
2552  else
2553  {
2554  if (!publishStoreMessage)
2555  {
2556  publishStoreMessage = new Message();
2557  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2558  }
2559  publishStoreMessage->reset();
2560  publishStoreMessage->setCommandEnum(Message::Command::Publish);
2561  return _publish(topic_, topicLen_, data_, dataLen_);
2562  }
2563  }
2564 
2565  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2566  size_t dataLen_, unsigned long expiration_)
2567  {
2568  if (!_publishStore.isValid())
2569  {
2570  Lock<Mutex> l(_lock);
2571  _publishMessage.assignTopic(topic_, topicLen_);
2572  _publishMessage.assignData(data_, dataLen_);
2573  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2574  size_t pos = convertToCharArray(exprBuf, expiration_);
2575  _publishMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2576  _send(_publishMessage);
2577  _publishMessage.assignExpiration(NULL, 0);
2578  return 0;
2579  }
2580  else
2581  {
2582  if (!publishStoreMessage)
2583  {
2584  publishStoreMessage = new Message();
2585  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2586  }
2587  publishStoreMessage->reset();
2588  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2589  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2590  publishStoreMessage->setCommandEnum(Message::Command::Publish)
2591  .assignExpiration(exprBuf+exprPos,
2592  AMPS_NUMBER_BUFFER_LEN-exprPos);
2593  return _publish(topic_, topicLen_, data_, dataLen_);
2594  }
2595  }
2596 
2597  void publishFlush(long timeout_)
2598  {
2599  static const char* processed = "processed";
2600  static const size_t processedLen = strlen(processed);
2601  if (!_publishStore.isValid())
2602  {
2603  if (_serverVersion.getOldStyleVersion() >= AMPS_FLUSH_MIN_VERSION)
2604  {
2605  Lock<Mutex> l(_lock);
2606  _message.reset();
2607  _message.newCommandId();
2608  _message.assignAckType(processed, processedLen);
2609  static const char* flush = "flush";
2610  static const size_t flushLen = strlen(flush);
2611  _message.assignCommand(flush, flushLen);
2612  try
2613  {
2614  syncAckProcessing(timeout_, _message);
2615  }
2616  catch(const AMPSException& ex)
2617  {
2618  AMPS_UNHANDLED_EXCEPTION(ex);
2619  throw;
2620  }
2621  }
2622  else
2623  {
2624  if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2625  else { AMPS_USLEEP(1000 * 1000); }
2626  return;
2627  }
2628  }
2629  else
2630  {
2631  try
2632  {
2633  _publishStore.flush(timeout_);
2634  }
2635  catch (const AMPSException& ex)
2636  {
2637  AMPS_UNHANDLED_EXCEPTION(ex);
2638  throw;
2639  }
2640  }
2641  }
2642 
2643  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2644  const char* data_, size_t dataLength_)
2645  {
2646  if (!_publishStore.isValid())
2647  {
2648  Lock<Mutex> l(_lock);
2649  _deltaMessage.assignTopic(topic_, topicLength_);
2650  _deltaMessage.assignData(data_, dataLength_);
2651  _send(_deltaMessage);
2652  return 0;
2653  }
2654  else
2655  {
2656  if (!publishStoreMessage)
2657  {
2658  publishStoreMessage = new Message();
2659  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2660  }
2661  publishStoreMessage->reset();
2662  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2663  return _publish(topic_, topicLength_, data_, dataLength_);
2664  }
2665  }
2666 
2667  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2668  const char* data_, size_t dataLength_,
2669  unsigned long expiration_)
2670  {
2671  if (!_publishStore.isValid())
2672  {
2673  Lock<Mutex> l(_lock);
2674  _deltaMessage.assignTopic(topic_, topicLength_);
2675  _deltaMessage.assignData(data_, dataLength_);
2676  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2677  size_t pos = convertToCharArray(exprBuf, expiration_);
2678  _deltaMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2679  _send(_deltaMessage);
2680  _deltaMessage.assignExpiration(NULL, 0);
2681  return 0;
2682  }
2683  else
2684  {
2685  if (!publishStoreMessage)
2686  {
2687  publishStoreMessage = new Message();
2688  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2689  }
2690  publishStoreMessage->reset();
2691  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2692  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2693  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2694  .assignExpiration(exprBuf+exprPos,
2695  AMPS_NUMBER_BUFFER_LEN-exprPos);
2696  return _publish(topic_, topicLength_, data_, dataLength_);
2697  }
2698  }
2699 
2700  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
2701  const char* data_, size_t dataLength_)
2702  {
2703  publishStoreMessage->assignTopic(topic_, topicLength_)
2704  .setAckTypeEnum(Message::AckType::Persisted)
2705  .assignData(data_, dataLength_);
2706  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
2707  char buf[AMPS_NUMBER_BUFFER_LEN];
2708  size_t pos = convertToCharArray(buf, haSequenceNumber);
2709  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2710  {
2711  Lock<Mutex> l(_lock);
2712  _send(*publishStoreMessage, haSequenceNumber);
2713  }
2714  return haSequenceNumber;
2715  }
2716 
2717  virtual std::string logon(long timeout_, Authenticator& authenticator_,
2718  const char* options_ = NULL)
2719  {
2720  Lock<Mutex> l(_lock);
2721  AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2722  _message.reset();
2723  _message.setCommandEnum(Message::Command::Logon);
2724  _message.newCommandId();
2725  std::string newCommandId = _message.getCommandId();
2726  _message.setClientName(_name);
2727 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
2728  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
2729  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2730 #endif
2731  URI uri(_lastUri);
2732  if(uri.user().size()) _message.setUserId(uri.user());
2733  if(uri.password().size()) _message.setPassword(uri.password());
2734  if(uri.protocol() == "amps" && uri.messageType().size())
2735  {
2736  _message.setMessageType(uri.messageType());
2737  }
2738  if(uri.isTrue("pretty"))
2739  {
2740  _message.setOptions("pretty");
2741  }
2742 
2743  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
2744  if (!_logonCorrelationData.empty())
2745  {
2746  _message.assignCorrelationId(_logonCorrelationData);
2747  }
2748  if (options_)
2749  {
2750  _message.setOptions(options_);
2751  }
2752  _username = _message.getUserId();
2753  try
2754  {
2755  while(true)
2756  {
2757  _message.setAckTypeEnum(Message::AckType::Processed);
2758  AckResponse ack = syncAckProcessing(timeout_, _message);
2759  if (ack.status() == "retry")
2760  {
2761  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
2762  _username = ack.username();
2763  _message.setUserId(_username);
2764  }
2765  else
2766  {
2767  authenticator_.completed(ack.username(), ack.password(), ack.reason());
2768  break;
2769  }
2770  }
2771  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2772 
2773  // Now re-send the heartbeat command if configured
2774  _sendHeartbeat();
2775  }
2776  catch(const AMPSException& ex)
2777  {
2778  AMPS_UNHANDLED_EXCEPTION(ex);
2779  throw;
2780  }
2781  catch(...)
2782  {
2783  throw;
2784  }
2785 
2786  if (_publishStore.isValid())
2787  {
2788  try
2789  {
2790  _publishStore.replay(_replayer);
2791  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2792  }
2793  catch(const StoreException& ex)
2794  {
2795  std::ostringstream os;
2796  os << "A local store exception occurred while logging on."
2797  << ex.toString();
2798  throw ConnectionException(os.str());
2799  }
2800  catch(const AMPSException& ex)
2801  {
2802  AMPS_UNHANDLED_EXCEPTION(ex);
2803  throw ex;
2804  }
2805  catch(const std::exception& ex)
2806  {
2807  AMPS_UNHANDLED_EXCEPTION(ex);
2808  throw ex;
2809  }
2810  catch(...)
2811  {
2812  throw;
2813  }
2814  }
2815  return newCommandId;
2816  }
2817 
2818  std::string subscribe(MessageHandler messageHandler_,
2819  const std::string& topic_,
2820  long timeout_,
2821  const std::string& filter_,
2822  const std::string& bookmark_,
2823  const std::string& options_,
2824  const std::string& subId_,
2825  bool isHASubscribe_ = true)
2826  {
2827  isHASubscribe_ &= (bool)_subscriptionManager;
2828  Lock<Mutex> l(_lock);
2829  _message.reset();
2830  _message.setCommandEnum(Message::Command::Subscribe);
2831  _message.newCommandId();
2832  std::string subId(subId_);
2833  if (subId.empty())
2834  {
2835  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2836  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
2837 
2838  subId = _message.getCommandId();
2839  }
2840  _message.setSubscriptionId(subId);
2841  // we need to deep copy this before sending the message; while we are
2842  // waiting for a response, the fields in _message may get blown away for
2843  // other operations.
2844  AMPS::Message::Field subIdField(subId);
2845  unsigned ackTypes = Message::AckType::Processed;
2846 
2847  if (!bookmark_.empty() && _bookmarkStore.isValid())
2848  {
2849  ackTypes |= Message::AckType::Persisted;
2850  }
2851  _message.setTopic(topic_);
2852 
2853  if (filter_.length()) _message.setFilter(filter_);
2854  if (bookmark_.length())
2855  {
2856  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2857  {
2858  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2859  _message.setBookmark(mostRecent);
2860  }
2861  else
2862  {
2863  _message.setBookmark(bookmark_);
2864  if (_bookmarkStore.isValid())
2865  {
2866  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2867  bookmark_ != AMPS_BOOKMARK_EPOCH)
2868  {
2869  _bookmarkStore.log(_message);
2870  _bookmarkStore.discard(_message);
2871  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2872  }
2873  }
2874  }
2875  }
2876  if (options_.length()) _message.setOptions(options_);
2877 
2878  Message message = _message;
2879  if (isHASubscribe_)
2880  {
2881  message = _message.deepCopy();
2882  Unlock<Mutex> u(_lock);
2883  _subscriptionManager->subscribe(messageHandler_, message,
2884  Message::AckType::None);
2885  if (_badTimeToHASubscribe) return subId;
2886  }
2887  if (!_routes.hasRoute(_message.getSubscriptionId()))
2888  {
2889  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2890  Message::AckType::None, ackTypes, true);
2891  }
2892  message.setAckTypeEnum(ackTypes);
2893  if (!options_.empty()) message.setOptions(options_);
2894  try
2895  {
2896  syncAckProcessing(timeout_, message, isHASubscribe_);
2897  }
2898  catch (const DisconnectedException&)
2899  {
2900  if (!isHASubscribe_)
2901  {
2902  _routes.removeRoute(subIdField);
2903  throw;
2904  }
2905  else
2906  {
2907  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2908  throw;
2909  }
2910  }
2911  catch (const TimedOutException&)
2912  {
2913  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2914  throw;
2915  }
2916  catch (...)
2917  {
2918  if (isHASubscribe_)
2919  {
2920  // Have to unlock before calling into sub manager to avoid deadlock
2921  Unlock<Mutex> unlock(_lock);
2922  _subscriptionManager->unsubscribe(subIdField);
2923  }
2924  _routes.removeRoute(subIdField);
2925  throw;
2926  }
2927 
2928  return subId;
2929  }
2930  std::string deltaSubscribe(MessageHandler messageHandler_,
2931  const std::string& topic_,
2932  long timeout_,
2933  const std::string& filter_,
2934  const std::string& bookmark_,
2935  const std::string& options_,
2936  const std::string& subId_ = "",
2937  bool isHASubscribe_ = true)
2938  {
2939  isHASubscribe_ &= (bool)_subscriptionManager;
2940  Lock<Mutex> l(_lock);
2941  _message.reset();
2942  _message.setCommandEnum(Message::Command::DeltaSubscribe);
2943  _message.newCommandId();
2944  std::string subId(subId_);
2945  if (subId.empty())
2946  {
2947  subId = _message.getCommandId();
2948  }
2949  _message.setSubscriptionId(subId);
2950  // we need to deep copy this before sending the message; while we are
2951  // waiting for a response, the fields in _message may get blown away for
2952  // other operations.
2953  AMPS::Message::Field subIdField(subId);
2954  unsigned ackTypes = Message::AckType::Processed;
2955 
2956  if (!bookmark_.empty() && _bookmarkStore.isValid())
2957  {
2958  ackTypes |= Message::AckType::Persisted;
2959  }
2960  _message.setTopic(topic_);
2961  if (filter_.length()) _message.setFilter(filter_);
2962  if (bookmark_.length())
2963  {
2964  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2965  {
2966  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2967  _message.setBookmark(mostRecent);
2968  }
2969  else
2970  {
2971  _message.setBookmark(bookmark_);
2972  if (_bookmarkStore.isValid())
2973  {
2974  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2975  bookmark_ != AMPS_BOOKMARK_EPOCH)
2976  {
2977  _bookmarkStore.log(_message);
2978  _bookmarkStore.discard(_message);
2979  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2980  }
2981  }
2982  }
2983  }
2984  if (options_.length()) _message.setOptions(options_);
2985  Message message = _message;
2986  if (isHASubscribe_)
2987  {
2988  message = _message.deepCopy();
2989  Unlock<Mutex> u(_lock);
2990  _subscriptionManager->subscribe(messageHandler_, message,
2991  Message::AckType::None);
2992  if (_badTimeToHASubscribe) return subId;
2993  }
2994  if (!_routes.hasRoute(_message.getSubscriptionId()))
2995  {
2996  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2997  Message::AckType::None, ackTypes, true);
2998  }
2999  message.setAckTypeEnum(ackTypes);
3000  if (!options_.empty()) message.setOptions(options_);
3001  try
3002  {
3003  syncAckProcessing(timeout_, message, isHASubscribe_);
3004  }
3005  catch (const DisconnectedException&)
3006  {
3007  if (!isHASubscribe_)
3008  {
3009  _routes.removeRoute(subIdField);
3010  throw;
3011  }
3012  }
3013  catch (const TimedOutException&)
3014  {
3015  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3016  throw;
3017  }
3018  catch (...)
3019  {
3020  if (isHASubscribe_)
3021  {
3022  // Have to unlock before calling into sub manager to avoid deadlock
3023  Unlock<Mutex> unlock(_lock);
3024  _subscriptionManager->unsubscribe(subIdField);
3025  }
3026  _routes.removeRoute(subIdField);
3027  throw;
3028  }
3029  return subId;
3030  }
3031 
3032  void unsubscribe(const std::string& id)
3033  {
3034  Lock<Mutex> l(_lock);
3035  unsubscribeInternal(id);
3036  }
3037 
3038  void unsubscribe(void)
3039  {
3040  if (_subscriptionManager)
3041  {
3042  _subscriptionManager->clear();
3043  }
3044  {
3045  _routes.unsubscribeAll();
3046  Lock<Mutex> l(_lock);
3047  _message.reset();
3048  _message.setCommandEnum(Message::Command::Unsubscribe);
3049  _message.newCommandId();
3050  _message.setSubscriptionId("all");
3051  _sendWithoutRetry(_message);
3052  }
3053  }
3054 
3055  std::string sow(MessageHandler messageHandler_,
3056  const std::string& topic_,
3057  const std::string& filter_ = "",
3058  const std::string& orderBy_ = "",
3059  const std::string& bookmark_ = "",
3060  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3061  int topN_ = AMPS_DEFAULT_TOP_N,
3062  const std::string& options_ = "",
3063  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3064  {
3065  Lock<Mutex> l(_lock);
3066  _message.reset();
3067  _message.setCommandEnum(Message::Command::SOW);
3068  _message.newCommandId();
3069  // need to keep our own copy of the command ID.
3070  std::string commandId = _message.getCommandId();
3071  _message.setQueryID(_message.getCommandId());
3072  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3073  _message.setAckTypeEnum(ackTypes);
3074  _message.setTopic(topic_);
3075  if (filter_.length()) _message.setFilter(filter_);
3076  if (orderBy_.length()) _message.setOrderBy(orderBy_);
3077  if (bookmark_.length()) _message.setBookmark(bookmark_);
3078  _message.setBatchSize(AMPS::asString(batchSize_));
3079  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
3080  if (options_.length()) _message.setOptions(options_);
3081 
3082  _routes.addRoute(_message.getQueryID(), messageHandler_,
3083  Message::AckType::None, ackTypes, false);
3084 
3085  try
3086  {
3087  syncAckProcessing(timeout_, _message);
3088  }
3089  catch (...)
3090  {
3091  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3092  throw;
3093  }
3094 
3095  return commandId;
3096  }
3097 
3098  std::string sow(MessageHandler messageHandler_,
3099  const std::string& topic_,
3100  long timeout_,
3101  const std::string& filter_ = "",
3102  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3103  int topN_ = AMPS_DEFAULT_TOP_N)
3104  {
3105  std::string notSet;
3106  return sow(messageHandler_,
3107  topic_,
3108  filter_,
3109  notSet, // orderBy
3110  notSet, // bookmark
3111  batchSize_,
3112  topN_,
3113  notSet,
3114  timeout_);
3115  }
3116 
3117  std::string sowAndSubscribe(MessageHandler messageHandler_,
3118  const std::string& topic_,
3119  const std::string& filter_ = "",
3120  const std::string& orderBy_ = "",
3121  const std::string& bookmark_ = "",
3122  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3123  int topN_ = AMPS_DEFAULT_TOP_N,
3124  const std::string& options_ = "",
3125  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3126  bool isHASubscribe_ = true)
3127  {
3128  isHASubscribe_ &= (bool)_subscriptionManager;
3129  Lock<Mutex> l(_lock);
3130  _message.reset();
3131  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3132  _message.newCommandId();
3133  Field cid = _message.getCommandId();
3134  _message.setQueryID(cid);
3135  _message.setSubscriptionId(cid);
3136  std::string subId = cid;
3137  _message.setTopic(topic_);
3138  if (filter_.length()) _message.setFilter(filter_);
3139  if (orderBy_.length()) _message.setOrderBy(orderBy_);
3140  if (bookmark_.length()) _message.setBookmark(bookmark_);
3141  _message.setBatchSize(AMPS::asString(batchSize_));
3142  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
3143  if (options_.length()) _message.setOptions(options_);
3144 
3145  Message message = _message;
3146  if (isHASubscribe_)
3147  {
3148  message = _message.deepCopy();
3149  Unlock<Mutex> u(_lock);
3150  _subscriptionManager->subscribe(messageHandler_, message,
3151  Message::AckType::None);
3152  if (_badTimeToHASubscribe) return subId;
3153  }
3154  if (!_routes.hasRoute(cid))
3155  {
3156  _routes.addRoute(cid, messageHandler_,
3157  Message::AckType::None, Message::AckType::Processed, true);
3158  }
3159  message.setAckTypeEnum(Message::AckType::Processed);
3160  if (!options_.empty()) message.setOptions(options_);
3161  try
3162  {
3163  syncAckProcessing(timeout_, message, isHASubscribe_);
3164  }
3165  catch (const DisconnectedException&)
3166  {
3167  if (!isHASubscribe_)
3168  {
3169  _routes.removeRoute(subId);
3170  throw;
3171  }
3172  }
3173  catch (const TimedOutException&)
3174  {
3175  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3176  throw;
3177  }
3178  catch (...)
3179  {
3180  if (isHASubscribe_)
3181  {
3182  // Have to unlock before calling into sub manager to avoid deadlock
3183  Unlock<Mutex> unlock(_lock);
3184  _subscriptionManager->unsubscribe(cid);
3185  }
3186  _routes.removeRoute(subId);
3187  throw;
3188  }
3189  return subId;
3190  }
3191 
3192  std::string sowAndSubscribe(MessageHandler messageHandler_,
3193  const std::string& topic_,
3194  long timeout_,
3195  const std::string& filter_ = "",
3196  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3197  bool oofEnabled_ = false,
3198  int topN_ = AMPS_DEFAULT_TOP_N,
3199  bool isHASubscribe_ = true)
3200  {
3201  std::string notSet;
3202  return sowAndSubscribe(messageHandler_,
3203  topic_,
3204  filter_,
3205  notSet, // orderBy
3206  notSet, // bookmark
3207  batchSize_,
3208  topN_,
3209  (oofEnabled_ ? "oof" : ""),
3210  timeout_,
3211  isHASubscribe_);
3212  }
3213 
3214  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
3215  const std::string& topic_,
3216  const std::string& filter_ = "",
3217  const std::string& orderBy_ = "",
3218  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3219  int topN_ = AMPS_DEFAULT_TOP_N,
3220  const std::string& options_ = "",
3221  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3222  bool isHASubscribe_ = true)
3223  {
3224  isHASubscribe_ &= (bool)_subscriptionManager;
3225  Lock<Mutex> l(_lock);
3226  _message.reset();
3227  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3228  _message.newCommandId();
3229  _message.setQueryID(_message.getCommandId());
3230  _message.setSubscriptionId(_message.getCommandId());
3231  std::string subId = _message.getSubscriptionId();
3232  _message.setTopic(topic_);
3233  if (filter_.length()) _message.setFilter(filter_);
3234  if (orderBy_.length()) _message.setOrderBy(orderBy_);
3235  _message.setBatchSize(AMPS::asString(batchSize_));
3236  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
3237  if (options_.length()) _message.setOptions(options_);
3238  Message message = _message;
3239  if (isHASubscribe_)
3240  {
3241  message = _message.deepCopy();
3242  Unlock<Mutex> u(_lock);
3243  _subscriptionManager->subscribe(messageHandler_, message,
3244  Message::AckType::None);
3245  if (_badTimeToHASubscribe) return subId;
3246  }
3247  if (!_routes.hasRoute(message.getSubscriptionId()))
3248  {
3249  _routes.addRoute(message.getQueryID(), messageHandler_,
3250  Message::AckType::None, Message::AckType::Processed, true);
3251  }
3252  message.setAckTypeEnum(Message::AckType::Processed);
3253  if (!options_.empty()) message.setOptions(options_);
3254  try
3255  {
3256  syncAckProcessing(timeout_, message, isHASubscribe_);
3257  }
3258  catch (const DisconnectedException&)
3259  {
3260  if (!isHASubscribe_)
3261  {
3262  _routes.removeRoute(subId);
3263  throw;
3264  }
3265  }
3266  catch (const TimedOutException&)
3267  {
3268  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3269  throw;
3270  }
3271  catch (...)
3272  {
3273  if (isHASubscribe_)
3274  {
3275  // Have to unlock before calling into sub manager to avoid deadlock
3276  Unlock<Mutex> unlock(_lock);
3277  _subscriptionManager->unsubscribe(Field(subId));
3278  }
3279  _routes.removeRoute(subId);
3280  throw;
3281  }
3282  return subId;
3283  }
3284 
3285  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
3286  const std::string& topic_,
3287  long timeout_,
3288  const std::string& filter_ = "",
3289  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3290  bool oofEnabled_ = false,
3291  bool sendEmpties_ = false,
3292  int topN_ = AMPS_DEFAULT_TOP_N,
3293  bool isHASubscribe_ = true)
3294  {
3295  std::string notSet;
3296  Message::Options options;
3297  if (oofEnabled_) options.setOOF();
3298  if (sendEmpties_ == false) options.setNoEmpties();
3299  return sowAndDeltaSubscribe(messageHandler_,
3300  topic_,
3301  filter_,
3302  notSet, // orderBy
3303  batchSize_,
3304  topN_,
3305  options,
3306  timeout_,
3307  isHASubscribe_);
3308  }
3309 
3310  std::string sowDelete(MessageHandler messageHandler_,
3311  const std::string& topic_,
3312  const std::string& filter_,
3313  long timeout_,
3314  Message::Field commandId_ = Message::Field())
3315  {
3316  if (_publishStore.isValid())
3317  {
3318  unsigned ackType = Message::AckType::Processed |
3319  Message::AckType::Stats |
3320  Message::AckType::Persisted;
3321  if (!publishStoreMessage)
3322  {
3323  publishStoreMessage = new Message();
3324  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3325  }
3326  publishStoreMessage->reset();
3327  if (commandId_.empty())
3328  {
3329  publishStoreMessage->newCommandId();
3330  commandId_ = publishStoreMessage->getCommandId();
3331  }
3332  else
3333  {
3334  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3335  }
3336  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3337  .assignSubscriptionId(commandId_.data(), commandId_.len())
3338  .assignQueryID(commandId_.data(), commandId_.len())
3339  .setAckTypeEnum(ackType)
3340  .assignTopic(topic_.c_str(), topic_.length())
3341  .assignFilter(filter_.c_str(), filter_.length());
3342  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3343  char buf[AMPS_NUMBER_BUFFER_LEN];
3344  size_t pos = convertToCharArray(buf, haSequenceNumber);
3345  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3346  {
3347  try
3348  {
3349  Lock<Mutex> l(_lock);
3350  _routes.addRoute(commandId_, messageHandler_,
3351  Message::AckType::Stats,
3352  Message::AckType::Processed|Message::AckType::Persisted,
3353  false);
3354  syncAckProcessing(timeout_, *publishStoreMessage,
3355  haSequenceNumber);
3356  }
3357  catch (const DisconnectedException&)
3358  {
3359  // Pass - it will get replayed upon reconnect
3360  }
3361  catch (...)
3362  {
3363  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3364  throw;
3365  }
3366  }
3367  return (std::string)commandId_;
3368  }
3369  else
3370  {
3371  Lock<Mutex> l(_lock);
3372  _message.reset();
3373  if (commandId_.empty())
3374  {
3375  _message.newCommandId();
3376  commandId_ = _message.getCommandId();
3377  }
3378  else
3379  {
3380  _message.setCommandId(commandId_.data(), commandId_.len());
3381  }
3382  _message.setCommandEnum(Message::Command::SOWDelete)
3383  .assignSubscriptionId(commandId_.data(), commandId_.len())
3384  .assignQueryID(commandId_.data(), commandId_.len())
3385  .setAckTypeEnum(Message::AckType::Processed |
3386  Message::AckType::Stats)
3387  .assignTopic(topic_.c_str(), topic_.length())
3388  .assignFilter(filter_.c_str(), filter_.length());
3389  _routes.addRoute(commandId_, messageHandler_,
3390  Message::AckType::Stats,
3391  Message::AckType::Processed,
3392  false);
3393  try
3394  {
3395  syncAckProcessing(timeout_, _message);
3396  }
3397  catch (...)
3398  {
3399  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3400  throw;
3401  }
3402  return (std::string)commandId_;
3403  }
3404  }
3405 
3406  std::string sowDeleteByData(MessageHandler messageHandler_,
3407  const std::string& topic_,
3408  const std::string& data_,
3409  long timeout_,
3410  Message::Field commandId_ = Message::Field())
3411  {
3412  if (_publishStore.isValid())
3413  {
3414  unsigned ackType = Message::AckType::Processed |
3415  Message::AckType::Stats |
3416  Message::AckType::Persisted;
3417  if (!publishStoreMessage)
3418  {
3419  publishStoreMessage = new Message();
3420  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3421  }
3422  publishStoreMessage->reset();
3423  if (commandId_.empty())
3424  {
3425  publishStoreMessage->newCommandId();
3426  commandId_ = publishStoreMessage->getCommandId();
3427  }
3428  else
3429  {
3430  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3431  }
3432  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3433  .assignSubscriptionId(commandId_.data(), commandId_.len())
3434  .assignQueryID(commandId_.data(), commandId_.len())
3435  .setAckTypeEnum(ackType)
3436  .assignTopic(topic_.c_str(), topic_.length())
3437  .assignData(data_.c_str(), data_.length());
3438  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3439  char buf[AMPS_NUMBER_BUFFER_LEN];
3440  size_t pos = convertToCharArray(buf, haSequenceNumber);
3441  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3442  {
3443  try
3444  {
3445  Lock<Mutex> l(_lock);
3446  _routes.addRoute(commandId_, messageHandler_,
3447  Message::AckType::Stats,
3448  Message::AckType::Processed|Message::AckType::Persisted,
3449  false);
3450  syncAckProcessing(timeout_, *publishStoreMessage,
3451  haSequenceNumber);
3452  }
3453  catch (const DisconnectedException&)
3454  {
3455  // Pass - it will get replayed upon reconnect
3456  }
3457  catch (...)
3458  {
3459  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3460  throw;
3461  }
3462  }
3463  return (std::string)commandId_;
3464  }
3465  else
3466  {
3467  Lock<Mutex> l(_lock);
3468  _message.reset();
3469  if (commandId_.empty())
3470  {
3471  _message.newCommandId();
3472  commandId_ = _message.getCommandId();
3473  }
3474  else
3475  {
3476  _message.setCommandId(commandId_.data(), commandId_.len());
3477  }
3478  _message.setCommandEnum(Message::Command::SOWDelete)
3479  .assignSubscriptionId(commandId_.data(), commandId_.len())
3480  .assignQueryID(commandId_.data(), commandId_.len())
3481  .setAckTypeEnum(Message::AckType::Processed |
3482  Message::AckType::Stats)
3483  .assignTopic(topic_.c_str(), topic_.length())
3484  .assignData(data_.c_str(), data_.length());
3485  _routes.addRoute(commandId_, messageHandler_,
3486  Message::AckType::Stats,
3487  Message::AckType::Processed,
3488  false);
3489  try
3490  {
3491  syncAckProcessing(timeout_, _message);
3492  }
3493  catch (...)
3494  {
3495  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3496  throw;
3497  }
3498  return (std::string)commandId_;
3499  }
3500  }
3501 
3502  std::string sowDeleteByKeys(MessageHandler messageHandler_,
3503  const std::string& topic_,
3504  const std::string& keys_,
3505  long timeout_,
3506  Message::Field commandId_ = Message::Field())
3507  {
3508  if (_publishStore.isValid())
3509  {
3510  unsigned ackType = Message::AckType::Processed |
3511  Message::AckType::Stats |
3512  Message::AckType::Persisted;
3513  if (!publishStoreMessage)
3514  {
3515  publishStoreMessage = new Message();
3516  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3517  }
3518  publishStoreMessage->reset();
3519  if (commandId_.empty())
3520  {
3521  publishStoreMessage->newCommandId();
3522  commandId_ = publishStoreMessage->getCommandId();
3523  }
3524  else
3525  {
3526  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3527  }
3528  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3529  .assignSubscriptionId(commandId_.data(), commandId_.len())
3530  .assignQueryID(commandId_.data(), commandId_.len())
3531  .setAckTypeEnum(ackType)
3532  .assignTopic(topic_.c_str(), topic_.length())
3533  .assignSowKeys(keys_.c_str(), keys_.length());
3534  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3535  char buf[AMPS_NUMBER_BUFFER_LEN];
3536  size_t pos = convertToCharArray(buf, haSequenceNumber);
3537  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3538  {
3539  try
3540  {
3541  Lock<Mutex> l(_lock);
3542  _routes.addRoute(commandId_, messageHandler_,
3543  Message::AckType::Stats,
3544  Message::AckType::Processed|Message::AckType::Persisted,
3545  false);
3546  syncAckProcessing(timeout_, *publishStoreMessage,
3547  haSequenceNumber);
3548  }
3549  catch (const DisconnectedException&)
3550  {
3551  // Pass - it will get replayed upon reconnect
3552  }
3553  catch (...)
3554  {
3555  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3556  throw;
3557  }
3558  }
3559  return (std::string)commandId_;
3560  }
3561  else
3562  {
3563  Lock<Mutex> l(_lock);
3564  _message.reset();
3565  if (commandId_.empty())
3566  {
3567  _message.newCommandId();
3568  commandId_ = _message.getCommandId();
3569  }
3570  else
3571  {
3572  _message.setCommandId(commandId_.data(), commandId_.len());
3573  }
3574  _message.setCommandEnum(Message::Command::SOWDelete)
3575  .assignSubscriptionId(commandId_.data(), commandId_.len())
3576  .assignQueryID(commandId_.data(), commandId_.len())
3577  .setAckTypeEnum(Message::AckType::Processed |
3578  Message::AckType::Stats)
3579  .assignTopic(topic_.c_str(), topic_.length())
3580  .assignSowKeys(keys_.c_str(), keys_.length());
3581  _routes.addRoute(commandId_, messageHandler_,
3582  Message::AckType::Stats,
3583  Message::AckType::Processed,
3584  false);
3585  try
3586  {
3587  syncAckProcessing(timeout_, _message);
3588  }
3589  catch (...)
3590  {
3591  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3592  throw;
3593  }
3594  return (std::string)commandId_;
3595  }
3596  }
3597 
3598  void startTimer(void)
3599  {
3600  Lock<Mutex> l(_lock);
3601  _message.reset();
3602  _message.setCommandEnum(Message::Command::StartTimer);
3603 
3604  _send(_message);
3605  }
3606 
3607  std::string stopTimer(MessageHandler messageHandler_)
3608  {
3609  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
3610  }
3611 
3612  amps_handle getHandle(void)
3613  {
3614  return _client;
3615  }
3616 
3617  void setExceptionListener(const ExceptionListener& listener_)
3618  {
3619  _exceptionListener = &listener_;
3620  }
3621 
3622  const ExceptionListener& getExceptionListener(void) const
3623  {
3624  return *_exceptionListener;
3625  }
3626 
3627  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
3628  {
3629  if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3630  {
3631  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
3632  }
3633  Lock<Mutex> l(_lock);
3634  if(_heartbeatInterval != heartbeatInterval_ ||
3635  _readTimeout != readTimeout_)
3636  {
3637  _heartbeatInterval = heartbeatInterval_;
3638  _readTimeout = readTimeout_;
3639  _sendHeartbeat();
3640  }
3641  }
3642 
3643  void _sendHeartbeat(void)
3644  {
3645  if (_connected && _heartbeatInterval != 0)
3646  {
3647  std::ostringstream options;
3648  options << "start," << _heartbeatInterval;
3649  Message startMessage = Message()
3650  .setCommandEnum(Message::Command::Heartbeat)
3651  .setOptions(options.str());
3652 
3653  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3654  _heartbeatTimer.start();
3655  try
3656  {
3657  _sendWithoutRetry(startMessage);
3658  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3659  }
3660  catch(ConnectionException &ex_)
3661  {
3662  // If we are disconnected when we attempt to send, that's OK;
3663  // we'll send this message after we re-connect (if we do).
3664  AMPS_UNHANDLED_EXCEPTION(ex_);
3665  }
3666  }
3667  amps_result result = AMPS_E_OK;
3668  if(_readTimeout && _connected)
3669  {
3670  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
3671  }
3672  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
3673  {
3674  AMPSException::throwFor(_client, result);
3675  }
3676  }
3677 
3678  void addConnectionStateListener(ConnectionStateListener *listener_)
3679  {
3680  Lock<Mutex> lock(_lock);
3681  _connectionStateListeners.insert(listener_);
3682  }
3683 
3684  void removeConnectionStateListener(ConnectionStateListener *listener_)
3685  {
3686  Lock<Mutex> lock(_lock);
3687  _connectionStateListeners.erase(listener_);
3688  }
3689 
3690  void _registerHandler(Command& command_, Message::Field& cid_,
3691  MessageHandler& handler_, unsigned requestedAcks_,
3692  unsigned systemAddedAcks_, bool isSubscribe_)
3693  {
3694  Message message = command_.getMessage();
3695  Message::Command::Type commandType = message.getCommandEnum();
3696  Message::Field subid = message.getSubscriptionId();
3697  Message::Field qid = message.getQueryID();
3698  // If we have an id, we're good, even if it's an existing route
3699  bool added = qid.len() || subid.len() || cid_.len();
3700  if (qid.len() > 0)
3701  {
3702  if (!_routes.hasRoute(qid))
3703  {
3704  _routes.addRoute(qid, handler_, requestedAcks_,
3705  systemAddedAcks_, isSubscribe_);
3706  }
3707  }
3708  if (subid.len() > 0)
3709  {
3710  if (!_routes.hasRoute(subid))
3711  {
3712  _routes.addRoute(subid, handler_, requestedAcks_,
3713  systemAddedAcks_, isSubscribe_);
3714  }
3715  }
3716  if (cid_.len() > 0)
3717  {
3718  if (!_routes.hasRoute(cid_))
3719  {
3720  _routes.addRoute(cid_, handler_, requestedAcks_,
3721  systemAddedAcks_, isSubscribe_);
3722  }
3723  }
3724  else if (commandType == Message::Command::Publish ||
3725  commandType == Message::Command::DeltaPublish)
3726  {
3727  cid_ = command_.getMessage().newCommandId().getCommandId();
3728  _routes.addRoute(cid_, handler_, requestedAcks_,
3729  systemAddedAcks_, isSubscribe_);
3730  added=true;
3731  }
3732  if (!added)
3733  {
3734  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
3735  }
3736  }
3737 
3738  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
3739  bool isHASubscribe_ = true)
3740  {
3741  isHASubscribe_ &= (bool)_subscriptionManager;
3742  Message& message = command_.getMessage();
3743  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3744  Message::AckType::Processed : Message::AckType::None;
3745  unsigned requestedAcks = message.getAckTypeEnum();
3746  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
3747  Message::Command::Type commandType = message.getCommandEnum();
3748  if (commandType == Message::Command::SOW ||
3749  commandType == Message::Command::StopTimer)
3750  systemAddedAcks |= Message::AckType::Completed;
3751  Message::Field cid = message.getCommandId();
3752  if (message.getBookmark().len() > 0)
3753  {
3754  if (command_.isSubscribe())
3755  {
3756  Message::Field bookmark = message.getBookmark();
3757  if (_bookmarkStore.isValid())
3758  {
3759  systemAddedAcks |= Message::AckType::Persisted;
3760  if (bookmark == AMPS_BOOKMARK_RECENT)
3761  {
3762  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
3763  }
3764  else if (bookmark != AMPS_BOOKMARK_NOW &&
3765  bookmark != AMPS_BOOKMARK_EPOCH)
3766  {
3767  _bookmarkStore.log(message);
3768  _bookmarkStore.discard(message);
3769  _bookmarkStore.persisted(message.getSubscriptionId(),
3770  bookmark);
3771  }
3772  }
3773  else if (bookmark == AMPS_BOOKMARK_RECENT)
3774  {
3776  }
3777  }
3778  }
3779  if (isPublishStore)
3780  {
3781  systemAddedAcks |= Message::AckType::Persisted;
3782  }
3783  bool isSubscribe = command_.isSubscribe();
3784  if (handler_.isValid() && !isSubscribe)
3785  {
3786  _registerHandler(command_, cid, handler_,
3787  requestedAcks, systemAddedAcks, isSubscribe);
3788  }
3789  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
3790  if (_publishStore.isValid() && command_.needsSequenceNumber())
3791  {
3792  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3793  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3794  {
3795  Unlock<Mutex> u(_lock);
3796  haSequenceNumber = _publishStore.store(message);
3797  }
3798  message.setSequence(haSequenceNumber);
3799  if (useSyncSend)
3800  {
3801  try
3802  {
3803  syncAckProcessing((long)command_.getTimeout(), message,
3804  haSequenceNumber);
3805  }
3806  catch (const DisconnectedException&)
3807  {
3808  // Pass - message will get replayed when reconnected
3809  }
3810  catch (...)
3811  {
3812  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3813  throw;
3814  }
3815  }
3816  else _send(message, haSequenceNumber);
3817  }
3818  else
3819  {
3820  if(command_.isSubscribe())
3821  {
3822  const Message::Field& subId = message.getSubscriptionId();
3823  if (isHASubscribe_)
3824  {
3825  Unlock<Mutex> u(_lock);
3826  _subscriptionManager->subscribe(handler_,
3827  message.deepCopy(),
3828  requestedAcks);
3829  if (_badTimeToHASubscribe)
3830  {
3831  message.setAckTypeEnum(requestedAcks);
3832  return std::string(subId.data(), subId.len());
3833  }
3834  }
3835  if (handler_.isValid())
3836  {
3837  _registerHandler(command_, cid, handler_,
3838  requestedAcks, systemAddedAcks, isSubscribe);
3839  }
3840  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3841  if (useSyncSend)
3842  {
3843  try
3844  {
3845  syncAckProcessing((long)command_.getTimeout(), message,
3846  isHASubscribe_);
3847  }
3848  catch (const DisconnectedException&)
3849  {
3850  if (!isHASubscribe_)
3851  {
3852  message.setAckTypeEnum(requestedAcks);
3853  throw;
3854  }
3855  }
3856  catch (const TimedOutException&)
3857  {
3858  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3859  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3860  throw;
3861  }
3862  catch (...)
3863  {
3864  if (isHASubscribe_)
3865  {
3866  // Have to unlock before calling into sub manager to avoid deadlock
3867  Unlock<Mutex> unlock(_lock);
3868  _subscriptionManager->unsubscribe(subId);
3869  }
3870  if (message.getQueryID().len() > 0)
3871  _routes.removeRoute(message.getQueryID());
3872  _routes.removeRoute(cid);
3873  _routes.removeRoute(subId);
3874  throw;
3875  }
3876  }
3877  else _send(message);
3878  if (subId.len() > 0)
3879  {
3880  message.setAckTypeEnum(requestedAcks);
3881  return std::string(subId.data(), subId.len());
3882  }
3883  }
3884  else
3885  {
3886  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3887  if (useSyncSend)
3888  {
3889  try
3890  {
3891  syncAckProcessing((long)(command_.getTimeout()), message);
3892  }
3893  catch (const DisconnectedException&)
3894  {
3895  message.setAckTypeEnum(requestedAcks);
3896  throw;
3897  }
3898  catch (...)
3899  {
3900  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3901  message.setAckTypeEnum(requestedAcks);
3902  throw;
3903  }
3904  }
3905  else _send(message);
3906  }
3907  }
3908  message.setAckTypeEnum(requestedAcks);
3909  return cid;
3910  }
3911 
3912  MessageStream getEmptyMessageStream(void);
3913 
3914  std::string executeAsync(Command& command_, MessageHandler& handler_,
3915  bool isHASubscribe_ = true)
3916  {
3917  Lock<Mutex> lock(_lock);
3918  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3919  }
3920 
3921  // Queue Methods //
3922  void setAutoAck(bool isAutoAckEnabled_)
3923  {
3924  _isAutoAckEnabled = isAutoAckEnabled_;
3925  }
3926  bool getAutoAck(void) const
3927  {
3928  return _isAutoAckEnabled;
3929  }
3930  void setAckBatchSize(const unsigned batchSize_)
3931  {
3932  _ackBatchSize = batchSize_;
3933  if (!_queueAckTimeout)
3934  {
3935  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3936  amps_client_set_idle_time(_client, _queueAckTimeout);
3937  }
3938  }
3939  unsigned getAckBatchSize(void) const
3940  {
3941  return _ackBatchSize;
3942  }
3943  int getAckTimeout(void) const
3944  {
3945  return _queueAckTimeout;
3946  }
3947  void setAckTimeout(const int ackTimeout_)
3948  {
3949  amps_client_set_idle_time(_client,ackTimeout_);
3950  _queueAckTimeout = ackTimeout_;
3951  }
3952  size_t _ack(QueueBookmarks& queueBookmarks_)
3953  {
3954  if(queueBookmarks_._bookmarkCount)
3955  {
3956  if (!publishStoreMessage)
3957  {
3958  publishStoreMessage = new Message();
3959  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3960  }
3961  publishStoreMessage->reset();
3962  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3963  .setTopic(queueBookmarks_._topic)
3964  .setBookmark(queueBookmarks_._data)
3965  .setCommandId("AMPS-queue-ack");
3966  amps_uint64_t haSequenceNumber = 0;
3967  if (_publishStore.isValid())
3968  {
3969  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3970  publishStoreMessage->setAckType("persisted")
3971  .setSequence(haSequenceNumber);
3972  queueBookmarks_._data.erase();
3973  queueBookmarks_._bookmarkCount = 0;
3974  }
3975  _send(*publishStoreMessage, haSequenceNumber);
3976  if (!_publishStore.isValid())
3977  {
3978  queueBookmarks_._data.erase();
3979  queueBookmarks_._bookmarkCount = 0;
3980  }
3981  return 1;
3982  }
3983  return 0;
3984  }
3985  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3986  {
3987  if (_isAutoAckEnabled) return;
3988  _ack(topic_, bookmark_, options_);
3989  }
3990  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3991  {
3992  if (bookmark_.len() == 0) return;
3993  Lock<Mutex> lock(_lock);
3994  if(_ackBatchSize < 2 || options_ != NULL)
3995  {
3996  if (!publishStoreMessage)
3997  {
3998  publishStoreMessage = new Message();
3999  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4000  }
4001  publishStoreMessage->reset();
4002  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4003  .setCommandId("AMPS-queue-ack")
4004  .setTopic(topic_).setBookmark(bookmark_);
4005  if (options_) publishStoreMessage->setOptions(options_);
4006  amps_uint64_t haSequenceNumber = 0;
4007  if (_publishStore.isValid())
4008  {
4009  haSequenceNumber = _publishStore.store(*publishStoreMessage);
4010  publishStoreMessage->setAckType("persisted")
4011  .setSequence(haSequenceNumber);
4012  }
4013  _send(*publishStoreMessage, haSequenceNumber);
4014  return;
4015  }
4016  // have we acked anything for this hash
4017  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(),topic_.len());
4018  TopicHashMap::iterator it = _topicHashMap.find(hash);
4019  if(it == _topicHashMap.end())
4020  {
4021  // add a new one to the map
4022  it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
4023  }
4024  QueueBookmarks &queueBookmarks = it->second;
4025  if(queueBookmarks._data.length())
4026  {
4027  queueBookmarks._data.append(",");
4028  }
4029  else
4030  {
4031  queueBookmarks._oldestTime = amps_now();
4032  }
4033  queueBookmarks._data.append(bookmark_);
4034  if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
4035  }
4036  void flushAcks(void)
4037  {
4038  size_t sendCount = 0;
4039  if(1)
4040  {
4041  Lock<Mutex> lock(_lock);
4042  typedef TopicHashMap::iterator iterator;
4043  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
4044  {
4045  QueueBookmarks& queueBookmarks = it->second;
4046  sendCount += _ack(queueBookmarks);
4047  }
4048  }
4049  if(sendCount) publishFlush(0);
4050  }
4051  // called when there's idle time, to see if we need to flush out any "acks"
4052  void checkQueueAcks(void)
4053  {
4054  if(!_topicHashMap.size()) return;
4055  Lock<Mutex> lock(_lock);
4056  try
4057  {
4058  amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
4059  typedef TopicHashMap::iterator iterator;
4060  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
4061  {
4062  QueueBookmarks& queueBookmarks = it->second;
4063  if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
4064  }
4065  }
4066  catch(std::exception& ex)
4067  {
4068  AMPS_UNHANDLED_EXCEPTION(ex);
4069  }
4070  }
4071 
4072  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4073  {
4074  Lock<Mutex> lock(_lock);
4075  _deferredExecutionList.push_back(
4076  DeferredExecutionRequest(func_,userData_));
4077  }
4078 
4079  inline void processDeferredExecutions(void)
4080  {
4081  if(_deferredExecutionList.size())
4082  {
4083  Lock<Mutex> lock(_lock);
4084  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4085  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4086  for(; it != end; ++it)
4087  {
4088  it->_func(it->_userData);
4089  }
4090  _deferredExecutionList.clear();
4091  _routes.invalidateCache();
4092  _routeCache.invalidateCache();
4093  }
4094  }
4095 
4096  bool getRetryOnDisconnect(void) const
4097  {
4098  return _isRetryOnDisconnect;
4099  }
4100 
4101  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4102  {
4103  _isRetryOnDisconnect = isRetryOnDisconnect_;
4104  }
4105 
4106  void setDefaultMaxDepth(unsigned maxDepth_)
4107  {
4108  _defaultMaxDepth = maxDepth_;
4109  }
4110 
4111  unsigned getDefaultMaxDepth(void) const
4112  {
4113  return _defaultMaxDepth;
4114  }
4115 
4116  void setTransportFilterFunction(amps_transport_filter_function filter_,
4117  void* userData_)
4118  {
4119  amps_result result = amps_client_set_transport_filter_function(_client, filter_, userData_);
4120  if (result != AMPS_E_OK)
4121  {
4122  AMPSException::throwFor(_client, result);
4123  }
4124  }
4125 }; // class ClientImpl
4153 
4155 {
4156  RefHandle<MessageStreamImpl> _body;
4157  public:
4162  class iterator
4163  {
4164  MessageStream* _pStream;
4165  Message _current;
4166  inline void advance(void);
4167 
4168  public:
4169  iterator() // end
4170  :_pStream(NULL)
4171  {;}
4172  iterator(MessageStream* pStream_)
4173  :_pStream(pStream_)
4174  {
4175  advance();
4176  }
4177 
4178  bool operator==(const iterator& rhs)
4179  {
4180  return _pStream == rhs._pStream;
4181  }
4182  bool operator!=(const iterator& rhs)
4183  {
4184  return _pStream != rhs._pStream;
4185  }
4186  void operator++(void) { advance(); }
4187  Message operator*(void) { return _current; }
4188  Message* operator->(void) { return &_current; }
4189  };
4191  bool isValid() const { return _body.isValid(); }
4192 
4196  {
4197  if(!_body.isValid())
4198  {
4199  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4200  }
4201  return iterator(this);
4202  }
4205  // For non-SOW queries, the end is never reached.
4206  iterator end(void) { return iterator(); }
4207  inline MessageStream(void);
4208 
4209  MessageStream timeout(unsigned timeout_);
4210  MessageStream conflate(void);
4211  MessageStream maxDepth(unsigned maxDepth_);
4212  unsigned getMaxDepth(void) const;
4213  unsigned getDepth(void) const;
4214 
4215  private:
4216  inline MessageStream(const Client& client_);
4217  inline void setSOWOnly(const std::string& commandId_);
4218  inline void setSubscription(const std::string& commandId_);
4219  inline void setStatsOnly(const std::string& commandId_);
4220  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
4221 
4222  inline operator MessageHandler(void);
4223  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
4224 
4225  friend class Client;
4226 
4227 };
4228 
4233 class Client
4234 {
4235 protected:
4236  BorrowRefHandle<ClientImpl> _body;
4237 public:
4238  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4239  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4240  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4241 
4250  Client(const std::string& clientName = "")
4251  : _body(new ClientImpl(clientName), true)
4252  {;}
4253 
4254  Client(ClientImpl* existingClient)
4255  : _body(existingClient, true)
4256  {;}
4257 
4258  Client(ClientImpl* existingClient, bool isRef)
4259  : _body(existingClient, isRef)
4260  {;}
4261 
4262  Client(const Client& rhs) : _body(rhs._body) {;}
4263  virtual ~Client(void) {;}
4264 
4265  Client& operator=(const Client& rhs)
4266  {
4267  _body = rhs._body;
4268  return *this;
4269  }
4270 
4271  bool isValid()
4272  {
4273  return _body.isValid();
4274  }
4275 
4288  void setName(const std::string& name)
4289  {
4290  _body.get().setName(name);
4291  }
4292 
4295  const std::string& getName() const
4296  {
4297  return _body.get().getName();
4298  }
4299 
4306  void setLogonCorrelationData(const std::string& logonCorrelationData_)
4307  {
4308  _body.get().setLogonCorrelationData(logonCorrelationData_);
4309  }
4310 
4313  const std::string& getLogonCorrelationData() const
4314  {
4315  return _body.get().getLogonCorrelationData();
4316  }
4317 
4326  size_t getServerVersion() const
4327  {
4328  return _body.get().getServerVersion();
4329  }
4330 
4337  VersionInfo getServerVersionInfo() const
4338  {
4339  return _body.get().getServerVersionInfo();
4340  }
4341 
4351  static size_t convertVersionToNumber(const std::string& version_)
4352  {
4353  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4354  }
4355 
4366  static size_t convertVersionToNumber(const char* data_, size_t len_)
4367  {
4368  return AMPS::convertVersionToNumber(data_, len_);
4369  }
4370 
4373  const std::string& getURI() const
4374  {
4375  return _body.get().getURI();
4376  }
4377 
4384 
4386 
4397  void connect(const std::string& uri)
4398  {
4399  _body.get().connect(uri);
4400  }
4401 
4404  void disconnect()
4405  {
4406  _body.get().disconnect();
4407  }
4408 
4422  void send(const Message& message)
4423  {
4424  _body.get().send(message);
4425  }
4426 
4435  void addMessageHandler(const Field& commandId_,
4436  const AMPS::MessageHandler& messageHandler_,
4437  unsigned requestedAcks_, bool isSubscribe_)
4438  {
4439  _body.get().addMessageHandler(commandId_, messageHandler_,
4440  requestedAcks_, isSubscribe_);
4441  }
4442 
4446  bool removeMessageHandler(const Field& commandId_)
4447  {
4448  return _body.get().removeMessageHandler(commandId_);
4449  }
4450 
4474  std::string send(MessageHandler messageHandler, Message& message, int timeout = 0)
4475  {
4476  return _body.get().send(messageHandler, message, timeout);
4477  }
4478 
4488  void setDisconnectHandler(DisconnectHandler disconnectHandler)
4489  {
4490  _body.get().setDisconnectHandler(disconnectHandler);
4491  }
4492 
4496  DisconnectHandler getDisconnectHandler(void)
4497  {
4498  return _body.get().getDisconnectHandler();
4499  }
4500 
4505  virtual ConnectionInfo getConnectionInfo() const
4506  {
4507  return _body.get().getConnectionInfo();
4508  }
4509 
4518  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
4519  {
4520  _body.get().setBookmarkStore(bookmarkStore_);
4521  }
4522 
4527  {
4528  return _body.get().getBookmarkStore();
4529  }
4530 
4535  {
4536  return _body.get().getSubscriptionManager();
4537  }
4538 
4546  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
4547  {
4548  _body.get().setSubscriptionManager(subscriptionManager_);
4549  }
4550 
4570  void setPublishStore(const Store& publishStore_)
4571  {
4572  _body.get().setPublishStore(publishStore_);
4573  }
4574 
4579  {
4580  return _body.get().getPublishStore();
4581  }
4582 
4586  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
4587  {
4588  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4589  duplicateMessageHandler_);
4590  }
4591 
4602  {
4603  return _body.get().getDuplicateMessageHandler();
4604  }
4605 
4616  {
4617  _body.get().setFailedWriteHandler(handler_);
4618  }
4619 
4624  {
4625  return _body.get().getFailedWriteHandler();
4626  }
4627 
4628 
4646  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
4647  {
4648  return _body.get().publish(topic_.c_str(), topic_.length(),
4649  data_.c_str(), data_.length());
4650  }
4651 
4671  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4672  const char* data_, size_t dataLength_)
4673  {
4674  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4675  }
4676 
4695  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
4696  unsigned long expiration_)
4697  {
4698  return _body.get().publish(topic_.c_str(), topic_.length(),
4699  data_.c_str(), data_.length(), expiration_);
4700  }
4701 
4722  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4723  const char* data_, size_t dataLength_,
4724  unsigned long expiration_)
4725  {
4726  return _body.get().publish(topic_, topicLength_,
4727  data_, dataLength_, expiration_);
4728  }
4729 
4763  void publishFlush(long timeout_ = 0)
4764  {
4765  _body.get().publishFlush(timeout_);
4766  }
4767 
4768 
4782  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
4783  {
4784  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4785  data_.c_str(), data_.length());
4786  }
4787 
4803  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4804  const char* data_, size_t dataLength_)
4805  {
4806  return _body.get().deltaPublish(topic_, topicLength_,
4807  data_, dataLength_);
4808  }
4809 
4824  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
4825  unsigned long expiration_)
4826  {
4827  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4828  data_.c_str(), data_.length(),
4829  expiration_);
4830  }
4831 
4848  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4849  const char* data_, size_t dataLength_,
4850  unsigned long expiration_)
4851  {
4852  return _body.get().deltaPublish(topic_, topicLength_,
4853  data_, dataLength_, expiration_);
4854  }
4855 
4869  std::string logon(int timeout_ = 0,
4870  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
4871  const char* options_ = NULL)
4872  {
4873  return _body.get().logon(timeout_, authenticator_, options_);
4874  }
4875 
4886  std::string logon(const char* options_, int timeout_ = 0)
4887  {
4888  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4889  options_);
4890  }
4891 
4902  std::string logon(const std::string& options_, int timeout_ = 0)
4903  {
4904  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4905  options_.c_str());
4906  }
4907 
4927  std::string subscribe(MessageHandler messageHandler_,
4928  const std::string& topic_,
4929  long timeout_=0,
4930  const std::string& filter_="",
4931  const std::string& options_ = "",
4932  const std::string& subId_ = "")
4933  {
4934  return _body.get().subscribe(messageHandler_, topic_, timeout_,
4935  filter_, "", options_, subId_);
4936  }
4937 
4953  MessageStream subscribe(const std::string& topic_,
4954  long timeout_=0, const std::string& filter_="",
4955  const std::string& options_ = "",
4956  const std::string& subId_ = "")
4957  {
4958  MessageStream result(*this);
4959  if (_body.get().getDefaultMaxDepth())
4960  result.maxDepth(_body.get().getDefaultMaxDepth());
4961  result.setSubscription(_body.get().subscribe(
4962  result.operator MessageHandler(),
4963  topic_, timeout_, filter_, "",
4964  options_, subId_, false));
4965  return result;
4966  }
4967 
4983  MessageStream subscribe(const char* topic_,
4984  long timeout_ = 0, const std::string& filter_ = "",
4985  const std::string& options_ = "",
4986  const std::string& subId_ = "")
4987  {
4988  MessageStream result(*this);
4989  if (_body.get().getDefaultMaxDepth())
4990  result.maxDepth(_body.get().getDefaultMaxDepth());
4991  result.setSubscription(_body.get().subscribe(
4992  result.operator MessageHandler(),
4993  topic_, timeout_, filter_, "",
4994  options_, subId_, false));
4995  return result;
4996  }
4997 
5010  std::string deltaSubscribe(MessageHandler messageHandler_,
5011  const std::string& topic_,
5012  long timeout_,
5013  const std::string& filter_="",
5014  const std::string& options_ = "",
5015  const std::string& subId_ = "")
5016  {
5017  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5018  filter_, "", options_, subId_);
5019  }
5028  MessageStream deltaSubscribe(const std::string& topic_,
5029  long timeout_, const std::string& filter_="",
5030  const std::string& options_ = "",
5031  const std::string& subId_ = "")
5032  {
5033  MessageStream result(*this);
5034  if (_body.get().getDefaultMaxDepth())
5035  result.maxDepth(_body.get().getDefaultMaxDepth());
5036  result.setSubscription(_body.get().deltaSubscribe(
5037  result.operator MessageHandler(),
5038  topic_, timeout_, filter_, "",
5039  options_, subId_, false));
5040  return result;
5041  }
5042 
5044  MessageStream deltaSubscribe(const char* topic_,
5045  long timeout_, const std::string& filter_ = "",
5046  const std::string& options_ = "",
5047  const std::string& subId_ = "")
5048  {
5049  MessageStream result(*this);
5050  if (_body.get().getDefaultMaxDepth())
5051  result.maxDepth(_body.get().getDefaultMaxDepth());
5052  result.setSubscription(_body.get().deltaSubscribe(
5053  result.operator MessageHandler(),
5054  topic_, timeout_, filter_, "",
5055  options_, subId_, false));
5056  return result;
5057  }
5058 
5084  std::string bookmarkSubscribe(MessageHandler messageHandler_,
5085  const std::string& topic_,
5086  long timeout_,
5087  const std::string& bookmark_,
5088  const std::string& filter_="",
5089  const std::string& options_ = "",
5090  const std::string& subId_ = "")
5091  {
5092  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5093  filter_, bookmark_, options_, subId_);
5094  }
5112  MessageStream bookmarkSubscribe(const std::string& topic_,
5113  long timeout_,
5114  const std::string& bookmark_,
5115  const std::string& filter_="",
5116  const std::string& options_ = "",
5117  const std::string& subId_ = "")
5118  {
5119  MessageStream result(*this);
5120  if (_body.get().getDefaultMaxDepth())
5121  result.maxDepth(_body.get().getDefaultMaxDepth());
5122  result.setSubscription(_body.get().subscribe(
5123  result.operator MessageHandler(),
5124  topic_, timeout_, filter_,
5125  bookmark_, options_,
5126  subId_, false));
5127  return result;
5128  }
5129 
5131  MessageStream bookmarkSubscribe(const char* topic_,
5132  long timeout_,
5133  const std::string& bookmark_,
5134  const std::string& filter_ = "",
5135  const std::string& options_ = "",
5136  const std::string& subId_ = "")
5137  {
5138  MessageStream result(*this);
5139  if (_body.get().getDefaultMaxDepth())
5140  result.maxDepth(_body.get().getDefaultMaxDepth());
5141  result.setSubscription(_body.get().subscribe(
5142  result.operator MessageHandler(),
5143  topic_, timeout_, filter_,
5144  bookmark_, options_,
5145  subId_, false));
5146  return result;
5147  }
5148 
5157  void unsubscribe(const std::string& commandId)
5158  {
5159  return _body.get().unsubscribe(commandId);
5160  }
5161 
5170  {
5171  return _body.get().unsubscribe();
5172  }
5173 
5174 
5204  std::string sow(MessageHandler messageHandler_,
5205  const std::string& topic_,
5206  const std::string& filter_ = "",
5207  const std::string& orderBy_ = "",
5208  const std::string& bookmark_ = "",
5209  int batchSize_ = DEFAULT_BATCH_SIZE,
5210  int topN_ = DEFAULT_TOP_N,
5211  const std::string& options_ = "",
5212  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5213  {
5214  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5215  bookmark_, batchSize_, topN_, options_,
5216  timeout_);
5217  }
5242  MessageStream sow(const std::string& topic_,
5243  const std::string& filter_ = "",
5244  const std::string& orderBy_ = "",
5245  const std::string& bookmark_ = "",
5246  int batchSize_ = DEFAULT_BATCH_SIZE,
5247  int topN_ = DEFAULT_TOP_N,
5248  const std::string& options_ = "",
5249  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5250  {
5251  MessageStream result(*this);
5252  if (_body.get().getDefaultMaxDepth())
5253  result.maxDepth(_body.get().getDefaultMaxDepth());
5254  result.setSOWOnly(sow(result.operator MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5255  return result;
5256  }
5257 
5259  MessageStream sow(const char* topic_,
5260  const std::string& filter_ = "",
5261  const std::string& orderBy_ = "",
5262  const std::string& bookmark_ = "",
5263  int batchSize_ = DEFAULT_BATCH_SIZE,
5264  int topN_ = DEFAULT_TOP_N,
5265  const std::string& options_ = "",
5266  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5267  {
5268  MessageStream result(*this);
5269  if (_body.get().getDefaultMaxDepth())
5270  result.maxDepth(_body.get().getDefaultMaxDepth());
5271  result.setSOWOnly(sow(result.operator MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5272  return result;
5273  }
5296  std::string sow(MessageHandler messageHandler_,
5297  const std::string& topic_,
5298  long timeout_,
5299  const std::string& filter_ = "",
5300  int batchSize_ = DEFAULT_BATCH_SIZE,
5301  int topN_ = DEFAULT_TOP_N)
5302  {
5303  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5304  batchSize_, topN_);
5305  }
5328  std::string sowAndSubscribe(MessageHandler messageHandler_,
5329  const std::string& topic_,
5330  long timeout_,
5331  const std::string& filter_ = "",
5332  int batchSize_ = DEFAULT_BATCH_SIZE,
5333  bool oofEnabled_ = false,
5334  int topN_ = DEFAULT_TOP_N)
5335  {
5336  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5337  filter_, batchSize_, oofEnabled_,
5338  topN_);
5339  }
5340 
5360  MessageStream sowAndSubscribe(const std::string& topic_,
5361  long timeout_,
5362  const std::string& filter_ = "",
5363  int batchSize_ = DEFAULT_BATCH_SIZE,
5364  bool oofEnabled_ = false,
5365  int topN_ = DEFAULT_TOP_N)
5366  {
5367  MessageStream result(*this);
5368  if (_body.get().getDefaultMaxDepth())
5369  result.maxDepth(_body.get().getDefaultMaxDepth());
5370  result.setSubscription(_body.get().sowAndSubscribe(
5371  result.operator MessageHandler(),
5372  topic_, timeout_, filter_,
5373  batchSize_, oofEnabled_,
5374  topN_, false));
5375  return result;
5376  }
5396  MessageStream sowAndSubscribe(const char *topic_,
5397  long timeout_,
5398  const std::string& filter_ = "",
5399  int batchSize_ = DEFAULT_BATCH_SIZE,
5400  bool oofEnabled_ = false,
5401  int topN_ = DEFAULT_TOP_N)
5402  {
5403  MessageStream result(*this);
5404  if (_body.get().getDefaultMaxDepth())
5405  result.maxDepth(_body.get().getDefaultMaxDepth());
5406  result.setSubscription(_body.get().sowAndSubscribe(
5407  result.operator MessageHandler(),
5408  topic_, timeout_, filter_,
5409  batchSize_, oofEnabled_,
5410  topN_, false));
5411  return result;
5412  }
5413 
5414 
5442  std::string sowAndSubscribe(MessageHandler messageHandler_,
5443  const std::string& topic_,
5444  const std::string& filter_ = "",
5445  const std::string& orderBy_ = "",
5446  const std::string& bookmark_ = "",
5447  int batchSize_ = DEFAULT_BATCH_SIZE,
5448  int topN_ = DEFAULT_TOP_N,
5449  const std::string& options_ = "",
5450  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5451  {
5452  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5453  orderBy_, bookmark_, batchSize_,
5454  topN_, options_, timeout_);
5455  }
5456 
5481  MessageStream sowAndSubscribe(const std::string& topic_,
5482  const std::string& filter_ = "",
5483  const std::string& orderBy_ = "",
5484  const std::string& bookmark_ = "",
5485  int batchSize_ = DEFAULT_BATCH_SIZE,
5486  int topN_ = DEFAULT_TOP_N,
5487  const std::string& options_ = "",
5488  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5489  {
5490  MessageStream result(*this);
5491  if (_body.get().getDefaultMaxDepth())
5492  result.maxDepth(_body.get().getDefaultMaxDepth());
5493  result.setSubscription(_body.get().sowAndSubscribe(
5494  result.operator MessageHandler(),
5495  topic_, filter_, orderBy_,
5496  bookmark_, batchSize_, topN_,
5497  options_, timeout_, false));
5498  return result;
5499  }
5500 
5502  MessageStream sowAndSubscribe(const char* topic_,
5503  const std::string& filter_ = "",
5504  const std::string& orderBy_ = "",
5505  const std::string& bookmark_ = "",
5506  int batchSize_ = DEFAULT_BATCH_SIZE,
5507  int topN_ = DEFAULT_TOP_N,
5508  const std::string& options_ = "",
5509  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5510  {
5511  MessageStream result(*this);
5512  if (_body.get().getDefaultMaxDepth())
5513  result.maxDepth(_body.get().getDefaultMaxDepth());
5514  result.setSubscription(_body.get().sowAndSubscribe(
5515  result.operator MessageHandler(),
5516  topic_, filter_, orderBy_,
5517  bookmark_, batchSize_, topN_,
5518  options_, timeout_, false));
5519  return result;
5520  }
5521 
5546  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
5547  const std::string& topic_,
5548  const std::string& filter_ = "",
5549  const std::string& orderBy_ = "",
5550  int batchSize_ = DEFAULT_BATCH_SIZE,
5551  int topN_ = DEFAULT_TOP_N,
5552  const std::string& options_ = "",
5553  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5554  {
5555  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5556  filter_, orderBy_, batchSize_,
5557  topN_, options_, timeout_);
5558  }
5579  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5580  const std::string& filter_ = "",
5581  const std::string& orderBy_ = "",
5582  int batchSize_ = DEFAULT_BATCH_SIZE,
5583  int topN_ = DEFAULT_TOP_N,
5584  const std::string& options_ = "",
5585  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5586  {
5587  MessageStream result(*this);
5588  if (_body.get().getDefaultMaxDepth())
5589  result.maxDepth(_body.get().getDefaultMaxDepth());
5590  result.setSubscription(sowAndDeltaSubscribe(result.operator MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5591  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5592  result.operator MessageHandler(),
5593  topic_, filter_, orderBy_,
5594  batchSize_, topN_, options_,
5595  timeout_, false));
5596  return result;
5597  }
5598 
5601  const std::string& filter_ = "",
5602  const std::string& orderBy_ = "",
5603  int batchSize_ = DEFAULT_BATCH_SIZE,
5604  int topN_ = DEFAULT_TOP_N,
5605  const std::string& options_ = "",
5606  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5607  {
5608  MessageStream result(*this);
5609  if (_body.get().getDefaultMaxDepth())
5610  result.maxDepth(_body.get().getDefaultMaxDepth());
5611  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5612  result.operator MessageHandler(),
5613  topic_, filter_, orderBy_,
5614  batchSize_, topN_, options_,
5615  timeout_, false));
5616  return result;
5617  }
5618 
5643  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
5644  const std::string& topic_,
5645  long timeout_,
5646  const std::string& filter_ = "",
5647  int batchSize_ = DEFAULT_BATCH_SIZE,
5648  bool oofEnabled_ = false,
5649  bool sendEmpties_ = false,
5650  int topN_ = DEFAULT_TOP_N)
5651  {
5652  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5653  timeout_, filter_, batchSize_,
5654  oofEnabled_, sendEmpties_,
5655  topN_);
5656  }
5657 
5679  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5680  long timeout_,
5681  const std::string& filter_ = "",
5682  int batchSize_ = DEFAULT_BATCH_SIZE,
5683  bool oofEnabled_ = false,
5684  bool sendEmpties_ = false,
5685  int topN_ = DEFAULT_TOP_N)
5686  {
5687  MessageStream result(*this);
5688  if (_body.get().getDefaultMaxDepth())
5689  result.maxDepth(_body.get().getDefaultMaxDepth());
5690  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5691  result.operator MessageHandler(),
5692  topic_, timeout_, filter_,
5693  batchSize_, oofEnabled_,
5694  sendEmpties_, topN_, false));
5695  return result;
5696  }
5719  long timeout_,
5720  const std::string& filter_ = "",
5721  int batchSize_ = DEFAULT_BATCH_SIZE,
5722  bool oofEnabled_ = false,
5723  bool sendEmpties_ = false,
5724  int topN_ = DEFAULT_TOP_N)
5725  {
5726  MessageStream result(*this);
5727  if (_body.get().getDefaultMaxDepth())
5728  result.maxDepth(_body.get().getDefaultMaxDepth());
5729  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5730  result.operator MessageHandler(),
5731  topic_, timeout_, filter_,
5732  batchSize_, oofEnabled_,
5733  sendEmpties_, topN_, false));
5734  return result;
5735  }
5755  std::string sowDelete(MessageHandler messageHandler,
5756  const std::string& topic,
5757  const std::string& filter,
5758  long timeout)
5759  {
5760  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5761  }
5778  Message sowDelete(const std::string& topic, const std::string& filter,
5779  long timeout=0)
5780  {
5781  MessageStream stream(*this);
5782  char buf[Message::IdentifierLength+1];
5783  buf[Message::IdentifierLength] = 0;
5784  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5785  Field cid(buf);
5786  try
5787  {
5788  stream.setStatsOnly(cid);
5789  _body.get().sowDelete(stream.operator MessageHandler(),topic,filter,timeout,cid);
5790  return *(stream.begin());
5791  }
5792  catch (const DisconnectedException&)
5793  {
5794  removeMessageHandler(cid);
5795  throw;
5796  }
5797  }
5798 
5802 
5803  void startTimer()
5804  {
5805  _body.get().startTimer();
5806  }
5807 
5813 
5814  std::string stopTimer(MessageHandler messageHandler)
5815  {
5816  return _body.get().stopTimer(messageHandler);
5817  }
5818 
5840  std::string sowDeleteByKeys(MessageHandler messageHandler_,
5841  const std::string& topic_,
5842  const std::string& keys_,
5843  long timeout_=0)
5844  {
5845  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5846  }
5867  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
5868  long timeout_ = 0)
5869  {
5870  MessageStream stream(*this);
5871  char buf[Message::IdentifierLength+1];
5872  buf[Message::IdentifierLength] = 0;
5873  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5874  Field cid(buf);
5875  try
5876  {
5877  stream.setStatsOnly(cid);
5878  _body.get().sowDeleteByKeys(stream.operator MessageHandler(),topic_,keys_,timeout_,cid);
5879  return *(stream.begin());
5880  }
5881  catch (const DisconnectedException&)
5882  {
5883  removeMessageHandler(cid);
5884  throw;
5885  }
5886  }
5887 
5902  std::string sowDeleteByData(MessageHandler messageHandler_,
5903  const std::string& topic_, const std::string& data_,
5904  long timeout_=0)
5905  {
5906  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5907  }
5922  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
5923  long timeout_=0)
5924  {
5925  MessageStream stream(*this);
5926  char buf[Message::IdentifierLength+1];
5927  buf[Message::IdentifierLength] = 0;
5928  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5929  Field cid(buf);
5930  try
5931  {
5932  stream.setStatsOnly(cid);
5933  _body.get().sowDeleteByData(stream.operator MessageHandler(),topic_,data_,timeout_,cid);
5934  return *(stream.begin());
5935  }
5936  catch (const DisconnectedException&)
5937  {
5938  removeMessageHandler(cid);
5939  throw;
5940  }
5941  }
5942 
5947  {
5948  return _body.get().getHandle();
5949  }
5950 
5959  {
5960  _body.get().setExceptionListener(listener_);
5961  }
5962 
5966  {
5967  return _body.get().getExceptionListener();
5968  }
5969 
5983  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
5984  {
5985  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
5986  }
5987 
6000  void setHeartbeat(unsigned heartbeatTime_)
6001  {
6002  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6003  }
6004 
6007  {
6008  setLastChanceMessageHandler(messageHandler);
6009  }
6010 
6014  {
6015  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6016  messageHandler);
6017  }
6018 
6036  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6037  {
6038  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6039  }
6040 
6057  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
6058  {
6059  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6060  }
6061 
6062  static const char* BOOKMARK_NOW() { return AMPS_BOOKMARK_NOW; }
6063  static const char* BOOKMARK_EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6064  static const char* BOOKMARK_RECENT() { return AMPS_BOOKMARK_RECENT; }
6065  static const char* BOOKMARK_MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6066  static const char* NOW() { return AMPS_BOOKMARK_NOW; }
6067  static const char* EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6068  static const char* MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6069 
6070 
6077  {
6078  _body.get().addConnectionStateListener(listener);
6079  }
6080 
6085  {
6086  _body.get().removeConnectionStateListener(listener);
6087  }
6088 
6110  std::string executeAsync(Command& command_, MessageHandler handler_)
6111  {
6112  return _body.get().executeAsync(command_, handler_);
6113  }
6114 
6144  std::string executeAsyncNoResubscribe(Command& command_,
6145  MessageHandler handler_)
6146  {
6147  std::string id;
6148  try
6149  {
6150  id = _body.get().executeAsync(command_, handler_, false);
6151  }
6152  catch (const DisconnectedException&)
6153  {
6154  removeMessageHandler(command_.getMessage().getCommandId());
6155  if (command_.isSubscribe())
6156  {
6157  removeMessageHandler(command_.getMessage().getSubscriptionId());
6158  }
6159  if (command_.isSow())
6160  {
6161  removeMessageHandler(command_.getMessage().getQueryID());
6162  }
6163  throw;
6164  }
6165  return id;
6166  }
6167 
6180  MessageStream execute(Command& command_);
6181 
6188  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6189  {
6190  _body.get().ack(topic_,bookmark_,options_);
6191  }
6192 
6198  void ack(Message& message_, const char* options_ = NULL)
6199  {
6200  _body.get().ack(message_.getTopic(),message_.getBookmark(),options_);
6201  }
6208  void ack(const std::string& topic_, const std::string& bookmark_,
6209  const char* options_ = NULL)
6210  {
6211  _body.get().ack(Field(topic_.data(),topic_.length()), Field(bookmark_.data(),bookmark_.length()),options_);
6212  }
6213 
6219  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6220  {
6221  _body.get()._ack(topic_,bookmark_,options_);
6222  }
6230  void flushAcks(void)
6231  {
6232  _body.get().flushAcks();
6233  }
6234 
6239  bool getAutoAck(void) const
6240  {
6241  return _body.get().getAutoAck();
6242  }
6249  void setAutoAck(bool isAutoAckEnabled_)
6250  {
6251  _body.get().setAutoAck(isAutoAckEnabled_);
6252  }
6257  unsigned getAckBatchSize(void) const
6258  {
6259  return _body.get().getAckBatchSize();
6260  }
6267  void setAckBatchSize(const unsigned ackBatchSize_)
6268  {
6269  _body.get().setAckBatchSize(ackBatchSize_);
6270  }
6271 
6276  int getAckTimeout(void) const
6277  {
6278  return _body.get().getAckTimeout();
6279  }
6286  void setAckTimeout(const int ackTimeout_)
6287  {
6288  _body.get().setAckTimeout(ackTimeout_);
6289  }
6290 
6291 
6300  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
6301  {
6302  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6303  }
6304 
6309  bool getRetryOnDisconnect(void) const
6310  {
6311  return _body.get().getRetryOnDisconnect();
6312  }
6313 
6318  void setDefaultMaxDepth(unsigned maxDepth_)
6319  {
6320  _body.get().setDefaultMaxDepth(maxDepth_);
6321  }
6322 
6327  unsigned getDefaultMaxDepth(void) const
6328  {
6329  return _body.get().getDefaultMaxDepth();
6330  }
6331 
6339  void* userData_)
6340  {
6341  return _body.get().setTransportFilterFunction(filter_, userData_);
6342  }
6343 
6349  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
6350  {
6351  _body.get().deferredExecution(func_,userData_);
6352  }
6356 };
6357 
6358 inline void
6359 ClientImpl::lastChance(AMPS::Message& message)
6360 {
6361  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6362 }
6363 
6364 inline unsigned
6365 ClientImpl::persistedAck(AMPS::Message& message)
6366 {
6367  unsigned deliveries = 0;
6368  try
6369  {
6370  /*
6371  * Best Practice: If you don't care about the dupe acks that
6372  * occur during failover or rapid disconnect/reconnect, then just
6373  * ignore them. We could discard each duplicate from the
6374  * persisted store, but the storage costs of doing 1 record
6375  * discards is heavy. In most scenarios we'll just quickly blow
6376  * through the duplicates and get back to processing the
6377  * non-dupes.
6378  */
6379  const char* data = NULL;
6380  size_t len = 0;
6381  const char* status = NULL;
6382  size_t statusLen = 0;
6383  amps_handle messageHandle = message.getMessage();
6384  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6385  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
6386  amps_message_get_field_value(messageHandle, AMPS_Status,
6387  &status, &statusLen);
6388  if (len == NotEntitled || len == Duplicate ||
6389  (statusLen == Failure && status[0] == 'f'))
6390  {
6391  if (_failedWriteHandler)
6392  {
6393  amps_uint64_t sequence =
6394  amps_message_get_field_uint64(messageHandle,
6395  AMPS_Sequence);
6396  if (_publishStore.isValid())
6397  {
6398  FailedWriteStoreReplayer replayer(this, data, len);
6399  _publishStore.replaySingle(replayer, sequence);
6400  }
6401  else // Call the handler with what little we have
6402  {
6403  static Message emptyMessage;
6404  _failedWriteHandler->failedWrite(emptyMessage, data, len);
6405  }
6406  ++deliveries;
6407  }
6408  }
6409  if (_publishStore.isValid())
6410  {
6411  // Ack for publisher will have sequence while
6412  // ack for bookmark subscribe won't
6413  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
6414  AMPS_Sequence);
6415  if (seq > 0)
6416  {
6417  ++deliveries;
6418  _publishStore.discardUpTo(seq);
6419  }
6420  }
6421 
6422  if (!deliveries && _bookmarkStore.isValid())
6423  {
6424  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
6425  &data, &len);
6426  if (len > 0)
6427  {
6428  const char* bookmarkData = NULL;
6429  size_t bookmarkLen = 0;
6430  amps_message_get_field_value(messageHandle, AMPS_Bookmark,
6431  &bookmarkData, &bookmarkLen);
6432  if (bookmarkLen > 0)
6433  {
6434  ++deliveries;
6435  Message::Field subId(data, len);
6436  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
6437  }
6438  }
6439  }
6440  }
6441  catch (std::exception& ex)
6442  {
6443  AMPS_UNHANDLED_EXCEPTION(ex);
6444  }
6445  return deliveries;
6446 }
6447 
6448 inline unsigned
6449 ClientImpl::processedAck(Message &message)
6450 {
6451  unsigned deliveries = 0;
6452  AckResponse ack;
6453  const char* data = NULL;
6454  size_t len = 0;
6455  amps_handle messageHandle = message.getMessage();
6456  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
6457  Lock<Mutex> l(_lock);
6458  if (data && len)
6459  {
6460  AckMap::iterator i = _acks.find(std::string(data, len));
6461  if (i != _acks.end())
6462  {
6463  ++deliveries;
6464  ack = i->second;
6465  _acks.erase(i);
6466  }
6467  }
6468  if (deliveries)
6469  {
6470  amps_message_get_field_value(messageHandle, AMPS_Status, &data,
6471  &len);
6472  ack.setStatus(data, len);
6473  amps_message_get_field_value(messageHandle, AMPS_Reason, &data,
6474  &len);
6475  ack.setReason(data, len);
6476  amps_message_get_field_value(messageHandle, AMPS_UserId, &data,
6477  &len);
6478  ack.setUsername(data, len);
6479  amps_message_get_field_value(messageHandle, AMPS_Password, &data,
6480  &len);
6481  ack.setPassword(data, len);
6482  amps_message_get_field_value(messageHandle, AMPS_Sequence, &data,
6483  &len);
6484  ack.setSequenceNo(data, len);
6485  amps_message_get_field_value(messageHandle, AMPS_Version, &data,
6486  &len);
6487  ack.setServerVersion(data, len);
6488  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
6489  ack.setOptions(data,len);
6490  ack.setResponded(true);
6491  _lock.signalAll();
6492  }
6493  return deliveries;
6494 }
6495 
6496 inline void
6497 ClientImpl::checkAndSendHeartbeat(bool force)
6498 {
6499  if (force || _heartbeatTimer.check())
6500  {
6501  _heartbeatTimer.start();
6502  try
6503  {
6504  sendWithoutRetry(_beatMessage);
6505  }
6506  catch (const AMPSException&)
6507  {
6508  ;
6509  }
6510  }
6511 }
6512 
6513 inline ConnectionInfo ClientImpl::getConnectionInfo() const
6514 {
6515  ConnectionInfo info;
6516  std::ostringstream writer;
6517 
6518  info["client.uri"] = _lastUri;
6519  info["client.name"] = _name;
6520  info["client.username"] = _username;
6521  if(_publishStore.isValid())
6522  {
6523  writer << _publishStore.unpersistedCount();
6524  info["publishStore.unpersistedCount"] = writer.str();
6525  writer.clear();
6526  writer.str("");
6527  }
6528 
6529  return info;
6530 }
6531 
6532 inline amps_result
6533 ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
6534 {
6535  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6536  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6537  ClientImpl* me = (ClientImpl*) userData_;
6538  if(!messageHandle_)
6539  {
6540  if(me->_queueAckTimeout) me->checkQueueAcks();
6541  return AMPS_E_OK;
6542  }
6543  me->processDeferredExecutions();
6544 
6545  me->_readMessage.replace(messageHandle_);
6546  Message& message = me->_readMessage;
6547  Message::Command::Type commandType = message.getCommandEnum();
6548  if (commandType & SOWMask)
6549  {
6550 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6551  // A small cheat here to get the right handler, using knowledge of the
6552  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
6553  // and their GlobalCommandTypeHandlers values 1, 2, 3.
6554  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6555  me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6556 #endif
6557  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6558  message.getQueryID()));
6559  }
6560  else if (commandType & PublishMask)
6561  {
6562 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6563  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6564  me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6565  GlobalCommandTypeHandlers::Publish :
6566  GlobalCommandTypeHandlers::OOF)].invoke(message));
6567 #endif
6568  const char* subIds = NULL;
6569  size_t subIdsLen = 0;
6570  // Publish command, send to subscriptions
6571  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds, &subIds, &subIdsLen);
6572  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
6573  for(size_t i=0; i<subIdCount; ++i)
6574  {
6575  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6576  MessageHandler& handler = lookupResult.handler;
6577  if (handler.isValid())
6578  {
6579  amps_message_set_field_value(messageHandle_, AMPS_SubscriptionId,
6580  subIds + lookupResult.idOffset, lookupResult.idLength);
6581  Message::Field bookmark = message.getBookmark();
6582  bool isMessageQueue = message.getLeasePeriod().len() != 0;
6583  bool isAutoAck = me->_isAutoAckEnabled;
6584 
6585  if (!isMessageQueue && !bookmark.empty() &&
6586  me->_bookmarkStore.isValid())
6587  {
6588  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6589  {
6590  //Call duplicate message handler in handlers map
6591  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6592  {
6593  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message);
6594  }
6595  }
6596  else
6597  {
6598  me->_bookmarkStore.log(me->_readMessage);
6599  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6600  handler.invoke(message));
6601  }
6602  }
6603  else
6604  {
6605  if(isMessageQueue && isAutoAck)
6606  {
6607  try
6608  {
6609  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6610  if (!message.getIgnoreAutoAck())
6611  {
6612  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6613  me->_ack(message.getTopic(),message.getBookmark()));
6614  }
6615  }
6616  catch(std::exception& ex)
6617  {
6618  if (!message.getIgnoreAutoAck())
6619  {
6620  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6621  me->_ack(message.getTopic(),message.getBookmark(),"cancel"));
6622  }
6623  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6624  }
6625  }
6626  else
6627  {
6628  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6629  handler.invoke(message));
6630  }
6631  }
6632  }
6633  else me->lastChance(message);
6634  } // for (subidsEnd)
6635  }
6636  else if (commandType == Message::Command::Ack)
6637  {
6638  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6639  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6640  unsigned ackType = message.getAckTypeEnum();
6641  unsigned deliveries = 0U;
6642  switch (ackType)
6643  {
6644  case Message::AckType::Persisted:
6645  deliveries += me->persistedAck(message);
6646  break;
6647  case Message::AckType::Processed: // processed
6648  deliveries += me->processedAck(message);
6649  break;
6650  }
6651  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6652  if (deliveries == 0)
6653  {
6654  me->lastChance(message);
6655  }
6656  }
6657  else if (commandType == Message::Command::Heartbeat)
6658  {
6659  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6660  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6661  if(me->_heartbeatTimer.getTimeout() != 0.0)
6662  {
6663  me->checkAndSendHeartbeat(true);
6664  }
6665  else
6666  {
6667  me->lastChance(message);
6668  }
6669  return AMPS_E_OK;
6670  }
6671  else if (!message.getCommandId().empty())
6672  {
6673  unsigned deliveries = 0U;
6674  try
6675  {
6676  while(me->_connected) // Keep sending heartbeats when stream is full
6677  {
6678  try
6679  {
6680  deliveries = me->_routes.deliverData(message, message.getCommandId());
6681  break;
6682  }
6683 #ifdef _WIN32
6684  catch(MessageStreamFullException&)
6685 #else
6686  catch(MessageStreamFullException& ex_)
6687 #endif
6688  {
6689  me->checkAndSendHeartbeat(false);
6690  }
6691  }
6692  }
6693  catch (std::exception& ex_)
6694  {
6695  try
6696  {
6697  me->_exceptionListener->exceptionThrown(ex_);
6698  }
6699  catch(...)
6700  {
6701  ;
6702  }
6703  }
6704  if (deliveries == 0)
6705  me->lastChance(message);
6706  }
6707  me->checkAndSendHeartbeat();
6708  return AMPS_E_OK;
6709 }
6710 
6711 inline void
6712 ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
6713 {
6714  ClientImpl* me = (ClientImpl*) userData;
6715  //Client wrapper(me);
6716  // Go ahead and signal any waiters if they are around...
6717  me->clearAcks(failedConnectionVersion);
6718 }
6719 
6720 inline amps_result
6721 ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
6722 {
6723  ClientImpl* me = (ClientImpl*) userData;
6724  Lock<Mutex> l(me->_lock);
6725  Client wrapper(me,false);
6726  if (me->_connected)
6727  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6728  while(true)
6729  {
6730  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6731  try
6732  {
6733  AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6734  me->_connected = false;
6735  {
6736  // Have to release the lock here or receive thread can't
6737  // invoke the message handler.
6738  Unlock<Mutex> unlock(me->_lock);
6739  me->_disconnectHandler.invoke(wrapper);
6740  }
6741  }
6742  catch(const std::exception& ex)
6743  {
6744  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6745  }
6746 
6747  if (!me->_connected)
6748  {
6749  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6750  AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException("Reconnect failed."));
6751  return AMPS_E_DISCONNECTED;
6752  }
6753  try
6754  {
6755  // Resubscribe
6756  if (me->_subscriptionManager)
6757  {
6758  {
6759  // Have to release the lock here or receive thread can't
6760  // invoke the message handler.
6761  Unlock<Mutex> unlock(me->_lock);
6762  me->_subscriptionManager->resubscribe(wrapper);
6763  }
6764  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6765  }
6766  return AMPS_E_OK;
6767  }
6768  catch(const AMPSException& subEx)
6769  {
6770  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6771  }
6772  catch(const std::exception& subEx)
6773  {
6774  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6775  return AMPS_E_RETRY;
6776  }
6777  catch(...)
6778  {
6779  return AMPS_E_RETRY;
6780  }
6781  }
6782  return AMPS_E_RETRY;
6783 }
6784 
6785 class FIX
6786 {
6787  const char* _data;
6788  size_t _len;
6789  char _fieldSep;
6790 public:
6791  class iterator
6792  {
6793  const char* _data;
6794  size_t _len;
6795  size_t _pos;
6796  char _fieldSep;
6797  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
6798  : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6799  {
6800  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6801  }
6802  public:
6803  typedef void* difference_type;
6804  typedef std::forward_iterator_tag iterator_category;
6805  typedef std::pair<Message::Field, Message::Field> value_type;
6806  typedef value_type* pointer;
6807  typedef value_type& reference;
6808  bool operator==(const iterator& rhs) const
6809  {
6810  return _pos == rhs._pos;
6811  }
6812  bool operator!=(const iterator& rhs) const
6813  {
6814  return _pos != rhs._pos;
6815  }
6816  iterator& operator++()
6817  {
6818  // Skip through the data
6819  while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
6820  // Skip through any field separators
6821  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6822  return *this;
6823  }
6824 
6825  value_type operator*() const
6826  {
6827  value_type result;
6828  size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
6829  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
6830 
6831  result.first.assign(_data+_pos, keyLength);
6832 
6833  if (i < _len && _data[i] == '=')
6834  {
6835  ++i;
6836  valueStart = i;
6837  for(; i < _len && _data[i] != _fieldSep; ++i)
6838  {
6839  valueLength++;
6840  }
6841  }
6842  result.second.assign(_data+valueStart, valueLength);
6843  return result;
6844  }
6845 
6846  friend class FIX;
6847  };
6848  class reverse_iterator
6849  {
6850  const char* _data;
6851  size_t _len;
6852  const char* _pos;
6853  char _fieldSep;
6854  public:
6855  typedef std::pair<Message::Field, Message::Field> value_type;
6856  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
6857  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
6858  {
6859  if (_pos)
6860  {
6861  // skip past meaningless trailing fieldseps
6862  while(_pos >=_data && *_pos == _fieldSep) --_pos;
6863  while(_pos > _data && *_pos != _fieldSep) --_pos;
6864  // if we stopped before the 0th character, it's because
6865  // it's a field sep. advance one to point to the first character
6866  // of a key.
6867  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6868  if (_pos < _data) _pos = 0;
6869  }
6870  }
6871  bool operator==(const reverse_iterator& rhs) const
6872  {
6873  return _pos == rhs._pos;
6874  }
6875  bool operator!=(const reverse_iterator& rhs) const
6876  {
6877  return _pos != rhs._pos;
6878  }
6879  reverse_iterator& operator++()
6880  {
6881  if (_pos == _data)
6882  {
6883  _pos = 0;
6884  }
6885  else
6886  {
6887  // back up 1 to a field separator
6888  --_pos;
6889  // keep backing up through field separators
6890  while(_pos >=_data && *_pos == _fieldSep) --_pos;
6891  // now back up to the beginning of this field
6892  while(_pos >_data && *_pos != _fieldSep) --_pos;
6893  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6894  if (_pos < _data) _pos = 0;
6895  }
6896  return *this;
6897  }
6898  value_type operator*() const
6899  {
6900  value_type result;
6901  size_t keyLength = 0, valueStart = 0, valueLength = 0;
6902  size_t i = (size_t)(_pos - _data);
6903  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
6904  result.first.assign(_pos, keyLength);
6905  if (i<_len && _data[i] == '=')
6906  {
6907  ++i;
6908  valueStart = i;
6909  for(; i<_len && _data[i] != _fieldSep; ++i)
6910  {
6911  valueLength++;
6912  }
6913  }
6914  result.second.assign(_data+valueStart, valueLength);
6915  return result;
6916  }
6917  };
6918  FIX(const Message::Field& data, char fieldSeparator=1)
6919  : _data(data.data()), _len(data.len()),
6920  _fieldSep(fieldSeparator)
6921  {
6922  }
6923 
6924  FIX(const char* data, size_t len, char fieldSeparator=1)
6925  : _data(data), _len(len), _fieldSep(fieldSeparator)
6926  {
6927  }
6928 
6929  iterator begin() const
6930  {
6931  return iterator(_data, _len, 0, _fieldSep);
6932  }
6933  iterator end() const
6934  {
6935  return iterator(_data, _len, _len, _fieldSep);
6936  }
6937 
6938 
6939  reverse_iterator rbegin() const
6940  {
6941  return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
6942  }
6943 
6944  reverse_iterator rend() const
6945  {
6946  return reverse_iterator(_data, _len, 0, _fieldSep);
6947  }
6948 };
6949 
6950 
6963 
6964 template <class T>
6966 {
6967  std::stringstream _data;
6968  char _fs;
6969 public:
6975  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
6976 
6984  void append(const T& tag, const char* value, size_t offset, size_t length)
6985  {
6986  _data << tag<<'=';
6987  _data.write(value+offset, (std::streamsize)length);
6988  _data << _fs;
6989  }
6995  void append(const T& tag, const std::string& value)
6996  {
6997  _data << tag << '=' << value << _fs;
6998  }
6999 
7002  std::string getString() const
7003  {
7004  return _data.str();
7005  }
7006  operator std::string() const
7007  {
7008  return _data.str();
7009  }
7010 
7012  void reset()
7013  {
7014  _data.str(std::string());
7015  }
7016 };
7017 
7021 
7023 
7027 
7029 
7030 
7038 
7040 {
7041  char _fs;
7042 public:
7047  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7048 
7051  typedef std::map<Message::Field, Message::Field> map_type;
7052 
7058  map_type toMap(const Message::Field& data)
7059  {
7060  FIX fix(data, _fs);
7061  map_type retval;
7062  for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7063  {
7064  retval.insert(*a);
7065  }
7066 
7067  return retval;
7068  }
7069 };
7070 
7071 class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
7072 {
7073  Mutex _lock;
7074  std::deque<Message> _q;
7075  std::string _commandId;
7076  Client _client;
7077  unsigned _timeout;
7078  unsigned _maxDepth;
7079  unsigned _requestedAcks;
7080  Message::Field _previousTopic;
7081  Message::Field _previousBookmark;
7082  volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7083  typedef std::map<std::string, Message*> SOWKeyMap;
7084  SOWKeyMap _sowKeyMap;
7085  public:
7086  MessageStreamImpl(const Client& client_)
7087  : _client(client_),
7088  _timeout(0),
7089  _maxDepth((unsigned)~0),
7090  _requestedAcks(0),
7091  _state(Unset)
7092  {
7093  if (_client.isValid())
7094  {
7095  _client.addConnectionStateListener(this);
7096  }
7097  }
7098 
7099  MessageStreamImpl(ClientImpl* client_)
7100  : _client(client_),
7101  _timeout(0),
7102  _maxDepth((unsigned)~0),
7103  _requestedAcks(0),
7104  _state(Unset)
7105  {
7106  if (_client.isValid())
7107  {
7108  _client.addConnectionStateListener(this);
7109  }
7110  }
7111 
7112  ~MessageStreamImpl()
7113  {
7114  }
7115 
7116  virtual void destroy()
7117  {
7118  try
7119  {
7120  close();
7121  }
7122  catch(std::exception &e)
7123  {
7124  try
7125  {
7126  if (_client.isValid())
7127  {
7128  _client.getExceptionListener().exceptionThrown(e);
7129  }
7130  } catch (...) {}
7131  }
7132  if (_client.isValid())
7133  {
7134  _client.removeConnectionStateListener(this);
7135  _client.deferredExecution(MessageStreamImpl::destroyer, this);
7136  _client = Client((ClientImpl*)NULL);
7137  }
7138  else
7139  {
7140  delete this;
7141  }
7142  }
7143 
7144  static void destroyer(void* vpMessageStreamImpl_)
7145  {
7146  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7147  }
7148 
7149  void setSubscription(const std::string& commandId_)
7150  {
7151  Lock<Mutex> lock(_lock);
7152  // It's possible to disconnect between creation/registration and here.
7153  if (Disconnected == _state) return;
7154  assert(Unset==_state);
7155  _state = Subscribe;
7156  _commandId = commandId_;
7157  }
7158 
7159  void setSOWOnly(const std::string& commandId_)
7160  {
7161  Lock<Mutex> lock(_lock);
7162  // It's possible to disconnect between creation/registration and here.
7163  if (Disconnected == _state) return;
7164  assert(Unset==_state);
7165  _state = SOWOnly;
7166  _commandId = commandId_;
7167  }
7168 
7169  void setStatsOnly(const std::string& commandId_)
7170  {
7171  Lock<Mutex> lock(_lock);
7172  // It's possible to disconnect between creation/registration and here.
7173  if (Disconnected == _state) return;
7174  assert(Unset==_state);
7175  _state = AcksOnly;
7176  _requestedAcks = Message::AckType::Stats;
7177  _commandId = commandId_;
7178  }
7179 
7180  void setAcksOnly(const std::string& commandId_, unsigned acks_)
7181  {
7182  Lock<Mutex> lock(_lock);
7183  // It's possible to disconnect between creation/registration and here.
7184  if (Disconnected == _state) return;
7185  assert(Unset==_state);
7186  _state = AcksOnly;
7187  _requestedAcks = acks_;
7188  _commandId = commandId_;
7189  }
7190 
7191  void connectionStateChanged(ConnectionStateListener::State state_)
7192  {
7193  Lock<Mutex> lock(_lock);
7194  if(state_ == AMPS::ConnectionStateListener::Disconnected)
7195  {
7196  _state = Disconnected;
7197  }
7198  _lock.signalAll();
7199  }
7200 
7201  void timeout(unsigned timeout_)
7202  {
7203  _timeout = timeout_;
7204  }
7205  void conflate(void)
7206  {
7207  if(_state == Subscribe) _state = Conflate;
7208  }
7209  void maxDepth(unsigned maxDepth_)
7210  {
7211  if(maxDepth_) _maxDepth = maxDepth_;
7212  else _maxDepth = (unsigned)~0;
7213  }
7214  unsigned getMaxDepth(void) const
7215  {
7216  return _maxDepth;
7217  }
7218  unsigned getDepth(void) const
7219  {
7220  return (unsigned)(_q.size());
7221  }
7222 
7223  bool next(Message& current_)
7224  {
7225  Lock<Mutex> lock(_lock);
7226  if (!_previousTopic.empty() && !_previousBookmark.empty())
7227  {
7228  try
7229  {
7230  if (_client.isValid())
7231  {
7232  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7233  }
7234  }
7235 #ifdef _WIN32
7236  catch (AMPSException&)
7237 #else
7238  catch (AMPSException& e)
7239 #endif
7240  {
7241  current_.invalidate();
7242  _previousTopic.clear();
7243  _previousBookmark.clear();
7244  return false;
7245  }
7246  _previousTopic.clear();
7247  _previousBookmark.clear();
7248  }
7249  unsigned minWaitTime = _timeout ? _timeout : 1000;
7250  size_t waited = 0;
7251  while(_q.empty() && _state & Running)
7252  {
7253  // Using timeout so python can interrupt
7254  _lock.wait((long)minWaitTime);
7255  amps_invoke_waiting_function();
7256  if (_timeout)
7257  {
7258  waited += minWaitTime;
7259  if(waited >= _timeout) break;
7260  }
7261  }
7262  if(!_q.empty())
7263  {
7264  current_ = _q.front();
7265  if(_q.size() == _maxDepth) _lock.signalAll();
7266  _q.pop_front();
7267  if(_state == Conflate)
7268  {
7269  std::string sowKey = current_.getSowKey();
7270  if(sowKey.length()) _sowKeyMap.erase(sowKey);
7271  }
7272  else if(_state == AcksOnly)
7273  {
7274  _requestedAcks &= ~(current_.getAckTypeEnum());
7275  }
7276  if((_state == AcksOnly && _requestedAcks == 0) ||
7277  (_state == SOWOnly && current_.getCommand()=="group_end"))
7278  {
7279  _state = Closed;
7280  }
7281  else if (current_.getCommandEnum() == Message::Command::Publish &&
7282  _client.isValid() && _client.getAutoAck() &&
7283  !current_.getLeasePeriod().empty() &&
7284  !current_.getBookmark().empty())
7285  {
7286  _previousTopic = current_.getTopic().deepCopy();
7287  _previousBookmark = current_.getBookmark().deepCopy();
7288  }
7289  return true;
7290  }
7291  if(_state == Disconnected)
7292  {
7293  throw DisconnectedException("Connection closed.");
7294  }
7295  current_.invalidate();
7296  if(_state == Closed)
7297  {
7298  return false;
7299  }
7300  return _timeout != 0;
7301  }
7302  void close(void)
7303  {
7304  if((_state==SOWOnly || _state==Subscribe) && _client.isValid()) //not delete
7305  {
7306  _client.unsubscribe(_commandId);
7307  }
7308  if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7309  {
7310  _state = Closed;
7311  }
7312  }
7313  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
7314  {
7315  Lock<Mutex> lock(this_->_lock);
7316  if(this_->_state != Conflate)
7317  {
7318  AMPS_TESTING_SLOW_MESSAGE_STREAM
7319  if(this_->_q.size() >= this_->_maxDepth)
7320  {
7321  // We throw here so that heartbeats can be sent. The exception
7322  // will be handled internally only, and the same Message will
7323  // come back to try again. Make sure to signal.
7324  this_->_lock.signalAll();
7325  throw MessageStreamFullException("Stream is currently full.");
7326  }
7327  this_->_q.push_back(message_.deepCopy());
7328  if (message_.getCommandEnum() == Message::Command::Publish &&
7329  this_->_client.isValid() && this_->_client.getAutoAck() &&
7330  !message_.getLeasePeriod().empty() &&
7331  !message_.getBookmark().empty())
7332  {
7333  message_.setIgnoreAutoAck();
7334  }
7335  }
7336  else
7337  {
7338  std::string sowKey = message_.getSowKey();
7339  if(sowKey.length())
7340  {
7341  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7342  if(it != this_->_sowKeyMap.end())
7343  {
7344  *(it->second) = message_.deepCopy();
7345  }
7346  else
7347  {
7348  if(this_->_q.size() >= this_->_maxDepth)
7349  {
7350  // We throw here so that heartbeats can be sent. The
7351  // exception will be handled internally only, and the
7352  // same Message will come back to try again. Make sure
7353  // to signal.
7354  this_->_lock.signalAll();
7355  throw MessageStreamFullException("Stream is currently full.");
7356  }
7357  this_->_q.push_back(message_.deepCopy());
7358  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7359  }
7360  }
7361  else
7362  {
7363  while(this_->_q.size() >= this_->_maxDepth)
7364  {
7365  this_->_lock.wait(1);
7366  }
7367  this_->_q.push_back(message_.deepCopy());
7368  }
7369  }
7370  this_->_lock.signalAll();
7371  }
7372 };
7373 inline MessageStream::MessageStream(void)
7374 {
7375 }
7376 inline MessageStream::MessageStream(const Client& client_)
7377  :_body(new MessageStreamImpl(client_))
7378 {
7379 }
7380 inline void MessageStream::iterator::advance(void)
7381 {
7382  _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7383 }
7384 inline MessageStream::operator MessageHandler(void)
7385 {
7386  return MessageHandler((void(*)(const Message&,void*))MessageStreamImpl::_messageHandler, &_body.get());
7387 }
7388 inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
7389 {
7390  MessageStream result;
7391  if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7392  {
7393  result._body = (MessageStreamImpl*)(handler_._userData);
7394  }
7395  return result;
7396 }
7397 
7398 inline void MessageStream::setSOWOnly(const std::string& commandId_)
7399 {
7400  _body->setSOWOnly(commandId_);
7401 }
7402 inline void MessageStream::setSubscription(const std::string& commandId_)
7403 {
7404  _body->setSubscription(commandId_);
7405 }
7406 inline void MessageStream::setStatsOnly(const std::string& commandId_)
7407 {
7408  _body->setStatsOnly(commandId_);
7409 }
7410 inline void MessageStream::setAcksOnly(const std::string& commandId_,
7411  unsigned acks_)
7412 {
7413  _body->setAcksOnly(commandId_, acks_);
7414 }
7415 inline MessageStream MessageStream::timeout(unsigned timeout_)
7416 {
7417  _body->timeout(timeout_);
7418  return *this;
7419 }
7420 inline MessageStream MessageStream::conflate(void)
7421 {
7422  _body->conflate();
7423  return *this;
7424 }
7425 inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
7426 {
7427  _body->maxDepth(maxDepth_);
7428  return *this;
7429 }
7430 inline unsigned MessageStream::getMaxDepth(void) const
7431 {
7432  return _body->getMaxDepth();
7433 }
7434 inline unsigned MessageStream::getDepth(void) const
7435 {
7436  return _body->getDepth();
7437 }
7438 
7439 inline MessageStream ClientImpl::getEmptyMessageStream(void)
7440 {
7441  return *(_pEmptyMessageStream.get());
7442 }
7443 
7445 {
7446  // If the command is sow and has a sub_id, OR
7447  // if the command has a replace option, return the existing
7448  // messagestream, don't create a new one.
7449  Message& message = command_.getMessage();
7450  Field subId = message.getSubscriptionId();
7451  unsigned ackTypes = message.getAckTypeEnum();
7452  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace",7)) || message.getCommandEnum() == Message::Command::SOW);
7453  if(useExistingHandler)
7454  {
7455  // Try to find the existing message handler.
7456  if(!subId.empty())
7457  {
7458  MessageHandler existingHandler;
7459  if (_body.get()._routes.getRoute(subId, existingHandler))
7460  {
7461  // we found an existing handler. It might not be a message stream, but that's okay.
7462  _body.get().executeAsync(command_, existingHandler, false);
7463  return MessageStream::fromExistingHandler(existingHandler);
7464  }
7465  }
7466  // fall through; we'll a new handler altogether.
7467  }
7468  Message::Command::Type command = command_.getMessage().getCommandEnum();
7469  if ((command == Message::Command::Publish ||
7470  command == Message::Command::DeltaPublish) &&
7471  (ackTypes == Message::AckType::Persisted ||
7472  ackTypes == Message::AckType::None))
7473  {
7474  executeAsync(command_, MessageHandler());
7475  if (!_body.get()._pEmptyMessageStream)
7476  {
7477  _body.get()._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
7478  _body.get()._pEmptyMessageStream.get()->_body->close();
7479  }
7480  return _body.get().getEmptyMessageStream();
7481  }
7482  MessageStream stream(*this);
7483  if (_body.get().getDefaultMaxDepth())
7484  stream.maxDepth(_body.get().getDefaultMaxDepth());
7485  MessageHandler handler = stream.operator MessageHandler();
7486  std::string commandID = _body.get().executeAsync(command_, handler, false);
7487  if (command_.hasStatsAck())
7488  {
7489  stream.setStatsOnly(commandID);
7490  }
7491  else if (command_.isSow())
7492  {
7493  stream.setSOWOnly(commandID);
7494  }
7495  else if (command_.isSubscribe())
7496  {
7497  stream.setSubscription(commandID);
7498  }
7499  else
7500  {
7501  // Persisted acks for writes don't come back with command id
7502  if (command == Message::Command::Publish ||
7503  command == Message::Command::DeltaPublish ||
7504  command == Message::Command::SOWDelete)
7505  {
7506  stream.setAcksOnly(commandID,
7507  ackTypes & (unsigned)~Message::AckType::Persisted);
7508  }
7509  else
7510  {
7511  stream.setAcksOnly(commandID, ackTypes);
7512  }
7513  }
7514  return stream;
7515 }
7516 
7517 // This is here because it uses api from Client.
7518 inline void Message::ack(const char* options_) const
7519 {
7520  ClientImpl* pClient = _body.get().clientImpl();
7521  Message::Field bookmark = getBookmark();
7522  if(pClient && bookmark.len() &&
7523  !pClient->getAutoAck())
7524  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
7525  pClient->ack(getTopic(),bookmark,options_);
7526 }
7527 }// end namespace AMPS
7528 #endif
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:530
void prune(std::string tmpFileName_="")
Used to trim the size of a store&#39;s storage.
Definition: ampsplusplus.hpp:1043
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:212
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1173
void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: ampsplusplus.hpp:947
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4250
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:958
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:404
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4446
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:978
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:6965
void startTimer()
Sends a message to AMPS requesting that AMPS start a server-side timer.
Definition: ampsplusplus.hpp:5803
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5360
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1143
void setMaxSubIdLength(size_t maxSubIdLength_)
Sets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:1076
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:570
std::string subscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4927
Abstract base class for storing received bookmarks for HA clients.
Definition: ampsplusplus.hpp:679
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1062
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1057
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:461
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:885
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6057
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: ampsplusplus.hpp:1022
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a defualt max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6318
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4351
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5157
void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a Message.
Definition: ampsplusplus.hpp:897
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1060
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1115
virtual void setResizeHandler(BookmarkStoreResizeHandler handler_, void *userData_)
Set a handler on the bookmark store that will get called whenever a resize of the store is required d...
Definition: ampsplusplus.hpp:758
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: ampsplusplus.hpp:992
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6327
void ack(Message &message_, const char *options_=NULL)
ACK a message queue message by supplying the message directly.
Definition: ampsplusplus.hpp:6198
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5112
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:515
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: ampsplusplus.hpp:886
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4570
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4824
bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: ampsplusplus.hpp:924
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4366
DisconnectHandler getDisconnectHandler(void)
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4496
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:979
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:615
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4586
bool(* BookmarkStoreResizeHandler)(BookmarkStore store_, const Message::Field &subId_, size_t size_, void *userData_)
Function type for BookmarkStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:672
void purge()
Called to purge the contents of this store.
Definition: ampsplusplus.hpp:936
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:430
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6084
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:604
int getAckTimeout(void) const
Returns the current value of the ack timeout setting.
Definition: ampsplusplus.hpp:6276
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4337
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5867
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1041
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1332
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7002
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6338
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4722
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4195
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1055
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5546
bool ThrowawayBookmarkResizeHandler(BookmarkStore store_, const Message::Field &subId_, size_t newSize_, void *data_)
A BookmarkStoreResizeHandler that discards the oldest bookmark assuming that it was used but not disc...
Definition: ampsplusplus.hpp:1098
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4422
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1072
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:1195
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6300
BookmarkStore(BookmarkStoreImpl *impl_)
Creates a BookmarkStore based on the given implementation.
Definition: ampsplusplus.hpp:843
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:958
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4397
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1426
Success.
Definition: amps.h:116
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:945
std::string deltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5010
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: ampsplusplus.hpp:1032
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:637
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4803
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7047
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:106
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:445
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4435
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4191
amps_result
Return values from amps_xxx functions.
Definition: amps.h:111
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4623
void setResizeHandler(BookmarkStoreResizeHandler handler_, void *userData_)
Set a handler on the bookmark store that will get called whenever a resize of the store is required d...
Definition: ampsplusplus.hpp:959
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:991
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:955
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7444
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:528
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:978
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4578
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5328
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4326
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:568
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1460
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:540
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server.
Definition: ampsplusplus.hpp:4869
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1265
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5643
std::string send(MessageHandler messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4474
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5502
The main class for interacting with AMPS.
Definition: ampsplusplus.hpp:4233
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6036
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:956
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:6076
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:476
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1063
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4671
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:958
void publishFlush(long timeout_=0)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4763
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:978
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4534
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:468
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:980
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:956
std::string sowDelete(MessageHandler messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5755
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:5965
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:654
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1456
void setImplementation(BookmarkStoreImpl *impl_)
Sets the BookmarkStore to use the given implementation.
Definition: ampsplusplus.hpp:857
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1051
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:513
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:557
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5442
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: ampsplusplus.hpp:101
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1052
void setDisconnectHandler(DisconnectHandler disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4488
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:517
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:93
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1286
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:649
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5028
Interface for BookmarkStoreImpl classes.
Definition: ampsplusplus.hpp:833
std::string logon(const char *options_, int timeout_=0)
Logon to the server.
Definition: ampsplusplus.hpp:4886
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:550
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:957
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1072
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: ampsplusplus.hpp:105
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:631
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4306
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:955
std::string bookmarkSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5084
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:6975
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
ACK a message queue message by supplying a topic and bookmark string.
Definition: ampsplusplus.hpp:6208
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:644
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1051
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:1137
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5396
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1220
void persisted(const Message::Field &subId_, size_t bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: ampsplusplus.hpp:1002
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6984
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:955
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: ampsplusplus.hpp:109
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1346
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4526
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1303
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the ack batch size setting.
Definition: ampsplusplus.hpp:6267
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1076
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1059
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1374
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5778
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1061
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4518
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:121
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1052
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:511
void setExceptionListener(const ExceptionListener &listener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:5958
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:484
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4546
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Deprecated. Use setLastChanceMessageHandler instead.
Definition: ampsplusplus.hpp:6006
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6995
unsigned getAckBatchSize(void) const
Returns the value of the ack batch size setting.
Definition: ampsplusplus.hpp:6257
size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark sequence in the store.
Definition: ampsplusplus.hpp:981
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4782
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5481
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4162
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: ampsplusplus.hpp:873
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5718
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1295
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:991
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6110
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5679
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server.
Definition: ampsplusplus.hpp:4902
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5169
void setAutoAck(bool isAutoAckEnabled_)
Sets the auto-ack setting on this client.
Definition: ampsplusplus.hpp:6249
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5044
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1062
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1256
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1360
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4601
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7039
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:991
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4505
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
ACK a message queue message by supplying a topic and bookmark.
Definition: ampsplusplus.hpp:6188
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7051
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1241
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7058
void noPersistedAcks(const Message::Field &subId_)
Internally used to indicate when a subscription is placed without requesting persisted acks...
Definition: ampsplusplus.hpp:1012
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:235
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5579
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6000
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:519
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4983
An iterable object representing the contents of an AMPS topic.
Definition: ampsplusplus.hpp:4154
std::string sow(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5204
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:956
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1324
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
std::string sow(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5296
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:552
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1077
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:811
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4404
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5242
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:536
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1057
The operation has not succeeded, but ought to be retried.
Definition: amps.h:140
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4313
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:5983
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4373
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:533
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1274
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1064
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
std::string sowDeleteByData(MessageHandler messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5902
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1021
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1052
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:523
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5946
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7012
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4615
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5600
void setAckTimeout(const int ackTimeout_)
Sets the message queue ACK timeout value.
Definition: ampsplusplus.hpp:6286
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:509
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1076
bool getAutoAck(void) const
Returns the value of the auto-ack setting.
Definition: ampsplusplus.hpp:6239
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4646
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:1064
Definition: ampsplusplus.hpp:136
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4288
std::string stopTimer(MessageHandler messageHandler)
Sends a message to AMPS requesting that AMPS stop the previously started timer.
Definition: ampsplusplus.hpp:5814
size_t getOldestBookmarkSeq(const std::string &subId_)
Called to find the oldest bookmark in the store.
Definition: ampsplusplus.hpp:969
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:600
std::string sowDeleteByKeys(MessageHandler messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5840
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1316
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1402
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1199
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5259
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4695
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6013
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: ampsplusplus.hpp:909
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4848
The client and server are disconnected.
Definition: amps.h:144
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5922
void setMaxSubIdLength(size_t maxSubIdLength_)
Sets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:820
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4295
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:385
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1058
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4953
static const size_t BOOKMARK_NONE
An indicator of no bookmark value.
Definition: Message.hpp:408
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5131
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:496
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6144
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4206
BookmarkStore()
Creates a BookmarkStore that does nothing.
Definition: ampsplusplus.hpp:839
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6309
void flushAcks(void)
Sends any queued message queue ACK messages to the server immediately.
Definition: ampsplusplus.hpp:6230