AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps.h"
28 #include "ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43  #include <inttypes.h>
44 #endif
45 #if defined(sun)
46  #include <sys/atomic.h>
47 #endif
48 #include "BookmarkStore.hpp"
49 #include "MessageRouter.hpp"
50 #include "util.hpp"
51 #include "ampscrc.hpp"
52 
53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
54  #define AMPS_TESTING_SLOW_MESSAGE_STREAM
55 #endif
56 
61 
62 
69 
80 
81 // For StoreBuffer implementations
82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
88 #define AMPS_DEFAULT_TOP_N -1
89 #define AMPS_DEFAULT_BATCH_SIZE 10
90 #define AMPS_NUMBER_BUFFER_LEN 20
91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
92 
93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
94  #define AMPS_X64 1
95 #endif
96 
97 #ifdef _WIN32
98  static __declspec ( thread ) AMPS::Message* publishStoreMessage = 0;
99 #else
100  static __thread AMPS::Message* publishStoreMessage = 0;
101 #endif
102 
103 namespace AMPS
104 {
105 
106  typedef std::map<std::string, std::string> ConnectionInfo;
107 
108  class PerThreadMessageTracker
109  {
110  std::vector<AMPS::Message*> _messages;
111  public:
112  PerThreadMessageTracker() {}
113  ~PerThreadMessageTracker()
114  {
115  for (size_t i = 0; i < _messages.size(); ++i)
116  {
117  delete _messages[i];
118  }
119  }
120  void addMessage(AMPS::Message* message)
121  {
122  _messages.push_back(message);
123  }
124  static void addMessageToCleanupList(AMPS::Message* message)
125  {
126  static AMPS::Mutex _lock;
127  AMPS::Lock<Mutex> l(_lock);
128  _addMessageToCleanupList(message);
129  }
130  static void _addMessageToCleanupList(AMPS::Message* message)
131  {
132  static PerThreadMessageTracker tracker;
133  tracker.addMessage(message);
134  }
135  };
136 
137  template<class Type>
138  inline std::string asString(Type x_)
139  {
140  std::ostringstream os;
141  os << x_;
142  return os.str();
143  }
144 
145  inline
146  size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
147  {
148  size_t pos = AMPS_NUMBER_BUFFER_LEN;
149  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
150  {
151  if (seqNo_ > 0)
152  {
153  buf_[--pos] = (char)(seqNo_ % 10 + '0');
154  seqNo_ /= 10;
155  }
156  }
157  return pos;
158  }
159 
160 #ifdef _WIN32
161  inline
162  size_t convertToCharArray(char* buf_, unsigned long seqNo_)
163  {
164  size_t pos = AMPS_NUMBER_BUFFER_LEN;
165  for (int i = 0; i < AMPS_NUMBER_BUFFER_LEN; ++i)
166  {
167  if (seqNo_ > 0)
168  {
169  buf_[--pos] = (char)(seqNo_ % 10 + '0');
170  seqNo_ /= 10;
171  }
172  }
173  return pos;
174  }
175 #endif
176 
180  class Reason
181  {
182  public:
183  static const char* duplicate()
184  {
185  return "duplicate";
186  }
187  static const char* badFilter()
188  {
189  return "bad filter";
190  }
191  static const char* badRegexTopic()
192  {
193  return "bad regex topic";
194  }
195  static const char* subscriptionAlreadyExists()
196  {
197  return "subscription already exists";
198  }
199  static const char* nameInUse()
200  {
201  return "name in use";
202  }
203  static const char* authFailure()
204  {
205  return "auth failure";
206  }
207  static const char* notEntitled()
208  {
209  return "not entitled";
210  }
211  static const char* authDisabled()
212  {
213  return "authentication disabled";
214  }
215  static const char* subidInUse()
216  {
217  return "subid in use";
218  }
219  static const char* noTopic()
220  {
221  return "no topic";
222  }
223  };
224 
234  {
235  public:
236  virtual ~ExceptionListener() {;}
237  virtual void exceptionThrown(const std::exception&) const {;}
238  };
239 
241 
242 
243 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
244  try\
245  {\
246  x;\
247  }\
248  catch (std::exception& ex_)\
249  {\
250  try\
251  {\
252  _exceptionListener->exceptionThrown(ex_);\
253  }\
254  catch(...)\
255  {\
256  ;\
257  }\
258  }
259  /*
260  * Note : we don't attempt to trap non std::exception exceptions
261  * here because doing so interferes with pthread_exit on some OSes.
262  catch (...)\
263  {\
264  try\
265  {\
266  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
267  "An unhandled exception of unknown type was thrown by "\
268  "the registered handler.", AMPS_E_USAGE));\
269  }\
270  catch(...)\
271  {\
272  ;\
273  }\
274  }
275  */
276 #ifdef _WIN32
277 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
278  try\
279  {\
280  while(me->_connected)\
281  {\
282  try\
283  {\
284  x;\
285  break;\
286  }\
287  catch(MessageStreamFullException&)\
288  {\
289  me->checkAndSendHeartbeat(false);\
290  }\
291  }\
292  }\
293  catch (std::exception& ex_)\
294  {\
295  try\
296  {\
297  me->_exceptionListener->exceptionThrown(ex_);\
298  }\
299  catch(...)\
300  {\
301  ;\
302  }\
303  }
304  /*
305  * Note : we don't attempt to trap non std::exception exceptions
306  * here because doing so interferes with pthread_exit on some OSes.
307  catch (...)\
308  {\
309  try\
310  {\
311  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
312  "An unhandled exception of unknown type was thrown by "\
313  "the registered handler.", AMPS_E_USAGE));\
314  }\
315  catch(...)\
316  {\
317  ;\
318  }\
319  }*/
320 
321 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
322  while(me->_connected)\
323  {\
324  try\
325  {\
326  x;\
327  break;\
328  }\
329  catch(MessageStreamFullException&)\
330  {\
331  me->checkAndSendHeartbeat(false);\
332  }\
333  }
334 #else
335 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
336  try\
337  {\
338  while(me->_connected)\
339  {\
340  try\
341  {\
342  x;\
343  break;\
344  }\
345  catch(MessageStreamFullException& ex_)\
346  {\
347  me->checkAndSendHeartbeat(false);\
348  }\
349  }\
350  }\
351  catch (std::exception& ex_)\
352  {\
353  try\
354  {\
355  me->_exceptionListener->exceptionThrown(ex_);\
356  }\
357  catch(...)\
358  {\
359  ;\
360  }\
361  }
362  /*
363  * Note : we don't attempt to trap non std::exception exceptions
364  * here because doing so interferes with pthread_exit on some OSes.
365  catch (...)\
366  {\
367  try\
368  {\
369  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
370  "An unhandled exception of unknown type was thrown by "\
371  "the registered handler.", AMPS_E_USAGE));\
372  }\
373  catch(...)\
374  {\
375  ;\
376  }\
377  }*/
378 
379 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
380  while(me->_connected)\
381  {\
382  try\
383  {\
384  x;\
385  break;\
386  }\
387  catch(MessageStreamFullException& ex_)\
388  {\
389  me->checkAndSendHeartbeat(false);\
390  }\
391  }
392 #endif
393 
394 #define AMPS_UNHANDLED_EXCEPTION(ex) \
395  try\
396  {\
397  _exceptionListener->exceptionThrown(ex);\
398  }\
399  catch(...)\
400  {;}
401 
402 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
403  try\
404  {\
405  me->_exceptionListener->exceptionThrown(ex);\
406  }\
407  catch(...)\
408  {;}
409 
410 
411  class Client;
412 
437 
438  class Command
439  {
440  Message _message;
441  unsigned _timeout;
442  unsigned _batchSize;
443  unsigned _flags;
444  static const unsigned Subscribe = 1;
445  static const unsigned SOW = 2;
446  static const unsigned NeedsSequenceNumber = 4;
447  static const unsigned ProcessedAck = 8;
448  static const unsigned StatsAck = 16;
449  void init(Message::Command::Type command_)
450  {
451  _timeout = 0;
452  _batchSize = 0;
453  _flags = 0;
454  _message.reset();
455  _message.setCommandEnum(command_);
456  _setIds();
457  }
458  void init(const std::string& command_)
459  {
460  _timeout = 0;
461  _batchSize = 0;
462  _flags = 0;
463  _message.reset();
464  _message.setCommand(command_);
465  _setIds();
466  }
467  void _setIds(void)
468  {
469  Message::Command::Type command = _message.getCommandEnum();
470  if (!(command & Message::Command::NoDataCommands))
471  {
472  _message.newCommandId();
473  if (command == Message::Command::Subscribe ||
474  command == Message::Command::SOWAndSubscribe ||
475  command == Message::Command::DeltaSubscribe ||
476  command == Message::Command::SOWAndDeltaSubscribe)
477  {
478  _message.setSubscriptionId(_message.getCommandId());
479  _flags |= Subscribe;
480  }
481  if (command == Message::Command::SOW
482  || command == Message::Command::SOWAndSubscribe
483  || command == Message::Command::SOWAndDeltaSubscribe)
484  {
485  _message.setQueryID(_message.getCommandId());
486  if (_batchSize == 0)
487  {
488  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
489  }
490  if (command == Message::Command::SOW)
491  {
492  _flags |= SOW;
493  }
494  }
495  _flags |= ProcessedAck;
496  }
497  else if (command == Message::Command::SOWDelete)
498  {
499  _message.newCommandId();
500  _flags |= ProcessedAck;
501  _flags |= NeedsSequenceNumber;
502  }
503  else if (command == Message::Command::Publish
504  || command == Message::Command::DeltaPublish)
505  {
506  _flags |= NeedsSequenceNumber;
507  }
508  else if (command == Message::Command::StopTimer)
509  {
510  _message.newCommandId();
511  }
512  }
513  public:
517  Command(const std::string& command_)
518  {
519  init(command_);
520  }
524  Command(Message::Command::Type command_)
525  {
526  init(command_);
527  }
528 
532  Command& reset(const std::string& command_)
533  {
534  init(command_);
535  return *this;
536  }
540  Command& reset(Message::Command::Type command_)
541  {
542  init(command_);
543  return *this;
544  }
552  Command& setSowKey(const std::string& sowKey_)
553  {
554  _message.setSowKey(sowKey_);
555  return *this;
556  }
569  Command& setSowKeys(const std::string& sowKeys_)
570  {
571  _message.setSowKeys(sowKeys_);
572  return *this;
573  }
575  Command& setCommandId(const std::string& cmdId_)
576  {
577  _message.setCommandId(cmdId_);
578  return *this;
579  }
581  Command& setTopic(const std::string& topic_)
582  {
583  _message.setTopic(topic_);
584  return *this;
585  }
587  Command& setFilter(const std::string& filter_)
588  {
589  _message.setFilter(filter_);
590  return *this;
591  }
593  Command& setOrderBy(const std::string& orderBy_)
594  {
595  _message.setOrderBy(orderBy_);
596  return *this;
597  }
599  Command& setSubId(const std::string& subId_)
600  {
601  _message.setSubscriptionId(subId_);
602  return *this;
603  }
605  Command& setQueryId(const std::string& queryId_)
606  {
607  _message.setQueryId(queryId_);
608  return *this;
609  }
615  Command& setBookmark(const std::string& bookmark_)
616  {
617  _message.setBookmark(bookmark_);
618  return *this;
619  }
626  Command& setCorrelationId(const std::string& correlationId_)
627  {
628  _message.setCorrelationId(correlationId_);
629  return *this;
630  }
633  Command& setOptions(const std::string& options_)
634  {
635  _message.setOptions(options_);
636  return *this;
637  }
641  Command& setOptions(const char* options_, size_t optionsLen_)
642  {
643  _message.setOptions(options_, optionsLen_);
644  return *this;
645  }
647  Command& setSequence(const std::string& seq_)
648  {
649  _message.setSequence(seq_);
650  return *this;
651  }
653  Command& setSequence(const amps_uint64_t seq_)
654  {
655  std::ostringstream os;
656  os << seq_;
657  _message.setSequence(os.str());
658  return *this;
659  }
660  amps_uint64_t getSequence() const
661  {
662  return amps_message_get_field_uint64(_message.getMessage(), AMPS_Sequence);
663  }
666  Command& setData(const std::string& data_)
667  {
668  _message.setData(data_);
669  return *this;
670  }
674  Command& setData(const char* data_, size_t dataLen_)
675  {
676  _message.setData(data_, dataLen_);
677  return *this;
678  }
688  Command& setTimeout(unsigned timeout_)
689  {
690  _timeout = timeout_;
691  return *this;
692  }
694  Command& setTopN(unsigned topN_)
695  {
696  _message.setTopNRecordsReturned(topN_);
697  return *this;
698  }
703  Command& setBatchSize(unsigned batchSize_)
704  {
705  _message.setBatchSize(batchSize_);
706  _batchSize = batchSize_;
707  return *this;
708  }
719  Command& setExpiration(unsigned expiration_)
720  {
721  _message.setExpiration(expiration_);
722  return *this;
723  }
725  Command& addAckType(const std::string& ackType_)
726  {
727  _message.setAckType(_message.getAckType() + "," + ackType_);
728  if (ackType_ == "processed")
729  {
730  _flags |= ProcessedAck;
731  }
732  else if (ackType_ == "stats")
733  {
734  _flags |= StatsAck;
735  }
736  return *this;
737  }
739  Command& setAckType(const std::string& ackType_)
740  {
741  _message.setAckType(ackType_);
742  if (ackType_.find("processed") != std::string::npos)
743  {
744  _flags |= ProcessedAck;
745  }
746  else
747  {
748  _flags &= ~ProcessedAck;
749  }
750  if (ackType_.find("stats") != std::string::npos)
751  {
752  _flags |= StatsAck;
753  }
754  else
755  {
756  _flags &= ~StatsAck;
757  }
758  return *this;
759  }
761  Command& setAckType(unsigned ackType_)
762  {
763  _message.setAckTypeEnum(ackType_);
764  if (ackType_ & Message::AckType::Processed)
765  {
766  _flags |= ProcessedAck;
767  }
768  else
769  {
770  _flags &= ~ProcessedAck;
771  }
772  if (ackType_ & Message::AckType::Stats)
773  {
774  _flags |= StatsAck;
775  }
776  else
777  {
778  _flags &= ~StatsAck;
779  }
780  return *this;
781  }
783  std::string getAckType() const
784  {
785  return (std::string)(_message.getAckType());
786  }
788  unsigned getAckTypeEnum() const
789  {
790  return _message.getAckTypeEnum();
791  }
792 
793  Message& getMessage(void)
794  {
795  return _message;
796  }
797  unsigned getTimeout(void) const
798  {
799  return _timeout;
800  }
801  unsigned getBatchSize(void) const
802  {
803  return _batchSize;
804  }
805  bool isSubscribe(void) const
806  {
807  return _flags & Subscribe;
808  }
809  bool isSow(void) const
810  {
811  return (_flags & SOW) != 0;
812  }
813  bool hasProcessedAck(void) const
814  {
815  return (_flags & ProcessedAck) != 0;
816  }
817  bool hasStatsAck(void) const
818  {
819  return (_flags & StatsAck) != 0;
820  }
821  bool needsSequenceNumber(void) const
822  {
823  return (_flags & NeedsSequenceNumber) != 0;
824  }
825  };
826 
829  typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
830 
831  class Message;
833 
837  {
838  public:
839  virtual ~Authenticator() {;}
840 
846  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
854  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
861  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
862  };
863 
868  {
869  public:
870  virtual ~DefaultAuthenticator() {;}
873  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
874  {
875  return password_;
876  }
877 
880  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
881  {
882  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
883  }
884 
885  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
886 
891  {
892  static DefaultAuthenticator d;
893  return d;
894  }
895  };
896 
900  {
901  public:
902 
906  virtual void execute(Message& message_) = 0;
907 
908  virtual ~StoreReplayer() {;}
909  };
910 
911  class Store;
912 
921  typedef bool (*PublishStoreResizeHandler)(Store store_,
922  size_t size_,
923  void* userData_);
924 
927  class StoreImpl : public RefBody
928  {
929  public:
930  StoreImpl()
931  : _resizeHandler(NULL)
932  , _resizeHandlerData(NULL)
933  {;}
934 
939  virtual amps_uint64_t store(const Message& message_) = 0;
940 
945  virtual void discardUpTo(amps_uint64_t index_) = 0;
946 
951  virtual void replay(StoreReplayer& replayer_) = 0;
952 
960  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
961 
966  virtual size_t unpersistedCount() const = 0;
967 
968  virtual ~StoreImpl() {;}
969 
978  virtual void flush(long timeout_) = 0;
979 
982  static inline size_t getUnsetPosition()
983  {
984  return AMPS_UNSET_INDEX;
985  }
986 
989  static inline amps_uint64_t getUnsetSequence()
990  {
991  return AMPS_UNSET_SEQUENCE;
992  }
993 
997  virtual amps_uint64_t getLowestUnpersisted() const = 0;
998 
1002  virtual amps_uint64_t getLastPersisted() = 0;
1003 
1013  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
1014  void* userData_)
1015  {
1016  _resizeHandler = handler_;
1017  _resizeHandlerData = userData_;
1018  }
1019 
1020  inline virtual PublishStoreResizeHandler getResizeHandler() const
1021  {
1022  return _resizeHandler;
1023  }
1024 
1025  bool callResizeHandler(size_t newSize_);
1026 
1027  private:
1028  PublishStoreResizeHandler _resizeHandler;
1029  void* _resizeHandlerData;
1030  };
1031 
1034  class Store
1035  {
1036  RefHandle<StoreImpl> _body;
1037  public:
1038  Store() {;}
1039  Store(StoreImpl* body_) : _body(body_) {;}
1040  Store(const Store& rhs) : _body(rhs._body) {;}
1041  Store& operator=(const Store& rhs)
1042  {
1043  _body = rhs._body;
1044  return *this;
1045  }
1046 
1050  amps_uint64_t store(const Message& message_)
1051  {
1052  return _body.get().store(message_);
1053  }
1054 
1059  void discardUpTo(amps_uint64_t index_)
1060  {
1061  _body.get().discardUpTo(index_);
1062  }
1063 
1068  void replay(StoreReplayer& replayer_)
1069  {
1070  _body.get().replay(replayer_);
1071  }
1072 
1080  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
1081  {
1082  return _body.get().replaySingle(replayer_, index_);
1083  }
1084 
1089  size_t unpersistedCount() const
1090  {
1091  return _body.get().unpersistedCount();
1092  }
1093 
1097  bool isValid() const
1098  {
1099  return _body.isValid();
1100  }
1101 
1110  void flush(long timeout_ = 0)
1111  {
1112  return _body.get().flush(timeout_);
1113  }
1114 
1118  amps_uint64_t getLowestUnpersisted()
1119  {
1120  return _body.get().getLowestUnpersisted();
1121  }
1122 
1126  amps_uint64_t getLastPersisted()
1127  {
1128  return _body.get().getLastPersisted();
1129  }
1130 
1140  void setResizeHandler(PublishStoreResizeHandler handler_,
1141  void* userData_)
1142  {
1143  _body.get().setResizeHandler(handler_, userData_);
1144  }
1145 
1146  PublishStoreResizeHandler getResizeHandler()
1147  {
1148  return _body.get().getResizeHandler();
1149  }
1150 
1154  StoreImpl* get()
1155  {
1156  if (_body.isValid())
1157  {
1158  return &_body.get();
1159  }
1160  else
1161  {
1162  return NULL;
1163  }
1164  }
1165 
1166  };
1167 
1173  {
1174  public:
1175  virtual ~FailedWriteHandler() {;}
1182  virtual void failedWrite(const Message& message_,
1183  const char* reason_, size_t reasonLength_) = 0;
1184  };
1185 
1186 
1187  inline bool StoreImpl::callResizeHandler(size_t newSize_)
1188  {
1189  if (_resizeHandler)
1190  {
1191  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1192  }
1193  return true;
1194  }
1195 
1202  inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1203  void* data_)
1204  {
1205  long* timeoutp = (long*)data_;
1206  size_t count = store_.unpersistedCount();
1207  if (count == 0)
1208  {
1209  return false;
1210  }
1211  try
1212  {
1213  store_.flush(*timeoutp);
1214  }
1215 #ifdef _WIN32
1216  catch (const TimedOutException&)
1217 #else
1218  catch (const TimedOutException& e)
1219 #endif
1220  {
1221  return true;
1222  }
1223  return (count == store_.unpersistedCount());
1224  }
1225 
1230  {
1231  public:
1232  virtual ~SubscriptionManager() {;}
1240  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1241  unsigned requestedAckTypes_) = 0;
1245  virtual void unsubscribe(const Message::Field& subId_) = 0;
1248  virtual void clear() = 0;
1252  virtual void resubscribe(Client& client_) = 0;
1254  };
1255 
1259 
1261  {
1262  public:
1264  typedef enum { Disconnected = 0,
1265  Shutdown = 1,
1266  Connected = 2,
1267  LoggedOn = 4,
1268  PublishReplayed = 8,
1269  HeartbeatInitiated = 16,
1270  Resubscribed = 32,
1271  UNKNOWN = 16384
1272  } State;
1273 
1283  virtual void connectionStateChanged(State newState_) = 0;
1284  virtual ~ConnectionStateListener() {;};
1285  };
1286 
1287 
1288  class MessageStreamImpl;
1289  class MessageStream;
1290 
1291  typedef void(*DeferredExecutionFunc)(void*);
1292 
1293  class ClientImpl : public RefBody // -V553
1294  {
1295  // Class to wrap turning of Nagle for things like flush and logon
1296  class NoDelay
1297  {
1298  private:
1299  AMPS_SOCKET _socket;
1300  int _noDelay;
1301  char* _valuePtr;
1302 #ifdef _WIN32
1303  int _valueLen;
1304 #else
1305  socklen_t _valueLen;
1306 #endif
1307  public:
1308  NoDelay(amps_handle client_)
1309  : _socket(AMPS_INVALID_SOCKET), _noDelay(0), _valueLen(sizeof(int))
1310  {
1311  _valuePtr = (char*)&_noDelay;
1312  _socket = amps_client_get_socket(client_);
1313  if (_socket != AMPS_INVALID_SOCKET)
1314  {
1315  getsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, &_valueLen);
1316  if (!_noDelay)
1317  {
1318  _noDelay = 1;
1319  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1320  }
1321  else
1322  {
1323  _socket = AMPS_INVALID_SOCKET;
1324  }
1325  }
1326  }
1327 
1328  ~NoDelay()
1329  {
1330  if (_socket != AMPS_INVALID_SOCKET)
1331  {
1332  _noDelay = 0;
1333  setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, _valuePtr, _valueLen);
1334  }
1335  }
1336  };
1337 
1338  friend class Client;
1339  protected:
1340  amps_handle _client;
1341  DisconnectHandler _disconnectHandler;
1342  enum GlobalCommandTypeHandlers : size_t
1343  {
1344  Publish = 0,
1345  SOW = 1,
1346  GroupBegin = 2,
1347  GroupEnd = 3,
1348  Heartbeat = 4,
1349  OOF = 5,
1350  Ack = 6,
1351  LastChance = 7,
1352  DuplicateMessage = 8,
1353  COUNT = 9
1354  };
1355  std::vector<MessageHandler> _globalCommandTypeHandlers;
1356  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1357  MessageRouter _routes;
1358  MessageRouter::RouteCache _routeCache;
1359  mutable Mutex _lock;
1360  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1361  amps_uint64_t _nameHashValue;
1362  BookmarkStore _bookmarkStore;
1363  Store _publishStore;
1364  bool _isRetryOnDisconnect;
1365  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1366  volatile amps_uint64_t _lastSentHaSequenceNumber;
1367  ATOMIC_TYPE_8 _badTimeToHAPublish;
1368  ATOMIC_TYPE_8 _badTimeToHASubscribe;
1369  VersionInfo _serverVersion;
1370  Timer _heartbeatTimer;
1371  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1372 
1373  // queue data
1374  int _queueAckTimeout;
1375  bool _isAutoAckEnabled;
1376  unsigned _ackBatchSize;
1377  unsigned _queuedAckCount;
1378  unsigned _defaultMaxDepth;
1379  struct QueueBookmarks
1380  {
1381  QueueBookmarks(const std::string& topic_)
1382  : _topic(topic_)
1383  , _oldestTime(0)
1384  , _bookmarkCount(0)
1385  {;}
1386  std::string _topic;
1387  std::string _data;
1388  amps_uint64_t _oldestTime;
1389  unsigned _bookmarkCount;
1390  };
1391  typedef amps_uint64_t topic_hash;
1392  typedef std::map<topic_hash, QueueBookmarks> TopicHashMap;
1393  TopicHashMap _topicHashMap;
1394 
1395  class ClientStoreReplayer : public StoreReplayer
1396  {
1397  ClientImpl* _client;
1398  public:
1399  unsigned _version;
1400  amps_result _res;
1401 
1402  ClientStoreReplayer()
1403  : _client(NULL), _version(0), _res(AMPS_E_OK)
1404  {}
1405 
1406  ClientStoreReplayer(ClientImpl* client_)
1407  : _client(client_), _version(0), _res(AMPS_E_OK)
1408  {}
1409 
1410  void setClient(ClientImpl* client_)
1411  {
1412  _client = client_;
1413  }
1414 
1415  void execute(Message& message_)
1416  {
1417  if (!_client)
1418  {
1419  throw CommandException("Can't replay without a client.");
1420  }
1421  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1422  AMPS_Sequence);
1423  if (index > _client->_lastSentHaSequenceNumber)
1424  {
1425  _client->_lastSentHaSequenceNumber = index;
1426  }
1427 
1428  _res = AMPS_E_OK;
1429  // Don't replay a queue cancel message after a reconnect.
1430  // Currently, the only messages that will have anything in options
1431  // are cancel messages.
1432  if (!message_.getCommand().empty() &&
1433  (!_client->_badTimeToHAPublish ||
1434  message_.getOptions().len() < 6))
1435  {
1436  _res = amps_client_send_with_version(_client->_client,
1437  message_.getMessage(),
1438  &_version);
1439  if (_res != AMPS_E_OK)
1440  {
1441  throw DisconnectedException("AMPS Server disconnected during replay");
1442  }
1443  }
1444  }
1445 
1446  };
1447  ClientStoreReplayer _replayer;
1448 
1449  class FailedWriteStoreReplayer : public StoreReplayer
1450  {
1451  ClientImpl* _parent;
1452  const char* _reason;
1453  size_t _reasonLength;
1454  size_t _replayCount;
1455  public:
1456  FailedWriteStoreReplayer(ClientImpl* parent, const char* reason_, size_t reasonLength_)
1457  : _parent(parent),
1458  _reason(reason_),
1459  _reasonLength(reasonLength_),
1460  _replayCount(0)
1461  {;}
1462  void execute(Message& message_)
1463  {
1464  if (_parent->_failedWriteHandler)
1465  {
1466  ++_replayCount;
1467  _parent->_failedWriteHandler->failedWrite(message_,
1468  _reason, _reasonLength);
1469  }
1470  }
1471  size_t replayCount(void) const
1472  {
1473  return _replayCount;
1474  }
1475  };
1476 
1477  struct AckResponseImpl : public RefBody
1478  {
1479  std::string username, password, reason, status, bookmark, options;
1480  amps_uint64_t sequenceNo;
1481  amps_uint64_t nameHashValue;
1482  VersionInfo serverVersion;
1483  volatile bool responded, abandoned;
1484  unsigned connectionVersion;
1485  AckResponseImpl() :
1486  RefBody(),
1487  sequenceNo((amps_uint64_t)0),
1488  serverVersion(),
1489  responded(false),
1490  abandoned(false),
1491  connectionVersion(0)
1492  {
1493  }
1494  };
1495 
1496  class AckResponse
1497  {
1498  RefHandle<AckResponseImpl> _body;
1499  public:
1500  AckResponse() : _body(NULL) {;}
1501  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1502  static AckResponse create()
1503  {
1504  AckResponse r;
1505  r._body = new AckResponseImpl();
1506  return r;
1507  }
1508 
1509  const std::string& username()
1510  {
1511  return _body.get().username;
1512  }
1513  void setUsername(const char* data_, size_t len_)
1514  {
1515  if (data_)
1516  {
1517  _body.get().username.assign(data_, len_);
1518  }
1519  else
1520  {
1521  _body.get().username.clear();
1522  }
1523  }
1524  const std::string& password()
1525  {
1526  return _body.get().password;
1527  }
1528  void setPassword(const char* data_, size_t len_)
1529  {
1530  if (data_)
1531  {
1532  _body.get().password.assign(data_, len_);
1533  }
1534  else
1535  {
1536  _body.get().password.clear();
1537  }
1538  }
1539  const std::string& reason()
1540  {
1541  return _body.get().reason;
1542  }
1543  void setReason(const char* data_, size_t len_)
1544  {
1545  if (data_)
1546  {
1547  _body.get().reason.assign(data_, len_);
1548  }
1549  else
1550  {
1551  _body.get().reason.clear();
1552  }
1553  }
1554  const std::string& status()
1555  {
1556  return _body.get().status;
1557  }
1558  void setStatus(const char* data_, size_t len_)
1559  {
1560  if (data_)
1561  {
1562  _body.get().status.assign(data_, len_);
1563  }
1564  else
1565  {
1566  _body.get().status.clear();
1567  }
1568  }
1569  const std::string& bookmark()
1570  {
1571  return _body.get().bookmark;
1572  }
1573  void setBookmark(const Field& bookmark_)
1574  {
1575  if (!bookmark_.empty())
1576  {
1577  _body.get().bookmark.assign(bookmark_.data(), bookmark_.len());
1578  Field::parseBookmark(bookmark_, _body.get().nameHashValue,
1579  _body.get().sequenceNo);
1580  }
1581  else
1582  {
1583  _body.get().bookmark.clear();
1584  _body.get().sequenceNo = (amps_uint64_t)0;
1585  _body.get().nameHashValue = (amps_uint64_t)0;
1586  }
1587  }
1588  amps_uint64_t sequenceNo() const
1589  {
1590  return _body.get().sequenceNo;
1591  }
1592  amps_uint64_t nameHashValue() const
1593  {
1594  return _body.get().nameHashValue;
1595  }
1596  void setSequenceNo(const char* data_, size_t len_)
1597  {
1598  amps_uint64_t result = (amps_uint64_t)0;
1599  if (data_)
1600  {
1601  for (size_t i = 0; i < len_; ++i)
1602  {
1603  result *= (amps_uint64_t)10;
1604  result += (amps_uint64_t)(data_[i] - '0');
1605  }
1606  }
1607  _body.get().sequenceNo = result;
1608  }
1609  VersionInfo serverVersion() const
1610  {
1611  return _body.get().serverVersion;
1612  }
1613  void setServerVersion(const char* data_, size_t len_)
1614  {
1615  if (data_)
1616  {
1617  _body.get().serverVersion.setVersion(std::string(data_, len_));
1618  }
1619  }
1620  bool responded()
1621  {
1622  return _body.get().responded;
1623  }
1624  void setResponded(bool responded_)
1625  {
1626  _body.get().responded = responded_;
1627  }
1628  bool abandoned()
1629  {
1630  return _body.get().abandoned;
1631  }
1632  void setAbandoned(bool abandoned_)
1633  {
1634  if (_body.isValid())
1635  {
1636  _body.get().abandoned = abandoned_;
1637  }
1638  }
1639 
1640  void setConnectionVersion(unsigned connectionVersion)
1641  {
1642  _body.get().connectionVersion = connectionVersion;
1643  }
1644 
1645  unsigned getConnectionVersion()
1646  {
1647  return _body.get().connectionVersion;
1648  }
1649  void setOptions(const char* data_, size_t len_)
1650  {
1651  if (data_)
1652  {
1653  _body.get().options.assign(data_, len_);
1654  }
1655  else
1656  {
1657  _body.get().options.clear();
1658  }
1659  }
1660 
1661  const std::string& options()
1662  {
1663  return _body.get().options;
1664  }
1665 
1666  AckResponse& operator=(const AckResponse& rhs)
1667  {
1668  _body = rhs._body;
1669  return *this;
1670  }
1671  };
1672 
1673 
1674  typedef std::map<std::string, AckResponse> AckMap;
1675  AckMap _ackMap;
1676  Mutex _ackMapLock;
1677  DefaultExceptionListener _defaultExceptionListener;
1678  protected:
1679 
1680  struct DeferredExecutionRequest
1681  {
1682  DeferredExecutionRequest(DeferredExecutionFunc func_,
1683  void* userData_)
1684  : _func(func_),
1685  _userData(userData_)
1686  {;}
1687 
1688  DeferredExecutionFunc _func;
1689  void* _userData;
1690  };
1691  const ExceptionListener* _exceptionListener;
1692  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1693  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1694  bool _connected;
1695  std::string _username;
1696  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1697  ConnectionStateListeners _connectionStateListeners;
1698  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1699  Mutex _deferredExecutionLock;
1700  DeferredExecutionList _deferredExecutionList;
1701  unsigned _heartbeatInterval;
1702  unsigned _readTimeout;
1703 
1704  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1705  {
1706  // If we disconnected before we got to notification, don't notify.
1707  // This should only be able to happen for Resubscribed, since the lock
1708  // is released to let the subscription manager run resubscribe so a
1709  // disconnect could be called before the change is broadcast.
1710  if (!_connected && newState_ > ConnectionStateListener::Connected)
1711  {
1712  return;
1713  }
1714  for (ConnectionStateListeners::iterator it = _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1715  {
1716  AMPS_CALL_EXCEPTION_WRAPPER(
1717  (*it)->connectionStateChanged(newState_));
1718  }
1719  }
1720  unsigned processedAck(Message& message);
1721  unsigned persistedAck(Message& meesage);
1722  void lastChance(Message& message);
1723  void checkAndSendHeartbeat(bool force = false);
1724  virtual ConnectionInfo getConnectionInfo() const;
1725  static amps_result
1726  ClientImplMessageHandler(amps_handle message, void* userData);
1727  static void
1728  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1729  static amps_result
1730  ClientImplDisconnectHandler(amps_handle client, void* userData);
1731 
1732  void unsubscribeInternal(const std::string& id)
1733  {
1734  if (id.empty())
1735  {
1736  return;
1737  }
1738  // remove the handler first to avoid any more message delivery
1739  Message::Field subId;
1740  subId.assign(id.data(), id.length());
1741  _routes.removeRoute(subId);
1742  // Lock is already acquired
1743  if (_subscriptionManager)
1744  {
1745  // Have to unlock before calling into sub manager to avoid deadlock
1746  Unlock<Mutex> unlock(_lock);
1747  _subscriptionManager->unsubscribe(subId);
1748  }
1749  _message.reset();
1750  _message.setCommandEnum(Message::Command::Unsubscribe);
1751  _message.newCommandId();
1752  _message.setSubscriptionId(id);
1753  _sendWithoutRetry(_message);
1754  deferredExecution(&amps_noOpFn, NULL);
1755  }
1756 
1757  AckResponse syncAckProcessing(long timeout_, Message& message_,
1758  bool isHASubscribe_)
1759  {
1760  return syncAckProcessing(timeout_, message_,
1761  (amps_uint64_t)0, isHASubscribe_);
1762  }
1763 
1764  AckResponse syncAckProcessing(long timeout_, Message& message_,
1765  amps_uint64_t haSeq = (amps_uint64_t)0,
1766  bool isHASubscribe_ = false)
1767  {
1768  // inv: we already have _lock locked up.
1769  AckResponse ack = AckResponse::create();
1770  if (1)
1771  {
1772  Lock<Mutex> guard(_ackMapLock);
1773  _ackMap[message_.getCommandId()] = ack;
1774  }
1775  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
1776  if (ack.getConnectionVersion() == 0)
1777  {
1778  // Send failed
1779  throw DisconnectedException("Connection closed while waiting for response.");
1780  }
1781  bool timedOut = false;
1782  AMPS_START_TIMER(timeout_)
1783  while (!timedOut && !ack.responded() && !ack.abandoned() && _connected)
1784  {
1785  if (timeout_)
1786  {
1787  timedOut = !_lock.wait(timeout_);
1788  // May have woken up early, check real time
1789  if (timedOut)
1790  {
1791  AMPS_RESET_TIMER(timedOut, timeout_);
1792  }
1793  }
1794  else
1795  {
1796  // Using a timeout version to ensure python can interrupt
1797  _lock.wait(1000);
1798  Unlock<Mutex> unlck(_lock);
1799  amps_invoke_waiting_function();
1800  }
1801  }
1802  if (ack.responded())
1803  {
1804  if (ack.status() != "failure")
1805  {
1806  if (message_.getCommand() == "logon")
1807  {
1808  amps_uint64_t ackSequence = ack.sequenceNo();
1809  if (_lastSentHaSequenceNumber < ackSequence)
1810  {
1811  _lastSentHaSequenceNumber = ackSequence;
1812  }
1813  if (_publishStore.isValid())
1814  {
1815  // If this throws, logon will fail and eitehr be
1816  // handled in HAClient/ServerChooser or by the caller
1817  // of logon.
1818  _publishStore.discardUpTo(ackSequence);
1819  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
1820  {
1821  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
1822  }
1823  }
1824  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
1825  _nameHashValue = ack.nameHashValue();
1826  _serverVersion = ack.serverVersion();
1827  if (_bookmarkStore.isValid())
1828  {
1829  _bookmarkStore.setServerVersion(_serverVersion);
1830  }
1831  }
1832  if (_ackBatchSize)
1833  {
1834  const std::string& options = ack.options();
1835  size_t index = options.find_first_of("max_backlog=");
1836  if (index != std::string::npos)
1837  {
1838  unsigned data = 0;
1839  const char* c = options.c_str() + index + 12;
1840  while (*c && *c != ',')
1841  {
1842  data = (data * 10) + (unsigned)(*c++ -48);
1843  }
1844  if (_ackBatchSize > data)
1845  {
1846  _ackBatchSize = data;
1847  }
1848  }
1849  }
1850  return ack;
1851  }
1852  const size_t NotEntitled = 12;
1853  std::string ackReason = ack.reason();
1854  if (ackReason.length() == 0)
1855  {
1856  return ack; // none
1857  }
1858  if (ackReason.length() == NotEntitled &&
1859  ackReason[0] == 'n' &&
1860  message_.getUserId().len() == 0)
1861  {
1862  message_.assignUserId(_username);
1863  }
1864  message_.throwFor(_client, ackReason);
1865  }
1866  else // !ack.responded()
1867  {
1868  if (!ack.abandoned())
1869  {
1870  throw TimedOutException("timed out waiting for operation.");
1871  }
1872  else
1873  {
1874  throw DisconnectedException("Connection closed while waiting for response.");
1875  }
1876  }
1877  return ack;
1878  }
1879 
1880  void _cleanup(void)
1881  {
1882  if (!_client)
1883  {
1884  return;
1885  }
1886  amps_client_set_predisconnect_handler(_client, NULL, 0L);
1887  amps_client_set_disconnect_handler(_client, NULL, 0L);
1888  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1889  _pEmptyMessageStream.reset(NULL);
1890  amps_client_destroy(_client);
1891  _client = NULL;
1892  }
1893 
1894  public:
1895 
1896  ClientImpl(const std::string& clientName)
1897  : _client(NULL), _name(clientName)
1898  , _isRetryOnDisconnect(true)
1899  , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1900  , _badTimeToHASubscribe(0), _serverVersion()
1901  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1902  , _isAutoAckEnabled(false)
1903  , _ackBatchSize(0)
1904  , _queuedAckCount(0)
1905  , _defaultMaxDepth(0)
1906  , _connected(false)
1907  , _heartbeatInterval(0)
1908  , _readTimeout(0)
1909  {
1910  _replayer.setClient(this);
1911  _client = amps_client_create(clientName.c_str());
1913  (amps_handler)ClientImpl::ClientImplMessageHandler,
1914  this);
1916  (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler,
1917  this);
1919  (amps_handler)ClientImpl::ClientImplDisconnectHandler,
1920  this);
1921  _exceptionListener = &_defaultExceptionListener;
1922  for (size_t i = 0; i < GlobalCommandTypeHandlers::COUNT; ++i)
1923  {
1924 #ifdef AMPS_USE_EMPLACE
1925  _globalCommandTypeHandlers.emplace_back(MessageHandler());
1926 #else
1927  _globalCommandTypeHandlers.push_back(MessageHandler());
1928 #endif
1929  }
1930  }
1931 
1932  virtual ~ClientImpl()
1933  {
1934  _cleanup();
1935  }
1936 
1937  const std::string& getName() const
1938  {
1939  return _name;
1940  }
1941 
1942  const std::string& getNameHash() const
1943  {
1944  return _nameHash;
1945  }
1946 
1947  const amps_uint64_t getNameHashValue() const
1948  {
1949  return _nameHashValue;
1950  }
1951 
1952  void setName(const std::string& name)
1953  {
1954  // This operation will fail if the client's
1955  // name is already set.
1956  amps_result result = amps_client_set_name(_client, name.c_str());
1957  if (result != AMPS_E_OK)
1958  {
1959  AMPSException::throwFor(_client, result);
1960  }
1961  _name = name;
1962  }
1963 
1964  const std::string& getLogonCorrelationData() const
1965  {
1966  return _logonCorrelationData;
1967  }
1968 
1969  void setLogonCorrelationData(const std::string& logonCorrelationData_)
1970  {
1971  _logonCorrelationData = logonCorrelationData_;
1972  }
1973 
1974  size_t getServerVersion() const
1975  {
1976  return _serverVersion.getOldStyleVersion();
1977  }
1978 
1979  VersionInfo getServerVersionInfo() const
1980  {
1981  return _serverVersion;
1982  }
1983 
1984  const std::string& getURI() const
1985  {
1986  return _lastUri;
1987  }
1988 
1989  virtual void connect(const std::string& uri)
1990  {
1991  Lock<Mutex> l(_lock);
1992  _connect(uri);
1993  }
1994 
1995  virtual void _connect(const std::string& uri)
1996  {
1997  _lastUri = uri;
1998  amps_result result = amps_client_connect(_client, uri.c_str());
1999  if (result != AMPS_E_OK)
2000  {
2001  AMPSException::throwFor(_client, result);
2002  }
2003  _message.reset();
2004  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
2005  _publishMessage.setCommandEnum(Message::Command::Publish);
2006  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
2007  _beatMessage.setOptions("beat");
2008  _readMessage.setClientImpl(this);
2009  if (_queueAckTimeout)
2010  {
2011  amps_client_set_idle_time(_client, _queueAckTimeout);
2012  }
2013  _connected = true;
2014  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
2015  }
2016 
2017  void setDisconnected()
2018  {
2019  {
2020  Lock<Mutex> l(_lock);
2021  if (_connected)
2022  {
2023  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
2024  }
2025  _connected = false;
2026  _heartbeatTimer.setTimeout(0.0);
2027  }
2028  clearAcks(INT_MAX);
2029  amps_client_disconnect(_client);
2030  _routes.clear();
2031  }
2032 
2033  virtual void disconnect()
2034  {
2035  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
2036  setDisconnected();
2037  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
2038  Lock<Mutex> l(_lock);
2039  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
2040  }
2041 
2042  void clearAcks(unsigned failedVersion)
2043  {
2044  // Have to lock to prevent race conditions
2045  Lock<Mutex> guard(_ackMapLock);
2046  {
2047  // Go ahead and signal any waiters if they are around...
2048  std::vector<std::string> worklist;
2049  for (AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
2050  {
2051  if (i->second.getConnectionVersion() <= failedVersion)
2052  {
2053  i->second.setAbandoned(true);
2054  worklist.push_back(i->first);
2055  }
2056  }
2057 
2058  for (std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
2059  {
2060  _ackMap.erase(*j);
2061  }
2062  }
2063 
2064  _lock.signalAll();
2065  }
2066 
2067  int send(const Message& message)
2068  {
2069  Lock<Mutex> l(_lock);
2070  return _send(message);
2071  }
2072 
2073  void sendWithoutRetry(const Message& message_)
2074  {
2075  Lock<Mutex> l(_lock);
2076  _sendWithoutRetry(message_);
2077  }
2078 
2079  void _sendWithoutRetry(const Message& message_)
2080  {
2081  amps_result result = amps_client_send(_client, message_.getMessage());
2082  if (result != AMPS_E_OK)
2083  {
2084  AMPSException::throwFor(_client, result);
2085  }
2086  }
2087 
2088  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
2089  bool isHASubscribe_ = false)
2090  {
2091  // Lock is already acquired
2092  amps_result result = AMPS_E_RETRY;
2093 
2094  // Create a local reference to this message, as we'll need to hold on
2095  // to a reference to it in case reconnect occurs.
2096  Message localMessage = message;
2097  unsigned version = 0;
2098 
2099  while (result == AMPS_E_RETRY)
2100  {
2101  if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
2102  {
2103  // If retrySend is disabled, do not wait for the reconnect
2104  // to finish, just throw.
2105  if (!_isRetryOnDisconnect)
2106  {
2107  AMPSException::throwFor(_client, AMPS_E_RETRY);
2108  }
2109  if (!_lock.wait(1000))
2110  {
2111  amps_invoke_waiting_function();
2112  }
2113  }
2114  else
2115  {
2116  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
2117  (isHASubscribe_ && _badTimeToHASubscribe != 0))
2118  {
2119  return (int)version;
2120  }
2121  // It's possible to get here out of order, but this way we'll
2122  // always send in order.
2123  if (haSeq > _lastSentHaSequenceNumber)
2124  {
2125  while (haSeq > _lastSentHaSequenceNumber + 1)
2126  {
2127  try
2128  {
2129  // Replayer updates _lastSentHaSsequenceNumber
2130  if (!_publishStore.replaySingle(_replayer,
2131  _lastSentHaSequenceNumber + 1))
2132  {
2133  //++_lastSentHaSequenceNumber;
2134  continue;
2135  }
2136  result = AMPS_E_OK;
2137  version = _replayer._version;
2138  }
2139 #ifdef _WIN32
2140  catch (const DisconnectedException&)
2141 #else
2142  catch (const DisconnectedException& e)
2143 #endif
2144  {
2145  result = _replayer._res;
2146  break;
2147  }
2148  }
2149  result = amps_client_send_with_version(_client,
2150  localMessage.getMessage(),
2151  &version);
2152  ++_lastSentHaSequenceNumber;
2153  }
2154  else
2155  result = amps_client_send_with_version(_client,
2156  localMessage.getMessage(),
2157  &version);
2158  if (result != AMPS_E_OK)
2159  {
2160  if (!isHASubscribe_ && !haSeq &&
2161  localMessage.getMessage() == message.getMessage())
2162  {
2163  localMessage = message.deepCopy();
2164  }
2165  if (_isRetryOnDisconnect)
2166  {
2167  Unlock<Mutex> u(_lock);
2168  result = amps_client_attempt_reconnect(_client, version);
2169  // If this is an HA publish or subscribe command, it was
2170  // stored first and will have already been replayed by the
2171  // store or sub manager after reconnect, so just return.
2172  if ((isHASubscribe_ || haSeq) &&
2173  result == AMPS_E_RETRY)
2174  {
2175  return (int)version;
2176  }
2177  }
2178  else
2179  {
2180  // retrySend is disabled so throw the error
2181  // from the send as an exception, do not retry.
2182  AMPSException::throwFor(_client, result);
2183  }
2184  }
2185  }
2186  if (result == AMPS_E_RETRY)
2187  {
2188  amps_invoke_waiting_function();
2189  }
2190  }
2191 
2192  if (result != AMPS_E_OK)
2193  {
2194  AMPSException::throwFor(_client, result);
2195  }
2196  return (int)version;
2197  }
2198 
2199  void addMessageHandler(const Field& commandId_,
2200  const AMPS::MessageHandler& messageHandler_,
2201  unsigned requestedAcks_, bool isSubscribe_)
2202  {
2203  Lock<Mutex> lock(_lock);
2204  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
2205  0, isSubscribe_);
2206  }
2207 
2208  bool removeMessageHandler(const Field& commandId_)
2209  {
2210  Lock<Mutex> lock(_lock);
2211  return _routes.removeRoute(commandId_);
2212  }
2213 
2214  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
2215  {
2216  Field id = message_.getCommandId();
2217  Field subId = message_.getSubscriptionId();
2218  Field qid = message_.getQueryId();
2219  bool isSubscribe = false;
2220  bool isSubscribeOnly = false;
2221  bool replace = false;
2222  unsigned requestedAcks = message_.getAckTypeEnum();
2223  unsigned systemAddedAcks = Message::AckType::None;
2224 
2225  switch (message_.getCommandEnum())
2226  {
2227  case Message::Command::Subscribe:
2228  case Message::Command::DeltaSubscribe:
2229  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos;
2230  isSubscribeOnly = true;
2231  // fall through
2232  case Message::Command::SOWAndSubscribe:
2233  case Message::Command::SOWAndDeltaSubscribe:
2234  if (id.empty())
2235  {
2236  id = message_.newCommandId().getCommandId();
2237  }
2238  else
2239  {
2240  while (!replace && id != subId && _routes.hasRoute(id))
2241  {
2242  id = message_.newCommandId().getCommandId();
2243  }
2244  }
2245  if (subId.empty())
2246  {
2247  message_.setSubscriptionId(id);
2248  subId = id;
2249  }
2250  isSubscribe = true;
2251  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
2252  {
2253  systemAddedAcks |= Message::AckType::Persisted;
2254  }
2255  // fall through
2256  case Message::Command::SOW:
2257  if (id.empty())
2258  {
2259  id = message_.newCommandId().getCommandId();
2260  }
2261  else
2262  {
2263  while (!replace && id != subId && _routes.hasRoute(id))
2264  {
2265  message_.newCommandId();
2266  if (qid == id)
2267  {
2268  qid = message_.getCommandId();
2269  message_.setQueryId(qid);
2270  }
2271  id = message_.getCommandId();
2272  }
2273  }
2274  if (!isSubscribeOnly)
2275  {
2276  if (qid.empty())
2277  {
2278  message_.setQueryID(id);
2279  qid = id;
2280  }
2281  else
2282  {
2283  while (!replace && qid != subId && qid != id
2284  && _routes.hasRoute(qid))
2285  {
2286  qid = message_.newQueryId().getQueryId();
2287  }
2288  }
2289  }
2290  systemAddedAcks |= Message::AckType::Processed;
2291  // for SOW only, we get a completed ack so we know when to remove the handler.
2292  if (!isSubscribeOnly)
2293  {
2294  systemAddedAcks |= Message::AckType::Completed;
2295  }
2296  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
2297  {
2298  int routesAdded = 0;
2299  Lock<Mutex> l(_lock);
2300  if (!subId.empty() && messageHandler_.isValid())
2301  {
2302  if (!_routes.hasRoute(subId))
2303  {
2304  ++routesAdded;
2305  }
2306  // This can replace a non-subscribe with a matching id
2307  // with a subscription but not another subscription.
2308  _routes.addRoute(subId, messageHandler_, requestedAcks,
2309  systemAddedAcks, isSubscribe);
2310  }
2311  if (!isSubscribeOnly && !qid.empty()
2312  && messageHandler_.isValid() && qid != subId)
2313  {
2314  if (routesAdded == 0)
2315  {
2316  _routes.addRoute(qid, messageHandler_,
2317  requestedAcks, systemAddedAcks, false);
2318  }
2319  else
2320  {
2321  void* data = NULL;
2322  {
2323  Unlock<Mutex> u(_lock);
2324  data = amps_invoke_copy_route_function(
2325  messageHandler_.userData());
2326  }
2327  if (!data)
2328  {
2329  _routes.addRoute(qid, messageHandler_, requestedAcks,
2330  systemAddedAcks, false);
2331  }
2332  else
2333  {
2334  _routes.addRoute(qid,
2335  MessageHandler(messageHandler_.function(),
2336  data),
2337  requestedAcks, systemAddedAcks, false);
2338  }
2339  }
2340  ++routesAdded;
2341  }
2342  if (!id.empty() && messageHandler_.isValid()
2343  && requestedAcks & ~Message::AckType::Persisted
2344  && id != subId && id != qid)
2345  {
2346  if (routesAdded == 0)
2347  {
2348  _routes.addRoute(id, messageHandler_, requestedAcks,
2349  systemAddedAcks, false);
2350  }
2351  else
2352  {
2353  void* data = NULL;
2354  {
2355  Unlock<Mutex> u(_lock);
2356  data = amps_invoke_copy_route_function(
2357  messageHandler_.userData());
2358  }
2359  if (!data)
2360  {
2361  _routes.addRoute(id, messageHandler_, requestedAcks,
2362  systemAddedAcks, false);
2363  }
2364  else
2365  {
2366  _routes.addRoute(id,
2367  MessageHandler(messageHandler_.function(),
2368  data),
2369  requestedAcks,
2370  systemAddedAcks, false);
2371  }
2372  }
2373  ++routesAdded;
2374  }
2375  try
2376  {
2377  // We aren't adding to subscription manager, so this isn't
2378  // an HA subscribe.
2379  syncAckProcessing(timeout_, message_, 0, false);
2380  message_.setAckTypeEnum(requestedAcks);
2381  }
2382  catch (...)
2383  {
2384  _routes.removeRoute(message_.getQueryID());
2385  _routes.removeRoute(message_.getSubscriptionId());
2386  _routes.removeRoute(id);
2387  message_.setAckTypeEnum(requestedAcks);
2388  throw;
2389  }
2390  }
2391  break;
2392  // These are valid commands that are used as-is
2393  case Message::Command::Unsubscribe:
2394  case Message::Command::Heartbeat:
2395  case Message::Command::Logon:
2396  case Message::Command::StartTimer:
2397  case Message::Command::StopTimer:
2398  case Message::Command::SOWDelete:
2399  {
2400  Lock<Mutex> l(_lock);
2401  // if an ack is requested, it'll need a command ID.
2402  if (message_.getAckTypeEnum() != Message::AckType::None)
2403  {
2404  if (id.empty())
2405  {
2406  message_.newCommandId();
2407  id = message_.getCommandId();
2408  }
2409  if (messageHandler_.isValid())
2410  {
2411  _routes.addRoute(id, messageHandler_, requestedAcks,
2412  Message::AckType::None, false);
2413  }
2414  }
2415  _send(message_);
2416  }
2417  break;
2418  case Message::Command::DeltaPublish:
2419  case Message::Command::Publish:
2420  {
2421  bool useSync = message_.getFilter().len() > 0;
2422  Lock<Mutex> l(_lock);
2423  // if an ack is requested, it'll need a command ID.
2424  unsigned ackType = message_.getAckTypeEnum();
2425  if (ackType != Message::AckType::None
2426  || useSync)
2427  {
2428  if (id.empty())
2429  {
2430  message_.newCommandId();
2431  id = message_.getCommandId();
2432  }
2433  if (messageHandler_.isValid())
2434  {
2435  _routes.addRoute(id, messageHandler_, requestedAcks,
2436  Message::AckType::None, false);
2437  }
2438  }
2439  if (useSync)
2440  {
2441  message_.setAckTypeEnum(ackType | Message::AckType::Processed);
2442  syncAckProcessing(timeout_, message_, 0, false);
2443  }
2444  else
2445  {
2446  _send(message_);
2447  }
2448  }
2449  break;
2450  // These are things that shouldn't be sent (not meaningful)
2451  case Message::Command::GroupBegin:
2452  case Message::Command::GroupEnd:
2453  case Message::Command::OOF:
2454  case Message::Command::Ack:
2455  case Message::Command::Unknown:
2456  default:
2457  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2458  }
2459  message_.setAckTypeEnum(requestedAcks);
2460  return id;
2461  }
2462 
2463  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2464  {
2465  Lock<Mutex> l(_lock);
2466  _disconnectHandler = disconnectHandler;
2467  }
2468 
2469  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2470  {
2471  switch (command_[0])
2472  {
2473 #if 0 // Not currently implemented to avoid an extra branch in delivery
2474  case 'p':
2475  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2476  break;
2477  case 's':
2478  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2479  break;
2480 #endif
2481  case 'h':
2482  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2483  break;
2484 #if 0 // Not currently implemented to avoid an extra branch in delivery
2485  case 'g':
2486  if (command_[6] == 'b')
2487  {
2488  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2489  }
2490  else if (command_[6] == 'e')
2491  {
2492  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2493  }
2494  else
2495  {
2496  std::ostringstream os;
2497  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2498  throw CommandException(os.str());
2499  }
2500  break;
2501  case 'o':
2502  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2503  break;
2504 #endif
2505  case 'a':
2506  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2507  break;
2508  case 'l':
2509  case 'L':
2510  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2511  break;
2512  case 'd':
2513  case 'D':
2514  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2515  break;
2516  default:
2517  std::ostringstream os;
2518  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2519  throw CommandException(os.str());
2520  break;
2521  }
2522  }
2523 
2524  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2525  {
2526  switch (command_)
2527  {
2528 #if 0 // Not currently implemented to avoid an extra branch in delivery
2529  case Message::Command::Publish:
2530  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2531  break;
2532  case Message::Command::SOW:
2533  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2534  break;
2535 #endif
2536  case Message::Command::Heartbeat:
2537  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2538  break;
2539 #if 0 // Not currently implemented to avoid an extra branch in delivery
2540  case Message::Command::GroupBegin:
2541  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2542  break;
2543  case Message::Command::GroupEnd:
2544  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2545  break;
2546  case Message::Command::OOF:
2547  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2548  break;
2549 #endif
2550  case Message::Command::Ack:
2551  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2552  break;
2553  default:
2554  unsigned bits = 0;
2555  unsigned command = command_;
2556  while (command > 0)
2557  {
2558  ++bits;
2559  command >>= 1;
2560  }
2561  char errBuf[128];
2562  AMPS_snprintf(errBuf, sizeof(errBuf),
2563  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2564  CommandConstants<0>::Lengths[bits],
2565  CommandConstants<0>::Values[bits]);
2566  throw CommandException(errBuf);
2567  break;
2568  }
2569  }
2570 
2571  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2572  {
2573  _globalCommandTypeHandlers[handlerType_] = handler_;
2574  }
2575 
2576  void setFailedWriteHandler(FailedWriteHandler* handler_)
2577  {
2578  Lock<Mutex> l(_lock);
2579  _failedWriteHandler.reset(handler_);
2580  }
2581 
2582  void setPublishStore(const Store& publishStore_)
2583  {
2584  Lock<Mutex> l(_lock);
2585  if (_connected)
2586  {
2587  throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2588  }
2589  _publishStore = publishStore_;
2590  }
2591 
2592  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2593  {
2594  Lock<Mutex> l(_lock);
2595  if (_connected)
2596  {
2597  throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2598  }
2599  _bookmarkStore = bookmarkStore_;
2600  }
2601 
2602  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2603  {
2604  Lock<Mutex> l(_lock);
2605  _subscriptionManager.reset(subscriptionManager_);
2606  }
2607 
2608  SubscriptionManager* getSubscriptionManager() const
2609  {
2610  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2611  }
2612 
2613  DisconnectHandler getDisconnectHandler() const
2614  {
2615  return _disconnectHandler;
2616  }
2617 
2618  MessageHandler getDuplicateMessageHandler() const
2619  {
2620  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2621  }
2622 
2623  FailedWriteHandler* getFailedWriteHandler() const
2624  {
2625  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2626  }
2627 
2628  Store getPublishStore() const
2629  {
2630  return _publishStore;
2631  }
2632 
2633  BookmarkStore getBookmarkStore() const
2634  {
2635  return _bookmarkStore;
2636  }
2637 
2638  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_, size_t dataLen_)
2639  {
2640  if (!_publishStore.isValid())
2641  {
2642  Lock<Mutex> l(_lock);
2643  _publishMessage.assignTopic(topic_, topicLen_);
2644  _publishMessage.assignData(data_, dataLen_);
2645  _send(_publishMessage);
2646  return 0;
2647  }
2648  else
2649  {
2650  if (!publishStoreMessage)
2651  {
2652  publishStoreMessage = new Message();
2653  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2654  }
2655  publishStoreMessage->reset();
2656  publishStoreMessage->setCommandEnum(Message::Command::Publish);
2657  return _publish(topic_, topicLen_, data_, dataLen_);
2658  }
2659  }
2660 
2661  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2662  size_t dataLen_, unsigned long expiration_)
2663  {
2664  if (!_publishStore.isValid())
2665  {
2666  Lock<Mutex> l(_lock);
2667  _publishMessage.assignTopic(topic_, topicLen_);
2668  _publishMessage.assignData(data_, dataLen_);
2669  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2670  size_t pos = convertToCharArray(exprBuf, expiration_);
2671  _publishMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2672  _send(_publishMessage);
2673  _publishMessage.assignExpiration(NULL, 0);
2674  return 0;
2675  }
2676  else
2677  {
2678  if (!publishStoreMessage)
2679  {
2680  publishStoreMessage = new Message();
2681  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2682  }
2683  publishStoreMessage->reset();
2684  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2685  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2686  publishStoreMessage->setCommandEnum(Message::Command::Publish)
2687  .assignExpiration(exprBuf + exprPos,
2688  AMPS_NUMBER_BUFFER_LEN - exprPos);
2689  return _publish(topic_, topicLen_, data_, dataLen_);
2690  }
2691  }
2692 
2693  class FlushAckHandler : ConnectionStateListener
2694  {
2695  private:
2696  ClientImpl* _pClient;
2697  Field _cmdId;
2698  volatile bool _acked;
2699  volatile bool _disconnected;
2700  public:
2701  FlushAckHandler(ClientImpl* pClient_)
2702  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2703  {
2704  pClient_->addConnectionStateListener(this);
2705  }
2706  ~FlushAckHandler()
2707  {
2708  _pClient->removeConnectionStateListener(this);
2709  _pClient->removeMessageHandler(_cmdId);
2710  _cmdId.clear();
2711  }
2712  void setCommandId(const Field& cmdId_)
2713  {
2714  _cmdId.deepCopy(cmdId_);
2715  }
2716  void invoke(const Message&)
2717  {
2718  _acked = true;
2719  }
2720  void connectionStateChanged(State state_)
2721  {
2722  if (state_ <= Shutdown)
2723  {
2724  _disconnected = true;
2725  }
2726  }
2727  bool acked()
2728  {
2729  return _acked;
2730  }
2731  bool done()
2732  {
2733  return _acked || _disconnected;
2734  }
2735  };
2736 
2737  void publishFlush(long timeout_, unsigned ackType_)
2738  {
2739  static const char* processed = "processed";
2740  static const size_t processedLen = strlen(processed);
2741  static const char* persisted = "persisted";
2742  static const size_t persistedLen = strlen(persisted);
2743  static const char* flush = "flush";
2744  static const size_t flushLen = strlen(flush);
2745  static VersionInfo minPersisted("5.3.3.0");
2746  static VersionInfo minFlush("4");
2747  if (ackType_ != Message::AckType::Processed
2748  && ackType_ != Message::AckType::Persisted)
2749  {
2750  throw CommandException("Flush can only be used with processed or persisted acks.");
2751  }
2752  FlushAckHandler flushHandler(this);
2753  if (_serverVersion >= minFlush)
2754  {
2755  Lock<Mutex> l(_lock);
2756  if (!_connected)
2757  {
2758  throw DisconnectedException("Not connected trying to flush");
2759  }
2760  _message.reset();
2761  _message.newCommandId();
2762  _message.assignCommand(flush, flushLen);
2763  if (_serverVersion < minPersisted
2764  || ackType_ == Message::AckType::Processed)
2765  {
2766  _message.assignAckType(processed, processedLen);
2767  }
2768  else
2769  {
2770  _message.assignAckType(persisted, persistedLen);
2771  }
2772  flushHandler.setCommandId(_message.getCommandId());
2773  addMessageHandler(_message.getCommandId(),
2774  std::bind(&FlushAckHandler::invoke,
2775  std::ref(flushHandler),
2776  std::placeholders::_1),
2777  ackType_, false);
2778  NoDelay noDelay(_client);
2779  if (_send(_message) == -1)
2780  {
2781  throw DisconnectedException("Disconnected trying to flush");
2782  }
2783  }
2784  if (_publishStore.isValid())
2785  {
2786  try
2787  {
2788  _publishStore.flush(timeout_);
2789  }
2790  catch (const AMPSException& ex)
2791  {
2792  AMPS_UNHANDLED_EXCEPTION(ex);
2793  throw;
2794  }
2795  }
2796  else if (_serverVersion < minFlush)
2797  {
2798  if (timeout_ > 0)
2799  {
2800  AMPS_USLEEP(timeout_ * 1000);
2801  }
2802  else
2803  {
2804  AMPS_USLEEP(1000 * 1000);
2805  }
2806  return;
2807  }
2808  if (timeout_)
2809  {
2810  Timer timer((double)timeout_);
2811  timer.start();
2812  while (!timer.check() && !flushHandler.done())
2813  {
2814  AMPS_USLEEP(10000);
2815  amps_invoke_waiting_function();
2816  }
2817  }
2818  else
2819  {
2820  while (!flushHandler.done())
2821  {
2822  AMPS_USLEEP(10000);
2823  amps_invoke_waiting_function();
2824  }
2825  }
2826  // No response or disconnect in timeout interval
2827  if (!flushHandler.done())
2828  {
2829  throw TimedOutException("Timed out waiting for flush");
2830  }
2831  // We got disconnected and there is no publish store
2832  if (!flushHandler.acked() && !_publishStore.isValid())
2833  {
2834  throw DisconnectedException("Disconnected waiting for flush");
2835  }
2836  }
2837 
2838  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2839  const char* data_, size_t dataLength_)
2840  {
2841  if (!_publishStore.isValid())
2842  {
2843  Lock<Mutex> l(_lock);
2844  _deltaMessage.assignTopic(topic_, topicLength_);
2845  _deltaMessage.assignData(data_, dataLength_);
2846  _send(_deltaMessage);
2847  return 0;
2848  }
2849  else
2850  {
2851  if (!publishStoreMessage)
2852  {
2853  publishStoreMessage = new Message();
2854  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2855  }
2856  publishStoreMessage->reset();
2857  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2858  return _publish(topic_, topicLength_, data_, dataLength_);
2859  }
2860  }
2861 
2862  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2863  const char* data_, size_t dataLength_,
2864  unsigned long expiration_)
2865  {
2866  if (!_publishStore.isValid())
2867  {
2868  Lock<Mutex> l(_lock);
2869  _deltaMessage.assignTopic(topic_, topicLength_);
2870  _deltaMessage.assignData(data_, dataLength_);
2871  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2872  size_t pos = convertToCharArray(exprBuf, expiration_);
2873  _deltaMessage.assignExpiration(exprBuf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2874  _send(_deltaMessage);
2875  _deltaMessage.assignExpiration(NULL, 0);
2876  return 0;
2877  }
2878  else
2879  {
2880  if (!publishStoreMessage)
2881  {
2882  publishStoreMessage = new Message();
2883  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2884  }
2885  publishStoreMessage->reset();
2886  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2887  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2888  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2889  .assignExpiration(exprBuf + exprPos,
2890  AMPS_NUMBER_BUFFER_LEN - exprPos);
2891  return _publish(topic_, topicLength_, data_, dataLength_);
2892  }
2893  }
2894 
2895  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
2896  const char* data_, size_t dataLength_)
2897  {
2898  publishStoreMessage->assignTopic(topic_, topicLength_)
2899  .setAckTypeEnum(Message::AckType::Persisted)
2900  .assignData(data_, dataLength_);
2901  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
2902  char buf[AMPS_NUMBER_BUFFER_LEN];
2903  size_t pos = convertToCharArray(buf, haSequenceNumber);
2904  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
2905  {
2906  Lock<Mutex> l(_lock);
2907  _send(*publishStoreMessage, haSequenceNumber);
2908  }
2909  return haSequenceNumber;
2910  }
2911 
2912  virtual std::string logon(long timeout_, Authenticator& authenticator_,
2913  const char* options_ = NULL)
2914  {
2915  Lock<Mutex> l(_lock);
2916  return _logon(timeout_, authenticator_, options_);
2917  }
2918 
2919  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
2920  const char* options_ = NULL)
2921  {
2922  AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2923  _message.reset();
2924  _message.setCommandEnum(Message::Command::Logon);
2925  _message.newCommandId();
2926  std::string newCommandId = _message.getCommandId();
2927  _message.setClientName(_name);
2928 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
2929  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
2930  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2931 #endif
2932  URI uri(_lastUri);
2933  if (uri.user().size())
2934  {
2935  _message.setUserId(uri.user());
2936  }
2937  if (uri.password().size())
2938  {
2939  _message.setPassword(uri.password());
2940  }
2941  if (uri.protocol() == "amps" && uri.messageType().size())
2942  {
2943  _message.setMessageType(uri.messageType());
2944  }
2945  if (uri.isTrue("pretty"))
2946  {
2947  _message.setOptions("pretty");
2948  }
2949 
2950  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
2951  if (!_logonCorrelationData.empty())
2952  {
2953  _message.assignCorrelationId(_logonCorrelationData);
2954  }
2955  if (options_)
2956  {
2957  _message.setOptions(options_);
2958  }
2959  _username = _message.getUserId();
2960  try
2961  {
2962  NoDelay noDelay(_client);
2963  while (true)
2964  {
2965  _message.setAckTypeEnum(Message::AckType::Processed);
2966  AckResponse ack = syncAckProcessing(timeout_, _message);
2967  if (ack.status() == "retry")
2968  {
2969  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
2970  _username = ack.username();
2971  _message.setUserId(_username);
2972  }
2973  else
2974  {
2975  authenticator_.completed(ack.username(), ack.password(), ack.reason());
2976  break;
2977  }
2978  }
2979  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2980 
2981  // Now re-send the heartbeat command if configured
2982  _sendHeartbeat();
2983  }
2984  catch (const AMPSException& ex)
2985  {
2986  _lock.signalAll();
2987  AMPS_UNHANDLED_EXCEPTION(ex);
2988  throw;
2989  }
2990  catch (...)
2991  {
2992  _lock.signalAll();
2993  throw;
2994  }
2995 
2996  if (_publishStore.isValid())
2997  {
2998  try
2999  {
3000  _publishStore.replay(_replayer);
3001  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
3002  }
3003  catch (const StoreException& ex)
3004  {
3005  _lock.signalAll();
3006  std::ostringstream os;
3007  os << "A local store exception occurred while logging on."
3008  << ex.toString();
3009  throw ConnectionException(os.str());
3010  }
3011  catch (const AMPSException& ex)
3012  {
3013  _lock.signalAll();
3014  AMPS_UNHANDLED_EXCEPTION(ex);
3015  throw ex;
3016  }
3017  catch (const std::exception& ex)
3018  {
3019  _lock.signalAll();
3020  AMPS_UNHANDLED_EXCEPTION(ex);
3021  throw ex;
3022  }
3023  catch (...)
3024  {
3025  _lock.signalAll();
3026  throw;
3027  }
3028  }
3029  _lock.signalAll();
3030  return newCommandId;
3031  }
3032 
3033  std::string subscribe(const MessageHandler& messageHandler_,
3034  const std::string& topic_,
3035  long timeout_,
3036  const std::string& filter_,
3037  const std::string& bookmark_,
3038  const std::string& options_,
3039  const std::string& subId_,
3040  bool isHASubscribe_ = true)
3041  {
3042  isHASubscribe_ &= (bool)_subscriptionManager;
3043  Lock<Mutex> l(_lock);
3044  _message.reset();
3045  _message.setCommandEnum(Message::Command::Subscribe);
3046  _message.newCommandId();
3047  std::string subId(subId_);
3048  if (subId.empty())
3049  {
3050  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE) - 1) != std::string::npos)
3051  {
3052  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
3053  }
3054 
3055  subId = _message.getCommandId();
3056  }
3057  _message.setSubscriptionId(subId);
3058  // we need to deep copy this before sending the message; while we are
3059  // waiting for a response, the fields in _message may get blown away for
3060  // other operations.
3061  AMPS::Message::Field subIdField(subId);
3062  unsigned ackTypes = Message::AckType::Processed;
3063 
3064  if (!bookmark_.empty() && _bookmarkStore.isValid())
3065  {
3066  ackTypes |= Message::AckType::Persisted;
3067  }
3068  _message.setTopic(topic_);
3069 
3070  if (filter_.length())
3071  {
3072  _message.setFilter(filter_);
3073  }
3074  if (bookmark_.length())
3075  {
3076  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3077  {
3078  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3079  _message.setBookmark(mostRecent);
3080  }
3081  else
3082  {
3083  _message.setBookmark(bookmark_);
3084  if (_bookmarkStore.isValid())
3085  {
3086  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3087  bookmark_ != AMPS_BOOKMARK_EPOCH)
3088  {
3089  _bookmarkStore.log(_message);
3090  _bookmarkStore.discard(_message);
3091  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3092  }
3093  }
3094  }
3095  }
3096  if (options_.length())
3097  {
3098  _message.setOptions(options_);
3099  }
3100 
3101  Message message = _message;
3102  if (isHASubscribe_)
3103  {
3104  message = _message.deepCopy();
3105  Unlock<Mutex> u(_lock);
3106  _subscriptionManager->subscribe(messageHandler_, message,
3107  Message::AckType::None);
3108  if (_badTimeToHASubscribe)
3109  {
3110  return subId;
3111  }
3112  }
3113  if (!_routes.hasRoute(_message.getSubscriptionId()))
3114  {
3115  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3116  Message::AckType::None, ackTypes, true);
3117  }
3118  message.setAckTypeEnum(ackTypes);
3119  if (!options_.empty())
3120  {
3121  message.setOptions(options_);
3122  }
3123  try
3124  {
3125  syncAckProcessing(timeout_, message, isHASubscribe_);
3126  }
3127  catch (const DisconnectedException&)
3128  {
3129  if (!isHASubscribe_)
3130  {
3131  _routes.removeRoute(subIdField);
3132  throw;
3133  }
3134  else
3135  {
3136  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3137  throw;
3138  }
3139  }
3140  catch (const TimedOutException&)
3141  {
3142  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3143  throw;
3144  }
3145  catch (...)
3146  {
3147  if (isHASubscribe_)
3148  {
3149  // Have to unlock before calling into sub manager to avoid deadlock
3150  Unlock<Mutex> unlock(_lock);
3151  _subscriptionManager->unsubscribe(subIdField);
3152  }
3153  _routes.removeRoute(subIdField);
3154  throw;
3155  }
3156 
3157  return subId;
3158  }
3159  std::string deltaSubscribe(const MessageHandler& messageHandler_,
3160  const std::string& topic_,
3161  long timeout_,
3162  const std::string& filter_,
3163  const std::string& bookmark_,
3164  const std::string& options_,
3165  const std::string& subId_ = "",
3166  bool isHASubscribe_ = true)
3167  {
3168  isHASubscribe_ &= (bool)_subscriptionManager;
3169  Lock<Mutex> l(_lock);
3170  _message.reset();
3171  _message.setCommandEnum(Message::Command::DeltaSubscribe);
3172  _message.newCommandId();
3173  std::string subId(subId_);
3174  if (subId.empty())
3175  {
3176  subId = _message.getCommandId();
3177  }
3178  _message.setSubscriptionId(subId);
3179  // we need to deep copy this before sending the message; while we are
3180  // waiting for a response, the fields in _message may get blown away for
3181  // other operations.
3182  AMPS::Message::Field subIdField(subId);
3183  unsigned ackTypes = Message::AckType::Processed;
3184 
3185  if (!bookmark_.empty() && _bookmarkStore.isValid())
3186  {
3187  ackTypes |= Message::AckType::Persisted;
3188  }
3189  _message.setTopic(topic_);
3190  if (filter_.length())
3191  {
3192  _message.setFilter(filter_);
3193  }
3194  if (bookmark_.length())
3195  {
3196  if (bookmark_ == AMPS_BOOKMARK_RECENT)
3197  {
3198  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
3199  _message.setBookmark(mostRecent);
3200  }
3201  else
3202  {
3203  _message.setBookmark(bookmark_);
3204  if (_bookmarkStore.isValid())
3205  {
3206  if (bookmark_ != AMPS_BOOKMARK_NOW &&
3207  bookmark_ != AMPS_BOOKMARK_EPOCH)
3208  {
3209  _bookmarkStore.log(_message);
3210  _bookmarkStore.discard(_message);
3211  _bookmarkStore.persisted(subIdField, _message.getBookmark());
3212  }
3213  }
3214  }
3215  }
3216  if (options_.length())
3217  {
3218  _message.setOptions(options_);
3219  }
3220  Message message = _message;
3221  if (isHASubscribe_)
3222  {
3223  message = _message.deepCopy();
3224  Unlock<Mutex> u(_lock);
3225  _subscriptionManager->subscribe(messageHandler_, message,
3226  Message::AckType::None);
3227  if (_badTimeToHASubscribe)
3228  {
3229  return subId;
3230  }
3231  }
3232  if (!_routes.hasRoute(_message.getSubscriptionId()))
3233  {
3234  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
3235  Message::AckType::None, ackTypes, true);
3236  }
3237  message.setAckTypeEnum(ackTypes);
3238  if (!options_.empty())
3239  {
3240  message.setOptions(options_);
3241  }
3242  try
3243  {
3244  syncAckProcessing(timeout_, message, isHASubscribe_);
3245  }
3246  catch (const DisconnectedException&)
3247  {
3248  if (!isHASubscribe_)
3249  {
3250  _routes.removeRoute(subIdField);
3251  throw;
3252  }
3253  }
3254  catch (const TimedOutException&)
3255  {
3256  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
3257  throw;
3258  }
3259  catch (...)
3260  {
3261  if (isHASubscribe_)
3262  {
3263  // Have to unlock before calling into sub manager to avoid deadlock
3264  Unlock<Mutex> unlock(_lock);
3265  _subscriptionManager->unsubscribe(subIdField);
3266  }
3267  _routes.removeRoute(subIdField);
3268  throw;
3269  }
3270  return subId;
3271  }
3272 
3273  void unsubscribe(const std::string& id)
3274  {
3275  Lock<Mutex> l(_lock);
3276  unsubscribeInternal(id);
3277  }
3278 
3279  void unsubscribe(void)
3280  {
3281  if (_subscriptionManager)
3282  {
3283  _subscriptionManager->clear();
3284  }
3285  {
3286  _routes.unsubscribeAll();
3287  Lock<Mutex> l(_lock);
3288  _message.reset();
3289  _message.setCommandEnum(Message::Command::Unsubscribe);
3290  _message.newCommandId();
3291  _message.setSubscriptionId("all");
3292  _sendWithoutRetry(_message);
3293  }
3294  deferredExecution(&amps_noOpFn, NULL);
3295  }
3296 
3297  std::string sow(const MessageHandler& messageHandler_,
3298  const std::string& topic_,
3299  const std::string& filter_ = "",
3300  const std::string& orderBy_ = "",
3301  const std::string& bookmark_ = "",
3302  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3303  int topN_ = AMPS_DEFAULT_TOP_N,
3304  const std::string& options_ = "",
3305  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
3306  {
3307  Lock<Mutex> l(_lock);
3308  _message.reset();
3309  _message.setCommandEnum(Message::Command::SOW);
3310  _message.newCommandId();
3311  // need to keep our own copy of the command ID.
3312  std::string commandId = _message.getCommandId();
3313  _message.setQueryID(_message.getCommandId());
3314  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
3315  _message.setAckTypeEnum(ackTypes);
3316  _message.setTopic(topic_);
3317  if (filter_.length())
3318  {
3319  _message.setFilter(filter_);
3320  }
3321  if (orderBy_.length())
3322  {
3323  _message.setOrderBy(orderBy_);
3324  }
3325  if (bookmark_.length())
3326  {
3327  _message.setBookmark(bookmark_);
3328  }
3329  _message.setBatchSize(AMPS::asString(batchSize_));
3330  if (topN_ != AMPS_DEFAULT_TOP_N)
3331  {
3332  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3333  }
3334  if (options_.length())
3335  {
3336  _message.setOptions(options_);
3337  }
3338 
3339  _routes.addRoute(_message.getQueryID(), messageHandler_,
3340  Message::AckType::None, ackTypes, false);
3341 
3342  try
3343  {
3344  syncAckProcessing(timeout_, _message);
3345  }
3346  catch (...)
3347  {
3348  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
3349  throw;
3350  }
3351 
3352  return commandId;
3353  }
3354 
3355  std::string sow(const MessageHandler& messageHandler_,
3356  const std::string& topic_,
3357  long timeout_,
3358  const std::string& filter_ = "",
3359  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3360  int topN_ = AMPS_DEFAULT_TOP_N)
3361  {
3362  std::string notSet;
3363  return sow(messageHandler_,
3364  topic_,
3365  filter_,
3366  notSet, // orderBy
3367  notSet, // bookmark
3368  batchSize_,
3369  topN_,
3370  notSet,
3371  timeout_);
3372  }
3373 
3374  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3375  const std::string& topic_,
3376  const std::string& filter_ = "",
3377  const std::string& orderBy_ = "",
3378  const std::string& bookmark_ = "",
3379  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3380  int topN_ = AMPS_DEFAULT_TOP_N,
3381  const std::string& options_ = "",
3382  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3383  bool isHASubscribe_ = true)
3384  {
3385  isHASubscribe_ &= (bool)_subscriptionManager;
3386  unsigned ackTypes = Message::AckType::Processed;
3387  Lock<Mutex> l(_lock);
3388  _message.reset();
3389  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
3390  _message.newCommandId();
3391  Field cid = _message.getCommandId();
3392  std::string subId = cid;
3393  _message.setQueryID(cid).setSubscriptionId(cid).setTopic(topic_);
3394  if (filter_.length())
3395  {
3396  _message.setFilter(filter_);
3397  }
3398  if (orderBy_.length())
3399  {
3400  _message.setOrderBy(orderBy_);
3401  }
3402  if (bookmark_.length())
3403  {
3404  _message.setBookmark(bookmark_);
3405  Message::Field bookmark = _message.getBookmark();
3406  if (_bookmarkStore.isValid())
3407  {
3408  ackTypes |= Message::AckType::Persisted;
3409  if (bookmark == AMPS_BOOKMARK_RECENT)
3410  {
3411  _message.setBookmark(_bookmarkStore.getMostRecent(_message.getSubscriptionId()));
3412  }
3413  else if (bookmark != AMPS_BOOKMARK_NOW &&
3414  bookmark != AMPS_BOOKMARK_EPOCH)
3415  {
3416  _bookmarkStore.log(_message);
3417  if (!BookmarkRange::isRange(bookmark))
3418  {
3419  _bookmarkStore.discard(_message);
3420  _bookmarkStore.persisted(_message.getSubscriptionId(),
3421  bookmark);
3422  }
3423  }
3424  }
3425  else if (bookmark == AMPS_BOOKMARK_RECENT)
3426  {
3427  _message.setBookmark(AMPS_BOOKMARK_EPOCH);
3428  }
3429  }
3430  _message.setBatchSize(AMPS::asString(batchSize_));
3431  if (topN_ != AMPS_DEFAULT_TOP_N)
3432  {
3433  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3434  }
3435  if (options_.length())
3436  {
3437  _message.setOptions(options_);
3438  }
3439 
3440  Message message = _message;
3441  if (isHASubscribe_)
3442  {
3443  message = _message.deepCopy();
3444  Unlock<Mutex> u(_lock);
3445  _subscriptionManager->subscribe(messageHandler_, message,
3446  Message::AckType::None);
3447  if (_badTimeToHASubscribe)
3448  {
3449  return subId;
3450  }
3451  }
3452  _routes.addRoute(cid, messageHandler_,
3453  Message::AckType::None, ackTypes, true);
3454  message.setAckTypeEnum(ackTypes);
3455  if (!options_.empty())
3456  {
3457  message.setOptions(options_);
3458  }
3459  try
3460  {
3461  syncAckProcessing(timeout_, message, isHASubscribe_);
3462  }
3463  catch (const DisconnectedException&)
3464  {
3465  if (!isHASubscribe_)
3466  {
3467  _routes.removeRoute(subId);
3468  throw;
3469  }
3470  }
3471  catch (const TimedOutException&)
3472  {
3473  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3474  throw;
3475  }
3476  catch (...)
3477  {
3478  if (isHASubscribe_)
3479  {
3480  // Have to unlock before calling into sub manager to avoid deadlock
3481  Unlock<Mutex> unlock(_lock);
3482  _subscriptionManager->unsubscribe(cid);
3483  }
3484  _routes.removeRoute(subId);
3485  throw;
3486  }
3487  return subId;
3488  }
3489 
3490  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3491  const std::string& topic_,
3492  long timeout_,
3493  const std::string& filter_ = "",
3494  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3495  bool oofEnabled_ = false,
3496  int topN_ = AMPS_DEFAULT_TOP_N,
3497  bool isHASubscribe_ = true)
3498  {
3499  std::string notSet;
3500  return sowAndSubscribe(messageHandler_,
3501  topic_,
3502  filter_,
3503  notSet, // orderBy
3504  notSet, // bookmark
3505  batchSize_,
3506  topN_,
3507  (oofEnabled_ ? "oof" : ""),
3508  timeout_,
3509  isHASubscribe_);
3510  }
3511 
3512  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3513  const std::string& topic_,
3514  const std::string& filter_ = "",
3515  const std::string& orderBy_ = "",
3516  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3517  int topN_ = AMPS_DEFAULT_TOP_N,
3518  const std::string& options_ = "",
3519  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3520  bool isHASubscribe_ = true)
3521  {
3522  isHASubscribe_ &= (bool)_subscriptionManager;
3523  Lock<Mutex> l(_lock);
3524  _message.reset();
3525  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3526  _message.newCommandId();
3527  _message.setQueryID(_message.getCommandId());
3528  _message.setSubscriptionId(_message.getCommandId());
3529  std::string subId = _message.getSubscriptionId();
3530  _message.setTopic(topic_);
3531  if (filter_.length())
3532  {
3533  _message.setFilter(filter_);
3534  }
3535  if (orderBy_.length())
3536  {
3537  _message.setOrderBy(orderBy_);
3538  }
3539  _message.setBatchSize(AMPS::asString(batchSize_));
3540  if (topN_ != AMPS_DEFAULT_TOP_N)
3541  {
3542  _message.setTopNRecordsReturned(AMPS::asString(topN_));
3543  }
3544  if (options_.length())
3545  {
3546  _message.setOptions(options_);
3547  }
3548  Message message = _message;
3549  if (isHASubscribe_)
3550  {
3551  message = _message.deepCopy();
3552  Unlock<Mutex> u(_lock);
3553  _subscriptionManager->subscribe(messageHandler_, message,
3554  Message::AckType::None);
3555  if (_badTimeToHASubscribe)
3556  {
3557  return subId;
3558  }
3559  }
3560  _routes.addRoute(message.getQueryID(), messageHandler_,
3561  Message::AckType::None, Message::AckType::Processed, true);
3562  message.setAckTypeEnum(Message::AckType::Processed);
3563  if (!options_.empty())
3564  {
3565  message.setOptions(options_);
3566  }
3567  try
3568  {
3569  syncAckProcessing(timeout_, message, isHASubscribe_);
3570  }
3571  catch (const DisconnectedException&)
3572  {
3573  if (!isHASubscribe_)
3574  {
3575  _routes.removeRoute(subId);
3576  throw;
3577  }
3578  }
3579  catch (const TimedOutException&)
3580  {
3581  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3582  throw;
3583  }
3584  catch (...)
3585  {
3586  if (isHASubscribe_)
3587  {
3588  // Have to unlock before calling into sub manager to avoid deadlock
3589  Unlock<Mutex> unlock(_lock);
3590  _subscriptionManager->unsubscribe(Field(subId));
3591  }
3592  _routes.removeRoute(subId);
3593  throw;
3594  }
3595  return subId;
3596  }
3597 
3598  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3599  const std::string& topic_,
3600  long timeout_,
3601  const std::string& filter_ = "",
3602  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3603  bool oofEnabled_ = false,
3604  bool sendEmpties_ = false,
3605  int topN_ = AMPS_DEFAULT_TOP_N,
3606  bool isHASubscribe_ = true)
3607  {
3608  std::string notSet;
3609  Message::Options options;
3610  if (oofEnabled_)
3611  {
3612  options.setOOF();
3613  }
3614  if (sendEmpties_ == false)
3615  {
3616  options.setNoEmpties();
3617  }
3618  return sowAndDeltaSubscribe(messageHandler_,
3619  topic_,
3620  filter_,
3621  notSet, // orderBy
3622  batchSize_,
3623  topN_,
3624  options,
3625  timeout_,
3626  isHASubscribe_);
3627  }
3628 
3629  std::string sowDelete(const MessageHandler& messageHandler_,
3630  const std::string& topic_,
3631  const std::string& filter_,
3632  long timeout_,
3633  Message::Field commandId_ = Message::Field())
3634  {
3635  if (_publishStore.isValid())
3636  {
3637  unsigned ackType = Message::AckType::Processed |
3638  Message::AckType::Stats |
3639  Message::AckType::Persisted;
3640  if (!publishStoreMessage)
3641  {
3642  publishStoreMessage = new Message();
3643  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3644  }
3645  publishStoreMessage->reset();
3646  if (commandId_.empty())
3647  {
3648  publishStoreMessage->newCommandId();
3649  commandId_ = publishStoreMessage->getCommandId();
3650  }
3651  else
3652  {
3653  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3654  }
3655  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3656  .assignSubscriptionId(commandId_.data(), commandId_.len())
3657  .assignQueryID(commandId_.data(), commandId_.len())
3658  .setAckTypeEnum(ackType)
3659  .assignTopic(topic_.c_str(), topic_.length())
3660  .assignFilter(filter_.c_str(), filter_.length());
3661  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3662  char buf[AMPS_NUMBER_BUFFER_LEN];
3663  size_t pos = convertToCharArray(buf, haSequenceNumber);
3664  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3665  {
3666  try
3667  {
3668  Lock<Mutex> l(_lock);
3669  _routes.addRoute(commandId_, messageHandler_,
3670  Message::AckType::Stats,
3671  Message::AckType::Processed | Message::AckType::Persisted,
3672  false);
3673  syncAckProcessing(timeout_, *publishStoreMessage,
3674  haSequenceNumber);
3675  }
3676  catch (const DisconnectedException&)
3677  {
3678  // -V565
3679  // Pass - it will get replayed upon reconnect
3680  }
3681  catch (...)
3682  {
3683  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3684  throw;
3685  }
3686  }
3687  return (std::string)commandId_;
3688  }
3689  else
3690  {
3691  Lock<Mutex> l(_lock);
3692  _message.reset();
3693  if (commandId_.empty())
3694  {
3695  _message.newCommandId();
3696  commandId_ = _message.getCommandId();
3697  }
3698  else
3699  {
3700  _message.setCommandId(commandId_.data(), commandId_.len());
3701  }
3702  _message.setCommandEnum(Message::Command::SOWDelete)
3703  .assignSubscriptionId(commandId_.data(), commandId_.len())
3704  .assignQueryID(commandId_.data(), commandId_.len())
3705  .setAckTypeEnum(Message::AckType::Processed |
3706  Message::AckType::Stats)
3707  .assignTopic(topic_.c_str(), topic_.length())
3708  .assignFilter(filter_.c_str(), filter_.length());
3709  _routes.addRoute(commandId_, messageHandler_,
3710  Message::AckType::Stats,
3711  Message::AckType::Processed,
3712  false);
3713  try
3714  {
3715  syncAckProcessing(timeout_, _message);
3716  }
3717  catch (...)
3718  {
3719  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3720  throw;
3721  }
3722  return (std::string)commandId_;
3723  }
3724  }
3725 
3726  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3727  const std::string& topic_,
3728  const std::string& data_,
3729  long timeout_,
3730  Message::Field commandId_ = Message::Field())
3731  {
3732  if (_publishStore.isValid())
3733  {
3734  unsigned ackType = Message::AckType::Processed |
3735  Message::AckType::Stats |
3736  Message::AckType::Persisted;
3737  if (!publishStoreMessage)
3738  {
3739  publishStoreMessage = new Message();
3740  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3741  }
3742  publishStoreMessage->reset();
3743  if (commandId_.empty())
3744  {
3745  publishStoreMessage->newCommandId();
3746  commandId_ = publishStoreMessage->getCommandId();
3747  }
3748  else
3749  {
3750  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3751  }
3752  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3753  .assignSubscriptionId(commandId_.data(), commandId_.len())
3754  .assignQueryID(commandId_.data(), commandId_.len())
3755  .setAckTypeEnum(ackType)
3756  .assignTopic(topic_.c_str(), topic_.length())
3757  .assignData(data_.c_str(), data_.length());
3758  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3759  char buf[AMPS_NUMBER_BUFFER_LEN];
3760  size_t pos = convertToCharArray(buf, haSequenceNumber);
3761  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3762  {
3763  try
3764  {
3765  Lock<Mutex> l(_lock);
3766  _routes.addRoute(commandId_, messageHandler_,
3767  Message::AckType::Stats,
3768  Message::AckType::Processed | Message::AckType::Persisted,
3769  false);
3770  syncAckProcessing(timeout_, *publishStoreMessage,
3771  haSequenceNumber);
3772  }
3773  catch (const DisconnectedException&)
3774  {
3775  // -V565
3776  // Pass - it will get replayed upon reconnect
3777  }
3778  catch (...)
3779  {
3780  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3781  throw;
3782  }
3783  }
3784  return (std::string)commandId_;
3785  }
3786  else
3787  {
3788  Lock<Mutex> l(_lock);
3789  _message.reset();
3790  if (commandId_.empty())
3791  {
3792  _message.newCommandId();
3793  commandId_ = _message.getCommandId();
3794  }
3795  else
3796  {
3797  _message.setCommandId(commandId_.data(), commandId_.len());
3798  }
3799  _message.setCommandEnum(Message::Command::SOWDelete)
3800  .assignSubscriptionId(commandId_.data(), commandId_.len())
3801  .assignQueryID(commandId_.data(), commandId_.len())
3802  .setAckTypeEnum(Message::AckType::Processed |
3803  Message::AckType::Stats)
3804  .assignTopic(topic_.c_str(), topic_.length())
3805  .assignData(data_.c_str(), data_.length());
3806  _routes.addRoute(commandId_, messageHandler_,
3807  Message::AckType::Stats,
3808  Message::AckType::Processed,
3809  false);
3810  try
3811  {
3812  syncAckProcessing(timeout_, _message);
3813  }
3814  catch (...)
3815  {
3816  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3817  throw;
3818  }
3819  return (std::string)commandId_;
3820  }
3821  }
3822 
3823  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
3824  const std::string& topic_,
3825  const std::string& keys_,
3826  long timeout_,
3827  Message::Field commandId_ = Message::Field())
3828  {
3829  if (_publishStore.isValid())
3830  {
3831  unsigned ackType = Message::AckType::Processed |
3832  Message::AckType::Stats |
3833  Message::AckType::Persisted;
3834  if (!publishStoreMessage)
3835  {
3836  publishStoreMessage = new Message();
3837  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3838  }
3839  publishStoreMessage->reset();
3840  if (commandId_.empty())
3841  {
3842  publishStoreMessage->newCommandId();
3843  commandId_ = publishStoreMessage->getCommandId();
3844  }
3845  else
3846  {
3847  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3848  }
3849  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3850  .assignSubscriptionId(commandId_.data(), commandId_.len())
3851  .assignQueryID(commandId_.data(), commandId_.len())
3852  .setAckTypeEnum(ackType)
3853  .assignTopic(topic_.c_str(), topic_.length())
3854  .assignSowKeys(keys_.c_str(), keys_.length());
3855  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3856  char buf[AMPS_NUMBER_BUFFER_LEN];
3857  size_t pos = convertToCharArray(buf, haSequenceNumber);
3858  publishStoreMessage->assignSequence(buf + pos, AMPS_NUMBER_BUFFER_LEN - pos);
3859  {
3860  try
3861  {
3862  Lock<Mutex> l(_lock);
3863  _routes.addRoute(commandId_, messageHandler_,
3864  Message::AckType::Stats,
3865  Message::AckType::Processed | Message::AckType::Persisted,
3866  false);
3867  syncAckProcessing(timeout_, *publishStoreMessage,
3868  haSequenceNumber);
3869  }
3870  catch (const DisconnectedException&)
3871  {
3872  // -V565
3873  // Pass - it will get replayed upon reconnect
3874  }
3875  catch (...)
3876  {
3877  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3878  throw;
3879  }
3880  }
3881  return (std::string)commandId_;
3882  }
3883  else
3884  {
3885  Lock<Mutex> l(_lock);
3886  _message.reset();
3887  if (commandId_.empty())
3888  {
3889  _message.newCommandId();
3890  commandId_ = _message.getCommandId();
3891  }
3892  else
3893  {
3894  _message.setCommandId(commandId_.data(), commandId_.len());
3895  }
3896  _message.setCommandEnum(Message::Command::SOWDelete)
3897  .assignSubscriptionId(commandId_.data(), commandId_.len())
3898  .assignQueryID(commandId_.data(), commandId_.len())
3899  .setAckTypeEnum(Message::AckType::Processed |
3900  Message::AckType::Stats)
3901  .assignTopic(topic_.c_str(), topic_.length())
3902  .assignSowKeys(keys_.c_str(), keys_.length());
3903  _routes.addRoute(commandId_, messageHandler_,
3904  Message::AckType::Stats,
3905  Message::AckType::Processed,
3906  false);
3907  try
3908  {
3909  syncAckProcessing(timeout_, _message);
3910  }
3911  catch (...)
3912  {
3913  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3914  throw;
3915  }
3916  return (std::string)commandId_;
3917  }
3918  }
3919 
3920  void startTimer(void)
3921  {
3922  if (_serverVersion >= "5.3.2.0")
3923  {
3924  throw CommandException("The start_timer command is deprecated.");
3925  }
3926  Lock<Mutex> l(_lock);
3927  _message.reset();
3928  _message.setCommandEnum(Message::Command::StartTimer);
3929 
3930  _send(_message);
3931  }
3932 
3933  std::string stopTimer(MessageHandler messageHandler_)
3934  {
3935  if (_serverVersion >= "5.3.2.0")
3936  {
3937  throw CommandException("The stop_timer command is deprecated.");
3938  }
3939  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
3940  }
3941 
3942  amps_handle getHandle(void)
3943  {
3944  return _client;
3945  }
3946 
3954  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
3955  {
3956  _pExceptionListener = pListener_;
3957  _exceptionListener = _pExceptionListener.get();
3958  }
3959 
3960  void setExceptionListener(const ExceptionListener& listener_)
3961  {
3962  _exceptionListener = &listener_;
3963  }
3964 
3965  const ExceptionListener& getExceptionListener(void) const
3966  {
3967  return *_exceptionListener;
3968  }
3969 
3970  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
3971  {
3972  if (readTimeout_ < heartbeatInterval_)
3973  {
3974  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
3975  }
3976  Lock<Mutex> l(_lock);
3977  if (_heartbeatInterval != heartbeatInterval_ ||
3978  _readTimeout != readTimeout_)
3979  {
3980  _heartbeatInterval = heartbeatInterval_;
3981  _readTimeout = readTimeout_;
3982  _sendHeartbeat();
3983  }
3984  }
3985 
3986  void _sendHeartbeat(void)
3987  {
3988  if (_connected && _heartbeatInterval != 0)
3989  {
3990  std::ostringstream options;
3991  options << "start," << _heartbeatInterval;
3992  Message startMessage = Message()
3993  .setCommandEnum(Message::Command::Heartbeat)
3994  .setOptions(options.str());
3995 
3996  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3997  _heartbeatTimer.start();
3998  try
3999  {
4000  _sendWithoutRetry(startMessage);
4001  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
4002  }
4003  catch (ConnectionException& ex_)
4004  {
4005  // If we are disconnected when we attempt to send, that's OK;
4006  // we'll send this message after we re-connect (if we do).
4007  AMPS_UNHANDLED_EXCEPTION(ex_);
4008  }
4009  }
4010  amps_result result = AMPS_E_OK;
4011  if (_readTimeout && _connected)
4012  {
4013  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
4014  }
4015  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
4016  {
4017  AMPSException::throwFor(_client, result);
4018  }
4019  }
4020 
4021  void addConnectionStateListener(ConnectionStateListener* listener_)
4022  {
4023  Lock<Mutex> lock(_lock);
4024  _connectionStateListeners.insert(listener_);
4025  }
4026 
4027  void removeConnectionStateListener(ConnectionStateListener* listener_)
4028  {
4029  Lock<Mutex> lock(_lock);
4030  _connectionStateListeners.erase(listener_);
4031  }
4032 
4033  void clearConnectionStateListeners()
4034  {
4035  Lock<Mutex> lock(_lock);
4036  _connectionStateListeners.clear();
4037  }
4038 
4039  void _registerHandler(Command& command_, Message::Field& cid_,
4040  MessageHandler& handler_, unsigned requestedAcks_,
4041  unsigned systemAddedAcks_, bool isSubscribe_)
4042  {
4043  Message message = command_.getMessage();
4044  Message::Command::Type commandType = message.getCommandEnum();
4045  Message::Field subid = message.getSubscriptionId();
4046  Message::Field qid = message.getQueryID();
4047  // If we have an id, we're good, even if it's an existing route
4048  bool added = qid.len() || subid.len() || cid_.len();
4049  int addedCount = 0;
4050  if (subid.len() > 0)
4051  {
4052  // This can replace a non-subscribe with a matching id
4053  // with a subscription but not another subscription.
4054  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
4055  systemAddedAcks_, isSubscribe_);
4056  }
4057  if (qid.len() > 0 && qid != subid)
4058  {
4059  while (_routes.hasRoute(qid))
4060  {
4061  message.newQueryId();
4062  if (cid_ == qid)
4063  {
4064  cid_ = message.getQueryId();
4065  }
4066  qid = message.getQueryId();
4067  }
4068  if (addedCount == 0)
4069  {
4070  _routes.addRoute(qid, handler_, requestedAcks_,
4071  systemAddedAcks_, isSubscribe_);
4072  }
4073  else
4074  {
4075  void* data = NULL;
4076  {
4077  Unlock<Mutex> u(_lock);
4078  data = amps_invoke_copy_route_function(handler_.userData());
4079  }
4080  if (!data)
4081  {
4082  _routes.addRoute(qid, handler_, requestedAcks_,
4083  systemAddedAcks_, false);
4084  }
4085  else
4086  {
4087  _routes.addRoute(qid,
4088  MessageHandler(handler_.function(),
4089  data),
4090  requestedAcks_,
4091  systemAddedAcks_, false);
4092  }
4093  }
4094  ++addedCount;
4095  }
4096  if (cid_.len() > 0 && cid_ != qid && cid_ != subid
4097  && requestedAcks_ & ~Message::AckType::Persisted)
4098  {
4099  while (_routes.hasRoute(cid_))
4100  {
4101  cid_ = message.newCommandId().getCommandId();
4102  }
4103  if (addedCount == 0)
4104  {
4105  _routes.addRoute(cid_, handler_, requestedAcks_,
4106  systemAddedAcks_, false);
4107  }
4108  else
4109  {
4110  void* data = NULL;
4111  {
4112  Unlock<Mutex> u(_lock);
4113  data = amps_invoke_copy_route_function(handler_.userData());
4114  }
4115  if (!data)
4116  {
4117  _routes.addRoute(cid_, handler_, requestedAcks_,
4118  systemAddedAcks_, false);
4119  }
4120  else
4121  {
4122  _routes.addRoute(cid_,
4123  MessageHandler(handler_.function(),
4124  data),
4125  requestedAcks_,
4126  systemAddedAcks_, false);
4127  }
4128  }
4129  }
4130  else if (commandType == Message::Command::Publish ||
4131  commandType == Message::Command::DeltaPublish)
4132  {
4133  cid_ = command_.getMessage().newCommandId().getCommandId();
4134  _routes.addRoute(cid_, handler_, requestedAcks_,
4135  systemAddedAcks_, false);
4136  added = true;
4137  }
4138  if (!added)
4139  {
4140  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
4141  }
4142  }
4143 
4144  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
4145  bool isHASubscribe_ = true)
4146  {
4147  isHASubscribe_ &= (bool)_subscriptionManager;
4148  Message& message = command_.getMessage();
4149  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
4150  Message::AckType::Processed : Message::AckType::None;
4151  unsigned requestedAcks = message.getAckTypeEnum();
4152  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
4153  Message::Command::Type commandType = message.getCommandEnum();
4154  if (commandType == Message::Command::SOW
4155  || commandType == Message::Command::SOWAndSubscribe
4156  || commandType == Message::Command::SOWAndDeltaSubscribe
4157  || commandType == Message::Command::StopTimer)
4158  {
4159  systemAddedAcks |= Message::AckType::Completed;
4160  }
4161  Message::Field cid = message.getCommandId();
4162  if (handler_.isValid() && cid.empty())
4163  {
4164  cid = message.newCommandId().getCommandId();
4165  }
4166  if (message.getBookmark().len() > 0)
4167  {
4168  if (command_.isSubscribe())
4169  {
4170  Message::Field bookmark = message.getBookmark();
4171  if (_bookmarkStore.isValid())
4172  {
4173  systemAddedAcks |= Message::AckType::Persisted;
4174  if (bookmark == AMPS_BOOKMARK_RECENT)
4175  {
4176  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
4177  }
4178  else if (bookmark != AMPS_BOOKMARK_NOW &&
4179  bookmark != AMPS_BOOKMARK_EPOCH)
4180  {
4181  _bookmarkStore.log(message);
4182  if (!BookmarkRange::isRange(bookmark))
4183  {
4184  _bookmarkStore.discard(message);
4185  _bookmarkStore.persisted(message.getSubscriptionId(),
4186  bookmark);
4187  }
4188  }
4189  }
4190  else if (bookmark == AMPS_BOOKMARK_RECENT)
4191  {
4193  }
4194  }
4195  }
4196  if (isPublishStore)
4197  {
4198  systemAddedAcks |= Message::AckType::Persisted;
4199  }
4200  bool isSubscribe = command_.isSubscribe();
4201  if (handler_.isValid() && !isSubscribe)
4202  {
4203  _registerHandler(command_, cid, handler_,
4204  requestedAcks, systemAddedAcks, isSubscribe);
4205  }
4206  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
4207  if (isPublishStore)
4208  {
4209  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
4210  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4211  {
4212  Unlock<Mutex> u(_lock);
4213  haSequenceNumber = _publishStore.store(message);
4214  }
4215  message.setSequence(haSequenceNumber);
4216  try
4217  {
4218  if (useSyncSend)
4219  {
4220  syncAckProcessing((long)command_.getTimeout(), message,
4221  haSequenceNumber);
4222  }
4223  else
4224  {
4225  _send(message, haSequenceNumber);
4226  }
4227  }
4228  catch (const DisconnectedException&)
4229  {
4230  // -V565
4231  // Pass - message will get replayed when reconnected
4232  }
4233  catch (...)
4234  {
4235  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4236  throw;
4237  }
4238  }
4239  else
4240  {
4241  if (isSubscribe)
4242  {
4243  const Message::Field& subId = message.getSubscriptionId();
4244  if (isHASubscribe_)
4245  {
4246  Unlock<Mutex> u(_lock);
4247  _subscriptionManager->subscribe(handler_,
4248  message.deepCopy(),
4249  requestedAcks);
4250  if (_badTimeToHASubscribe)
4251  {
4252  message.setAckTypeEnum(requestedAcks);
4253  return std::string(subId.data(), subId.len());
4254  }
4255  }
4256  if (handler_.isValid())
4257  {
4258  _registerHandler(command_, cid, handler_,
4259  requestedAcks, systemAddedAcks, isSubscribe);
4260  }
4261  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4262  try
4263  {
4264  if (useSyncSend)
4265  {
4266  syncAckProcessing((long)command_.getTimeout(), message,
4267  isHASubscribe_);
4268  }
4269  else
4270  {
4271  _send(message);
4272  }
4273  }
4274  catch (const DisconnectedException&)
4275  {
4276  if (!isHASubscribe_)
4277  {
4278  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4279  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
4280  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4281  message.setAckTypeEnum(requestedAcks);
4282  throw;
4283  }
4284  }
4285  catch (const TimedOutException&)
4286  {
4287  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
4288  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
4289  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
4290  throw;
4291  }
4292  catch (...)
4293  {
4294  if (isHASubscribe_)
4295  {
4296  // Have to unlock before calling into sub manager to avoid deadlock
4297  Unlock<Mutex> unlock(_lock);
4298  _subscriptionManager->unsubscribe(subId);
4299  }
4300  if (message.getQueryID().len() > 0)
4301  {
4302  _routes.removeRoute(message.getQueryID());
4303  }
4304  _routes.removeRoute(cid);
4305  _routes.removeRoute(subId);
4306  throw;
4307  }
4308  if (subId.len() > 0)
4309  {
4310  message.setAckTypeEnum(requestedAcks);
4311  return std::string(subId.data(), subId.len());
4312  }
4313  }
4314  else
4315  {
4316  message.setAckTypeEnum(requestedAcks | systemAddedAcks);
4317  try
4318  {
4319  if (useSyncSend)
4320  {
4321  syncAckProcessing((long)(command_.getTimeout()), message);
4322  }
4323  else
4324  {
4325  _send(message);
4326  }
4327  }
4328  catch (const DisconnectedException&)
4329  {
4330  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4331  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4332  message.setAckTypeEnum(requestedAcks);
4333  throw;
4334  }
4335  catch (...)
4336  {
4337  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
4338  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
4339  message.setAckTypeEnum(requestedAcks);
4340  throw;
4341  }
4342  }
4343  }
4344  message.setAckTypeEnum(requestedAcks);
4345  return cid;
4346  }
4347 
4348  MessageStream getEmptyMessageStream(void);
4349 
4350  std::string executeAsync(Command& command_, MessageHandler& handler_,
4351  bool isHASubscribe_ = true)
4352  {
4353  Lock<Mutex> lock(_lock);
4354  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
4355  }
4356 
4357  // Queue Methods //
4358  void setAutoAck(bool isAutoAckEnabled_)
4359  {
4360  _isAutoAckEnabled = isAutoAckEnabled_;
4361  }
4362  bool getAutoAck(void) const
4363  {
4364  return _isAutoAckEnabled;
4365  }
4366  void setAckBatchSize(const unsigned batchSize_)
4367  {
4368  _ackBatchSize = batchSize_;
4369  if (!_queueAckTimeout)
4370  {
4371  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
4372  amps_client_set_idle_time(_client, _queueAckTimeout);
4373  }
4374  }
4375  unsigned getAckBatchSize(void) const
4376  {
4377  return _ackBatchSize;
4378  }
4379  int getAckTimeout(void) const
4380  {
4381  return _queueAckTimeout;
4382  }
4383  void setAckTimeout(const int ackTimeout_)
4384  {
4385  amps_client_set_idle_time(_client, ackTimeout_);
4386  _queueAckTimeout = ackTimeout_;
4387  }
4388  size_t _ack(QueueBookmarks& queueBookmarks_)
4389  {
4390  if (queueBookmarks_._bookmarkCount)
4391  {
4392  if (!publishStoreMessage)
4393  {
4394  publishStoreMessage = new Message();
4395  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4396  }
4397  publishStoreMessage->reset();
4398  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4399  .setTopic(queueBookmarks_._topic)
4400  .setBookmark(queueBookmarks_._data)
4401  .setCommandId("AMPS-queue-ack");
4402  amps_uint64_t haSequenceNumber = 0;
4403  if (_publishStore.isValid())
4404  {
4405  haSequenceNumber = _publishStore.store(*publishStoreMessage);
4406  publishStoreMessage->setAckType("persisted")
4407  .setSequence(haSequenceNumber);
4408  queueBookmarks_._data.erase();
4409  queueBookmarks_._bookmarkCount = 0;
4410  }
4411  _send(*publishStoreMessage, haSequenceNumber);
4412  if (!_publishStore.isValid())
4413  {
4414  queueBookmarks_._data.erase();
4415  queueBookmarks_._bookmarkCount = 0;
4416  }
4417  return 1;
4418  }
4419  return 0;
4420  }
4421  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4422  {
4423  if (_isAutoAckEnabled)
4424  {
4425  return;
4426  }
4427  _ack(topic_, bookmark_, options_);
4428  }
4429  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
4430  {
4431  if (bookmark_.len() == 0)
4432  {
4433  return;
4434  }
4435  Lock<Mutex> lock(_lock);
4436  if (_ackBatchSize < 2 || options_ != NULL)
4437  {
4438  if (!publishStoreMessage)
4439  {
4440  publishStoreMessage = new Message();
4441  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
4442  }
4443  publishStoreMessage->reset();
4444  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
4445  .setCommandId("AMPS-queue-ack")
4446  .setTopic(topic_).setBookmark(bookmark_);
4447  if (options_)
4448  {
4449  publishStoreMessage->setOptions(options_);
4450  }
4451  amps_uint64_t haSequenceNumber = 0;
4452  if (_publishStore.isValid())
4453  {
4454  haSequenceNumber = _publishStore.store(*publishStoreMessage);
4455  publishStoreMessage->setAckType("persisted")
4456  .setSequence(haSequenceNumber);
4457  }
4458  _send(*publishStoreMessage, haSequenceNumber);
4459  return;
4460  }
4461  // have we acked anything for this hash
4462  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(), topic_.len());
4463  TopicHashMap::iterator it = _topicHashMap.find(hash);
4464  if (it == _topicHashMap.end())
4465  {
4466  // add a new one to the map
4467 #ifdef AMPS_USE_EMPLACE
4468  it = _topicHashMap.emplace(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4469 #else
4470  it = _topicHashMap.insert(TopicHashMap::value_type(hash, QueueBookmarks(topic_))).first;
4471 #endif
4472  }
4473  QueueBookmarks& queueBookmarks = it->second;
4474  if (queueBookmarks._data.length())
4475  {
4476  queueBookmarks._data.append(",");
4477  }
4478  else
4479  {
4480  queueBookmarks._oldestTime = amps_now();
4481  }
4482  queueBookmarks._data.append(bookmark_);
4483  if (++queueBookmarks._bookmarkCount >= _ackBatchSize)
4484  {
4485  _ack(queueBookmarks);
4486  }
4487  }
4488  void flushAcks(void)
4489  {
4490  size_t sendCount = 0;
4491  if (!_connected)
4492  {
4493  return;
4494  }
4495  else
4496  {
4497  Lock<Mutex> lock(_lock);
4498  typedef TopicHashMap::iterator iterator;
4499  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4500  {
4501  QueueBookmarks& queueBookmarks = it->second;
4502  sendCount += _ack(queueBookmarks);
4503  }
4504  }
4505  if (sendCount && _connected)
4506  {
4507  publishFlush(0, Message::AckType::Processed);
4508  }
4509  }
4510  // called when there's idle time, to see if we need to flush out any "acks"
4511  void checkQueueAcks(void)
4512  {
4513  if (!_topicHashMap.size())
4514  {
4515  return;
4516  }
4517  Lock<Mutex> lock(_lock);
4518  try
4519  {
4520  amps_uint64_t threshold = amps_now()
4521  - (amps_uint64_t)_queueAckTimeout;
4522  typedef TopicHashMap::iterator iterator;
4523  for (iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it != end; ++it)
4524  {
4525  QueueBookmarks& queueBookmarks = it->second;
4526  if (queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold)
4527  {
4528  _ack(queueBookmarks);
4529  }
4530  }
4531  }
4532  catch (std::exception& ex)
4533  {
4534  AMPS_UNHANDLED_EXCEPTION(ex);
4535  }
4536  }
4537 
4538  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4539  {
4540  Lock<Mutex> lock(_deferredExecutionLock);
4541 #ifdef AMPS_USE_EMPLACE
4542  _deferredExecutionList.emplace_back(
4543  DeferredExecutionRequest(func_, userData_));
4544 #else
4545  _deferredExecutionList.push_back(
4546  DeferredExecutionRequest(func_, userData_));
4547 #endif
4548  }
4549 
4550  inline void processDeferredExecutions(void)
4551  {
4552  if (_deferredExecutionList.size())
4553  {
4554  Lock<Mutex> lock(_deferredExecutionLock);
4555  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4556  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4557  for (; it != end; ++it)
4558  {
4559  try
4560  {
4561  it->_func(it->_userData);
4562  }
4563  catch (...)
4564  {
4565  // -V565
4566  // Intentionally ignore errors
4567  }
4568  }
4569  _deferredExecutionList.clear();
4570  _routes.invalidateCache();
4571  _routeCache.invalidateCache();
4572  }
4573  }
4574 
4575  bool getRetryOnDisconnect(void) const
4576  {
4577  return _isRetryOnDisconnect;
4578  }
4579 
4580  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4581  {
4582  _isRetryOnDisconnect = isRetryOnDisconnect_;
4583  }
4584 
4585  void setDefaultMaxDepth(unsigned maxDepth_)
4586  {
4587  _defaultMaxDepth = maxDepth_;
4588  }
4589 
4590  unsigned getDefaultMaxDepth(void) const
4591  {
4592  return _defaultMaxDepth;
4593  }
4594 
4595  void setTransportFilterFunction(amps_transport_filter_function filter_,
4596  void* userData_)
4597  {
4598  amps_client_set_transport_filter_function(_client, filter_, userData_);
4599  }
4600 
4601  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4602  void* userData_)
4603  {
4604  amps_client_set_thread_created_callback(_client, callback_, userData_);
4605  }
4606  }; // class ClientImpl
4681 
4683  {
4684  RefHandle<MessageStreamImpl> _body;
4685  public:
4690  class iterator
4691  {
4692  MessageStream* _pStream;
4693  Message _current;
4694  inline void advance(void);
4695 
4696  public:
4697  iterator() // end
4698  : _pStream(NULL)
4699  {;}
4700  iterator(MessageStream* pStream_)
4701  : _pStream(pStream_)
4702  {
4703  advance();
4704  }
4705 
4706  bool operator==(const iterator& rhs)
4707  {
4708  return _pStream == rhs._pStream;
4709  }
4710  bool operator!=(const iterator& rhs)
4711  {
4712  return _pStream != rhs._pStream;
4713  }
4714  void operator++(void)
4715  {
4716  advance();
4717  }
4718  Message operator*(void)
4719  {
4720  return _current;
4721  }
4722  Message* operator->(void)
4723  {
4724  return &_current;
4725  }
4726  };
4728  bool isValid() const
4729  {
4730  return _body.isValid();
4731  }
4732 
4736  {
4737  if (!_body.isValid())
4738  {
4739  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4740  }
4741  return iterator(this);
4742  }
4745  // For non-SOW queries, the end is never reached.
4747  {
4748  return iterator();
4749  }
4750  inline MessageStream(void);
4751 
4757  MessageStream timeout(unsigned timeout_);
4758 
4762  MessageStream conflate(void);
4768  MessageStream maxDepth(unsigned maxDepth_);
4771  unsigned getMaxDepth(void) const;
4774  unsigned getDepth(void) const;
4775 
4776  private:
4777  inline MessageStream(const Client& client_);
4778  inline void setSOWOnly(const std::string& commandId_,
4779  const std::string& queryId_ = "");
4780  inline void setSubscription(const std::string& subId_,
4781  const std::string& commandId_ = "",
4782  const std::string& queryId_ = "");
4783  inline void setStatsOnly(const std::string& commandId_,
4784  const std::string& queryId_ = "");
4785  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
4786 
4787  inline operator MessageHandler(void);
4788 
4789  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
4790 
4791  friend class Client;
4792 
4793  };
4794 
4814  class Client // -V553
4815  {
4816  protected:
4817  BorrowRefHandle<ClientImpl> _body;
4818  public:
4819  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4820  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4821  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4822 
4831  Client(const std::string& clientName = "")
4832  : _body(new ClientImpl(clientName), true)
4833  {;}
4834 
4835  Client(ClientImpl* existingClient)
4836  : _body(existingClient, true)
4837  {;}
4838 
4839  Client(ClientImpl* existingClient, bool isRef)
4840  : _body(existingClient, isRef)
4841  {;}
4842 
4843  Client(const Client& rhs) : _body(rhs._body) {;}
4844  virtual ~Client(void) {;}
4845 
4846  Client& operator=(const Client& rhs)
4847  {
4848  _body = rhs._body;
4849  return *this;
4850  }
4851 
4852  bool isValid()
4853  {
4854  return _body.isValid();
4855  }
4856 
4869  void setName(const std::string& name)
4870  {
4871  _body.get().setName(name);
4872  }
4873 
4876  const std::string& getName() const
4877  {
4878  return _body.get().getName();
4879  }
4880 
4884  const std::string& getNameHash() const
4885  {
4886  return _body.get().getNameHash();
4887  }
4888 
4892  const amps_uint64_t getNameHashValue() const
4893  {
4894  return _body.get().getNameHashValue();
4895  }
4896 
4903  void setLogonCorrelationData(const std::string& logonCorrelationData_)
4904  {
4905  _body.get().setLogonCorrelationData(logonCorrelationData_);
4906  }
4907 
4910  const std::string& getLogonCorrelationData() const
4911  {
4912  return _body.get().getLogonCorrelationData();
4913  }
4914 
4923  size_t getServerVersion() const
4924  {
4925  return _body.get().getServerVersion();
4926  }
4927 
4934  VersionInfo getServerVersionInfo() const
4935  {
4936  return _body.get().getServerVersionInfo();
4937  }
4938 
4948  static size_t convertVersionToNumber(const std::string& version_)
4949  {
4950  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4951  }
4952 
4963  static size_t convertVersionToNumber(const char* data_, size_t len_)
4964  {
4965  return AMPS::convertVersionToNumber(data_, len_);
4966  }
4967 
4970  const std::string& getURI() const
4971  {
4972  return _body.get().getURI();
4973  }
4974 
4981 
4983 
4994  void connect(const std::string& uri)
4995  {
4996  _body.get().connect(uri);
4997  }
4998 
5001  void disconnect()
5002  {
5003  _body.get().disconnect();
5004  }
5005 
5019  void send(const Message& message)
5020  {
5021  _body.get().send(message);
5022  }
5023 
5032  void addMessageHandler(const Field& commandId_,
5033  const AMPS::MessageHandler& messageHandler_,
5034  unsigned requestedAcks_, bool isSubscribe_)
5035  {
5036  _body.get().addMessageHandler(commandId_, messageHandler_,
5037  requestedAcks_, isSubscribe_);
5038  }
5039 
5043  bool removeMessageHandler(const Field& commandId_)
5044  {
5045  return _body.get().removeMessageHandler(commandId_);
5046  }
5047 
5071  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
5072  {
5073  return _body.get().send(messageHandler, message, timeout);
5074  }
5075 
5085  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
5086  {
5087  _body.get().setDisconnectHandler(disconnectHandler);
5088  }
5089 
5093  DisconnectHandler getDisconnectHandler(void) const
5094  {
5095  return _body.get().getDisconnectHandler();
5096  }
5097 
5102  virtual ConnectionInfo getConnectionInfo() const
5103  {
5104  return _body.get().getConnectionInfo();
5105  }
5106 
5115  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
5116  {
5117  _body.get().setBookmarkStore(bookmarkStore_);
5118  }
5119 
5124  {
5125  return _body.get().getBookmarkStore();
5126  }
5127 
5132  {
5133  return _body.get().getSubscriptionManager();
5134  }
5135 
5143  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
5144  {
5145  _body.get().setSubscriptionManager(subscriptionManager_);
5146  }
5147 
5167  void setPublishStore(const Store& publishStore_)
5168  {
5169  _body.get().setPublishStore(publishStore_);
5170  }
5171 
5176  {
5177  return _body.get().getPublishStore();
5178  }
5179 
5183  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
5184  {
5185  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
5186  duplicateMessageHandler_);
5187  }
5188 
5199  {
5200  return _body.get().getDuplicateMessageHandler();
5201  }
5202 
5213  {
5214  _body.get().setFailedWriteHandler(handler_);
5215  }
5216 
5221  {
5222  return _body.get().getFailedWriteHandler();
5223  }
5224 
5225 
5243  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
5244  {
5245  return _body.get().publish(topic_.c_str(), topic_.length(),
5246  data_.c_str(), data_.length());
5247  }
5248 
5268  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5269  const char* data_, size_t dataLength_)
5270  {
5271  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
5272  }
5273 
5292  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
5293  unsigned long expiration_)
5294  {
5295  return _body.get().publish(topic_.c_str(), topic_.length(),
5296  data_.c_str(), data_.length(), expiration_);
5297  }
5298 
5319  amps_uint64_t publish(const char* topic_, size_t topicLength_,
5320  const char* data_, size_t dataLength_,
5321  unsigned long expiration_)
5322  {
5323  return _body.get().publish(topic_, topicLength_,
5324  data_, dataLength_, expiration_);
5325  }
5326 
5365  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
5366  {
5367  _body.get().publishFlush(timeout_, ackType_);
5368  }
5369 
5370 
5386  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
5387  {
5388  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5389  data_.c_str(), data_.length());
5390  }
5391 
5409  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5410  const char* data_, size_t dataLength_)
5411  {
5412  return _body.get().deltaPublish(topic_, topicLength_,
5413  data_, dataLength_);
5414  }
5415 
5432  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
5433  unsigned long expiration_)
5434  {
5435  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
5436  data_.c_str(), data_.length(),
5437  expiration_);
5438  }
5439 
5458  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
5459  const char* data_, size_t dataLength_,
5460  unsigned long expiration_)
5461  {
5462  return _body.get().deltaPublish(topic_, topicLength_,
5463  data_, dataLength_, expiration_);
5464  }
5465 
5480  std::string logon(int timeout_ = 0,
5481  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
5482  const char* options_ = NULL)
5483  {
5484  return _body.get().logon(timeout_, authenticator_, options_);
5485  }
5498  std::string logon(const char* options_, int timeout_ = 0)
5499  {
5500  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5501  options_);
5502  }
5503 
5516  std::string logon(const std::string& options_, int timeout_ = 0)
5517  {
5518  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
5519  options_.c_str());
5520  }
5521 
5541  std::string subscribe(const MessageHandler& messageHandler_,
5542  const std::string& topic_,
5543  long timeout_ = 0,
5544  const std::string& filter_ = "",
5545  const std::string& options_ = "",
5546  const std::string& subId_ = "")
5547  {
5548  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5549  filter_, "", options_, subId_);
5550  }
5551 
5567  MessageStream subscribe(const std::string& topic_,
5568  long timeout_ = 0, const std::string& filter_ = "",
5569  const std::string& options_ = "",
5570  const std::string& subId_ = "")
5571  {
5572  MessageStream result(*this);
5573  if (_body.get().getDefaultMaxDepth())
5574  {
5575  result.maxDepth(_body.get().getDefaultMaxDepth());
5576  }
5577  result.setSubscription(_body.get().subscribe(
5578  result.operator MessageHandler(),
5579  topic_, timeout_, filter_, "",
5580  options_, subId_, false));
5581  return result;
5582  }
5583 
5599  MessageStream subscribe(const char* topic_,
5600  long timeout_ = 0, const std::string& filter_ = "",
5601  const std::string& options_ = "",
5602  const std::string& subId_ = "")
5603  {
5604  MessageStream result(*this);
5605  if (_body.get().getDefaultMaxDepth())
5606  {
5607  result.maxDepth(_body.get().getDefaultMaxDepth());
5608  }
5609  result.setSubscription(_body.get().subscribe(
5610  result.operator MessageHandler(),
5611  topic_, timeout_, filter_, "",
5612  options_, subId_, false));
5613  return result;
5614  }
5615 
5628  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5629  const std::string& topic_,
5630  long timeout_,
5631  const std::string& filter_ = "",
5632  const std::string& options_ = "",
5633  const std::string& subId_ = "")
5634  {
5635  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5636  filter_, "", options_, subId_);
5637  }
5646  MessageStream deltaSubscribe(const std::string& topic_,
5647  long timeout_, const std::string& filter_ = "",
5648  const std::string& options_ = "",
5649  const std::string& subId_ = "")
5650  {
5651  MessageStream result(*this);
5652  if (_body.get().getDefaultMaxDepth())
5653  {
5654  result.maxDepth(_body.get().getDefaultMaxDepth());
5655  }
5656  result.setSubscription(_body.get().deltaSubscribe(
5657  result.operator MessageHandler(),
5658  topic_, timeout_, filter_, "",
5659  options_, subId_, false));
5660  return result;
5661  }
5662 
5664  MessageStream deltaSubscribe(const char* topic_,
5665  long timeout_, const std::string& filter_ = "",
5666  const std::string& options_ = "",
5667  const std::string& subId_ = "")
5668  {
5669  MessageStream result(*this);
5670  if (_body.get().getDefaultMaxDepth())
5671  {
5672  result.maxDepth(_body.get().getDefaultMaxDepth());
5673  }
5674  result.setSubscription(_body.get().deltaSubscribe(
5675  result.operator MessageHandler(),
5676  topic_, timeout_, filter_, "",
5677  options_, subId_, false));
5678  return result;
5679  }
5680 
5706  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5707  const std::string& topic_,
5708  long timeout_,
5709  const std::string& bookmark_,
5710  const std::string& filter_ = "",
5711  const std::string& options_ = "",
5712  const std::string& subId_ = "")
5713  {
5714  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5715  filter_, bookmark_, options_, subId_);
5716  }
5734  MessageStream bookmarkSubscribe(const std::string& topic_,
5735  long timeout_,
5736  const std::string& bookmark_,
5737  const std::string& filter_ = "",
5738  const std::string& options_ = "",
5739  const std::string& subId_ = "")
5740  {
5741  MessageStream result(*this);
5742  if (_body.get().getDefaultMaxDepth())
5743  {
5744  result.maxDepth(_body.get().getDefaultMaxDepth());
5745  }
5746  result.setSubscription(_body.get().subscribe(
5747  result.operator MessageHandler(),
5748  topic_, timeout_, filter_,
5749  bookmark_, options_,
5750  subId_, false));
5751  return result;
5752  }
5753 
5755  MessageStream bookmarkSubscribe(const char* topic_,
5756  long timeout_,
5757  const std::string& bookmark_,
5758  const std::string& filter_ = "",
5759  const std::string& options_ = "",
5760  const std::string& subId_ = "")
5761  {
5762  MessageStream result(*this);
5763  if (_body.get().getDefaultMaxDepth())
5764  {
5765  result.maxDepth(_body.get().getDefaultMaxDepth());
5766  }
5767  result.setSubscription(_body.get().subscribe(
5768  result.operator MessageHandler(),
5769  topic_, timeout_, filter_,
5770  bookmark_, options_,
5771  subId_, false));
5772  return result;
5773  }
5774 
5783  void unsubscribe(const std::string& commandId)
5784  {
5785  return _body.get().unsubscribe(commandId);
5786  }
5787 
5796  {
5797  return _body.get().unsubscribe();
5798  }
5799 
5800 
5830  std::string sow(const MessageHandler& messageHandler_,
5831  const std::string& topic_,
5832  const std::string& filter_ = "",
5833  const std::string& orderBy_ = "",
5834  const std::string& bookmark_ = "",
5835  int batchSize_ = DEFAULT_BATCH_SIZE,
5836  int topN_ = DEFAULT_TOP_N,
5837  const std::string& options_ = "",
5838  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5839  {
5840  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5841  bookmark_, batchSize_, topN_, options_,
5842  timeout_);
5843  }
5868  MessageStream sow(const std::string& topic_,
5869  const std::string& filter_ = "",
5870  const std::string& orderBy_ = "",
5871  const std::string& bookmark_ = "",
5872  int batchSize_ = DEFAULT_BATCH_SIZE,
5873  int topN_ = DEFAULT_TOP_N,
5874  const std::string& options_ = "",
5875  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5876  {
5877  MessageStream result(*this);
5878  if (_body.get().getDefaultMaxDepth())
5879  {
5880  result.maxDepth(_body.get().getDefaultMaxDepth());
5881  }
5882  result.setSOWOnly(sow(result.operator MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5883  return result;
5884  }
5885 
5887  MessageStream sow(const char* topic_,
5888  const std::string& filter_ = "",
5889  const std::string& orderBy_ = "",
5890  const std::string& bookmark_ = "",
5891  int batchSize_ = DEFAULT_BATCH_SIZE,
5892  int topN_ = DEFAULT_TOP_N,
5893  const std::string& options_ = "",
5894  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5895  {
5896  MessageStream result(*this);
5897  if (_body.get().getDefaultMaxDepth())
5898  {
5899  result.maxDepth(_body.get().getDefaultMaxDepth());
5900  }
5901  result.setSOWOnly(sow(result.operator MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5902  return result;
5903  }
5926  std::string sow(const MessageHandler& messageHandler_,
5927  const std::string& topic_,
5928  long timeout_,
5929  const std::string& filter_ = "",
5930  int batchSize_ = DEFAULT_BATCH_SIZE,
5931  int topN_ = DEFAULT_TOP_N)
5932  {
5933  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5934  batchSize_, topN_);
5935  }
5958  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
5959  const std::string& topic_,
5960  long timeout_,
5961  const std::string& filter_ = "",
5962  int batchSize_ = DEFAULT_BATCH_SIZE,
5963  bool oofEnabled_ = false,
5964  int topN_ = DEFAULT_TOP_N)
5965  {
5966  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5967  filter_, batchSize_, oofEnabled_,
5968  topN_);
5969  }
5970 
5990  MessageStream sowAndSubscribe(const std::string& topic_,
5991  long timeout_,
5992  const std::string& filter_ = "",
5993  int batchSize_ = DEFAULT_BATCH_SIZE,
5994  bool oofEnabled_ = false,
5995  int topN_ = DEFAULT_TOP_N)
5996  {
5997  MessageStream result(*this);
5998  if (_body.get().getDefaultMaxDepth())
5999  {
6000  result.maxDepth(_body.get().getDefaultMaxDepth());
6001  }
6002  result.setSubscription(_body.get().sowAndSubscribe(
6003  result.operator MessageHandler(),
6004  topic_, timeout_, filter_,
6005  batchSize_, oofEnabled_,
6006  topN_, false));
6007  return result;
6008  }
6028  MessageStream sowAndSubscribe(const char* topic_,
6029  long timeout_,
6030  const std::string& filter_ = "",
6031  int batchSize_ = DEFAULT_BATCH_SIZE,
6032  bool oofEnabled_ = false,
6033  int topN_ = DEFAULT_TOP_N)
6034  {
6035  MessageStream result(*this);
6036  if (_body.get().getDefaultMaxDepth())
6037  {
6038  result.maxDepth(_body.get().getDefaultMaxDepth());
6039  }
6040  result.setSubscription(_body.get().sowAndSubscribe(
6041  result.operator MessageHandler(),
6042  topic_, timeout_, filter_,
6043  batchSize_, oofEnabled_,
6044  topN_, false));
6045  return result;
6046  }
6047 
6048 
6076  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
6077  const std::string& topic_,
6078  const std::string& filter_ = "",
6079  const std::string& orderBy_ = "",
6080  const std::string& bookmark_ = "",
6081  int batchSize_ = DEFAULT_BATCH_SIZE,
6082  int topN_ = DEFAULT_TOP_N,
6083  const std::string& options_ = "",
6084  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6085  {
6086  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
6087  orderBy_, bookmark_, batchSize_,
6088  topN_, options_, timeout_);
6089  }
6090 
6115  MessageStream sowAndSubscribe(const std::string& topic_,
6116  const std::string& filter_ = "",
6117  const std::string& orderBy_ = "",
6118  const std::string& bookmark_ = "",
6119  int batchSize_ = DEFAULT_BATCH_SIZE,
6120  int topN_ = DEFAULT_TOP_N,
6121  const std::string& options_ = "",
6122  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6123  {
6124  MessageStream result(*this);
6125  if (_body.get().getDefaultMaxDepth())
6126  {
6127  result.maxDepth(_body.get().getDefaultMaxDepth());
6128  }
6129  result.setSubscription(_body.get().sowAndSubscribe(
6130  result.operator MessageHandler(),
6131  topic_, filter_, orderBy_,
6132  bookmark_, batchSize_, topN_,
6133  options_, timeout_, false));
6134  return result;
6135  }
6136 
6138  MessageStream sowAndSubscribe(const char* topic_,
6139  const std::string& filter_ = "",
6140  const std::string& orderBy_ = "",
6141  const std::string& bookmark_ = "",
6142  int batchSize_ = DEFAULT_BATCH_SIZE,
6143  int topN_ = DEFAULT_TOP_N,
6144  const std::string& options_ = "",
6145  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6146  {
6147  MessageStream result(*this);
6148  if (_body.get().getDefaultMaxDepth())
6149  {
6150  result.maxDepth(_body.get().getDefaultMaxDepth());
6151  }
6152  result.setSubscription(_body.get().sowAndSubscribe(
6153  result.operator MessageHandler(),
6154  topic_, filter_, orderBy_,
6155  bookmark_, batchSize_, topN_,
6156  options_, timeout_, false));
6157  return result;
6158  }
6159 
6184  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6185  const std::string& topic_,
6186  const std::string& filter_ = "",
6187  const std::string& orderBy_ = "",
6188  int batchSize_ = DEFAULT_BATCH_SIZE,
6189  int topN_ = DEFAULT_TOP_N,
6190  const std::string& options_ = "",
6191  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6192  {
6193  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6194  filter_, orderBy_, batchSize_,
6195  topN_, options_, timeout_);
6196  }
6217  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6218  const std::string& filter_ = "",
6219  const std::string& orderBy_ = "",
6220  int batchSize_ = DEFAULT_BATCH_SIZE,
6221  int topN_ = DEFAULT_TOP_N,
6222  const std::string& options_ = "",
6223  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6224  {
6225  MessageStream result(*this);
6226  if (_body.get().getDefaultMaxDepth())
6227  {
6228  result.maxDepth(_body.get().getDefaultMaxDepth());
6229  }
6230  result.setSubscription(sowAndDeltaSubscribe(result.operator MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
6231  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6232  result.operator MessageHandler(),
6233  topic_, filter_, orderBy_,
6234  batchSize_, topN_, options_,
6235  timeout_, false));
6236  return result;
6237  }
6238 
6241  const std::string& filter_ = "",
6242  const std::string& orderBy_ = "",
6243  int batchSize_ = DEFAULT_BATCH_SIZE,
6244  int topN_ = DEFAULT_TOP_N,
6245  const std::string& options_ = "",
6246  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
6247  {
6248  MessageStream result(*this);
6249  if (_body.get().getDefaultMaxDepth())
6250  {
6251  result.maxDepth(_body.get().getDefaultMaxDepth());
6252  }
6253  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6254  result.operator MessageHandler(),
6255  topic_, filter_, orderBy_,
6256  batchSize_, topN_, options_,
6257  timeout_, false));
6258  return result;
6259  }
6260 
6285  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
6286  const std::string& topic_,
6287  long timeout_,
6288  const std::string& filter_ = "",
6289  int batchSize_ = DEFAULT_BATCH_SIZE,
6290  bool oofEnabled_ = false,
6291  bool sendEmpties_ = false,
6292  int topN_ = DEFAULT_TOP_N)
6293  {
6294  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
6295  timeout_, filter_, batchSize_,
6296  oofEnabled_, sendEmpties_,
6297  topN_);
6298  }
6299 
6321  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
6322  long timeout_,
6323  const std::string& filter_ = "",
6324  int batchSize_ = DEFAULT_BATCH_SIZE,
6325  bool oofEnabled_ = false,
6326  bool sendEmpties_ = false,
6327  int topN_ = DEFAULT_TOP_N)
6328  {
6329  MessageStream result(*this);
6330  if (_body.get().getDefaultMaxDepth())
6331  {
6332  result.maxDepth(_body.get().getDefaultMaxDepth());
6333  }
6334  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6335  result.operator MessageHandler(),
6336  topic_, timeout_, filter_,
6337  batchSize_, oofEnabled_,
6338  sendEmpties_, topN_, false));
6339  return result;
6340  }
6363  long timeout_,
6364  const std::string& filter_ = "",
6365  int batchSize_ = DEFAULT_BATCH_SIZE,
6366  bool oofEnabled_ = false,
6367  bool sendEmpties_ = false,
6368  int topN_ = DEFAULT_TOP_N)
6369  {
6370  MessageStream result(*this);
6371  if (_body.get().getDefaultMaxDepth())
6372  {
6373  result.maxDepth(_body.get().getDefaultMaxDepth());
6374  }
6375  result.setSubscription(_body.get().sowAndDeltaSubscribe(
6376  result.operator MessageHandler(),
6377  topic_, timeout_, filter_,
6378  batchSize_, oofEnabled_,
6379  sendEmpties_, topN_, false));
6380  return result;
6381  }
6401  std::string sowDelete(const MessageHandler& messageHandler,
6402  const std::string& topic,
6403  const std::string& filter,
6404  long timeout)
6405  {
6406  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
6407  }
6424  Message sowDelete(const std::string& topic, const std::string& filter,
6425  long timeout = 0)
6426  {
6427  MessageStream stream(*this);
6428  char buf[Message::IdentifierLength + 1];
6429  buf[Message::IdentifierLength] = 0;
6430  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6431  Field cid(buf);
6432  try
6433  {
6434  stream.setStatsOnly(cid);
6435  _body.get().sowDelete(stream.operator MessageHandler(), topic, filter, timeout, cid);
6436  return *(stream.begin());
6437  }
6438  catch (const DisconnectedException&)
6439  {
6440  removeMessageHandler(cid);
6441  throw;
6442  }
6443  }
6444 
6449  void startTimer()
6450  {
6451  _body.get().startTimer();
6452  }
6453 
6460  std::string stopTimer(const MessageHandler& messageHandler)
6461  {
6462  return _body.get().stopTimer(messageHandler);
6463  }
6464 
6486  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
6487  const std::string& topic_,
6488  const std::string& keys_,
6489  long timeout_ = 0)
6490  {
6491  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
6492  }
6513  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
6514  long timeout_ = 0)
6515  {
6516  MessageStream stream(*this);
6517  char buf[Message::IdentifierLength + 1];
6518  buf[Message::IdentifierLength] = 0;
6519  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6520  Field cid(buf);
6521  try
6522  {
6523  stream.setStatsOnly(cid);
6524  _body.get().sowDeleteByKeys(stream.operator MessageHandler(), topic_, keys_, timeout_, cid);
6525  return *(stream.begin());
6526  }
6527  catch (const DisconnectedException&)
6528  {
6529  removeMessageHandler(cid);
6530  throw;
6531  }
6532  }
6533 
6548  std::string sowDeleteByData(const MessageHandler& messageHandler_,
6549  const std::string& topic_, const std::string& data_,
6550  long timeout_ = 0)
6551  {
6552  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
6553  }
6554 
6569  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
6570  long timeout_ = 0)
6571  {
6572  MessageStream stream(*this);
6573  char buf[Message::IdentifierLength + 1];
6574  buf[Message::IdentifierLength] = 0;
6575  AMPS_snprintf(buf, Message::IdentifierLength + 1, "%lx", MessageImpl::newId());
6576  Field cid(buf);
6577  try
6578  {
6579  stream.setStatsOnly(cid);
6580  _body.get().sowDeleteByData(stream.operator MessageHandler(), topic_, data_, timeout_, cid);
6581  return *(stream.begin());
6582  }
6583  catch (const DisconnectedException&)
6584  {
6585  removeMessageHandler(cid);
6586  throw;
6587  }
6588  }
6589 
6594  {
6595  return _body.get().getHandle();
6596  }
6597 
6606  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6607  {
6608  _body.get().setExceptionListener(pListener_);
6609  }
6610 
6620  {
6621  _body.get().setExceptionListener(listener_);
6622  }
6623 
6627  {
6628  return _body.get().getExceptionListener();
6629  }
6630 
6638  // type of message) from the server for the specified interval (plus a grace period),
6652  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6653  {
6654  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6655  }
6656 
6664  // type of message) from the server for the specified interval (plus a grace period),
6676  void setHeartbeat(unsigned heartbeatTime_)
6677  {
6678  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6679  }
6680 
6683  {
6684  setLastChanceMessageHandler(messageHandler);
6685  }
6686 
6690  {
6691  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6692  messageHandler);
6693  }
6694 
6715  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6716  {
6717  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6718  }
6719 
6740  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
6741  {
6742  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6743  }
6744 
6750  static const char* BOOKMARK_NOW()
6751  {
6752  return AMPS_BOOKMARK_NOW;
6753  }
6759  static const char* NOW()
6760  {
6761  return AMPS_BOOKMARK_NOW;
6762  }
6763 
6769  static const char* BOOKMARK_EPOCH()
6770  {
6771  return AMPS_BOOKMARK_EPOCH;
6772  }
6773 
6779  static const char* EPOCH()
6780  {
6781  return AMPS_BOOKMARK_EPOCH;
6782  }
6783 
6790  static const char* BOOKMARK_MOST_RECENT()
6791  {
6792  return AMPS_BOOKMARK_RECENT;
6793  }
6794 
6801  static const char* MOST_RECENT()
6802  {
6803  return AMPS_BOOKMARK_RECENT;
6804  }
6805 
6812  static const char* BOOKMARK_RECENT()
6813  {
6814  return AMPS_BOOKMARK_RECENT;
6815  }
6816 
6817 
6824  {
6825  _body.get().addConnectionStateListener(listener);
6826  }
6827 
6832  {
6833  _body.get().removeConnectionStateListener(listener);
6834  }
6835 
6839  {
6840  _body.get().clearConnectionStateListeners();
6841  }
6842 
6868  std::string executeAsync(Command& command_, MessageHandler handler_)
6869  {
6870  return _body.get().executeAsync(command_, handler_);
6871  }
6872 
6902  std::string executeAsyncNoResubscribe(Command& command_,
6903  MessageHandler handler_)
6904  {
6905  std::string id;
6906  try
6907  {
6908  if (command_.isSubscribe())
6909  {
6910  Message& message = command_.getMessage();
6911  Field subId = message.getSubscriptionId();
6912  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace", 7);
6913  if (useExistingHandler)
6914  {
6915  MessageHandler existingHandler;
6916  if (_body.get()._routes.getRoute(subId, existingHandler))
6917  {
6918  // we found an existing handler.
6919  _body.get().executeAsync(command_, existingHandler, false);
6920  return id; // empty string indicates existing
6921  }
6922  }
6923  }
6924  id = _body.get().executeAsync(command_, handler_, false);
6925  }
6926  catch (const DisconnectedException&)
6927  {
6928  removeMessageHandler(command_.getMessage().getCommandId());
6929  if (command_.isSubscribe())
6930  {
6931  removeMessageHandler(command_.getMessage().getSubscriptionId());
6932  }
6933  if (command_.isSow())
6934  {
6935  removeMessageHandler(command_.getMessage().getQueryID());
6936  }
6937  throw;
6938  }
6939  return id;
6940  }
6941 
6954  MessageStream execute(Command& command_);
6955 
6964  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6965  {
6966  _body.get().ack(topic_, bookmark_, options_);
6967  }
6968 
6976  void ack(Message& message_, const char* options_ = NULL)
6977  {
6978  _body.get().ack(message_.getTopic(), message_.getBookmark(), options_);
6979  }
6988  void ack(const std::string& topic_, const std::string& bookmark_,
6989  const char* options_ = NULL)
6990  {
6991  _body.get().ack(Field(topic_.data(), topic_.length()), Field(bookmark_.data(), bookmark_.length()), options_);
6992  }
6993 
6999  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
7000  {
7001  _body.get()._ack(topic_, bookmark_, options_);
7002  }
7012  void flushAcks(void)
7013  {
7014  _body.get().flushAcks();
7015  }
7016 
7021  bool getAutoAck(void) const
7022  {
7023  return _body.get().getAutoAck();
7024  }
7031  void setAutoAck(bool isAutoAckEnabled_)
7032  {
7033  _body.get().setAutoAck(isAutoAckEnabled_);
7034  }
7039  unsigned getAckBatchSize(void) const
7040  {
7041  return _body.get().getAckBatchSize();
7042  }
7049  void setAckBatchSize(const unsigned ackBatchSize_)
7050  {
7051  _body.get().setAckBatchSize(ackBatchSize_);
7052  }
7053 
7060  int getAckTimeout(void) const
7061  {
7062  return _body.get().getAckTimeout();
7063  }
7070  void setAckTimeout(const int ackTimeout_)
7071  {
7072  _body.get().setAckTimeout(ackTimeout_);
7073  }
7074 
7075 
7084  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
7085  {
7086  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
7087  }
7088 
7093  bool getRetryOnDisconnect(void) const
7094  {
7095  return _body.get().getRetryOnDisconnect();
7096  }
7097 
7102  void setDefaultMaxDepth(unsigned maxDepth_)
7103  {
7104  _body.get().setDefaultMaxDepth(maxDepth_);
7105  }
7106 
7111  unsigned getDefaultMaxDepth(void) const
7112  {
7113  return _body.get().getDefaultMaxDepth();
7114  }
7115 
7123  void* userData_)
7124  {
7125  return _body.get().setTransportFilterFunction(filter_, userData_);
7126  }
7127 
7137  void* userData_)
7138  {
7139  return _body.get().setThreadCreatedCallback(callback_, userData_);
7140  }
7141 
7147  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
7148  {
7149  _body.get().deferredExecution(func_, userData_);
7150  }
7154  };
7155 
7156  inline void
7157  ClientImpl::lastChance(AMPS::Message& message)
7158  {
7159  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
7160  }
7161 
7162  inline unsigned
7163  ClientImpl::persistedAck(AMPS::Message& message)
7164  {
7165  unsigned deliveries = 0;
7166  try
7167  {
7168  /*
7169  * Best Practice: If you don't care about the dupe acks that
7170  * occur during failover or rapid disconnect/reconnect, then just
7171  * ignore them. We could discard each duplicate from the
7172  * persisted store, but the storage costs of doing 1 record
7173  * discards is heavy. In most scenarios we'll just quickly blow
7174  * through the duplicates and get back to processing the
7175  * non-dupes.
7176  */
7177  const char* data = NULL;
7178  size_t len = 0;
7179  const char* status = NULL;
7180  size_t statusLen = 0;
7181  amps_handle messageHandle = message.getMessage();
7182  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
7183  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7184  amps_message_get_field_value(messageHandle, AMPS_Status, &status, &statusLen);
7185  if (len == NotEntitled || len == Duplicate ||
7186  (statusLen == Failure && status[0] == 'f'))
7187  {
7188  if (_failedWriteHandler)
7189  {
7190  if (_publishStore.isValid())
7191  {
7192  amps_uint64_t sequence =
7193  amps_message_get_field_uint64(messageHandle, AMPS_Sequence);
7194  FailedWriteStoreReplayer replayer(this, data, len);
7195  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
7196  replayer, sequence));
7197  }
7198  else // Call the handler with what little we have
7199  {
7200  static Message emptyMessage;
7201  emptyMessage.setSequence(message.getSequence());
7202  AMPS_CALL_EXCEPTION_WRAPPER(
7203  _failedWriteHandler->failedWrite(emptyMessage,
7204  data, len));
7205  }
7206  ++deliveries;
7207  }
7208  }
7209  if (_publishStore.isValid())
7210  {
7211  // Ack for publisher will have sequence while
7212  // ack for bookmark subscribe won't
7213  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
7214  AMPS_Sequence);
7215  if (seq > 0)
7216  {
7217  ++deliveries;
7218  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.discardUpTo(seq));
7219  }
7220  }
7221 
7222  if (!deliveries && _bookmarkStore.isValid())
7223  {
7224  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
7225  &data, &len);
7226  if (len > 0)
7227  {
7228  Message::Field subId(data, len);
7229  const char* bookmarkData = NULL;
7230  size_t bookmarkLen = 0;
7231  amps_message_get_field_value(messageHandle,
7232  AMPS_Bookmark,
7233  &bookmarkData,
7234  &bookmarkLen);
7235  // Everything is there and not unsubscribed AC-912
7236  if (bookmarkLen > 0 && _routes.hasRoute(subId))
7237  {
7238  ++deliveries;
7239  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
7240  }
7241  }
7242  }
7243  }
7244  catch (std::exception& ex)
7245  {
7246  AMPS_UNHANDLED_EXCEPTION(ex);
7247  }
7248  return deliveries;
7249  }
7250 
7251  inline unsigned
7252  ClientImpl::processedAck(Message& message)
7253  {
7254  unsigned deliveries = 0;
7255  AckResponse ack;
7256  const char* data = NULL;
7257  size_t len = 0;
7258  amps_handle messageHandle = message.getMessage();
7259  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
7260  Lock<Mutex> l(_lock);
7261  if (data && len)
7262  {
7263  Lock<Mutex> guard(_ackMapLock);
7264  AckMap::iterator i = _ackMap.find(std::string(data, len));
7265  if (i != _ackMap.end())
7266  {
7267  ++deliveries;
7268  ack = i->second;
7269  _ackMap.erase(i);
7270  }
7271  }
7272  if (deliveries)
7273  {
7274  amps_message_get_field_value(messageHandle, AMPS_Status, &data, &len);
7275  ack.setStatus(data, len);
7276  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
7277  ack.setReason(data, len);
7278  amps_message_get_field_value(messageHandle, AMPS_UserId, &data, &len);
7279  ack.setUsername(data, len);
7280  amps_message_get_field_value(messageHandle, AMPS_Password, &data, &len);
7281  ack.setPassword(data, len);
7282  amps_message_get_field_value(messageHandle, AMPS_Version, &data, &len);
7283  ack.setServerVersion(data, len);
7284  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
7285  ack.setOptions(data, len);
7286  // This sets bookmark, nameHashValue, and sequenceNo
7287  ack.setBookmark(message.getBookmark());
7288  ack.setResponded(true);
7289  _lock.signalAll();
7290  }
7291  return deliveries;
7292  }
7293 
7294  inline void
7295  ClientImpl::checkAndSendHeartbeat(bool force)
7296  {
7297  if (force || _heartbeatTimer.check())
7298  {
7299  _heartbeatTimer.start();
7300  try
7301  {
7302  sendWithoutRetry(_beatMessage);
7303  }
7304  catch (const AMPSException&)
7305  {
7306  ;
7307  }
7308  }
7309  }
7310 
7311  inline ConnectionInfo ClientImpl::getConnectionInfo() const
7312  {
7313  ConnectionInfo info;
7314  std::ostringstream writer;
7315 
7316  info["client.uri"] = _lastUri;
7317  info["client.name"] = _name;
7318  info["client.username"] = _username;
7319  if (_publishStore.isValid())
7320  {
7321  writer << _publishStore.unpersistedCount();
7322  info["publishStore.unpersistedCount"] = writer.str();
7323  writer.clear();
7324  writer.str("");
7325  }
7326 
7327  return info;
7328  }
7329 
7330  inline amps_result
7331  ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
7332  {
7333  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
7334  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
7335  ClientImpl* me = (ClientImpl*) userData_;
7336  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->processDeferredExecutions());
7337  if (!messageHandle_)
7338  {
7339  if (me->_queueAckTimeout)
7340  {
7341  me->checkQueueAcks();
7342  }
7343  return AMPS_E_OK;
7344  }
7345 
7346  me->_readMessage.replace(messageHandle_);
7347  Message& message = me->_readMessage;
7348  Message::Command::Type commandType = message.getCommandEnum();
7349  if (commandType & SOWMask)
7350  {
7351 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7352  // A small cheat here to get the right handler, using knowledge of the
7353  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
7354  // and their GlobalCommandTypeHandlers values 1, 2, 3.
7355  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7356  me->_globalCommandTypeHandlers[1 + (commandType / 8192)].invoke(message));
7357 #endif
7358  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_routes.deliverData(message,
7359  message.getQueryID()));
7360  }
7361  else if (commandType & PublishMask)
7362  {
7363 #if 0 // Not currently implemented, to avoid an extra branch in delivery
7364  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7365  me->_globalCommandTypeHandlers[(commandType == Message::Command::Publish ?
7366  GlobalCommandTypeHandlers::Publish :
7367  GlobalCommandTypeHandlers::OOF)].invoke(message));
7368 #endif
7369  const char* subIds = NULL;
7370  size_t subIdsLen = 0;
7371  // Publish command, send to subscriptions
7372  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds,
7373  &subIds, &subIdsLen);
7374  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
7375  for (size_t i = 0; i < subIdCount; ++i)
7376  {
7377  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
7378  MessageHandler& handler = lookupResult.handler;
7379  if (handler.isValid())
7380  {
7381  amps_message_set_field_value(messageHandle_,
7382  AMPS_SubscriptionId,
7383  subIds + lookupResult.idOffset,
7384  lookupResult.idLength);
7385  Message::Field bookmark = message.getBookmark();
7386  bool isMessageQueue = message.getLeasePeriod().len() != 0;
7387  bool isAutoAck = me->_isAutoAckEnabled;
7388 
7389  if (!isMessageQueue && !bookmark.empty() &&
7390  me->_bookmarkStore.isValid())
7391  {
7392  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
7393  {
7394  //Call duplicate message handler in handlers map
7395  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
7396  {
7397  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
7398  }
7399  }
7400  else
7401  {
7402  me->_bookmarkStore.log(me->_readMessage);
7403  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7404  handler.invoke(message));
7405  }
7406  }
7407  else
7408  {
7409  if (isMessageQueue && isAutoAck)
7410  {
7411  try
7412  {
7413  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
7414  if (!message.getIgnoreAutoAck())
7415  {
7416  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7417  me->_ack(message.getTopic(), message.getBookmark()));
7418  }
7419  }
7420  catch (std::exception& ex)
7421  {
7422  if (!message.getIgnoreAutoAck())
7423  {
7424  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7425  me->_ack(message.getTopic(), message.getBookmark(), "cancel"));
7426  }
7427  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7428  }
7429  }
7430  else
7431  {
7432  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7433  handler.invoke(message));
7434  }
7435  }
7436  }
7437  else
7438  {
7439  me->lastChance(message);
7440  }
7441  } // for (subidsEnd)
7442  }
7443  else if (commandType == Message::Command::Ack)
7444  {
7445  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7446  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
7447  unsigned ackType = message.getAckTypeEnum();
7448  unsigned deliveries = 0U;
7449  switch (ackType)
7450  {
7451  case Message::AckType::Persisted:
7452  deliveries += me->persistedAck(message);
7453  break;
7454  case Message::AckType::Processed: // processed
7455  deliveries += me->processedAck(message);
7456  break;
7457  }
7458  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
7459  if (deliveries == 0)
7460  {
7461  me->lastChance(message);
7462  }
7463  }
7464  else if (commandType == Message::Command::Heartbeat)
7465  {
7466  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
7467  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
7468  if (me->_heartbeatTimer.getTimeout() != 0.0) // -V550
7469  {
7470  me->checkAndSendHeartbeat(true);
7471  }
7472  else
7473  {
7474  me->lastChance(message);
7475  }
7476  return AMPS_E_OK;
7477  }
7478  else if (!message.getCommandId().empty())
7479  {
7480  unsigned deliveries = 0U;
7481  try
7482  {
7483  while (me->_connected) // Keep sending heartbeats when stream is full
7484  {
7485  try
7486  {
7487  deliveries = me->_routes.deliverData(message, message.getCommandId());
7488  break;
7489  }
7490 #ifdef _WIN32
7491  catch (MessageStreamFullException&)
7492 #else
7493  catch (MessageStreamFullException& ex_)
7494 #endif
7495  {
7496  me->checkAndSendHeartbeat(false);
7497  }
7498  }
7499  }
7500  catch (std::exception& ex_)
7501  {
7502  try
7503  {
7504  me->_exceptionListener->exceptionThrown(ex_);
7505  }
7506  catch (...)
7507  {
7508  ;
7509  }
7510  }
7511  if (deliveries == 0)
7512  {
7513  me->lastChance(message);
7514  }
7515  }
7516  me->checkAndSendHeartbeat();
7517  return AMPS_E_OK;
7518  }
7519 
7520  inline void
7521  ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
7522  {
7523  ClientImpl* me = (ClientImpl*) userData;
7524  //Client wrapper(me);
7525  // Go ahead and signal any waiters if they are around...
7526  me->clearAcks(failedConnectionVersion);
7527  }
7528 
7529  inline amps_result
7530  ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
7531  {
7532  ClientImpl* me = (ClientImpl*) userData;
7533  Lock<Mutex> l(me->_lock);
7534  Client wrapper(me, false);
7535  if (me->_connected)
7536  {
7537  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
7538  }
7539  while (true)
7540  {
7541  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
7542  try
7543  {
7544  AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
7545  me->_connected = false;
7546  {
7547  // Have to release the lock here or receive thread can't
7548  // invoke the message handler.
7549  Unlock<Mutex> unlock(me->_lock);
7550  me->_disconnectHandler.invoke(wrapper);
7551  }
7552  }
7553  catch (const std::exception& ex)
7554  {
7555  AMPS_UNHANDLED_EXCEPTION_2(me, ex);
7556  }
7557  me->_lock.signalAll();
7558 
7559  if (!me->_connected)
7560  {
7561  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
7562  AMPS_UNHANDLED_EXCEPTION_2(me, DisconnectedException("Reconnect failed."));
7563  return AMPS_E_DISCONNECTED;
7564  }
7565  try
7566  {
7567  // Resubscribe
7568  if (me->_subscriptionManager)
7569  {
7570  {
7571  // Have to release the lock here or receive thread can't
7572  // invoke the message handler.
7573  Unlock<Mutex> unlock(me->_lock);
7574  me->_subscriptionManager->resubscribe(wrapper);
7575  }
7576  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
7577  }
7578  return AMPS_E_OK;
7579  }
7580  catch (const AMPSException& subEx)
7581  {
7582  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7583  }
7584  catch (const std::exception& subEx)
7585  {
7586  AMPS_UNHANDLED_EXCEPTION_2(me, subEx);
7587  return AMPS_E_RETRY;
7588  }
7589  catch (...)
7590  {
7591  return AMPS_E_RETRY;
7592  }
7593  }
7594  return AMPS_E_RETRY;
7595  }
7596 
7597  class FIX
7598  {
7599  const char* _data;
7600  size_t _len;
7601  char _fieldSep;
7602  public:
7603  class iterator
7604  {
7605  const char* _data;
7606  size_t _len;
7607  size_t _pos;
7608  char _fieldSep;
7609  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
7610  : _data(data_), _len(len_), _pos(pos_), _fieldSep(fieldSep_)
7611  {
7612  while (_pos != _len && _data[_pos] == _fieldSep)
7613  {
7614  ++_pos;
7615  }
7616  }
7617  public:
7618  typedef void* difference_type;
7619  typedef std::forward_iterator_tag iterator_category;
7620  typedef std::pair<Message::Field, Message::Field> value_type;
7621  typedef value_type* pointer;
7622  typedef value_type& reference;
7623  bool operator==(const iterator& rhs) const
7624  {
7625  return _pos == rhs._pos;
7626  }
7627  bool operator!=(const iterator& rhs) const
7628  {
7629  return _pos != rhs._pos;
7630  }
7631  iterator& operator++()
7632  {
7633  // Skip through the data
7634  while (_pos != _len && _data[_pos] != _fieldSep)
7635  {
7636  ++_pos;
7637  }
7638  // Skip through any field separators
7639  while (_pos != _len && _data[_pos] == _fieldSep)
7640  {
7641  ++_pos;
7642  }
7643  return *this;
7644  }
7645 
7646  value_type operator*() const
7647  {
7648  value_type result;
7649  size_t i = _pos, keyLength = 0, valueStart = 0, valueLength = 0;
7650  for (; i < _len && _data[i] != '='; ++i)
7651  {
7652  ++keyLength;
7653  }
7654 
7655  result.first.assign(_data + _pos, keyLength);
7656 
7657  if (i < _len && _data[i] == '=')
7658  {
7659  ++i;
7660  valueStart = i;
7661  for (; i < _len && _data[i] != _fieldSep; ++i)
7662  {
7663  valueLength++;
7664  }
7665  }
7666  result.second.assign(_data + valueStart, valueLength);
7667  return result;
7668  }
7669 
7670  friend class FIX;
7671  };
7672  class reverse_iterator
7673  {
7674  const char* _data;
7675  size_t _len;
7676  const char* _pos;
7677  char _fieldSep;
7678  public:
7679  typedef std::pair<Message::Field, Message::Field> value_type;
7680  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
7681  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7682  {
7683  if (_pos)
7684  {
7685  // skip past meaningless trailing fieldseps
7686  while (_pos >= _data && *_pos == _fieldSep)
7687  {
7688  --_pos;
7689  }
7690  while (_pos > _data && *_pos != _fieldSep)
7691  {
7692  --_pos;
7693  }
7694  // if we stopped before the 0th character, it's because
7695  // it's a field sep. advance one to point to the first character
7696  // of a key.
7697  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7698  {
7699  ++_pos;
7700  }
7701  if (_pos < _data)
7702  {
7703  _pos = 0;
7704  }
7705  }
7706  }
7707  bool operator==(const reverse_iterator& rhs) const
7708  {
7709  return _pos == rhs._pos;
7710  }
7711  bool operator!=(const reverse_iterator& rhs) const
7712  {
7713  return _pos != rhs._pos;
7714  }
7715  reverse_iterator& operator++()
7716  {
7717  if (_pos == _data)
7718  {
7719  _pos = 0;
7720  }
7721  else
7722  {
7723  // back up 1 to a field separator
7724  --_pos;
7725  // keep backing up through field separators
7726  while (_pos >= _data && *_pos == _fieldSep)
7727  {
7728  --_pos;
7729  }
7730  // now back up to the beginning of this field
7731  while (_pos > _data && *_pos != _fieldSep)
7732  {
7733  --_pos;
7734  }
7735  if (_pos > _data || (_pos == _data && *_pos == _fieldSep))
7736  {
7737  ++_pos;
7738  }
7739  if (_pos < _data)
7740  {
7741  _pos = 0;
7742  }
7743  }
7744  return *this;
7745  }
7746  value_type operator*() const
7747  {
7748  value_type result;
7749  size_t keyLength = 0, valueStart = 0, valueLength = 0;
7750  size_t i = (size_t)(_pos - _data);
7751  for (; i < _len && _data[i] != '='; ++i)
7752  {
7753  ++keyLength;
7754  }
7755  result.first.assign(_pos, keyLength);
7756  if (i < _len && _data[i] == '=')
7757  {
7758  ++i;
7759  valueStart = i;
7760  for (; i < _len && _data[i] != _fieldSep; ++i)
7761  {
7762  valueLength++;
7763  }
7764  }
7765  result.second.assign(_data + valueStart, valueLength);
7766  return result;
7767  }
7768  };
7769  FIX(const Message::Field& data, char fieldSeparator = 1)
7770  : _data(data.data()), _len(data.len()),
7771  _fieldSep(fieldSeparator)
7772  {
7773  }
7774 
7775  FIX(const char* data, size_t len, char fieldSeparator = 1)
7776  : _data(data), _len(len), _fieldSep(fieldSeparator)
7777  {
7778  }
7779 
7780  iterator begin() const
7781  {
7782  return iterator(_data, _len, 0, _fieldSep);
7783  }
7784  iterator end() const
7785  {
7786  return iterator(_data, _len, _len, _fieldSep);
7787  }
7788 
7789 
7790  reverse_iterator rbegin() const
7791  {
7792  return reverse_iterator(_data, _len, _data + (_len - 1), _fieldSep);
7793  }
7794 
7795  reverse_iterator rend() const
7796  {
7797  return reverse_iterator(_data, _len, 0, _fieldSep);
7798  }
7799  };
7800 
7801 
7814 
7815  template <class T>
7817  {
7818  std::stringstream _data;
7819  char _fs;
7820  public:
7826  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7827 
7835  void append(const T& tag, const char* value, size_t offset, size_t length)
7836  {
7837  _data << tag << '=';
7838  _data.write(value + offset, (std::streamsize)length);
7839  _data << _fs;
7840  }
7846  void append(const T& tag, const std::string& value)
7847  {
7848  _data << tag << '=' << value << _fs;
7849  }
7850 
7853  std::string getString() const
7854  {
7855  return _data.str();
7856  }
7857  operator std::string() const
7858  {
7859  return _data.str();
7860  }
7861 
7863  void reset()
7864  {
7865  _data.str(std::string());
7866  }
7867  };
7868 
7872 
7874 
7878 
7880 
7881 
7889 
7891  {
7892  char _fs;
7893  public:
7898  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7899 
7902  typedef std::map<Message::Field, Message::Field> map_type;
7903 
7909  map_type toMap(const Message::Field& data)
7910  {
7911  FIX fix(data, _fs);
7912  map_type retval;
7913  for (FIX::iterator a = fix.begin(); a != fix.end(); ++a)
7914  {
7915  retval.insert(*a);
7916  }
7917 
7918  return retval;
7919  }
7920  };
7921 
7922  class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
7923  {
7924  Mutex _lock;
7925  std::deque<Message> _q;
7926  std::string _commandId;
7927  std::string _subId;
7928  std::string _queryId;
7929  Client _client;
7930  unsigned _timeout;
7931  unsigned _maxDepth;
7932  unsigned _requestedAcks;
7933  Message::Field _previousTopic;
7934  Message::Field _previousBookmark;
7935  volatile enum { Unset = 0x0, Running = 0x10, Subscribe = 0x11, SOWOnly = 0x12, AcksOnly = 0x13, Conflate = 0x14, Closed = 0x1, Disconnected = 0x2 } _state;
7936  typedef std::map<std::string, Message*> SOWKeyMap;
7937  SOWKeyMap _sowKeyMap;
7938  public:
7939  MessageStreamImpl(const Client& client_)
7940  : _client(client_),
7941  _timeout(0),
7942  _maxDepth((unsigned)~0),
7943  _requestedAcks(0),
7944  _state(Unset)
7945  {
7946  if (_client.isValid())
7947  {
7948  _client.addConnectionStateListener(this);
7949  }
7950  }
7951 
7952  MessageStreamImpl(ClientImpl* client_)
7953  : _client(client_),
7954  _timeout(0),
7955  _maxDepth((unsigned)~0),
7956  _requestedAcks(0),
7957  _state(Unset)
7958  {
7959  if (_client.isValid())
7960  {
7961  _client.addConnectionStateListener(this);
7962  }
7963  }
7964 
7965  ~MessageStreamImpl()
7966  {
7967  }
7968 
7969  virtual void destroy()
7970  {
7971  try
7972  {
7973  close();
7974  }
7975  catch (std::exception& e)
7976  {
7977  try
7978  {
7979  if (_client.isValid())
7980  {
7981  _client.getExceptionListener().exceptionThrown(e);
7982  }
7983  }
7984  catch (...) {/*Ignore exception listener exceptions*/} // -V565
7985  }
7986  if (_client.isValid())
7987  {
7988  _client.removeConnectionStateListener(this);
7989  Client c = _client;
7990  _client = Client((ClientImpl*)NULL);
7991  c.deferredExecution(MessageStreamImpl::destroyer, this);
7992  }
7993  else
7994  {
7995  delete this;
7996  }
7997  }
7998 
7999  static void destroyer(void* vpMessageStreamImpl_)
8000  {
8001  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
8002  }
8003 
8004  void setSubscription(const std::string& subId_,
8005  const std::string& commandId_ = "",
8006  const std::string& queryId_ = "")
8007  {
8008  Lock<Mutex> lock(_lock);
8009  _subId = subId_;
8010  if (!commandId_.empty() && commandId_ != subId_)
8011  {
8012  _commandId = commandId_;
8013  }
8014  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
8015  {
8016  _queryId = queryId_;
8017  }
8018  // It's possible to disconnect between creation/registration and here.
8019  if (Disconnected == _state)
8020  {
8021  return;
8022  }
8023  assert(Unset == _state);
8024  _state = Subscribe;
8025  }
8026 
8027  void setSOWOnly(const std::string& commandId_,
8028  const std::string& queryId_ = "")
8029  {
8030  Lock<Mutex> lock(_lock);
8031  _commandId = commandId_;
8032  if (!queryId_.empty() && queryId_ != commandId_)
8033  {
8034  _queryId = queryId_;
8035  }
8036  // It's possible to disconnect between creation/registration and here.
8037  if (Disconnected == _state)
8038  {
8039  return;
8040  }
8041  assert(Unset == _state);
8042  _state = SOWOnly;
8043  }
8044 
8045  void setStatsOnly(const std::string& commandId_,
8046  const std::string& queryId_ = "")
8047  {
8048  Lock<Mutex> lock(_lock);
8049  _commandId = commandId_;
8050  if (!queryId_.empty() && queryId_ != commandId_)
8051  {
8052  _queryId = queryId_;
8053  }
8054  // It's possible to disconnect between creation/registration and here.
8055  if (Disconnected == _state)
8056  {
8057  return;
8058  }
8059  assert(Unset == _state);
8060  _state = AcksOnly;
8061  _requestedAcks = Message::AckType::Stats;
8062  }
8063 
8064  void setAcksOnly(const std::string& commandId_, unsigned acks_)
8065  {
8066  Lock<Mutex> lock(_lock);
8067  _commandId = commandId_;
8068  // It's possible to disconnect between creation/registration and here.
8069  if (Disconnected == _state)
8070  {
8071  return;
8072  }
8073  assert(Unset == _state);
8074  _state = AcksOnly;
8075  _requestedAcks = acks_;
8076  }
8077 
8078  void connectionStateChanged(ConnectionStateListener::State state_)
8079  {
8080  Lock<Mutex> lock(_lock);
8081  if (state_ == AMPS::ConnectionStateListener::Disconnected)
8082  {
8083  _state = Disconnected;
8084  close();
8085  }
8086  _lock.signalAll();
8087  }
8088 
8089  void timeout(unsigned timeout_)
8090  {
8091  _timeout = timeout_;
8092  }
8093  void conflate(void)
8094  {
8095  if (_state == Subscribe)
8096  {
8097  _state = Conflate;
8098  }
8099  }
8100  void maxDepth(unsigned maxDepth_)
8101  {
8102  if (maxDepth_)
8103  {
8104  _maxDepth = maxDepth_;
8105  }
8106  else
8107  {
8108  _maxDepth = (unsigned)~0;
8109  }
8110  }
8111  unsigned getMaxDepth(void) const
8112  {
8113  return _maxDepth;
8114  }
8115  unsigned getDepth(void) const
8116  {
8117  return (unsigned)(_q.size());
8118  }
8119 
8120  bool next(Message& current_)
8121  {
8122  Lock<Mutex> lock(_lock);
8123  if (!_previousTopic.empty() && !_previousBookmark.empty())
8124  {
8125  try
8126  {
8127  if (_client.isValid())
8128  {
8129  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
8130  }
8131  }
8132 #ifdef _WIN32
8133  catch (AMPSException&)
8134 #else
8135  catch (AMPSException& e)
8136 #endif
8137  {
8138  current_.invalidate();
8139  _previousTopic.clear();
8140  _previousBookmark.clear();
8141  return false;
8142  }
8143  _previousTopic.clear();
8144  _previousBookmark.clear();
8145  }
8146  double minWaitTime = (double)((_timeout && _timeout > 1000)
8147  ? _timeout : 1000);
8148  Timer timer(minWaitTime);
8149  timer.start();
8150  while (_q.empty() && _state & Running)
8151  {
8152  // Using timeout so python can interrupt
8153  _lock.wait((long)minWaitTime);
8154  {
8155  Unlock<Mutex> unlck(_lock);
8156  amps_invoke_waiting_function();
8157  }
8158  if (_timeout)
8159  {
8160  // In case we woke up early, see how much longer to wait
8161  if (timer.checkAndGetRemaining(&minWaitTime))
8162  {
8163  break;
8164  }
8165  }
8166  }
8167  if (!_q.empty())
8168  {
8169  current_ = _q.front();
8170  if (_q.size() == _maxDepth)
8171  {
8172  _lock.signalAll();
8173  }
8174  _q.pop_front();
8175  if (_state == Conflate)
8176  {
8177  std::string sowKey = current_.getSowKey();
8178  if (sowKey.length())
8179  {
8180  _sowKeyMap.erase(sowKey);
8181  }
8182  }
8183  else if (_state == AcksOnly)
8184  {
8185  _requestedAcks &= ~(current_.getAckTypeEnum());
8186  }
8187  if ((_state == AcksOnly && _requestedAcks == 0) ||
8188  (_state == SOWOnly && current_.getCommand() == "group_end"))
8189  {
8190  _state = Closed;
8191  }
8192  else if (current_.getCommandEnum() == Message::Command::Publish &&
8193  _client.isValid() && _client.getAutoAck() &&
8194  !current_.getLeasePeriod().empty() &&
8195  !current_.getBookmark().empty())
8196  {
8197  _previousTopic = current_.getTopic().deepCopy();
8198  _previousBookmark = current_.getBookmark().deepCopy();
8199  }
8200  return true;
8201  }
8202  if (_state == Disconnected)
8203  {
8204  throw DisconnectedException("Connection closed.");
8205  }
8206  current_.invalidate();
8207  if (_state == Closed)
8208  {
8209  return false;
8210  }
8211  return _timeout != 0;
8212  }
8213  void close(void)
8214  {
8215  if (_client.isValid())
8216  {
8217  if (_state == SOWOnly || _state == Subscribe) //not delete
8218  {
8219  if (!_commandId.empty())
8220  {
8221  _client.unsubscribe(_commandId);
8222  }
8223  if (!_subId.empty())
8224  {
8225  _client.unsubscribe(_subId);
8226  }
8227  if (!_queryId.empty())
8228  {
8229  _client.unsubscribe(_queryId);
8230  }
8231  }
8232  else
8233  {
8234  if (!_commandId.empty())
8235  {
8236  _client.removeMessageHandler(_commandId);
8237  }
8238  if (!_subId.empty())
8239  {
8240  _client.removeMessageHandler(_subId);
8241  }
8242  if (!_queryId.empty())
8243  {
8244  _client.removeMessageHandler(_queryId);
8245  }
8246  }
8247  }
8248  if (_state == SOWOnly || _state == Subscribe || _state == Unset)
8249  {
8250  _state = Closed;
8251  }
8252  }
8253  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
8254  {
8255  Lock<Mutex> lock(this_->_lock);
8256  if (this_->_state != Conflate)
8257  {
8258  AMPS_TESTING_SLOW_MESSAGE_STREAM
8259  if (this_->_q.size() >= this_->_maxDepth)
8260  {
8261  // We throw here so that heartbeats can be sent. The exception
8262  // will be handled internally only, and the same Message will
8263  // come back to try again. Make sure to signal.
8264  this_->_lock.signalAll();
8265  throw MessageStreamFullException("Stream is currently full.");
8266  }
8267 #ifdef AMPS_USE_EMPLACE
8268  this_->_q.emplace_back(message_.deepCopy());
8269 #else
8270  this_->_q.push_back(message_.deepCopy());
8271 #endif
8272  if (message_.getCommandEnum() == Message::Command::Publish &&
8273  this_->_client.isValid() && this_->_client.getAutoAck() &&
8274  !message_.getLeasePeriod().empty() &&
8275  !message_.getBookmark().empty())
8276  {
8277  message_.setIgnoreAutoAck();
8278  }
8279  }
8280  else
8281  {
8282  std::string sowKey = message_.getSowKey();
8283  if (sowKey.length())
8284  {
8285  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
8286  if (it != this_->_sowKeyMap.end())
8287  {
8288  *(it->second) = message_.deepCopy();
8289  }
8290  else
8291  {
8292  if (this_->_q.size() >= this_->_maxDepth)
8293  {
8294  // We throw here so that heartbeats can be sent. The
8295  // exception will be handled internally only, and the
8296  // same Message will come back to try again. Make sure
8297  // to signal.
8298  this_->_lock.signalAll();
8299  throw MessageStreamFullException("Stream is currently full.");
8300  }
8301 #ifdef AMPS_USE_EMPLACE
8302  this_->_q.emplace_back(message_.deepCopy());
8303 #else
8304  this_->_q.push_back(message_.deepCopy());
8305 #endif
8306  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
8307  }
8308  }
8309  else
8310  {
8311  while (this_->_q.size() >= this_->_maxDepth) // -V712
8312  {
8313  this_->_lock.wait(1);
8314  }
8315 #ifdef AMPS_USE_EMPLACE
8316  this_->_q.emplace_back(message_.deepCopy());
8317 #else
8318  this_->_q.push_back(message_.deepCopy());
8319 #endif
8320  }
8321  }
8322  this_->_lock.signalAll();
8323  }
8324  };
8325  inline MessageStream::MessageStream(void)
8326  {
8327  }
8328  inline MessageStream::MessageStream(const Client& client_)
8329  : _body(new MessageStreamImpl(client_))
8330  {
8331  }
8332  inline void MessageStream::iterator::advance(void)
8333  {
8334  _pStream = _pStream->_body->next(_current) ? _pStream : NULL;
8335  }
8336  inline MessageStream::operator MessageHandler(void)
8337  {
8338  return MessageHandler((void(*)(const Message&, void*))MessageStreamImpl::_messageHandler, &_body.get());
8339  }
8340  inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
8341  {
8342  MessageStream result;
8343  if (handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
8344  {
8345  result._body = (MessageStreamImpl*)(handler_._userData);
8346  }
8347  return result;
8348  }
8349 
8350  inline void MessageStream::setSOWOnly(const std::string& commandId_,
8351  const std::string& queryId_)
8352  {
8353  _body->setSOWOnly(commandId_, queryId_);
8354  }
8355  inline void MessageStream::setSubscription(const std::string& subId_,
8356  const std::string& commandId_,
8357  const std::string& queryId_)
8358  {
8359  _body->setSubscription(subId_, commandId_, queryId_);
8360  }
8361  inline void MessageStream::setStatsOnly(const std::string& commandId_,
8362  const std::string& queryId_)
8363  {
8364  _body->setStatsOnly(commandId_, queryId_);
8365  }
8366  inline void MessageStream::setAcksOnly(const std::string& commandId_,
8367  unsigned acks_)
8368  {
8369  _body->setAcksOnly(commandId_, acks_);
8370  }
8371  inline MessageStream MessageStream::timeout(unsigned timeout_)
8372  {
8373  _body->timeout(timeout_);
8374  return *this;
8375  }
8377  {
8378  _body->conflate();
8379  return *this;
8380  }
8381  inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
8382  {
8383  _body->maxDepth(maxDepth_);
8384  return *this;
8385  }
8386  inline unsigned MessageStream::getMaxDepth(void) const
8387  {
8388  return _body->getMaxDepth();
8389  }
8390  inline unsigned MessageStream::getDepth(void) const
8391  {
8392  return _body->getDepth();
8393  }
8394 
8395  inline MessageStream ClientImpl::getEmptyMessageStream(void)
8396  {
8397  return *(_pEmptyMessageStream.get());
8398  }
8399 
8401  {
8402  // If the command is sow and has a sub_id, OR
8403  // if the command has a replace option, return the existing
8404  // messagestream, don't create a new one.
8405  ClientImpl& body = _body.get();
8406  Message& message = command_.getMessage();
8407  Field subId = message.getSubscriptionId();
8408  unsigned ackTypes = message.getAckTypeEnum();
8409  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace", 7)) || message.getCommandEnum() == Message::Command::SOW);
8410  if (useExistingHandler)
8411  {
8412  // Try to find the existing message handler.
8413  if (!subId.empty())
8414  {
8415  MessageHandler existingHandler;
8416  if (body._routes.getRoute(subId, existingHandler))
8417  {
8418  // we found an existing handler. It might not be a message stream, but that's okay.
8419  body.executeAsync(command_, existingHandler, false);
8420  return MessageStream::fromExistingHandler(existingHandler);
8421  }
8422  }
8423  // fall through; we'll a new handler altogether.
8424  }
8425  // Make sure something will be returned to the stream or use the empty one
8426  // Check that: it's a command that doesn't normally return data, and there
8427  // are no acks requested for the cmd id
8428  Message::Command::Type command = message.getCommandEnum();
8429  if ((command & Message::Command::NoDataCommands)
8430  && (ackTypes == Message::AckType::Persisted
8431  || ackTypes == Message::AckType::None))
8432  {
8433  executeAsync(command_, MessageHandler());
8434  if (!body._pEmptyMessageStream)
8435  {
8436  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
8437  body._pEmptyMessageStream.get()->_body->close();
8438  }
8439  return body.getEmptyMessageStream();
8440  }
8441  MessageStream stream(*this);
8442  if (body.getDefaultMaxDepth())
8443  {
8444  stream.maxDepth(body.getDefaultMaxDepth());
8445  }
8446  MessageHandler handler = stream.operator MessageHandler();
8447  std::string commandID = body.executeAsync(command_, handler, false);
8448  if (command_.hasStatsAck())
8449  {
8450  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
8451  }
8452  else if (command_.isSow())
8453  {
8454  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
8455  }
8456  else if (command_.isSubscribe())
8457  {
8458  stream.setSubscription(commandID,
8459  command_.getMessage().getCommandId(),
8460  command_.getMessage().getQueryId());
8461  }
8462  else
8463  {
8464  // Persisted acks for writes don't come back with command id
8465  if (command == Message::Command::Publish ||
8466  command == Message::Command::DeltaPublish ||
8467  command == Message::Command::SOWDelete)
8468  {
8469  stream.setAcksOnly(commandID,
8470  ackTypes & (unsigned)~Message::AckType::Persisted);
8471  }
8472  else
8473  {
8474  stream.setAcksOnly(commandID, ackTypes);
8475  }
8476  }
8477  return stream;
8478  }
8479 
8480 // This is here because it uses api from Client.
8481  inline void Message::ack(const char* options_) const
8482  {
8483  ClientImpl* pClient = _body.get().clientImpl();
8484  Message::Field bookmark = getBookmark();
8485  if (pClient && bookmark.len() &&
8486  !pClient->getAutoAck())
8487  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
8488  {
8489  pClient->ack(getTopic(), bookmark, options_);
8490  }
8491  }
8492 }// end namespace AMPS
8493 #endif
Command & setBookmark(const std::string &bookmark_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:615
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:180
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1422
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4831
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1399
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6486
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6460
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:522
std::string getAckType() const
Definition: ampsplusplus.hpp:783
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:5043
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1395
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7816
void startTimer()
Definition: ampsplusplus.hpp:6449
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5990
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:927
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:8376
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1368
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5071
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1250
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1363
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:517
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1174
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6740
Command & setAckType(unsigned ackType_)
Definition: ampsplusplus.hpp:761
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:429
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:7102
const amps_uint64_t getNameHashValue() const
Returns the numeric name hash of this client as generated by the server and returned when the client ...
Definition: ampsplusplus.hpp:4892
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4948
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5783
Command & setOptions(const std::string &options_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:633
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1361
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:899
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:405
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:7111
void ack(Message &message_, const char *options_=NULL)
Acknowledge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6976
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5734
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
AMPSDLL AMPS_SOCKET amps_client_get_socket(amps_handle client)
Returns the socket from the underlying transport in client, or NULL if no transport is associated wit...
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:283
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:5167
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5432
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4963
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1252
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:803
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5183
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:548
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6831
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:786
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:7060
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4934
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6513
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1129
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:1126
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7853
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:7122
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5319
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4735
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1255
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6759
Command & setAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:739
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6769
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:5019
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1370
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5706
Field getFilter() const
Retrieves the value of the Filter header of the Message as a new Field.
Definition: Message.hpp:1252
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:982
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:7084
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8386
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1399
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4994
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1229
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6184
Success.
Definition: amps.h:206
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1234
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1262
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:873
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5409
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7898
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5830
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:196
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:563
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:5032
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4728
amps_result
Return values from amps_xxx functions.
Definition: amps.h:201
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:5220
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1076
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1141
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:8400
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:627
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1395
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:5175
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:7136
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:788
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4923
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1264
Command & setData(const std::string &data_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:666
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:5480
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:1059
Command & setFilter(const std::string &filter_)
Definition: ampsplusplus.hpp:587
const std::string & getNameHash() const
Returns the name hash string of this client as generated by the server and returned when the client l...
Definition: ampsplusplus.hpp:4884
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6138
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4814
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6715
Command & setSubId(const std::string &subId_)
Definition: ampsplusplus.hpp:599
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6750
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1248
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:6823
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:532
Command & setTimeout(unsigned timeout_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:688
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6838
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1368
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5268
Command & setCommandId(const std::string &cmdId_)
Definition: ampsplusplus.hpp:575
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1399
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1395
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:5131
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5541
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:524
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1258
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1248
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6626
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1363
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:890
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1260
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6548
Command & setOptions(const char *options_, size_t optionsLen_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:641
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1363
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1251
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:1080
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:885
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5646
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6779
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:228
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5498
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1249
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1370
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:867
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4903
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1141
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7826
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6988
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:880
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:921
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6028
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1013
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7835
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1141
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1140
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:5123
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:1097
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:7049
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1362
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1397
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:1172
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6424
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1371
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:5115
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:149
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1251
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setOrderBy(const std::string &orderBy_)
Definition: ampsplusplus.hpp:593
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6619
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:540
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:5143
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6682
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7846
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:7039
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5386
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6115
Command & setExpiration(unsigned expiration_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:719
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4690
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:268
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6362
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:1089
Command & setData(const char *data_, size_t dataLen_)
Sets the data for this command.
Definition: ampsplusplus.hpp:674
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1076
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:5365
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6868
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6321
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:5516
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5795
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:7031
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5664
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1250
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:1050
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6285
Command & setBatchSize(unsigned batchSize_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:703
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6606
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:1154
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:5198
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:5093
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7890
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1076
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:5102
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1290
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6964
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7902
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:5085
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1034
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7909
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:233
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6217
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6676
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5599
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4682
Command & setSequence(const std::string &seq_)
Definition: ampsplusplus.hpp:647
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6801
Command & setQueryId(const std::string &queryId_)
Definition: ampsplusplus.hpp:605
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1248
Command & setSequence(const amps_uint64_t seq_)
Definition: ampsplusplus.hpp:653
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6812
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1363
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:653
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:1118
Command & setTopic(const std::string &topic_)
Definition: ampsplusplus.hpp:581
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:8381
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1398
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:5001
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5868
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:8390
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1363
The operation has not succeeded, but ought to be retried.
Definition: amps.h:230
const std::string & getLogonCorrelationData() const
Returns the currently set logon correlation data for the client.
Definition: ampsplusplus.hpp:4910
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6652
Command & setTopN(unsigned topN_)
Definition: ampsplusplus.hpp:694
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4970
Command & addAckType(const std::string &ackType_)
Definition: ampsplusplus.hpp:725
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:1068
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6790
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1140
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1106
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:465
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:6593
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7863
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:5212
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:6240
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:7070
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:569
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1362
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:7021
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:5243
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4869
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:836
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:1110
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1202
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:989
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5887
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:5292
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6689
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:310
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:5458
The client and server are disconnected.
Definition: amps.h:234
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:6569
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5628
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:8371
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5926
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4876
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:438
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:6076
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1139
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5567
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5755
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:552
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6902
Command & setCorrelationId(const std::string &correlationId_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:626
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:6401
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4746
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:7093
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:7012
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5958