AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.3
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps.h"
28 #include "ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43 #include <inttypes.h>
44 #endif
45 #if defined(sun)
46 #include <sys/atomic.h>
47 #endif
48 #include "BookmarkStore.hpp"
49 #include "MessageRouter.hpp"
50 #include "util.hpp"
51 #include "ampscrc.hpp"
52 
53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
55 #endif
56 
61 
62 
69 
80 
81 // For StoreBuffer implementations
82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
88 #define AMPS_DEFAULT_TOP_N -1
89 #define AMPS_DEFAULT_BATCH_SIZE 10
90 #define AMPS_NUMBER_BUFFER_LEN 20
91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
92 
93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
94 #define AMPS_X64 1
95 #endif
96 
97 #ifdef _WIN32
98 static __declspec ( thread ) AMPS::Message* publishStoreMessage = 0;
99 #else
100 static __thread AMPS::Message* publishStoreMessage = 0;
101 #endif
102 
103 namespace AMPS
104 {
105 
106 typedef std::map<std::string, std::string> ConnectionInfo;
107 
108 class PerThreadMessageTracker {
109 std::vector<AMPS::Message*> _messages;
110 public:
111  PerThreadMessageTracker() {}
112  ~PerThreadMessageTracker()
113  {
114  for (size_t i=0; i<_messages.size(); ++i)
115  {
116  delete _messages[i];
117  }
118  }
119  void addMessage(AMPS::Message* message)
120  {
121  _messages.push_back(message);
122  }
123  static void addMessageToCleanupList(AMPS::Message* message)
124  {
125  static AMPS::Mutex _lock;
126  AMPS::Lock<Mutex> l(_lock);
127  _addMessageToCleanupList(message);
128  }
129  static void _addMessageToCleanupList(AMPS::Message* message)
130  {
131  static PerThreadMessageTracker tracker;
132  tracker.addMessage(message);
133  }
134 };
135 
136 template<class Type>
137 inline std::string asString(Type x_)
138 {
139  std::ostringstream os;
140  os << x_;
141  return os.str();
142 }
143 
144 inline
145 size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
146 {
147  size_t pos = AMPS_NUMBER_BUFFER_LEN;
148  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
149  {
150  if (seqNo_ > 0)
151  {
152  buf_[--pos] = (char)(seqNo_ % 10 + '0');
153  seqNo_ /= 10;
154  }
155  }
156  return pos;
157 }
158 
159 #ifdef _WIN32
160 inline
161 size_t convertToCharArray(char* buf_, unsigned long seqNo_)
162 {
163  size_t pos = AMPS_NUMBER_BUFFER_LEN;
164  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
165  {
166  if (seqNo_ > 0)
167  {
168  buf_[--pos] = (char)(seqNo_ % 10 + '0');
169  seqNo_ /= 10;
170  }
171  }
172  return pos;
173 }
174 #endif
175 
179 class Reason
180 {
181  public:
182  static const char* duplicate() { return "duplicate";}
183  static const char* badFilter() { return "bad filter";}
184  static const char* badRegexTopic() { return "bad regex topic";}
185  static const char* subscriptionAlreadyExists() { return "subscription already exists";}
186  static const char* nameInUse() { return "name in use";}
187  static const char* authFailure() { return "auth failure";}
188  static const char* notEntitled() { return "not entitled";}
189  static const char* authDisabled() { return "authentication disabled";}
190  static const char* subidInUse() { return "subid in use";}
191  static const char* noTopic() { return "no topic";}
192 };
193 
203 {
204 public:
205  virtual ~ExceptionListener() {;}
206  virtual void exceptionThrown(const std::exception&) const {;}
207 };
208 
210 
211 
212 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
213  try\
214  {\
215  x;\
216  }\
217  catch (std::exception& ex_)\
218  {\
219  try\
220  {\
221  _exceptionListener->exceptionThrown(ex_);\
222  }\
223  catch(...)\
224  {\
225  ;\
226  }\
227  }
228  /*
229  * Note : we don't attempt to trap non std::exception exceptions
230  * here because doing so interferes with pthread_exit on some OSes.
231  catch (...)\
232  {\
233  try\
234  {\
235  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
236  "An unhandled exception of unknown type was thrown by "\
237  "the registered handler.", AMPS_E_USAGE));\
238  }\
239  catch(...)\
240  {\
241  ;\
242  }\
243  }
244  */
245 #ifdef _WIN32
246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
247  try\
248  {\
249  while(me->_connected)\
250  {\
251  try\
252  {\
253  x;\
254  break;\
255  }\
256  catch(MessageStreamFullException&)\
257  {\
258  me->checkAndSendHeartbeat(false);\
259  }\
260  }\
261  }\
262  catch (std::exception& ex_)\
263  {\
264  try\
265  {\
266  me->_exceptionListener->exceptionThrown(ex_);\
267  }\
268  catch(...)\
269  {\
270  ;\
271  }\
272  }
273  /*
274  * Note : we don't attempt to trap non std::exception exceptions
275  * here because doing so interferes with pthread_exit on some OSes.
276  catch (...)\
277  {\
278  try\
279  {\
280  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
281  "An unhandled exception of unknown type was thrown by "\
282  "the registered handler.", AMPS_E_USAGE));\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }*/
289 
290 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
291  while(me->_connected)\
292  {\
293  try\
294  {\
295  x;\
296  break;\
297  }\
298  catch(MessageStreamFullException&)\
299  {\
300  me->checkAndSendHeartbeat(false);\
301  }\
302  }
303 #else
304 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
305  try\
306  {\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException& ex_)\
315  {\
316  me->checkAndSendHeartbeat(false);\
317  }\
318  }\
319  }\
320  catch (std::exception& ex_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(ex_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  }
331  /*
332  * Note : we don't attempt to trap non std::exception exceptions
333  * here because doing so interferes with pthread_exit on some OSes.
334  catch (...)\
335  {\
336  try\
337  {\
338  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
339  "An unhandled exception of unknown type was thrown by "\
340  "the registered handler.", AMPS_E_USAGE));\
341  }\
342  catch(...)\
343  {\
344  ;\
345  }\
346  }*/
347 
348 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
349  while(me->_connected)\
350  {\
351  try\
352  {\
353  x;\
354  break;\
355  }\
356  catch(MessageStreamFullException& ex_)\
357  {\
358  me->checkAndSendHeartbeat(false);\
359  }\
360  }
361 #endif
362 
363 #define AMPS_UNHANDLED_EXCEPTION(ex) \
364  try\
365  {\
366  _exceptionListener->exceptionThrown(ex);\
367  }\
368  catch(...)\
369  {;}
370 
371 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
372  try\
373  {\
374  me->_exceptionListener->exceptionThrown(ex);\
375  }\
376  catch(...)\
377  {;}
378 
379 
380 class Client;
381 
406 
407 class Command
408 {
409  Message _message;
410  unsigned _timeout;
411  unsigned _batchSize;
412  unsigned _flags;
413  static const unsigned Subscribe = 1;
414  static const unsigned SOW = 2;
415  static const unsigned NeedsSequenceNumber = 4;
416  static const unsigned ProcessedAck = 8;
417  static const unsigned StatsAck = 16;
418  void init(Message::Command::Type command_)
419  {
420  _timeout = 0;
421  _batchSize = 0;
422  _flags = 0;
423  _message.reset();
424  _message.setCommandEnum(command_);
425  _setIds();
426  }
427  void init(const std::string& command_)
428  {
429  _timeout = 0;
430  _batchSize = 0;
431  _flags = 0;
432  _message.reset();
433  _message.setCommand(command_);
434  _setIds();
435  }
436  void _setIds(void)
437  {
438  Message::Command::Type command = _message.getCommandEnum();
439  if (!(command & Message::Command::NoDataCommands))
440  {
441  _message.newCommandId();
442  if (command == Message::Command::Subscribe ||
443  command == Message::Command::SOWAndSubscribe ||
444  command == Message::Command::DeltaSubscribe ||
445  command == Message::Command::SOWAndDeltaSubscribe)
446  {
447  _message.setSubscriptionId(_message.getCommandId());
448  _flags |= Subscribe;
449  }
450  if (command == Message::Command::SOW
451  || command == Message::Command::SOWAndSubscribe
452  || command == Message::Command::SOWAndDeltaSubscribe)
453  {
454  _message.setQueryID(_message.getCommandId());
455  if (_batchSize == 0)
456  {
457  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
458  }
459  if (command == Message::Command::SOW)
460  {
461  _flags |= SOW;
462  }
463  }
464  _flags |= ProcessedAck;
465  }
466  else if (command == Message::Command::SOWDelete)
467  {
468  _message.newCommandId();
469  _flags |= ProcessedAck;
470  _flags |= NeedsSequenceNumber;
471  }
472  else if (command == Message::Command::Publish
473  || command == Message::Command::DeltaPublish)
474  {
475  _flags |= NeedsSequenceNumber;
476  }
477  else if (command == Message::Command::StopTimer)
478  {
479  _message.newCommandId();
480  }
481  }
482 public:
486  Command(const std::string& command_)
487  {
488  init(command_);
489  }
493  Command(Message::Command::Type command_)
494  {
495  init(command_);
496  }
497 
501  Command& reset(const std::string& command_)
502  {
503  init(command_);
504  return *this;
505  }
509  Command& reset(Message::Command::Type command_)
510  {
511  init(command_);
512  return *this;
513  }
521  Command& setSowKey(const std::string& sowKey_) { _message.setSowKey(sowKey_); return *this; }
534  Command& setSowKeys(const std::string& sowKeys_) { _message.setSowKeys(sowKeys_); return *this; }
536  Command& setCommandId(const std::string& v_) { _message.setCommandId(v_); return *this; }
538  Command& setTopic(const std::string& v_) { _message.setTopic(v_); return *this; }
540  Command& setFilter(const std::string& v_) { _message.setFilter(v_); return *this; }
542  Command& setOrderBy(const std::string& v_) { _message.setOrderBy(v_); return *this; }
544  Command& setSubId(const std::string& v_) { _message.setSubscriptionId(v_); return *this; }
546  Command& setQueryId(const std::string& v_) { _message.setQueryId(v_); return *this; }
552  Command& setBookmark(const std::string& v_) { _message.setBookmark(v_); return *this; }
559  Command& setCorrelationId(const std::string& v_) { _message.setCorrelationId(v_); return *this; }
562  Command& setOptions(const std::string& v_) { _message.setOptions(v_); return *this; }
564  Command& setSequence(const std::string& v_) { _message.setSequence(v_); return *this; }
566  Command& setSequence(const amps_uint64_t v_)
567  {
568  std::ostringstream os;
569  os << v_;
570  _message.setSequence(os.str());
571  return *this;
572  }
573  amps_uint64_t getSequence() const { return amps_message_get_field_uint64(_message.getMessage(),AMPS_Sequence); }
576  Command& setData(const std::string& v_) { _message.setData(v_); return *this; }
580  Command& setData(const char* v_, size_t length_) { _message.setData(v_, length_); return *this; }
590  Command& setTimeout(unsigned v_) { _timeout = v_; return *this; }
592  Command& setTopN(unsigned v_) { _message.setTopNRecordsReturned(v_); return *this; }
597  Command& setBatchSize(unsigned v_) { _message.setBatchSize(v_); _batchSize = v_; return *this; }
608  Command& setExpiration(unsigned v_) { _message.setExpiration(v_); return *this; }
610  Command& addAckType(const std::string& v_)
611  {
612  _message.setAckType(_message.getAckType() + "," + v_);
613  if (v_ == "processed") _flags |= ProcessedAck;
614  else if (v_ == "stats") _flags |= StatsAck;
615  return *this;
616  }
618  Command& setAckType(const std::string& v_)
619  {
620  _message.setAckType(v_);
621  if (v_.find("processed") != std::string::npos) _flags |= ProcessedAck;
622  else _flags &= ~ProcessedAck;
623  if (v_.find("stats") != std::string::npos) _flags |= StatsAck;
624  else _flags &= ~StatsAck;
625  return *this;
626  }
628  Command& setAckType(unsigned v_)
629  {
630  _message.setAckTypeEnum(v_);
631  if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
632  else _flags &= ~ProcessedAck;
633  if (v_ & Message::AckType::Stats) _flags |= StatsAck;
634  else _flags &= ~StatsAck;
635  return *this;
636  }
638  std::string getAckType() const
639  {
640  return (std::string)(_message.getAckType());
641  }
643  unsigned getAckTypeEnum() const
644  {
645  return _message.getAckTypeEnum();
646  }
647 
648  Message& getMessage(void) { return _message; }
649  unsigned getTimeout(void) const { return _timeout; }
650  unsigned getBatchSize(void) const { return _batchSize; }
651  bool isSubscribe(void) const
652  {
653  return _flags & Subscribe;
654  }
655  bool isSow(void) const { return (_flags & SOW) != 0; }
656  bool hasProcessedAck(void) const { return (_flags & ProcessedAck) != 0; }
657  bool hasStatsAck(void) const { return (_flags & StatsAck) != 0; }
658  bool needsSequenceNumber(void) const { return (_flags & NeedsSequenceNumber) != 0; }
659 };
660 
663 typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
664 
665 class Message;
667 
671 {
672 public:
673  virtual ~Authenticator() {;}
674 
680  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
688  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
695  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
696 };
697 
702 {
703 public:
704  virtual ~DefaultAuthenticator() {;}
707  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
708  {
709  return password_;
710  }
711 
714  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
715  {
716  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
717  }
718 
719  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
720 
725  {
726  static DefaultAuthenticator d;
727  return d;
728  }
729 };
730 
734 {
735 public:
736 
740  virtual void execute(Message& message_) = 0;
741 
742  virtual ~StoreReplayer() {;}
743 };
744 
745 class Store;
746 
755 typedef bool (*PublishStoreResizeHandler)(Store store_,
756  size_t size_,
757  void* userData_);
758 
761 class StoreImpl : public RefBody
762 {
763 public:
764  StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
765 
770  virtual amps_uint64_t store(const Message& message_) = 0;
771 
776  virtual void discardUpTo(amps_uint64_t index_) = 0;
777 
782  virtual void replay(StoreReplayer& replayer_) = 0;
783 
791  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
792 
797  virtual size_t unpersistedCount() const = 0;
798 
799  virtual ~StoreImpl() {;}
800 
809  virtual void flush(long timeout_) = 0;
810 
813  static inline size_t getUnsetPosition() { return AMPS_UNSET_INDEX; }
814 
817  static inline amps_uint64_t getUnsetSequence() { return AMPS_UNSET_SEQUENCE; }
818 
822  virtual amps_uint64_t getLowestUnpersisted() const = 0;
823 
827  virtual amps_uint64_t getLastPersisted() = 0;
828 
838  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
839  void* userData_)
840  {
841  _resizeHandler = handler_;
842  _resizeHandlerData = userData_;
843  }
844 
845  inline virtual PublishStoreResizeHandler getResizeHandler() const
846  {
847  return _resizeHandler;
848  }
849 
850  bool callResizeHandler(size_t newSize_);
851 
852 private:
853  PublishStoreResizeHandler _resizeHandler;
854  void* _resizeHandlerData;
855 };
856 
859 class Store
860 {
861  RefHandle<StoreImpl> _body;
862 public:
863  Store() {;}
864  Store(StoreImpl* body_) : _body(body_) {;}
865  Store(const Store& rhs) : _body(rhs._body) {;}
866  Store& operator=(const Store& rhs)
867  {
868  _body = rhs._body;
869  return *this;
870  }
871 
875  amps_uint64_t store(const Message& message_)
876  {
877  return _body.get().store(message_);
878  }
879 
884  void discardUpTo(amps_uint64_t index_)
885  {
886  _body.get().discardUpTo(index_);
887  }
888 
893  void replay(StoreReplayer& replayer_)
894  {
895  _body.get().replay(replayer_);
896  }
897 
905  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
906  {
907  return _body.get().replaySingle(replayer_, index_);
908  }
909 
914  size_t unpersistedCount() const
915  {
916  return _body.get().unpersistedCount();
917  }
918 
922  bool isValid() const
923  {
924  return _body.isValid();
925  }
926 
935  void flush(long timeout_ = 0)
936  {
937  return _body.get().flush(timeout_);
938  }
939 
943  amps_uint64_t getLowestUnpersisted()
944  {
945  return _body.get().getLowestUnpersisted();
946  }
947 
951  amps_uint64_t getLastPersisted()
952  {
953  return _body.get().getLastPersisted();
954  }
955 
965  void setResizeHandler(PublishStoreResizeHandler handler_,
966  void* userData_)
967  {
968  _body.get().setResizeHandler(handler_, userData_);
969  }
970 
971  PublishStoreResizeHandler getResizeHandler()
972  {
973  return _body.get().getResizeHandler();
974  }
975 
979  StoreImpl* get()
980  {
981  if (_body.isValid())
982  return &_body.get();
983  else
984  return NULL;
985  }
986 
987 };
988 
994 {
995 public:
996  virtual ~FailedWriteHandler() {;}
1003  virtual void failedWrite(const Message& message_,
1004  const char* reason_, size_t reasonLength_) = 0;
1005 };
1006 
1007 
1008 inline bool StoreImpl::callResizeHandler(size_t newSize_)
1009 {
1010  if(_resizeHandler)
1011  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1012  return true;
1013 }
1014 
1021 inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1022  void* data_)
1023 {
1024  long* timeoutp = (long*)data_;
1025  size_t count = store_.unpersistedCount();
1026  if (count == 0) return false;
1027  try
1028  {
1029  store_.flush(*timeoutp);
1030  }
1031 #ifdef _WIN32
1032  catch (const TimedOutException&)
1033 #else
1034  catch (const TimedOutException& e)
1035 #endif
1036  {
1037  return true;
1038  }
1039  return (count == store_.unpersistedCount());
1040 }
1041 
1046 {
1047 public:
1048  virtual ~SubscriptionManager() {;}
1056  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1057  unsigned requestedAckTypes_) = 0;
1061  virtual void unsubscribe(const Message::Field& subId_) = 0;
1064  virtual void clear() = 0;
1068  virtual void resubscribe(Client& client_) = 0;
1069 };
1070 
1074 
1076 {
1077 public:
1079  typedef enum { Disconnected = 0,
1080  Shutdown = 1,
1081  Connected = 2,
1082  LoggedOn = 4,
1083  PublishReplayed = 8,
1084  HeartbeatInitiated = 16,
1085  Resubscribed = 32,
1086  UNKNOWN = 16384
1087  } State;
1088 
1098  virtual void connectionStateChanged(State newState_) = 0;
1099  virtual ~ConnectionStateListener() {;};
1100 };
1101 
1102 
1103 class MessageStreamImpl;
1104 class MessageStream;
1105 
1106 typedef void(*DeferredExecutionFunc)(void*);
1107 
1108 class ClientImpl : public RefBody // -V553
1109 {
1110  friend class Client;
1111 protected:
1112  amps_handle _client;
1113  DisconnectHandler _disconnectHandler;
1114  enum GlobalCommandTypeHandlers : size_t
1115  {
1116  Publish = 0,
1117  SOW = 1,
1118  GroupBegin = 2,
1119  GroupEnd = 3,
1120  Heartbeat = 4,
1121  OOF = 5,
1122  Ack = 6,
1123  LastChance = 7,
1124  DuplicateMessage = 8,
1125  COUNT = 9
1126  };
1127  std::vector<MessageHandler> _globalCommandTypeHandlers;
1128  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1129  MessageRouter _routes;
1130  MessageRouter::RouteCache _routeCache;
1131  mutable Mutex _lock;
1132  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1133  BookmarkStore _bookmarkStore;
1134  Store _publishStore;
1135  bool _isRetryOnDisconnect;
1136  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1137  volatile amps_uint64_t _lastSentHaSequenceNumber;
1138  ATOMIC_TYPE_8 _badTimeToHAPublish;
1139  ATOMIC_TYPE_8 _badTimeToHASubscribe;
1140  VersionInfo _serverVersion;
1141  Timer _heartbeatTimer;
1142  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1143 
1144  // queue data
1145  int _queueAckTimeout;
1146  bool _isAutoAckEnabled;
1147  unsigned _ackBatchSize;
1148  unsigned _queuedAckCount;
1149  unsigned _defaultMaxDepth;
1150  struct QueueBookmarks
1151  {
1152  QueueBookmarks(const std::string& topic_)
1153  :_topic(topic_)
1154  ,_oldestTime(0)
1155  ,_bookmarkCount(0)
1156  {;}
1157  std::string _topic;
1158  std::string _data;
1159  amps_uint64_t _oldestTime;
1160  unsigned _bookmarkCount;
1161  };
1162  typedef amps_uint64_t topic_hash;
1163  typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1164  TopicHashMap _topicHashMap;
1165 
1166  class ClientStoreReplayer : public StoreReplayer
1167  {
1168  ClientImpl* _client;
1169  public:
1170  unsigned _version;
1171  amps_result _res;
1172 
1173  ClientStoreReplayer()
1174  : _client(NULL) , _version(0), _res(AMPS_E_OK)
1175  {}
1176 
1177  ClientStoreReplayer(ClientImpl* client_)
1178  : _client(client_) , _version(0), _res(AMPS_E_OK)
1179  {}
1180 
1181  void setClient(ClientImpl* client_) { _client = client_; }
1182 
1183  void execute(Message& message_)
1184  {
1185  if (!_client) throw CommandException("Can't replay without a client.");
1186  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1187  AMPS_Sequence);
1188  if (index > _client->_lastSentHaSequenceNumber)
1189  _client->_lastSentHaSequenceNumber = index;
1190 
1191  _res = AMPS_E_OK;
1192  // Don't replay a queue cancel message after a reconnect.
1193  // Currently, the only messages that will have anything in options
1194  // are cancel messages.
1195  if (!message_.getCommand().empty() &&
1196  (!_client->_badTimeToHAPublish ||
1197  message_.getOptions().len() < 6))
1198  {
1199  _res= amps_client_send_with_version(_client->_client,
1200  message_.getMessage(),
1201  &_version);
1202  if (_res != AMPS_E_OK)
1203  {
1204  throw DisconnectedException("AMPS Server disconnected during replay");
1205  }
1206  }
1207  }
1208 
1209  };
1210  ClientStoreReplayer _replayer;
1211 
1212  class FailedWriteStoreReplayer : public StoreReplayer
1213  {
1214  ClientImpl* _parent;
1215  const char* _reason;
1216  size_t _reasonLength;
1217  size_t _replayCount;
1218  public:
1219  FailedWriteStoreReplayer(ClientImpl* parent,const char* reason_, size_t reasonLength_)
1220  : _parent(parent),
1221  _reason(reason_),
1222  _reasonLength(reasonLength_),
1223  _replayCount(0)
1224  {;}
1225  void execute(Message& message_)
1226  {
1227  if (_parent->_failedWriteHandler)
1228  {
1229  ++_replayCount;
1230  _parent->_failedWriteHandler->failedWrite(message_,
1231  _reason, _reasonLength);
1232  }
1233  }
1234  size_t replayCount(void) const { return _replayCount; }
1235  };
1236 
1237  struct AckResponseImpl : public RefBody
1238  {
1239  std::string username, password, reason, status, bookmark, options;
1240  amps_uint64_t sequenceNo;
1241  VersionInfo serverVersion;
1242  volatile bool responded, abandoned;
1243  unsigned connectionVersion;
1244  AckResponseImpl() :
1245  RefBody(),
1246  sequenceNo((amps_uint64_t)0),
1247  serverVersion(),
1248  responded(false),
1249  abandoned(false),
1250  connectionVersion(0)
1251  {
1252  }
1253  };
1254 
1255  class AckResponse
1256  {
1257  RefHandle<AckResponseImpl> _body;
1258  public:
1259  AckResponse() : _body(NULL) {;}
1260  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1261  static AckResponse create()
1262  {
1263  AckResponse r;
1264  r._body = new AckResponseImpl();
1265  return r;
1266  }
1267 
1268  const std::string& username()
1269  {
1270  return _body.get().username;
1271  }
1272  void setUsername(const char* data_, size_t len_)
1273  {
1274  if (data_) _body.get().username.assign(data_, len_);
1275  else _body.get().username.clear();
1276  }
1277  const std::string& password()
1278  {
1279  return _body.get().password;
1280  }
1281  void setPassword(const char* data_, size_t len_)
1282  {
1283  if (data_) _body.get().password.assign(data_, len_);
1284  else _body.get().password.clear();
1285  }
1286  const std::string& reason()
1287  {
1288  return _body.get().reason;
1289  }
1290  void setReason(const char* data_, size_t len_)
1291  {
1292  if (data_) _body.get().reason.assign(data_, len_);
1293  else _body.get().reason.clear();
1294  }
1295  const std::string& status()
1296  {
1297  return _body.get().status;
1298  }
1299  void setStatus(const char* data_, size_t len_)
1300  {
1301  if (data_) _body.get().status.assign(data_, len_);
1302  else _body.get().status.clear();
1303  }
1304  const std::string& bookmark()
1305  {
1306  return _body.get().bookmark;
1307  }
1308  void setBookmark(const char* data_, size_t len_)
1309  {
1310  if (data_) _body.get().bookmark.assign(data_, len_);
1311  else _body.get().bookmark.clear();
1312  }
1313  amps_uint64_t sequenceNo() const
1314  {
1315  return _body.get().sequenceNo;
1316  }
1317  void setSequenceNo(const char* data_, size_t len_)
1318  {
1319  amps_uint64_t result = (amps_uint64_t)0;
1320  if (data_)
1321  {
1322  for(size_t i=0; i<len_; ++i)
1323  {
1324  result *= (amps_uint64_t)10;
1325  result += (amps_uint64_t)(data_[i] - '0');
1326  }
1327  }
1328  _body.get().sequenceNo = result;
1329  }
1330  VersionInfo serverVersion() const
1331  {
1332  return _body.get().serverVersion;
1333  }
1334  void setServerVersion(const char* data_, size_t len_)
1335  {
1336  if (data_)
1337  _body.get().serverVersion.setVersion(std::string(data_, len_));
1338  }
1339  bool responded()
1340  {
1341  return _body.get().responded;
1342  }
1343  void setResponded(bool responded_)
1344  {
1345  _body.get().responded = responded_;
1346  }
1347  bool abandoned()
1348  {
1349  return _body.get().abandoned;
1350  }
1351  void setAbandoned(bool abandoned_)
1352  {
1353  if (_body.isValid())
1354  _body.get().abandoned = abandoned_;
1355  }
1356 
1357  void setConnectionVersion(unsigned connectionVersion)
1358  {
1359  _body.get().connectionVersion = connectionVersion;
1360  }
1361 
1362  unsigned getConnectionVersion()
1363  {
1364  return _body.get().connectionVersion;
1365  }
1366  void setOptions(const char* data_, size_t len_)
1367  {
1368  if (data_) _body.get().options.assign(data_,len_);
1369  else _body.get().options.clear();
1370  }
1371 
1372  const std::string& options()
1373  {
1374  return _body.get().options;
1375  }
1376 
1377  AckResponse& operator=(const AckResponse& rhs)
1378  {
1379  _body = rhs._body;
1380  return *this;
1381  }
1382  };
1383 
1384 
1385  typedef std::map<std::string, AckResponse> AckMap;
1386  AckMap _ackMap;
1387  Mutex _ackMapLock;
1388  DefaultExceptionListener _defaultExceptionListener;
1389 protected:
1390 
1391  struct DeferredExecutionRequest
1392  {
1393  DeferredExecutionRequest(DeferredExecutionFunc func_,
1394  void* userData_)
1395  : _func(func_),
1396  _userData(userData_)
1397  {;}
1398 
1399  DeferredExecutionFunc _func;
1400  void* _userData;
1401  };
1402  const ExceptionListener* _exceptionListener;
1403  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1404  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1405  bool _connected;
1406  std::string _username;
1407  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1408  ConnectionStateListeners _connectionStateListeners;
1409  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1410  Mutex _deferredExecutionLock;
1411  DeferredExecutionList _deferredExecutionList;
1412  unsigned _heartbeatInterval;
1413  unsigned _readTimeout;
1414 
1415  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1416  {
1417  // If we disconnectd before we got to notification, don't notify.
1418  // This should only be able to happen for Resubscribed, since the lock
1419  // is released to let the subscription manager run resubscribe so a
1420  // disconnect could be called before the change is broadcast.
1421  if (!_connected && newState_ > ConnectionStateListener::Connected)
1422  {
1423  return;
1424  }
1425  for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1426  {
1427  AMPS_CALL_EXCEPTION_WRAPPER(
1428  (*it)->connectionStateChanged(newState_));
1429  }
1430  }
1431  unsigned processedAck(Message& message);
1432  unsigned persistedAck(Message& meesage);
1433  void lastChance(Message& message);
1434  void checkAndSendHeartbeat(bool force=false);
1435  virtual ConnectionInfo getConnectionInfo() const;
1436  static amps_result
1437  ClientImplMessageHandler(amps_handle message, void* userData);
1438  static void
1439  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1440  static amps_result
1441  ClientImplDisconnectHandler(amps_handle client, void* userData);
1442 
1443  void unsubscribeInternal(const std::string& id)
1444  {
1445  if (id.empty()) return;
1446  // remove the handler first to avoid any more message delivery
1447  Message::Field subId;
1448  subId.assign(id.data(), id.length());
1449  _routes.removeRoute(subId);
1450  // Lock is already acquired
1451  if (_subscriptionManager)
1452  {
1453  // Have to unlock before calling into sub manager to avoid deadlock
1454  Unlock<Mutex> unlock(_lock);
1455  _subscriptionManager->unsubscribe(subId);
1456  }
1457  _message.reset();
1458  _message.setCommandEnum(Message::Command::Unsubscribe);
1459  _message.newCommandId();
1460  _message.setSubscriptionId(id);
1461  _sendWithoutRetry(_message);
1462  deferredExecution(&amps_noOpFn, NULL);
1463  }
1464 
1465  AckResponse syncAckProcessing(long timeout_, Message& message_,
1466  bool isHASubscribe_)
1467  {
1468  return syncAckProcessing(timeout_, message_,
1469  (amps_uint64_t)0, isHASubscribe_);
1470  }
1471 
1472  AckResponse syncAckProcessing(long timeout_, Message& message_,
1473  amps_uint64_t haSeq = (amps_uint64_t)0,
1474  bool isHASubscribe_ = false)
1475  {
1476  // inv: we already have _lock locked up.
1477  AckResponse ack = AckResponse::create();
1478  if (1)
1479  {
1480  Lock<Mutex> guard(_ackMapLock);
1481  _ackMap[message_.getCommandId()] = ack;
1482  }
1483  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
1484  if (ack.getConnectionVersion() == 0)
1485  {
1486  // Send failed
1487  throw DisconnectedException("Connection closed while waiting for response.");
1488  }
1489  bool timedOut = false;
1490  AMPS_START_TIMER(timeout_)
1491  while(!timedOut && !ack.responded() && !ack.abandoned() && _connected)
1492  {
1493  if (timeout_)
1494  {
1495  timedOut = !_lock.wait(timeout_);
1496  // May have woken up early, check real time
1497  if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1498  }
1499  else
1500  {
1501  // Using a timeout version to ensure python can interrupt
1502  _lock.wait(1000);
1503  Unlock<Mutex> unlck(_lock);
1504  amps_invoke_waiting_function();
1505  }
1506  }
1507  if (ack.responded())
1508  {
1509  if (ack.status() != "failure")
1510  {
1511  if (message_.getCommand() == "logon")
1512  {
1513  amps_uint64_t ackSequence = ack.sequenceNo();
1514  if (_lastSentHaSequenceNumber < ackSequence)
1515  {
1516  _lastSentHaSequenceNumber = ackSequence;
1517  }
1518  if (_publishStore.isValid())
1519  {
1520  _publishStore.discardUpTo(ackSequence);
1521  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
1522  {
1523  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
1524  }
1525  }
1526  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
1527  _serverVersion = ack.serverVersion();
1528  if (_bookmarkStore.isValid())
1529  _bookmarkStore.setServerVersion(_serverVersion);
1530  }
1531  if(_ackBatchSize)
1532  {
1533  const std::string& options = ack.options();
1534  size_t index = options.find_first_of("max_backlog=");
1535  if(index != std::string::npos)
1536  {
1537  unsigned data =0;
1538  const char* c = options.c_str()+index+12;
1539  while(*c && *c!=',')
1540  {
1541  data = (data*10) + (unsigned)(*c++-48);
1542  }
1543  if(_ackBatchSize > data) _ackBatchSize = data;
1544  }
1545  }
1546  return ack;
1547  }
1548  const size_t NotEntitled = 12;
1549  std::string ackReason = ack.reason();
1550  if (ackReason.length() == 0) return ack; // none
1551  if (ackReason.length() == NotEntitled &&
1552  ackReason[0] == 'n' &&
1553  message_.getUserId().len() == 0)
1554  {
1555  message_.assignUserId(_username);
1556  }
1557  message_.throwFor(_client, ackReason);
1558  }
1559  else // !ack.responded()
1560  {
1561  if (!ack.abandoned())
1562  {
1563  throw TimedOutException("timed out waiting for operation.");
1564  }
1565  else
1566  {
1567  throw DisconnectedException("Connection closed while waiting for response.");
1568  }
1569  }
1570  return ack;
1571  }
1572 
1573  void _cleanup(void)
1574  {
1575  if (!_client) return;
1577  NULL,
1578  0L);
1580  NULL,
1581  0L);
1582  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1583  _pEmptyMessageStream.reset(NULL);
1584  amps_client_destroy(_client);
1585  _client = NULL;
1586  }
1587 
1588 public:
1589 
1590  ClientImpl(const std::string& clientName)
1591  : _client(NULL), _name(clientName)
1592  , _isRetryOnDisconnect(true)
1593  , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1594  , _badTimeToHASubscribe(0), _serverVersion()
1595  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1596  , _isAutoAckEnabled(false)
1597  , _ackBatchSize(0)
1598  , _queuedAckCount(0)
1599  , _defaultMaxDepth(0)
1600  , _connected(false)
1601  , _heartbeatInterval(0)
1602  , _readTimeout(0)
1603  {
1604  _replayer.setClient(this);
1605  _client = amps_client_create(clientName.c_str());
1606  amps_client_set_message_handler(_client, (amps_handler)ClientImpl::ClientImplMessageHandler, this);
1607  amps_client_set_predisconnect_handler(_client, (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler, this);
1608  amps_client_set_disconnect_handler(_client, (amps_handler)ClientImpl::ClientImplDisconnectHandler, this);
1609  _exceptionListener = &_defaultExceptionListener;
1610  for (size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1611  {
1612  _globalCommandTypeHandlers.push_back(MessageHandler());
1613  }
1614  }
1615 
1616  virtual ~ClientImpl()
1617  {
1618  _cleanup();
1619  }
1620 
1621  const std::string& getName() const
1622  {
1623  return _name;
1624  }
1625 
1626  const std::string& getNameHash() const
1627  {
1628  return _nameHash;
1629  }
1630 
1631  void setName(const std::string& name)
1632  {
1633  // This operation will fail if the client's
1634  // name is already set.
1636  _client, name.c_str());
1637  if (result != AMPS_E_OK)
1638  {
1639  AMPSException::throwFor(_client, result);
1640  }
1641  _name = name;
1642  }
1643 
1644  const std::string& getLogonCorrelationData() const
1645  {
1646  return _logonCorrelationData;
1647  }
1648 
1649  void setLogonCorrelationData(const std::string& logonCorrelationData_)
1650  {
1651  _logonCorrelationData = logonCorrelationData_;
1652  }
1653 
1654  size_t getServerVersion() const
1655  {
1656  return _serverVersion.getOldStyleVersion();
1657  }
1658 
1659  VersionInfo getServerVersionInfo() const
1660  {
1661  return _serverVersion;
1662  }
1663 
1664  const std::string& getURI() const
1665  {
1666  return _lastUri;
1667  }
1668 
1669  virtual void connect(const std::string& uri)
1670  {
1671  Lock<Mutex> l(_lock);
1672  _connect(uri);
1673  }
1674 
1675  virtual void _connect(const std::string& uri)
1676  {
1677  _lastUri = uri;
1679  _client, uri.c_str());
1680  if (result != AMPS_E_OK)
1681  {
1682  AMPSException::throwFor(_client, result);
1683  }
1684  _message.reset();
1685  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
1686  _publishMessage.setCommandEnum(Message::Command::Publish);
1687  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
1688  _beatMessage.setOptions("beat");
1689  _readMessage.setClientImpl(this);
1690  if(_queueAckTimeout)
1691  {
1692  amps_client_set_idle_time(_client,_queueAckTimeout);
1693  }
1694  _connected = true;
1695  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1696  }
1697 
1698  void setDisconnected()
1699  {
1700  {
1701  Lock<Mutex> l(_lock);
1702  if (_connected)
1703  {
1704  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
1705  }
1706  _connected = false;
1707  _routes.clear();
1708  _heartbeatTimer.setTimeout(0.0);
1709  }
1710  clearAcks(INT_MAX);
1711  amps_client_disconnect(_client);
1712  }
1713 
1714  virtual void disconnect()
1715  {
1716  {
1717  Lock<Mutex> l(_lock);
1718  _message.reset();
1719  _message.setCommandEnum(Message::Command::Unsubscribe);
1720  _message.newCommandId();
1721  _message.setSubscriptionId("all");
1722  AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1723  }
1724  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1725  setDisconnected();
1726  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
1727  Lock<Mutex> l(_lock);
1728  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1729  }
1730 
1731  void clearAcks(unsigned failedVersion)
1732  {
1733  // Have to lock to prevent race conditions
1734  Lock<Mutex> guard(_ackMapLock);
1735  {
1736  // Go ahead and signal any waiters if they are around...
1737  std::vector<std::string> worklist;
1738  for(AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
1739  {
1740  if (i->second.getConnectionVersion() <= failedVersion)
1741  {
1742  i->second.setAbandoned(true);
1743  worklist.push_back(i->first);
1744  }
1745  }
1746 
1747  for(std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
1748  {
1749  _ackMap.erase(*j);
1750  }
1751  }
1752 
1753  _lock.signalAll();
1754  }
1755 
1756  int send(const Message& message)
1757  {
1758  Lock<Mutex> l(_lock);
1759  return _send(message);
1760  }
1761 
1762  void sendWithoutRetry(const Message& message_)
1763  {
1764  Lock<Mutex> l(_lock);
1765  _sendWithoutRetry(message_);
1766  }
1767 
1768  void _sendWithoutRetry(const Message& message_)
1769  {
1770  amps_result result = amps_client_send(_client, message_.getMessage());
1771  if(result != AMPS_E_OK)
1772  {
1773  AMPSException::throwFor(_client,result);
1774  }
1775  }
1776 
1777  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1778  bool isHASubscribe_ = false)
1779  {
1780  // Lock is already acquired
1781  amps_result result = AMPS_E_RETRY;
1782 
1783  // Create a local reference to this message, as we'll need to hold on
1784  // to a reference to it in case reconnect occurs.
1785  Message localMessage = message;
1786  unsigned version = 0;
1787 
1788  while(result == AMPS_E_RETRY)
1789  {
1790  if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1791  {
1792  // If retrySend is disabled, do not wait for the reconnect
1793  // to finish, just throw.
1794  if(!_isRetryOnDisconnect)
1795  {
1796  AMPSException::throwFor(_client,AMPS_E_RETRY);
1797  }
1798  if (!_lock.wait(1000))
1799  {
1800  amps_invoke_waiting_function();
1801  }
1802  }
1803  else
1804  {
1805  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1806  (isHASubscribe_ && _badTimeToHASubscribe != 0))
1807  {
1808  return (int)version;
1809  }
1810  // It's possible to get here out of order, but this way we'll
1811  // always send in order.
1812  if (haSeq > _lastSentHaSequenceNumber)
1813  {
1814  while (haSeq > _lastSentHaSequenceNumber + 1)
1815  {
1816  try
1817  {
1818  // Replayer updates _lastSentHaSsequenceNumber
1819  if (!_publishStore.replaySingle(_replayer,
1820  _lastSentHaSequenceNumber+1))
1821  {
1822  //++_lastSentHaSequenceNumber;
1823  continue;
1824  }
1825  result = AMPS_E_OK;
1826  version = _replayer._version;
1827  }
1828  #ifdef _WIN32
1829  catch(const DisconnectedException&)
1830  #else
1831  catch(const DisconnectedException& e)
1832  #endif
1833  {
1834  result = _replayer._res;
1835  break;
1836  }
1837  }
1838  result = amps_client_send_with_version(_client,
1839  localMessage.getMessage(),
1840  &version);
1841  ++_lastSentHaSequenceNumber;
1842  }
1843  else
1844  result = amps_client_send_with_version(_client,
1845  localMessage.getMessage(),
1846  &version);
1847  if (result != AMPS_E_OK)
1848  {
1849  if (!isHASubscribe_ && !haSeq &&
1850  localMessage.getMessage() == message.getMessage())
1851  {
1852  localMessage = message.deepCopy();
1853  }
1854  if(_isRetryOnDisconnect)
1855  {
1856  Unlock<Mutex> u(_lock);
1857  result = amps_client_attempt_reconnect(_client, version);
1858  // If this is an HA publish or subscrbie command, it was
1859  // stored first and will have already been replayed by the
1860  // store or sub manager after reconnect, so just return.
1861  if ((isHASubscribe_ || haSeq) &&
1862  result == AMPS_E_RETRY)
1863  {
1864  return (int)version;
1865  }
1866  }
1867  else
1868  {
1869  // retrySend is disabled so throw the error
1870  // from the send as an exception, do not retry.
1871  AMPSException::throwFor(_client, result);
1872  }
1873  }
1874  }
1875  if (result == AMPS_E_RETRY)
1876  {
1877  amps_invoke_waiting_function();
1878  }
1879  }
1880 
1881  if (result != AMPS_E_OK) AMPSException::throwFor(_client, result);
1882  return (int)version;
1883  }
1884 
1885  void addMessageHandler(const Field& commandId_,
1886  const AMPS::MessageHandler& messageHandler_,
1887  unsigned requestedAcks_, bool isSubscribe_)
1888  {
1889  Lock<Mutex> lock(_lock);
1890  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1891  0, isSubscribe_);
1892  }
1893 
1894  bool removeMessageHandler(const Field& commandId_)
1895  {
1896  Lock<Mutex> lock(_lock);
1897  return _routes.removeRoute(commandId_);
1898  }
1899 
1900  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
1901  {
1902  Field id = message_.getCommandId();
1903  Field subId = message_.getSubscriptionId();
1904  Field qid = message_.getQueryId();
1905  bool isSubscribe = false;
1906  bool isSubscribeOnly = false;
1907  bool replace = false;
1908  unsigned requestedAcks = message_.getAckTypeEnum();
1909  unsigned systemAddedAcks = Message::AckType::None;
1910 
1911  switch(message_.getCommandEnum())
1912  {
1913  case Message::Command::Subscribe:
1914  case Message::Command::DeltaSubscribe:
1915  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos;
1916  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
1917  {
1918  systemAddedAcks |= Message::AckType::Persisted;
1919  }
1920  isSubscribeOnly = true;
1921  // fall through
1922  case Message::Command::SOWAndSubscribe:
1923  case Message::Command::SOWAndDeltaSubscribe:
1924  if (id.empty())
1925  {
1926  id = message_.newCommandId().getCommandId();
1927  }
1928  else
1929  {
1930  while (!replace && id != subId && _routes.hasRoute(id))
1931  {
1932  id = message_.newCommandId().getCommandId();
1933  }
1934  }
1935  if (subId.empty())
1936  {
1937  message_.setSubscriptionId(id);
1938  subId = id;
1939  }
1940  isSubscribe = true;
1941  // fall through
1942  case Message::Command::SOW:
1943  if(id.empty())
1944  {
1945  id = message_.newCommandId().getCommandId();
1946  }
1947  else
1948  {
1949  while (!replace && id != subId && _routes.hasRoute(id))
1950  {
1951  message_.newCommandId();
1952  if (qid == id)
1953  {
1954  qid = message_.getCommandId();
1955  message_.setQueryId(qid);
1956  }
1957  id = message_.getCommandId();
1958  }
1959  }
1960  if (!isSubscribeOnly)
1961  {
1962  if (qid.empty())
1963  {
1964  message_.setQueryID(id);
1965  qid = id;
1966  }
1967  else
1968  {
1969  while (!replace && qid != subId && qid != id
1970  && _routes.hasRoute(qid))
1971  {
1972  qid = message_.newQueryId().getQueryId();
1973  }
1974  }
1975  }
1976  systemAddedAcks |= Message::AckType::Processed;
1977  // for SOW only, we get a completed ack so we know when to remove the handler.
1978  if (!isSubscribeOnly) systemAddedAcks |= Message::AckType::Completed;
1979  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
1980  {
1981  int routesAdded = 0;
1982  Lock<Mutex> l(_lock);
1983  if (!subId.empty() && messageHandler_.isValid())
1984  {
1985  if (!_routes.hasRoute(subId))
1986  {
1987  ++routesAdded;
1988  }
1989  // This can replace a non-subscribe with a matching id
1990  // with a subscription but not another subscription.
1991  _routes.addRoute(subId, messageHandler_, requestedAcks,
1992  systemAddedAcks, isSubscribe);
1993  }
1994  if (!isSubscribeOnly && !qid.empty()
1995  && messageHandler_.isValid() && qid != subId)
1996  {
1997  if (routesAdded == 0)
1998  {
1999  _routes.addRoute(qid, messageHandler_,
2000  requestedAcks, systemAddedAcks, false);
2001  }
2002  else
2003  {
2004  void* data = NULL;
2005  {
2006  Unlock<Mutex> u(_lock);
2007  data = amps_invoke_copy_route_function(
2008  messageHandler_.userData());
2009  }
2010  if (!data)
2011  {
2012  _routes.addRoute(qid, messageHandler_, requestedAcks,
2013  systemAddedAcks, false);
2014  }
2015  else
2016  {
2017  _routes.addRoute(qid,
2018  MessageHandler(messageHandler_.function(),
2019  data),
2020  requestedAcks, systemAddedAcks, false);
2021  }
2022  }
2023  ++routesAdded;
2024  }
2025  if (!id.empty() && messageHandler_.isValid()
2026  && requestedAcks & ~Message::AckType::Persisted
2027  && id != subId && id != qid)
2028  {
2029  if (routesAdded == 0)
2030  {
2031  _routes.addRoute(id, messageHandler_, requestedAcks,
2032  systemAddedAcks, false);
2033  }
2034  else
2035  {
2036  void* data = NULL;
2037  {
2038  Unlock<Mutex> u(_lock);
2039  data = amps_invoke_copy_route_function(
2040  messageHandler_.userData());
2041  }
2042  if (!data)
2043  {
2044  _routes.addRoute(id, messageHandler_, requestedAcks,
2045  systemAddedAcks, false);
2046  }
2047  else
2048  {
2049  _routes.addRoute(id,
2050  MessageHandler(messageHandler_.function(),
2051  data),
2052  requestedAcks,
2053  systemAddedAcks, false);
2054  }
2055  }
2056  ++routesAdded;
2057  }
2058  try
2059  {
2060  // We aren't adding to subscription manager, so this isn't
2061  // an HA subscribe.
2062  syncAckProcessing(timeout_, message_, 0, false);
2063  message_.setAckTypeEnum(requestedAcks);
2064  }
2065  catch (...)
2066  {
2067  _routes.removeRoute(message_.getQueryID());
2068  _routes.removeRoute(message_.getSubscriptionId());
2069  _routes.removeRoute(id);
2070  message_.setAckTypeEnum(requestedAcks);
2071  throw;
2072  }
2073  }
2074  break;
2075  // These are valid commands that are used as-is
2076  case Message::Command::Unsubscribe:
2077  case Message::Command::Heartbeat:
2078  case Message::Command::Logon:
2079  case Message::Command::StartTimer:
2080  case Message::Command::StopTimer:
2081  case Message::Command::DeltaPublish:
2082  case Message::Command::Publish:
2083  case Message::Command::SOWDelete:
2084  {
2085  Lock<Mutex> l(_lock);
2086  // if an ack is requested, it'll need a command ID.
2087  if (message_.getAckTypeEnum() != Message::AckType::None)
2088  {
2089  if (id.empty())
2090  {
2091  message_.newCommandId();
2092  id = message_.getCommandId();
2093  }
2094  if (messageHandler_.isValid())
2095  {
2096  _routes.addRoute(id, messageHandler_, requestedAcks,
2097  Message::AckType::None, false);
2098  }
2099  }
2100  _send(message_);
2101  }
2102  break;
2103  // These are things that shouldn't be sent (not meaningful)
2104  case Message::Command::GroupBegin:
2105  case Message::Command::GroupEnd:
2106  case Message::Command::OOF:
2107  case Message::Command::Ack:
2108  case Message::Command::Unknown:
2109  default:
2110  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2111  }
2112  message_.setAckTypeEnum(requestedAcks);
2113  return id;
2114  }
2115 
2116  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2117  {
2118  Lock<Mutex> l(_lock);
2119  _disconnectHandler = disconnectHandler;
2120  }
2121 
2122  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2123  {
2124  switch (command_[0])
2125  {
2126 #if 0 // Not currently implemented to avoid an extra branch in delivery
2127  case 'p':
2128  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2129  break;
2130  case 's':
2131  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2132  break;
2133 #endif
2134  case 'h':
2135  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2136  break;
2137 #if 0 // Not currently implemented to avoid an extra branch in delivery
2138  case 'g':
2139  if (command_[6] == 'b')
2140  {
2141  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2142  }
2143  else if (command_[6] == 'e')
2144  {
2145  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2146  }
2147  else
2148  {
2149  std::ostringstream os;
2150  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2151  throw CommandException(os.str());
2152  }
2153  break;
2154  case 'o':
2155  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2156  break;
2157 #endif
2158  case 'a':
2159  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2160  break;
2161  case 'l':
2162  case 'L':
2163  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2164  break;
2165  case 'd':
2166  case 'D':
2167  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2168  break;
2169  default:
2170  std::ostringstream os;
2171  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2172  throw CommandException(os.str());
2173  break;
2174  }
2175  }
2176 
2177  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2178  {
2179  switch (command_)
2180  {
2181 #if 0 // Not currently implemented to avoid an extra branch in delivery
2182  case Message::Command::Publish:
2183  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2184  break;
2185  case Message::Command::SOW:
2186  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2187  break;
2188 #endif
2189  case Message::Command::Heartbeat:
2190  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2191  break;
2192 #if 0 // Not currently implemented to avoid an extra branch in delivery
2193  case Message::Command::GroupBegin:
2194  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2195  break;
2196  case Message::Command::GroupEnd:
2197  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2198  break;
2199  case Message::Command::OOF:
2200  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2201  break;
2202 #endif
2203  case Message::Command::Ack:
2204  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2205  break;
2206  default:
2207  unsigned bits = 0;
2208  unsigned command = command_;
2209  while (command > 0) { ++bits; command >>= 1; }
2210  char errBuf[128];
2211  AMPS_snprintf(errBuf, sizeof(errBuf),
2212  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2213  CommandConstants<0>::Lengths[bits],
2214  CommandConstants<0>::Values[bits]);
2215  throw CommandException(errBuf);
2216  break;
2217  }
2218  }
2219 
2220  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2221  {
2222  _globalCommandTypeHandlers[handlerType_] = handler_;
2223  }
2224 
2225  void setFailedWriteHandler(FailedWriteHandler* handler_)
2226  {
2227  Lock<Mutex> l(_lock);
2228  _failedWriteHandler.reset(handler_);
2229  }
2230 
2231  void setPublishStore(const Store& publishStore_)
2232  {
2233  Lock<Mutex> l(_lock);
2234  if (_connected) throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2235  _publishStore = publishStore_;
2236  }
2237 
2238  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2239  {
2240  Lock<Mutex> l(_lock);
2241  if (_connected) throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2242  _bookmarkStore = bookmarkStore_;
2243  }
2244 
2245  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2246  {
2247  Lock<Mutex> l(_lock);
2248  _subscriptionManager.reset(subscriptionManager_);
2249  }
2250 
2251  SubscriptionManager* getSubscriptionManager() const
2252  {
2253  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2254  }
2255 
2256  DisconnectHandler getDisconnectHandler() const
2257  {
2258  return _disconnectHandler;
2259  }
2260 
2261  MessageHandler getDuplicateMessageHandler() const
2262  {
2263  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2264  }
2265 
2266  FailedWriteHandler* getFailedWriteHandler() const
2267  {
2268  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2269  }
2270 
2271  Store getPublishStore() const
2272  {
2273  return _publishStore;
2274  }
2275 
2276  BookmarkStore getBookmarkStore() const
2277  {
2278  return _bookmarkStore;
2279  }
2280 
2281  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,size_t dataLen_)
2282  {
2283  if (!_publishStore.isValid())
2284  {
2285  Lock<Mutex> l(_lock);
2286  _publishMessage.assignTopic(topic_, topicLen_);
2287  _publishMessage.assignData(data_, dataLen_);
2288  _send(_publishMessage);
2289  return 0;
2290  }
2291  else
2292  {
2293  if (!publishStoreMessage)
2294  {
2295  publishStoreMessage = new Message();
2296  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2297  }
2298  publishStoreMessage->reset();
2299  publishStoreMessage->setCommandEnum(Message::Command::Publish);
2300  return _publish(topic_, topicLen_, data_, dataLen_);
2301  }
2302  }
2303 
2304  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2305  size_t dataLen_, unsigned long expiration_)
2306  {
2307  if (!_publishStore.isValid())
2308  {
2309  Lock<Mutex> l(_lock);
2310  _publishMessage.assignTopic(topic_, topicLen_);
2311  _publishMessage.assignData(data_, dataLen_);
2312  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2313  size_t pos = convertToCharArray(exprBuf, expiration_);
2314  _publishMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2315  _send(_publishMessage);
2316  _publishMessage.assignExpiration(NULL, 0);
2317  return 0;
2318  }
2319  else
2320  {
2321  if (!publishStoreMessage)
2322  {
2323  publishStoreMessage = new Message();
2324  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2325  }
2326  publishStoreMessage->reset();
2327  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2328  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2329  publishStoreMessage->setCommandEnum(Message::Command::Publish)
2330  .assignExpiration(exprBuf+exprPos,
2331  AMPS_NUMBER_BUFFER_LEN-exprPos);
2332  return _publish(topic_, topicLen_, data_, dataLen_);
2333  }
2334  }
2335 
2336  class FlushAckHandler : ConnectionStateListener
2337  {
2338  private:
2339  ClientImpl* _pClient;
2340  Field _cmdId;
2341  volatile bool _acked;
2342  volatile bool _disconnected;
2343  public:
2344  FlushAckHandler(ClientImpl* pClient_)
2345  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2346  {
2347  pClient_->addConnectionStateListener(this);
2348  }
2349  ~FlushAckHandler()
2350  {
2351  _pClient->removeConnectionStateListener(this);
2352  _pClient->removeMessageHandler(_cmdId);
2353  _cmdId.clear();
2354  }
2355  void setCommandId(const Field& cmdId_)
2356  {
2357  _cmdId.deepCopy(cmdId_);
2358  }
2359  void invoke(const Message&)
2360  {
2361  _acked = true;
2362  }
2363  void connectionStateChanged(State state_)
2364  {
2365  if (state_ <= Shutdown)
2366  {
2367  _disconnected = true;
2368  }
2369  }
2370  bool acked()
2371  {
2372  return _acked;
2373  }
2374  bool done()
2375  {
2376  return _acked || _disconnected;
2377  }
2378  };
2379 
2380  void publishFlush(long timeout_, unsigned ackType_)
2381  {
2382  static const char* processed = "processed";
2383  static const size_t processedLen = strlen(processed);
2384  static const char* persisted = "persisted";
2385  static const size_t persistedLen = strlen(persisted);
2386  static const char* flush = "flush";
2387  static const size_t flushLen = strlen(flush);
2388  static VersionInfo minPersisted("5.3.3.0");
2389  static VersionInfo minFlush("4");
2390  if (ackType_ != Message::AckType::Processed
2391  && ackType_ != Message::AckType::Persisted)
2392  {
2393  throw new CommandException("Flush can only be used with processed or persisted acks.");
2394  }
2395  FlushAckHandler flushHandler(this);
2396  if (_serverVersion >= minFlush)
2397  {
2398  Lock<Mutex> l(_lock);
2399  if (!_connected)
2400  throw DisconnectedException("Not cconnected trying to flush");
2401  _message.reset();
2402  _message.newCommandId();
2403  _message.assignCommand(flush, flushLen);
2404  if (_serverVersion < minPersisted
2405  || ackType_ == Message::AckType::Processed)
2406  {
2407  _message.assignAckType(processed, processedLen);
2408  }
2409  else
2410  {
2411  _message.assignAckType(persisted, persistedLen);
2412  }
2413  flushHandler.setCommandId(_message.getCommandId());
2414  addMessageHandler(_message.getCommandId(),
2415  std::bind(&FlushAckHandler::invoke,
2416  std::ref(flushHandler),
2417  std::placeholders::_1),
2418  ackType_, false);
2419  if (_send(_message) == -1)
2420  throw DisconnectedException("Disconnected trying to flush");
2421  }
2422  if (_publishStore.isValid())
2423  {
2424  try
2425  {
2426  _publishStore.flush(timeout_);
2427  }
2428  catch (const AMPSException& ex)
2429  {
2430  AMPS_UNHANDLED_EXCEPTION(ex);
2431  throw;
2432  }
2433  }
2434  else if (_serverVersion < minFlush)
2435  {
2436  if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2437  else { AMPS_USLEEP(1000 * 1000); }
2438  return;
2439  }
2440  if (timeout_)
2441  {
2442  Timer timer((double)timeout_);
2443  timer.start();
2444  while (!timer.check() && !flushHandler.done())
2445  {
2446  AMPS_USLEEP(10000);
2447  amps_invoke_waiting_function();
2448  }
2449  }
2450  else
2451  {
2452  while (!flushHandler.done())
2453  {
2454  AMPS_USLEEP(10000);
2455  amps_invoke_waiting_function();
2456  }
2457  }
2458  // No response or disconnect in timeout interval
2459  if (!flushHandler.done())
2460  throw TimedOutException("Timed out waiting for flush");
2461  // We got disconnected and there is no publish store
2462  if (!flushHandler.acked() && !_publishStore.isValid())
2463  throw DisconnectedException("Disconnected waiting for flush");
2464  }
2465 
2466  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2467  const char* data_, size_t dataLength_)
2468  {
2469  if (!_publishStore.isValid())
2470  {
2471  Lock<Mutex> l(_lock);
2472  _deltaMessage.assignTopic(topic_, topicLength_);
2473  _deltaMessage.assignData(data_, dataLength_);
2474  _send(_deltaMessage);
2475  return 0;
2476  }
2477  else
2478  {
2479  if (!publishStoreMessage)
2480  {
2481  publishStoreMessage = new Message();
2482  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2483  }
2484  publishStoreMessage->reset();
2485  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2486  return _publish(topic_, topicLength_, data_, dataLength_);
2487  }
2488  }
2489 
2490  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2491  const char* data_, size_t dataLength_,
2492  unsigned long expiration_)
2493  {
2494  if (!_publishStore.isValid())
2495  {
2496  Lock<Mutex> l(_lock);
2497  _deltaMessage.assignTopic(topic_, topicLength_);
2498  _deltaMessage.assignData(data_, dataLength_);
2499  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2500  size_t pos = convertToCharArray(exprBuf, expiration_);
2501  _deltaMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2502  _send(_deltaMessage);
2503  _deltaMessage.assignExpiration(NULL, 0);
2504  return 0;
2505  }
2506  else
2507  {
2508  if (!publishStoreMessage)
2509  {
2510  publishStoreMessage = new Message();
2511  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2512  }
2513  publishStoreMessage->reset();
2514  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2515  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2516  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2517  .assignExpiration(exprBuf+exprPos,
2518  AMPS_NUMBER_BUFFER_LEN-exprPos);
2519  return _publish(topic_, topicLength_, data_, dataLength_);
2520  }
2521  }
2522 
2523  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
2524  const char* data_, size_t dataLength_)
2525  {
2526  publishStoreMessage->assignTopic(topic_, topicLength_)
2527  .setAckTypeEnum(Message::AckType::Persisted)
2528  .assignData(data_, dataLength_);
2529  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
2530  char buf[AMPS_NUMBER_BUFFER_LEN];
2531  size_t pos = convertToCharArray(buf, haSequenceNumber);
2532  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2533  {
2534  Lock<Mutex> l(_lock);
2535  _send(*publishStoreMessage, haSequenceNumber);
2536  }
2537  return haSequenceNumber;
2538  }
2539 
2540  virtual std::string logon(long timeout_, Authenticator& authenticator_,
2541  const char* options_ = NULL)
2542  {
2543  Lock<Mutex> l(_lock);
2544  return _logon(timeout_, authenticator_, options_);
2545  }
2546 
2547  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
2548  const char* options_ = NULL)
2549  {
2550  AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2551  _message.reset();
2552  _message.setCommandEnum(Message::Command::Logon);
2553  _message.newCommandId();
2554  std::string newCommandId = _message.getCommandId();
2555  _message.setClientName(_name);
2556 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
2557  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
2558  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2559 #endif
2560  URI uri(_lastUri);
2561  if(uri.user().size()) _message.setUserId(uri.user());
2562  if(uri.password().size()) _message.setPassword(uri.password());
2563  if(uri.protocol() == "amps" && uri.messageType().size())
2564  {
2565  _message.setMessageType(uri.messageType());
2566  }
2567  if(uri.isTrue("pretty"))
2568  {
2569  _message.setOptions("pretty");
2570  }
2571 
2572  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
2573  if (!_logonCorrelationData.empty())
2574  {
2575  _message.assignCorrelationId(_logonCorrelationData);
2576  }
2577  if (options_)
2578  {
2579  _message.setOptions(options_);
2580  }
2581  _username = _message.getUserId();
2582  try
2583  {
2584  while(true)
2585  {
2586  _message.setAckTypeEnum(Message::AckType::Processed);
2587  AckResponse ack = syncAckProcessing(timeout_, _message);
2588  if (ack.status() == "retry")
2589  {
2590  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
2591  _username = ack.username();
2592  _message.setUserId(_username);
2593  }
2594  else
2595  {
2596  authenticator_.completed(ack.username(), ack.password(), ack.reason());
2597  break;
2598  }
2599  }
2600  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2601 
2602  // Now re-send the heartbeat command if configured
2603  _sendHeartbeat();
2604  }
2605  catch(const AMPSException& ex)
2606  {
2607  _lock.signalAll();
2608  AMPS_UNHANDLED_EXCEPTION(ex);
2609  throw;
2610  }
2611  catch(...)
2612  {
2613  _lock.signalAll();
2614  throw;
2615  }
2616 
2617  if (_publishStore.isValid())
2618  {
2619  try
2620  {
2621  _publishStore.replay(_replayer);
2622  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2623  }
2624  catch(const StoreException& ex)
2625  {
2626  _lock.signalAll();
2627  std::ostringstream os;
2628  os << "A local store exception occurred while logging on."
2629  << ex.toString();
2630  throw ConnectionException(os.str());
2631  }
2632  catch(const AMPSException& ex)
2633  {
2634  _lock.signalAll();
2635  AMPS_UNHANDLED_EXCEPTION(ex);
2636  throw ex;
2637  }
2638  catch(const std::exception& ex)
2639  {
2640  _lock.signalAll();
2641  AMPS_UNHANDLED_EXCEPTION(ex);
2642  throw ex;
2643  }
2644  catch(...)
2645  {
2646  _lock.signalAll();
2647  throw;
2648  }
2649  }
2650  _lock.signalAll();
2651  return newCommandId;
2652  }
2653 
2654  std::string subscribe(const MessageHandler& messageHandler_,
2655  const std::string& topic_,
2656  long timeout_,
2657  const std::string& filter_,
2658  const std::string& bookmark_,
2659  const std::string& options_,
2660  const std::string& subId_,
2661  bool isHASubscribe_ = true)
2662  {
2663  isHASubscribe_ &= (bool)_subscriptionManager;
2664  Lock<Mutex> l(_lock);
2665  _message.reset();
2666  _message.setCommandEnum(Message::Command::Subscribe);
2667  _message.newCommandId();
2668  std::string subId(subId_);
2669  if (subId.empty())
2670  {
2671  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2672  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
2673 
2674  subId = _message.getCommandId();
2675  }
2676  _message.setSubscriptionId(subId);
2677  // we need to deep copy this before sending the message; while we are
2678  // waiting for a response, the fields in _message may get blown away for
2679  // other operations.
2680  AMPS::Message::Field subIdField(subId);
2681  unsigned ackTypes = Message::AckType::Processed;
2682 
2683  if (!bookmark_.empty() && _bookmarkStore.isValid())
2684  {
2685  ackTypes |= Message::AckType::Persisted;
2686  }
2687  _message.setTopic(topic_);
2688 
2689  if (filter_.length()) _message.setFilter(filter_);
2690  if (bookmark_.length())
2691  {
2692  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2693  {
2694  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2695  _message.setBookmark(mostRecent);
2696  }
2697  else
2698  {
2699  _message.setBookmark(bookmark_);
2700  if (_bookmarkStore.isValid())
2701  {
2702  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2703  bookmark_ != AMPS_BOOKMARK_EPOCH)
2704  {
2705  _bookmarkStore.log(_message);
2706  _bookmarkStore.discard(_message);
2707  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2708  }
2709  }
2710  }
2711  }
2712  if (options_.length()) _message.setOptions(options_);
2713 
2714  Message message = _message;
2715  if (isHASubscribe_)
2716  {
2717  message = _message.deepCopy();
2718  Unlock<Mutex> u(_lock);
2719  _subscriptionManager->subscribe(messageHandler_, message,
2720  Message::AckType::None);
2721  if (_badTimeToHASubscribe) return subId;
2722  }
2723  if (!_routes.hasRoute(_message.getSubscriptionId()))
2724  {
2725  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2726  Message::AckType::None, ackTypes, true);
2727  }
2728  message.setAckTypeEnum(ackTypes);
2729  if (!options_.empty()) message.setOptions(options_);
2730  try
2731  {
2732  syncAckProcessing(timeout_, message, isHASubscribe_);
2733  }
2734  catch (const DisconnectedException&)
2735  {
2736  if (!isHASubscribe_)
2737  {
2738  _routes.removeRoute(subIdField);
2739  throw;
2740  }
2741  else
2742  {
2743  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2744  throw;
2745  }
2746  }
2747  catch (const TimedOutException&)
2748  {
2749  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2750  throw;
2751  }
2752  catch (...)
2753  {
2754  if (isHASubscribe_)
2755  {
2756  // Have to unlock before calling into sub manager to avoid deadlock
2757  Unlock<Mutex> unlock(_lock);
2758  _subscriptionManager->unsubscribe(subIdField);
2759  }
2760  _routes.removeRoute(subIdField);
2761  throw;
2762  }
2763 
2764  return subId;
2765  }
2766  std::string deltaSubscribe(const MessageHandler& messageHandler_,
2767  const std::string& topic_,
2768  long timeout_,
2769  const std::string& filter_,
2770  const std::string& bookmark_,
2771  const std::string& options_,
2772  const std::string& subId_ = "",
2773  bool isHASubscribe_ = true)
2774  {
2775  isHASubscribe_ &= (bool)_subscriptionManager;
2776  Lock<Mutex> l(_lock);
2777  _message.reset();
2778  _message.setCommandEnum(Message::Command::DeltaSubscribe);
2779  _message.newCommandId();
2780  std::string subId(subId_);
2781  if (subId.empty())
2782  {
2783  subId = _message.getCommandId();
2784  }
2785  _message.setSubscriptionId(subId);
2786  // we need to deep copy this before sending the message; while we are
2787  // waiting for a response, the fields in _message may get blown away for
2788  // other operations.
2789  AMPS::Message::Field subIdField(subId);
2790  unsigned ackTypes = Message::AckType::Processed;
2791 
2792  if (!bookmark_.empty() && _bookmarkStore.isValid())
2793  {
2794  ackTypes |= Message::AckType::Persisted;
2795  }
2796  _message.setTopic(topic_);
2797  if (filter_.length()) _message.setFilter(filter_);
2798  if (bookmark_.length())
2799  {
2800  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2801  {
2802  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2803  _message.setBookmark(mostRecent);
2804  }
2805  else
2806  {
2807  _message.setBookmark(bookmark_);
2808  if (_bookmarkStore.isValid())
2809  {
2810  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2811  bookmark_ != AMPS_BOOKMARK_EPOCH)
2812  {
2813  _bookmarkStore.log(_message);
2814  _bookmarkStore.discard(_message);
2815  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2816  }
2817  }
2818  }
2819  }
2820  if (options_.length()) _message.setOptions(options_);
2821  Message message = _message;
2822  if (isHASubscribe_)
2823  {
2824  message = _message.deepCopy();
2825  Unlock<Mutex> u(_lock);
2826  _subscriptionManager->subscribe(messageHandler_, message,
2827  Message::AckType::None);
2828  if (_badTimeToHASubscribe) return subId;
2829  }
2830  if (!_routes.hasRoute(_message.getSubscriptionId()))
2831  {
2832  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2833  Message::AckType::None, ackTypes, true);
2834  }
2835  message.setAckTypeEnum(ackTypes);
2836  if (!options_.empty()) message.setOptions(options_);
2837  try
2838  {
2839  syncAckProcessing(timeout_, message, isHASubscribe_);
2840  }
2841  catch (const DisconnectedException&)
2842  {
2843  if (!isHASubscribe_)
2844  {
2845  _routes.removeRoute(subIdField);
2846  throw;
2847  }
2848  }
2849  catch (const TimedOutException&)
2850  {
2851  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2852  throw;
2853  }
2854  catch (...)
2855  {
2856  if (isHASubscribe_)
2857  {
2858  // Have to unlock before calling into sub manager to avoid deadlock
2859  Unlock<Mutex> unlock(_lock);
2860  _subscriptionManager->unsubscribe(subIdField);
2861  }
2862  _routes.removeRoute(subIdField);
2863  throw;
2864  }
2865  return subId;
2866  }
2867 
2868  void unsubscribe(const std::string& id)
2869  {
2870  Lock<Mutex> l(_lock);
2871  unsubscribeInternal(id);
2872  }
2873 
2874  void unsubscribe(void)
2875  {
2876  if (_subscriptionManager)
2877  {
2878  _subscriptionManager->clear();
2879  }
2880  {
2881  _routes.unsubscribeAll();
2882  Lock<Mutex> l(_lock);
2883  _message.reset();
2884  _message.setCommandEnum(Message::Command::Unsubscribe);
2885  _message.newCommandId();
2886  _message.setSubscriptionId("all");
2887  _sendWithoutRetry(_message);
2888  }
2889  deferredExecution(&amps_noOpFn, NULL);
2890  }
2891 
2892  std::string sow(const MessageHandler& messageHandler_,
2893  const std::string& topic_,
2894  const std::string& filter_ = "",
2895  const std::string& orderBy_ = "",
2896  const std::string& bookmark_ = "",
2897  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2898  int topN_ = AMPS_DEFAULT_TOP_N,
2899  const std::string& options_ = "",
2900  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2901  {
2902  Lock<Mutex> l(_lock);
2903  _message.reset();
2904  _message.setCommandEnum(Message::Command::SOW);
2905  _message.newCommandId();
2906  // need to keep our own copy of the command ID.
2907  std::string commandId = _message.getCommandId();
2908  _message.setQueryID(_message.getCommandId());
2909  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2910  _message.setAckTypeEnum(ackTypes);
2911  _message.setTopic(topic_);
2912  if (filter_.length()) _message.setFilter(filter_);
2913  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2914  if (bookmark_.length()) _message.setBookmark(bookmark_);
2915  _message.setBatchSize(AMPS::asString(batchSize_));
2916  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2917  if (options_.length()) _message.setOptions(options_);
2918 
2919  _routes.addRoute(_message.getQueryID(), messageHandler_,
2920  Message::AckType::None, ackTypes, false);
2921 
2922  try
2923  {
2924  syncAckProcessing(timeout_, _message);
2925  }
2926  catch (...)
2927  {
2928  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2929  throw;
2930  }
2931 
2932  return commandId;
2933  }
2934 
2935  std::string sow(const MessageHandler& messageHandler_,
2936  const std::string& topic_,
2937  long timeout_,
2938  const std::string& filter_ = "",
2939  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2940  int topN_ = AMPS_DEFAULT_TOP_N)
2941  {
2942  std::string notSet;
2943  return sow(messageHandler_,
2944  topic_,
2945  filter_,
2946  notSet, // orderBy
2947  notSet, // bookmark
2948  batchSize_,
2949  topN_,
2950  notSet,
2951  timeout_);
2952  }
2953 
2954  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
2955  const std::string& topic_,
2956  const std::string& filter_ = "",
2957  const std::string& orderBy_ = "",
2958  const std::string& bookmark_ = "",
2959  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2960  int topN_ = AMPS_DEFAULT_TOP_N,
2961  const std::string& options_ = "",
2962  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2963  bool isHASubscribe_ = true)
2964  {
2965  isHASubscribe_ &= (bool)_subscriptionManager;
2966  Lock<Mutex> l(_lock);
2967  _message.reset();
2968  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
2969  _message.newCommandId();
2970  Field cid = _message.getCommandId();
2971  _message.setQueryID(cid);
2972  _message.setSubscriptionId(cid);
2973  std::string subId = cid;
2974  _message.setTopic(topic_);
2975  if (filter_.length()) _message.setFilter(filter_);
2976  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2977  if (bookmark_.length()) _message.setBookmark(bookmark_);
2978  _message.setBatchSize(AMPS::asString(batchSize_));
2979  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2980  if (options_.length()) _message.setOptions(options_);
2981 
2982  Message message = _message;
2983  if (isHASubscribe_)
2984  {
2985  message = _message.deepCopy();
2986  Unlock<Mutex> u(_lock);
2987  _subscriptionManager->subscribe(messageHandler_, message,
2988  Message::AckType::None);
2989  if (_badTimeToHASubscribe) return subId;
2990  }
2991  _routes.addRoute(cid, messageHandler_,
2992  Message::AckType::None, Message::AckType::Processed, true);
2993  message.setAckTypeEnum(Message::AckType::Processed);
2994  if (!options_.empty()) message.setOptions(options_);
2995  try
2996  {
2997  syncAckProcessing(timeout_, message, isHASubscribe_);
2998  }
2999  catch (const DisconnectedException&)
3000  {
3001  if (!isHASubscribe_)
3002  {
3003  _routes.removeRoute(subId);
3004  throw;
3005  }
3006  }
3007  catch (const TimedOutException&)
3008  {
3009  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3010  throw;
3011  }
3012  catch (...)
3013  {
3014  if (isHASubscribe_)
3015  {
3016  // Have to unlock before calling into sub manager to avoid deadlock
3017  Unlock<Mutex> unlock(_lock);
3018  _subscriptionManager->unsubscribe(cid);
3019  }
3020  _routes.removeRoute(subId);
3021  throw;
3022  }
3023  return subId;
3024  }
3025 
3026  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3027  const std::string& topic_,
3028  long timeout_,
3029  const std::string& filter_ = "",
3030  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3031  bool oofEnabled_ = false,
3032  int topN_ = AMPS_DEFAULT_TOP_N,
3033  bool isHASubscribe_ = true)
3034  {
3035  std::string notSet;
3036  return sowAndSubscribe(messageHandler_,
3037  topic_,
3038  filter_,
3039  notSet, // orderBy
3040  notSet, // bookmark
3041  batchSize_,
3042  topN_,
3043  (oofEnabled_ ? "oof" : ""),
3044  timeout_,
3045  isHASubscribe_);
3046  }
3047 
3048  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3049  const std::string& topic_,
3050  const std::string& filter_ = "",
3051  const std::string& orderBy_ = "",
3052  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3053  int topN_ = AMPS_DEFAULT_TOP_N,
3054  const std::string& options_ = "",
3055  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3056  bool isHASubscribe_ = true)
3057  {
3058  isHASubscribe_ &= (bool)_subscriptionManager;
3059  Lock<Mutex> l(_lock);
3060  _message.reset();
3061  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3062  _message.newCommandId();
3063  _message.setQueryID(_message.getCommandId());
3064  _message.setSubscriptionId(_message.getCommandId());
3065  std::string subId = _message.getSubscriptionId();
3066  _message.setTopic(topic_);
3067  if (filter_.length()) _message.setFilter(filter_);
3068  if (orderBy_.length()) _message.setOrderBy(orderBy_);
3069  _message.setBatchSize(AMPS::asString(batchSize_));
3070  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
3071  if (options_.length()) _message.setOptions(options_);
3072  Message message = _message;
3073  if (isHASubscribe_)
3074  {
3075  message = _message.deepCopy();
3076  Unlock<Mutex> u(_lock);
3077  _subscriptionManager->subscribe(messageHandler_, message,
3078  Message::AckType::None);
3079  if (_badTimeToHASubscribe) return subId;
3080  }
3081  _routes.addRoute(message.getQueryID(), messageHandler_,
3082  Message::AckType::None, Message::AckType::Processed, true);
3083  message.setAckTypeEnum(Message::AckType::Processed);
3084  if (!options_.empty()) message.setOptions(options_);
3085  try
3086  {
3087  syncAckProcessing(timeout_, message, isHASubscribe_);
3088  }
3089  catch (const DisconnectedException&)
3090  {
3091  if (!isHASubscribe_)
3092  {
3093  _routes.removeRoute(subId);
3094  throw;
3095  }
3096  }
3097  catch (const TimedOutException&)
3098  {
3099  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3100  throw;
3101  }
3102  catch (...)
3103  {
3104  if (isHASubscribe_)
3105  {
3106  // Have to unlock before calling into sub manager to avoid deadlock
3107  Unlock<Mutex> unlock(_lock);
3108  _subscriptionManager->unsubscribe(Field(subId));
3109  }
3110  _routes.removeRoute(subId);
3111  throw;
3112  }
3113  return subId;
3114  }
3115 
3116  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3117  const std::string& topic_,
3118  long timeout_,
3119  const std::string& filter_ = "",
3120  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3121  bool oofEnabled_ = false,
3122  bool sendEmpties_ = false,
3123  int topN_ = AMPS_DEFAULT_TOP_N,
3124  bool isHASubscribe_ = true)
3125  {
3126  std::string notSet;
3127  Message::Options options;
3128  if (oofEnabled_) options.setOOF();
3129  if (sendEmpties_ == false) options.setNoEmpties();
3130  return sowAndDeltaSubscribe(messageHandler_,
3131  topic_,
3132  filter_,
3133  notSet, // orderBy
3134  batchSize_,
3135  topN_,
3136  options,
3137  timeout_,
3138  isHASubscribe_);
3139  }
3140 
3141  std::string sowDelete(const MessageHandler& messageHandler_,
3142  const std::string& topic_,
3143  const std::string& filter_,
3144  long timeout_,
3145  Message::Field commandId_ = Message::Field())
3146  {
3147  if (_publishStore.isValid())
3148  {
3149  unsigned ackType = Message::AckType::Processed |
3150  Message::AckType::Stats |
3151  Message::AckType::Persisted;
3152  if (!publishStoreMessage)
3153  {
3154  publishStoreMessage = new Message();
3155  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3156  }
3157  publishStoreMessage->reset();
3158  if (commandId_.empty())
3159  {
3160  publishStoreMessage->newCommandId();
3161  commandId_ = publishStoreMessage->getCommandId();
3162  }
3163  else
3164  {
3165  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3166  }
3167  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3168  .assignSubscriptionId(commandId_.data(), commandId_.len())
3169  .assignQueryID(commandId_.data(), commandId_.len())
3170  .setAckTypeEnum(ackType)
3171  .assignTopic(topic_.c_str(), topic_.length())
3172  .assignFilter(filter_.c_str(), filter_.length());
3173  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3174  char buf[AMPS_NUMBER_BUFFER_LEN];
3175  size_t pos = convertToCharArray(buf, haSequenceNumber);
3176  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3177  {
3178  try
3179  {
3180  Lock<Mutex> l(_lock);
3181  _routes.addRoute(commandId_, messageHandler_,
3182  Message::AckType::Stats,
3183  Message::AckType::Processed|Message::AckType::Persisted,
3184  false);
3185  syncAckProcessing(timeout_, *publishStoreMessage,
3186  haSequenceNumber);
3187  }
3188  catch (const DisconnectedException&)
3189  { // -V565
3190  // Pass - it will get replayed upon reconnect
3191  }
3192  catch (...)
3193  {
3194  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3195  throw;
3196  }
3197  }
3198  return (std::string)commandId_;
3199  }
3200  else
3201  {
3202  Lock<Mutex> l(_lock);
3203  _message.reset();
3204  if (commandId_.empty())
3205  {
3206  _message.newCommandId();
3207  commandId_ = _message.getCommandId();
3208  }
3209  else
3210  {
3211  _message.setCommandId(commandId_.data(), commandId_.len());
3212  }
3213  _message.setCommandEnum(Message::Command::SOWDelete)
3214  .assignSubscriptionId(commandId_.data(), commandId_.len())
3215  .assignQueryID(commandId_.data(), commandId_.len())
3216  .setAckTypeEnum(Message::AckType::Processed |
3217  Message::AckType::Stats)
3218  .assignTopic(topic_.c_str(), topic_.length())
3219  .assignFilter(filter_.c_str(), filter_.length());
3220  _routes.addRoute(commandId_, messageHandler_,
3221  Message::AckType::Stats,
3222  Message::AckType::Processed,
3223  false);
3224  try
3225  {
3226  syncAckProcessing(timeout_, _message);
3227  }
3228  catch (...)
3229  {
3230  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3231  throw;
3232  }
3233  return (std::string)commandId_;
3234  }
3235  }
3236 
3237  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3238  const std::string& topic_,
3239  const std::string& data_,
3240  long timeout_,
3241  Message::Field commandId_ = Message::Field())
3242  {
3243  if (_publishStore.isValid())
3244  {
3245  unsigned ackType = Message::AckType::Processed |
3246  Message::AckType::Stats |
3247  Message::AckType::Persisted;
3248  if (!publishStoreMessage)
3249  {
3250  publishStoreMessage = new Message();
3251  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3252  }
3253  publishStoreMessage->reset();
3254  if (commandId_.empty())
3255  {
3256  publishStoreMessage->newCommandId();
3257  commandId_ = publishStoreMessage->getCommandId();
3258  }
3259  else
3260  {
3261  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3262  }
3263  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3264  .assignSubscriptionId(commandId_.data(), commandId_.len())
3265  .assignQueryID(commandId_.data(), commandId_.len())
3266  .setAckTypeEnum(ackType)
3267  .assignTopic(topic_.c_str(), topic_.length())
3268  .assignData(data_.c_str(), data_.length());
3269  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3270  char buf[AMPS_NUMBER_BUFFER_LEN];
3271  size_t pos = convertToCharArray(buf, haSequenceNumber);
3272  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3273  {
3274  try
3275  {
3276  Lock<Mutex> l(_lock);
3277  _routes.addRoute(commandId_, messageHandler_,
3278  Message::AckType::Stats,
3279  Message::AckType::Processed|Message::AckType::Persisted,
3280  false);
3281  syncAckProcessing(timeout_, *publishStoreMessage,
3282  haSequenceNumber);
3283  }
3284  catch (const DisconnectedException&)
3285  { // -V565
3286  // Pass - it will get replayed upon reconnect
3287  }
3288  catch (...)
3289  {
3290  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3291  throw;
3292  }
3293  }
3294  return (std::string)commandId_;
3295  }
3296  else
3297  {
3298  Lock<Mutex> l(_lock);
3299  _message.reset();
3300  if (commandId_.empty())
3301  {
3302  _message.newCommandId();
3303  commandId_ = _message.getCommandId();
3304  }
3305  else
3306  {
3307  _message.setCommandId(commandId_.data(), commandId_.len());
3308  }
3309  _message.setCommandEnum(Message::Command::SOWDelete)
3310  .assignSubscriptionId(commandId_.data(), commandId_.len())
3311  .assignQueryID(commandId_.data(), commandId_.len())
3312  .setAckTypeEnum(Message::AckType::Processed |
3313  Message::AckType::Stats)
3314  .assignTopic(topic_.c_str(), topic_.length())
3315  .assignData(data_.c_str(), data_.length());
3316  _routes.addRoute(commandId_, messageHandler_,
3317  Message::AckType::Stats,
3318  Message::AckType::Processed,
3319  false);
3320  try
3321  {
3322  syncAckProcessing(timeout_, _message);
3323  }
3324  catch (...)
3325  {
3326  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3327  throw;
3328  }
3329  return (std::string)commandId_;
3330  }
3331  }
3332 
3333  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
3334  const std::string& topic_,
3335  const std::string& keys_,
3336  long timeout_,
3337  Message::Field commandId_ = Message::Field())
3338  {
3339  if (_publishStore.isValid())
3340  {
3341  unsigned ackType = Message::AckType::Processed |
3342  Message::AckType::Stats |
3343  Message::AckType::Persisted;
3344  if (!publishStoreMessage)
3345  {
3346  publishStoreMessage = new Message();
3347  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3348  }
3349  publishStoreMessage->reset();
3350  if (commandId_.empty())
3351  {
3352  publishStoreMessage->newCommandId();
3353  commandId_ = publishStoreMessage->getCommandId();
3354  }
3355  else
3356  {
3357  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3358  }
3359  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3360  .assignSubscriptionId(commandId_.data(), commandId_.len())
3361  .assignQueryID(commandId_.data(), commandId_.len())
3362  .setAckTypeEnum(ackType)
3363  .assignTopic(topic_.c_str(), topic_.length())
3364  .assignSowKeys(keys_.c_str(), keys_.length());
3365  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3366  char buf[AMPS_NUMBER_BUFFER_LEN];
3367  size_t pos = convertToCharArray(buf, haSequenceNumber);
3368  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3369  {
3370  try
3371  {
3372  Lock<Mutex> l(_lock);
3373  _routes.addRoute(commandId_, messageHandler_,
3374  Message::AckType::Stats,
3375  Message::AckType::Processed|Message::AckType::Persisted,
3376  false);
3377  syncAckProcessing(timeout_, *publishStoreMessage,
3378  haSequenceNumber);
3379  }
3380  catch (const DisconnectedException&)
3381  { // -V565
3382  // Pass - it will get replayed upon reconnect
3383  }
3384  catch (...)
3385  {
3386  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3387  throw;
3388  }
3389  }
3390  return (std::string)commandId_;
3391  }
3392  else
3393  {
3394  Lock<Mutex> l(_lock);
3395  _message.reset();
3396  if (commandId_.empty())
3397  {
3398  _message.newCommandId();
3399  commandId_ = _message.getCommandId();
3400  }
3401  else
3402  {
3403  _message.setCommandId(commandId_.data(), commandId_.len());
3404  }
3405  _message.setCommandEnum(Message::Command::SOWDelete)
3406  .assignSubscriptionId(commandId_.data(), commandId_.len())
3407  .assignQueryID(commandId_.data(), commandId_.len())
3408  .setAckTypeEnum(Message::AckType::Processed |
3409  Message::AckType::Stats)
3410  .assignTopic(topic_.c_str(), topic_.length())
3411  .assignSowKeys(keys_.c_str(), keys_.length());
3412  _routes.addRoute(commandId_, messageHandler_,
3413  Message::AckType::Stats,
3414  Message::AckType::Processed,
3415  false);
3416  try
3417  {
3418  syncAckProcessing(timeout_, _message);
3419  }
3420  catch (...)
3421  {
3422  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3423  throw;
3424  }
3425  return (std::string)commandId_;
3426  }
3427  }
3428 
3429  void startTimer(void)
3430  {
3431  if (_serverVersion >= "5.3.2.0")
3432  {
3433  throw CommandException("The start_timer command is deprecated.");
3434  }
3435  Lock<Mutex> l(_lock);
3436  _message.reset();
3437  _message.setCommandEnum(Message::Command::StartTimer);
3438 
3439  _send(_message);
3440  }
3441 
3442  std::string stopTimer(MessageHandler messageHandler_)
3443  {
3444  if (_serverVersion >= "5.3.2.0")
3445  {
3446  throw CommandException("The stop_timer command is deprecated.");
3447  }
3448  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
3449  }
3450 
3451  amps_handle getHandle(void)
3452  {
3453  return _client;
3454  }
3455 
3463  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
3464  {
3465  _pExceptionListener = pListener_;
3466  _exceptionListener = _pExceptionListener.get();
3467  }
3468 
3469  void setExceptionListener(const ExceptionListener& listener_)
3470  {
3471  _exceptionListener = &listener_;
3472  }
3473 
3474  const ExceptionListener& getExceptionListener(void) const
3475  {
3476  return *_exceptionListener;
3477  }
3478 
3479  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
3480  {
3481  if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3482  {
3483  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
3484  }
3485  Lock<Mutex> l(_lock);
3486  if(_heartbeatInterval != heartbeatInterval_ ||
3487  _readTimeout != readTimeout_)
3488  {
3489  _heartbeatInterval = heartbeatInterval_;
3490  _readTimeout = readTimeout_;
3491  _sendHeartbeat();
3492  }
3493  }
3494 
3495  void _sendHeartbeat(void)
3496  {
3497  if (_connected && _heartbeatInterval != 0)
3498  {
3499  std::ostringstream options;
3500  options << "start," << _heartbeatInterval;
3501  Message startMessage = Message()
3502  .setCommandEnum(Message::Command::Heartbeat)
3503  .setOptions(options.str());
3504 
3505  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3506  _heartbeatTimer.start();
3507  try
3508  {
3509  _sendWithoutRetry(startMessage);
3510  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3511  }
3512  catch(ConnectionException &ex_)
3513  {
3514  // If we are disconnected when we attempt to send, that's OK;
3515  // we'll send this message after we re-connect (if we do).
3516  AMPS_UNHANDLED_EXCEPTION(ex_);
3517  }
3518  }
3519  amps_result result = AMPS_E_OK;
3520  if(_readTimeout && _connected)
3521  {
3522  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
3523  }
3524  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
3525  {
3526  AMPSException::throwFor(_client, result);
3527  }
3528  }
3529 
3530  void addConnectionStateListener(ConnectionStateListener *listener_)
3531  {
3532  Lock<Mutex> lock(_lock);
3533  _connectionStateListeners.insert(listener_);
3534  }
3535 
3536  void removeConnectionStateListener(ConnectionStateListener *listener_)
3537  {
3538  Lock<Mutex> lock(_lock);
3539  _connectionStateListeners.erase(listener_);
3540  }
3541 
3542  void clearConnectionStateListeners()
3543  {
3544  Lock<Mutex> lock(_lock);
3545  _connectionStateListeners.clear();
3546  }
3547 
3548  void _registerHandler(Command& command_, Message::Field& cid_,
3549  MessageHandler& handler_, unsigned requestedAcks_,
3550  unsigned systemAddedAcks_, bool isSubscribe_)
3551  {
3552  Message message = command_.getMessage();
3553  Message::Command::Type commandType = message.getCommandEnum();
3554  Message::Field subid = message.getSubscriptionId();
3555  Message::Field qid = message.getQueryID();
3556  // If we have an id, we're good, even if it's an existing route
3557  bool added = qid.len() || subid.len() || cid_.len();
3558  int addedCount = 0;
3559  if (subid.len() > 0)
3560  {
3561  // This can replace a non-subscribe with a matching id
3562  // with a subscription but not another subscription.
3563  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
3564  systemAddedAcks_, isSubscribe_);
3565  }
3566  if (qid.len() > 0 && qid != subid)
3567  {
3568  while (_routes.hasRoute(qid))
3569  {
3570  message.newQueryId();
3571  if (cid_ == qid)
3572  cid_ = message.getQueryId();
3573  qid = message.getQueryId();
3574  }
3575  if (addedCount == 0)
3576  {
3577  _routes.addRoute(qid, handler_, requestedAcks_,
3578  systemAddedAcks_, isSubscribe_);
3579  }
3580  else
3581  {
3582  void* data = NULL;
3583  {
3584  Unlock<Mutex> u(_lock);
3585  data = amps_invoke_copy_route_function(handler_.userData());
3586  }
3587  if (!data)
3588  {
3589  _routes.addRoute(qid, handler_, requestedAcks_,
3590  systemAddedAcks_, false);
3591  }
3592  else
3593  {
3594  _routes.addRoute(qid,
3595  MessageHandler(handler_.function(),
3596  data),
3597  requestedAcks_,
3598  systemAddedAcks_, false);
3599  }
3600  }
3601  ++addedCount;
3602  }
3603  if (cid_.len() > 0 && cid_ != qid && cid_ != subid
3604  && requestedAcks_ & ~Message::AckType::Persisted)
3605  {
3606  while (_routes.hasRoute(cid_))
3607  {
3608  cid_ = message.newCommandId().getCommandId();
3609  }
3610  if (addedCount == 0)
3611  {
3612  _routes.addRoute(cid_, handler_, requestedAcks_,
3613  systemAddedAcks_, false);
3614  }
3615  else
3616  {
3617  void* data = NULL;
3618  {
3619  Unlock<Mutex> u(_lock);
3620  data = amps_invoke_copy_route_function(handler_.userData());
3621  }
3622  if (!data)
3623  {
3624  _routes.addRoute(cid_, handler_, requestedAcks_,
3625  systemAddedAcks_, false);
3626  }
3627  else
3628  {
3629  _routes.addRoute(cid_,
3630  MessageHandler(handler_.function(),
3631  data),
3632  requestedAcks_,
3633  systemAddedAcks_, false);
3634  }
3635  }
3636  }
3637  else if (commandType == Message::Command::Publish ||
3638  commandType == Message::Command::DeltaPublish)
3639  {
3640  cid_ = command_.getMessage().newCommandId().getCommandId();
3641  _routes.addRoute(cid_, handler_, requestedAcks_,
3642  systemAddedAcks_, false);
3643  added=true;
3644  }
3645  if (!added)
3646  {
3647  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
3648  }
3649  }
3650 
3651  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
3652  bool isHASubscribe_ = true)
3653  {
3654  isHASubscribe_ &= (bool)_subscriptionManager;
3655  Message& message = command_.getMessage();
3656  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3657  Message::AckType::Processed : Message::AckType::None;
3658  unsigned requestedAcks = message.getAckTypeEnum();
3659  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
3660  Message::Command::Type commandType = message.getCommandEnum();
3661  if (commandType == Message::Command::SOW
3662  || commandType == Message::Command::SOWAndSubscribe
3663  || commandType == Message::Command::SOWAndDeltaSubscribe
3664  || commandType == Message::Command::StopTimer)
3665  systemAddedAcks |= Message::AckType::Completed;
3666  Message::Field cid = message.getCommandId();
3667  if (handler_.isValid() && cid.empty())
3668  {
3669  cid = message.newCommandId().getCommandId();
3670  }
3671  if (message.getBookmark().len() > 0)
3672  {
3673  if (command_.isSubscribe())
3674  {
3675  Message::Field bookmark = message.getBookmark();
3676  if (_bookmarkStore.isValid())
3677  {
3678  systemAddedAcks |= Message::AckType::Persisted;
3679  if (bookmark == AMPS_BOOKMARK_RECENT)
3680  {
3681  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
3682  }
3683  else if (bookmark != AMPS_BOOKMARK_NOW &&
3684  bookmark != AMPS_BOOKMARK_EPOCH)
3685  {
3686  _bookmarkStore.log(message);
3687  if (!BookmarkRange::isRange(bookmark))
3688  {
3689  _bookmarkStore.discard(message);
3690  _bookmarkStore.persisted(message.getSubscriptionId(),
3691  bookmark);
3692  }
3693  }
3694  }
3695  else if (bookmark == AMPS_BOOKMARK_RECENT)
3696  {
3698  }
3699  }
3700  }
3701  if (isPublishStore)
3702  {
3703  systemAddedAcks |= Message::AckType::Persisted;
3704  }
3705  bool isSubscribe = command_.isSubscribe();
3706  if (handler_.isValid() && !isSubscribe)
3707  {
3708  _registerHandler(command_, cid, handler_,
3709  requestedAcks, systemAddedAcks, isSubscribe);
3710  }
3711  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
3712  if (isPublishStore)
3713  {
3714  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3715  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3716  {
3717  Unlock<Mutex> u(_lock);
3718  haSequenceNumber = _publishStore.store(message);
3719  }
3720  message.setSequence(haSequenceNumber);
3721  try
3722  {
3723  if (useSyncSend)
3724  {
3725  syncAckProcessing((long)command_.getTimeout(), message,
3726  haSequenceNumber);
3727  }
3728  else _send(message, haSequenceNumber);
3729  }
3730  catch (const DisconnectedException&)
3731  { // -V565
3732  // Pass - message will get replayed when reconnected
3733  }
3734  catch (...)
3735  {
3736  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3737  throw;
3738  }
3739  }
3740  else
3741  {
3742  if(isSubscribe)
3743  {
3744  const Message::Field& subId = message.getSubscriptionId();
3745  if (isHASubscribe_)
3746  {
3747  Unlock<Mutex> u(_lock);
3748  _subscriptionManager->subscribe(handler_,
3749  message.deepCopy(),
3750  requestedAcks);
3751  if (_badTimeToHASubscribe)
3752  {
3753  message.setAckTypeEnum(requestedAcks);
3754  return std::string(subId.data(), subId.len());
3755  }
3756  }
3757  if (handler_.isValid())
3758  {
3759  _registerHandler(command_, cid, handler_,
3760  requestedAcks, systemAddedAcks, isSubscribe);
3761  }
3762  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3763  try
3764  {
3765  if (useSyncSend)
3766  {
3767  syncAckProcessing((long)command_.getTimeout(), message,
3768  isHASubscribe_);
3769  }
3770  else _send(message);
3771  }
3772  catch (const DisconnectedException&)
3773  {
3774  if (!isHASubscribe_)
3775  {
3776  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3777  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3778  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3779  message.setAckTypeEnum(requestedAcks);
3780  throw;
3781  }
3782  }
3783  catch (const TimedOutException&)
3784  {
3785  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3786  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3787  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
3788  throw;
3789  }
3790  catch (...)
3791  {
3792  if (isHASubscribe_)
3793  {
3794  // Have to unlock before calling into sub manager to avoid deadlock
3795  Unlock<Mutex> unlock(_lock);
3796  _subscriptionManager->unsubscribe(subId);
3797  }
3798  if (message.getQueryID().len() > 0)
3799  _routes.removeRoute(message.getQueryID());
3800  _routes.removeRoute(cid);
3801  _routes.removeRoute(subId);
3802  throw;
3803  }
3804  if (subId.len() > 0)
3805  {
3806  message.setAckTypeEnum(requestedAcks);
3807  return std::string(subId.data(), subId.len());
3808  }
3809  }
3810  else
3811  {
3812  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3813  try
3814  {
3815  if (useSyncSend)
3816  {
3817  syncAckProcessing((long)(command_.getTimeout()), message);
3818  }
3819  else _send(message);
3820  }
3821  catch (const DisconnectedException&)
3822  {
3823  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3824  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3825  message.setAckTypeEnum(requestedAcks);
3826  throw;
3827  }
3828  catch (...)
3829  {
3830  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3831  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3832  message.setAckTypeEnum(requestedAcks);
3833  throw;
3834  }
3835  }
3836  }
3837  message.setAckTypeEnum(requestedAcks);
3838  return cid;
3839  }
3840 
3841  MessageStream getEmptyMessageStream(void);
3842 
3843  std::string executeAsync(Command& command_, MessageHandler& handler_,
3844  bool isHASubscribe_ = true)
3845  {
3846  Lock<Mutex> lock(_lock);
3847  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3848  }
3849 
3850  // Queue Methods //
3851  void setAutoAck(bool isAutoAckEnabled_)
3852  {
3853  _isAutoAckEnabled = isAutoAckEnabled_;
3854  }
3855  bool getAutoAck(void) const
3856  {
3857  return _isAutoAckEnabled;
3858  }
3859  void setAckBatchSize(const unsigned batchSize_)
3860  {
3861  _ackBatchSize = batchSize_;
3862  if (!_queueAckTimeout)
3863  {
3864  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3865  amps_client_set_idle_time(_client, _queueAckTimeout);
3866  }
3867  }
3868  unsigned getAckBatchSize(void) const
3869  {
3870  return _ackBatchSize;
3871  }
3872  int getAckTimeout(void) const
3873  {
3874  return _queueAckTimeout;
3875  }
3876  void setAckTimeout(const int ackTimeout_)
3877  {
3878  amps_client_set_idle_time(_client,ackTimeout_);
3879  _queueAckTimeout = ackTimeout_;
3880  }
3881  size_t _ack(QueueBookmarks& queueBookmarks_)
3882  {
3883  if(queueBookmarks_._bookmarkCount)
3884  {
3885  if (!publishStoreMessage)
3886  {
3887  publishStoreMessage = new Message();
3888  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3889  }
3890  publishStoreMessage->reset();
3891  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3892  .setTopic(queueBookmarks_._topic)
3893  .setBookmark(queueBookmarks_._data)
3894  .setCommandId("AMPS-queue-ack");
3895  amps_uint64_t haSequenceNumber = 0;
3896  if (_publishStore.isValid())
3897  {
3898  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3899  publishStoreMessage->setAckType("persisted")
3900  .setSequence(haSequenceNumber);
3901  queueBookmarks_._data.erase();
3902  queueBookmarks_._bookmarkCount = 0;
3903  }
3904  _send(*publishStoreMessage, haSequenceNumber);
3905  if (!_publishStore.isValid())
3906  {
3907  queueBookmarks_._data.erase();
3908  queueBookmarks_._bookmarkCount = 0;
3909  }
3910  return 1;
3911  }
3912  return 0;
3913  }
3914  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3915  {
3916  if (_isAutoAckEnabled) return;
3917  _ack(topic_, bookmark_, options_);
3918  }
3919  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3920  {
3921  if (bookmark_.len() == 0) return;
3922  Lock<Mutex> lock(_lock);
3923  if(_ackBatchSize < 2 || options_ != NULL)
3924  {
3925  if (!publishStoreMessage)
3926  {
3927  publishStoreMessage = new Message();
3928  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3929  }
3930  publishStoreMessage->reset();
3931  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3932  .setCommandId("AMPS-queue-ack")
3933  .setTopic(topic_).setBookmark(bookmark_);
3934  if (options_) publishStoreMessage->setOptions(options_);
3935  amps_uint64_t haSequenceNumber = 0;
3936  if (_publishStore.isValid())
3937  {
3938  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3939  publishStoreMessage->setAckType("persisted")
3940  .setSequence(haSequenceNumber);
3941  }
3942  _send(*publishStoreMessage, haSequenceNumber);
3943  return;
3944  }
3945  // have we acked anything for this hash
3946  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(),topic_.len());
3947  TopicHashMap::iterator it = _topicHashMap.find(hash);
3948  if(it == _topicHashMap.end())
3949  {
3950  // add a new one to the map
3951  it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3952  }
3953  QueueBookmarks &queueBookmarks = it->second;
3954  if(queueBookmarks._data.length())
3955  {
3956  queueBookmarks._data.append(",");
3957  }
3958  else
3959  {
3960  queueBookmarks._oldestTime = amps_now();
3961  }
3962  queueBookmarks._data.append(bookmark_);
3963  if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3964  }
3965  void flushAcks(void)
3966  {
3967  size_t sendCount = 0;
3968  if(!_connected)
3969  {
3970  return;
3971  }
3972  else
3973  {
3974  Lock<Mutex> lock(_lock);
3975  typedef TopicHashMap::iterator iterator;
3976  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3977  {
3978  QueueBookmarks& queueBookmarks = it->second;
3979  sendCount += _ack(queueBookmarks);
3980  }
3981  }
3982  if(sendCount && _connected) publishFlush(0, Message::AckType::Processed);
3983  }
3984  // called when there's idle time, to see if we need to flush out any "acks"
3985  void checkQueueAcks(void)
3986  {
3987  if(!_topicHashMap.size()) return;
3988  Lock<Mutex> lock(_lock);
3989  try
3990  {
3991  amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3992  typedef TopicHashMap::iterator iterator;
3993  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3994  {
3995  QueueBookmarks& queueBookmarks = it->second;
3996  if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3997  }
3998  }
3999  catch(std::exception& ex)
4000  {
4001  AMPS_UNHANDLED_EXCEPTION(ex);
4002  }
4003  }
4004 
4005  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4006  {
4007  Lock<Mutex> lock(_deferredExecutionLock);
4008  _deferredExecutionList.push_back(
4009  DeferredExecutionRequest(func_,userData_));
4010  }
4011 
4012  inline void processDeferredExecutions(void)
4013  {
4014  if(_deferredExecutionList.size())
4015  {
4016  Lock<Mutex> lock(_deferredExecutionLock);
4017  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4018  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4019  for(; it != end; ++it)
4020  {
4021  try
4022  {
4023  it->_func(it->_userData);
4024  }
4025  catch (...)
4026  { // -V565
4027  // Intentionally ignore errors
4028  }
4029  }
4030  _deferredExecutionList.clear();
4031  _routes.invalidateCache();
4032  _routeCache.invalidateCache();
4033  }
4034  }
4035 
4036  bool getRetryOnDisconnect(void) const
4037  {
4038  return _isRetryOnDisconnect;
4039  }
4040 
4041  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4042  {
4043  _isRetryOnDisconnect = isRetryOnDisconnect_;
4044  }
4045 
4046  void setDefaultMaxDepth(unsigned maxDepth_)
4047  {
4048  _defaultMaxDepth = maxDepth_;
4049  }
4050 
4051  unsigned getDefaultMaxDepth(void) const
4052  {
4053  return _defaultMaxDepth;
4054  }
4055 
4056  void setTransportFilterFunction(amps_transport_filter_function filter_,
4057  void* userData_)
4058  {
4059  amps_client_set_transport_filter_function(_client, filter_, userData_);
4060  }
4061 
4062  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4063  void* userData_)
4064  {
4065  amps_client_set_thread_created_callback(_client, callback_, userData_);
4066  }
4067 }; // class ClientImpl
4142 
4144 {
4145  RefHandle<MessageStreamImpl> _body;
4146  public:
4151  class iterator
4152  {
4153  MessageStream* _pStream;
4154  Message _current;
4155  inline void advance(void);
4156 
4157  public:
4158  iterator() // end
4159  :_pStream(NULL)
4160  {;}
4161  iterator(MessageStream* pStream_)
4162  :_pStream(pStream_)
4163  {
4164  advance();
4165  }
4166 
4167  bool operator==(const iterator& rhs)
4168  {
4169  return _pStream == rhs._pStream;
4170  }
4171  bool operator!=(const iterator& rhs)
4172  {
4173  return _pStream != rhs._pStream;
4174  }
4175  void operator++(void) { advance(); }
4176  Message operator*(void) { return _current; }
4177  Message* operator->(void) { return &_current; }
4178  };
4180  bool isValid() const { return _body.isValid(); }
4181 
4185  {
4186  if(!_body.isValid())
4187  {
4188  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4189  }
4190  return iterator(this);
4191  }
4194  // For non-SOW queries, the end is never reached.
4195  iterator end(void) { return iterator(); }
4196  inline MessageStream(void);
4197 
4203  MessageStream timeout(unsigned timeout_);
4204 
4208  MessageStream conflate(void);
4214  MessageStream maxDepth(unsigned maxDepth_);
4217  unsigned getMaxDepth(void) const;
4220  unsigned getDepth(void) const;
4221 
4222  private:
4223  inline MessageStream(const Client& client_);
4224  inline void setSOWOnly(const std::string& commandId_,
4225  const std::string& queryId_ = "");
4226  inline void setSubscription(const std::string& subId_,
4227  const std::string& commandId_ = "",
4228  const std::string& queryId_ = "");
4229  inline void setStatsOnly(const std::string& commandId_,
4230  const std::string& queryId_ = "");
4231  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
4232 
4233  inline operator MessageHandler(void);
4234 
4235  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
4236 
4237  friend class Client;
4238 
4239 };
4240 
4260 class Client // -V553
4261 {
4262 protected:
4263  BorrowRefHandle<ClientImpl> _body;
4264 public:
4265  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4266  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4267  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4268 
4277  Client(const std::string& clientName = "")
4278  : _body(new ClientImpl(clientName), true)
4279  {;}
4280 
4281  Client(ClientImpl* existingClient)
4282  : _body(existingClient, true)
4283  {;}
4284 
4285  Client(ClientImpl* existingClient, bool isRef)
4286  : _body(existingClient, isRef)
4287  {;}
4288 
4289  Client(const Client& rhs) : _body(rhs._body) {;}
4290  virtual ~Client(void) {;}
4291 
4292  Client& operator=(const Client& rhs)
4293  {
4294  _body = rhs._body;
4295  return *this;
4296  }
4297 
4298  bool isValid()
4299  {
4300  return _body.isValid();
4301  }
4302 
4315  void setName(const std::string& name)
4316  {
4317  _body.get().setName(name);
4318  }
4319 
4322  const std::string& getName() const
4323  {
4324  return _body.get().getName();
4325  }
4326 
4330  const std::string& getNameHash() const
4331  {
4332  return _body.get().getNameHash();
4333  }
4334 
4341  void setLogonCorrelationData(const std::string& logonCorrelationData_)
4342  {
4343  _body.get().setLogonCorrelationData(logonCorrelationData_);
4344  }
4345 
4348  const std::string& getLogonCorrelationData() const
4349  {
4350  return _body.get().getLogonCorrelationData();
4351  }
4352 
4361  size_t getServerVersion() const
4362  {
4363  return _body.get().getServerVersion();
4364  }
4365 
4372  VersionInfo getServerVersionInfo() const
4373  {
4374  return _body.get().getServerVersionInfo();
4375  }
4376 
4386  static size_t convertVersionToNumber(const std::string& version_)
4387  {
4388  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4389  }
4390 
4401  static size_t convertVersionToNumber(const char* data_, size_t len_)
4402  {
4403  return AMPS::convertVersionToNumber(data_, len_);
4404  }
4405 
4408  const std::string& getURI() const
4409  {
4410  return _body.get().getURI();
4411  }
4412 
4419 
4421 
4432  void connect(const std::string& uri)
4433  {
4434  _body.get().connect(uri);
4435  }
4436 
4439  void disconnect()
4440  {
4441  _body.get().disconnect();
4442  }
4443 
4457  void send(const Message& message)
4458  {
4459  _body.get().send(message);
4460  }
4461 
4470  void addMessageHandler(const Field& commandId_,
4471  const AMPS::MessageHandler& messageHandler_,
4472  unsigned requestedAcks_, bool isSubscribe_)
4473  {
4474  _body.get().addMessageHandler(commandId_, messageHandler_,
4475  requestedAcks_, isSubscribe_);
4476  }
4477 
4481  bool removeMessageHandler(const Field& commandId_)
4482  {
4483  return _body.get().removeMessageHandler(commandId_);
4484  }
4485 
4509  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
4510  {
4511  return _body.get().send(messageHandler, message, timeout);
4512  }
4513 
4523  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
4524  {
4525  _body.get().setDisconnectHandler(disconnectHandler);
4526  }
4527 
4531  DisconnectHandler getDisconnectHandler(void) const
4532  {
4533  return _body.get().getDisconnectHandler();
4534  }
4535 
4540  virtual ConnectionInfo getConnectionInfo() const
4541  {
4542  return _body.get().getConnectionInfo();
4543  }
4544 
4553  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
4554  {
4555  _body.get().setBookmarkStore(bookmarkStore_);
4556  }
4557 
4562  {
4563  return _body.get().getBookmarkStore();
4564  }
4565 
4570  {
4571  return _body.get().getSubscriptionManager();
4572  }
4573 
4581  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
4582  {
4583  _body.get().setSubscriptionManager(subscriptionManager_);
4584  }
4585 
4605  void setPublishStore(const Store& publishStore_)
4606  {
4607  _body.get().setPublishStore(publishStore_);
4608  }
4609 
4614  {
4615  return _body.get().getPublishStore();
4616  }
4617 
4621  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
4622  {
4623  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4624  duplicateMessageHandler_);
4625  }
4626 
4637  {
4638  return _body.get().getDuplicateMessageHandler();
4639  }
4640 
4651  {
4652  _body.get().setFailedWriteHandler(handler_);
4653  }
4654 
4659  {
4660  return _body.get().getFailedWriteHandler();
4661  }
4662 
4663 
4681  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
4682  {
4683  return _body.get().publish(topic_.c_str(), topic_.length(),
4684  data_.c_str(), data_.length());
4685  }
4686 
4706  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4707  const char* data_, size_t dataLength_)
4708  {
4709  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4710  }
4711 
4730  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
4731  unsigned long expiration_)
4732  {
4733  return _body.get().publish(topic_.c_str(), topic_.length(),
4734  data_.c_str(), data_.length(), expiration_);
4735  }
4736 
4757  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4758  const char* data_, size_t dataLength_,
4759  unsigned long expiration_)
4760  {
4761  return _body.get().publish(topic_, topicLength_,
4762  data_, dataLength_, expiration_);
4763  }
4764 
4803  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
4804  {
4805  _body.get().publishFlush(timeout_, ackType_);
4806  }
4807 
4808 
4824  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
4825  {
4826  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4827  data_.c_str(), data_.length());
4828  }
4829 
4847  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4848  const char* data_, size_t dataLength_)
4849  {
4850  return _body.get().deltaPublish(topic_, topicLength_,
4851  data_, dataLength_);
4852  }
4853 
4870  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
4871  unsigned long expiration_)
4872  {
4873  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4874  data_.c_str(), data_.length(),
4875  expiration_);
4876  }
4877 
4896  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4897  const char* data_, size_t dataLength_,
4898  unsigned long expiration_)
4899  {
4900  return _body.get().deltaPublish(topic_, topicLength_,
4901  data_, dataLength_, expiration_);
4902  }
4903 
4918  std::string logon(int timeout_ = 0,
4919  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
4920  const char* options_ = NULL)
4921  {
4922  return _body.get().logon(timeout_, authenticator_, options_);
4923  }
4936  std::string logon(const char* options_, int timeout_ = 0)
4937  {
4938  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4939  options_);
4940  }
4941 
4954  std::string logon(const std::string& options_, int timeout_ = 0)
4955  {
4956  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4957  options_.c_str());
4958  }
4959 
4979  std::string subscribe(const MessageHandler& messageHandler_,
4980  const std::string& topic_,
4981  long timeout_=0,
4982  const std::string& filter_="",
4983  const std::string& options_ = "",
4984  const std::string& subId_ = "")
4985  {
4986  return _body.get().subscribe(messageHandler_, topic_, timeout_,
4987  filter_, "", options_, subId_);
4988  }
4989 
5005  MessageStream subscribe(const std::string& topic_,
5006  long timeout_=0, const std::string& filter_="",
5007  const std::string& options_ = "",
5008  const std::string& subId_ = "")
5009  {
5010  MessageStream result(*this);
5011  if (_body.get().getDefaultMaxDepth())
5012  result.maxDepth(_body.get().getDefaultMaxDepth());
5013  result.setSubscription(_body.get().subscribe(
5014  result.operator MessageHandler(),
5015  topic_, timeout_, filter_, "",
5016  options_, subId_, false));
5017  return result;
5018  }
5019 
5035  MessageStream subscribe(const char* topic_,
5036  long timeout_ = 0, const std::string& filter_ = "",
5037  const std::string& options_ = "",
5038  const std::string& subId_ = "")
5039  {
5040  MessageStream result(*this);
5041  if (_body.get().getDefaultMaxDepth())
5042  result.maxDepth(_body.get().getDefaultMaxDepth());
5043  result.setSubscription(_body.get().subscribe(
5044  result.operator MessageHandler(),
5045  topic_, timeout_, filter_, "",
5046  options_, subId_, false));
5047  return result;
5048  }
5049 
5062  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5063  const std::string& topic_,
5064  long timeout_,
5065  const std::string& filter_="",
5066  const std::string& options_ = "",
5067  const std::string& subId_ = "")
5068  {
5069  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5070  filter_, "", options_, subId_);
5071  }
5080  MessageStream deltaSubscribe(const std::string& topic_,
5081  long timeout_, const std::string& filter_="",
5082  const std::string& options_ = "",
5083  const std::string& subId_ = "")
5084  {
5085  MessageStream result(*this);
5086  if (_body.get().getDefaultMaxDepth())
5087  result.maxDepth(_body.get().getDefaultMaxDepth());
5088  result.setSubscription(_body.get().deltaSubscribe(
5089  result.operator MessageHandler(),
5090  topic_, timeout_, filter_, "",
5091  options_, subId_, false));
5092  return result;
5093  }
5094 
5096  MessageStream deltaSubscribe(const char* topic_,
5097  long timeout_, const std::string& filter_ = "",
5098  const std::string& options_ = "",
5099  const std::string& subId_ = "")
5100  {
5101  MessageStream result(*this);
5102  if (_body.get().getDefaultMaxDepth())
5103  result.maxDepth(_body.get().getDefaultMaxDepth());
5104  result.setSubscription(_body.get().deltaSubscribe(
5105  result.operator MessageHandler(),
5106  topic_, timeout_, filter_, "",
5107  options_, subId_, false));
5108  return result;
5109  }
5110 
5136  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5137  const std::string& topic_,
5138  long timeout_,
5139  const std::string& bookmark_,
5140  const std::string& filter_="",
5141  const std::string& options_ = "",
5142  const std::string& subId_ = "")
5143  {
5144  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5145  filter_, bookmark_, options_, subId_);
5146  }
5164  MessageStream bookmarkSubscribe(const std::string& topic_,
5165  long timeout_,
5166  const std::string& bookmark_,
5167  const std::string& filter_="",
5168  const std::string& options_ = "",
5169  const std::string& subId_ = "")
5170  {
5171  MessageStream result(*this);
5172  if (_body.get().getDefaultMaxDepth())
5173  result.maxDepth(_body.get().getDefaultMaxDepth());
5174  result.setSubscription(_body.get().subscribe(
5175  result.operator MessageHandler(),
5176  topic_, timeout_, filter_,
5177  bookmark_, options_,
5178  subId_, false));
5179  return result;
5180  }
5181 
5183  MessageStream bookmarkSubscribe(const char* topic_,
5184  long timeout_,
5185  const std::string& bookmark_,
5186  const std::string& filter_ = "",
5187  const std::string& options_ = "",
5188  const std::string& subId_ = "")
5189  {
5190  MessageStream result(*this);
5191  if (_body.get().getDefaultMaxDepth())
5192  result.maxDepth(_body.get().getDefaultMaxDepth());
5193  result.setSubscription(_body.get().subscribe(
5194  result.operator MessageHandler(),
5195  topic_, timeout_, filter_,
5196  bookmark_, options_,
5197  subId_, false));
5198  return result;
5199  }
5200 
5209  void unsubscribe(const std::string& commandId)
5210  {
5211  return _body.get().unsubscribe(commandId);
5212  }
5213 
5222  {
5223  return _body.get().unsubscribe();
5224  }
5225 
5226 
5256  std::string sow(const MessageHandler& messageHandler_,
5257  const std::string& topic_,
5258  const std::string& filter_ = "",
5259  const std::string& orderBy_ = "",
5260  const std::string& bookmark_ = "",
5261  int batchSize_ = DEFAULT_BATCH_SIZE,
5262  int topN_ = DEFAULT_TOP_N,
5263  const std::string& options_ = "",
5264  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5265  {
5266  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5267  bookmark_, batchSize_, topN_, options_,
5268  timeout_);
5269  }
5294  MessageStream sow(const std::string& topic_,
5295  const std::string& filter_ = "",
5296  const std::string& orderBy_ = "",
5297  const std::string& bookmark_ = "",
5298  int batchSize_ = DEFAULT_BATCH_SIZE,
5299  int topN_ = DEFAULT_TOP_N,
5300  const std::string& options_ = "",
5301  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5302  {
5303  MessageStream result(*this);
5304  if (_body.get().getDefaultMaxDepth())
5305  result.maxDepth(_body.get().getDefaultMaxDepth());
5306  result.setSOWOnly(sow(result.operator MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5307  return result;
5308  }
5309 
5311  MessageStream sow(const char* topic_,
5312  const std::string& filter_ = "",
5313  const std::string& orderBy_ = "",
5314  const std::string& bookmark_ = "",
5315  int batchSize_ = DEFAULT_BATCH_SIZE,
5316  int topN_ = DEFAULT_TOP_N,
5317  const std::string& options_ = "",
5318  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5319  {
5320  MessageStream result(*this);
5321  if (_body.get().getDefaultMaxDepth())
5322  result.maxDepth(_body.get().getDefaultMaxDepth());
5323  result.setSOWOnly(sow(result.operator MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5324  return result;
5325  }
5348  std::string sow(const MessageHandler& messageHandler_,
5349  const std::string& topic_,
5350  long timeout_,
5351  const std::string& filter_ = "",
5352  int batchSize_ = DEFAULT_BATCH_SIZE,
5353  int topN_ = DEFAULT_TOP_N)
5354  {
5355  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5356  batchSize_, topN_);
5357  }
5380  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
5381  const std::string& topic_,
5382  long timeout_,
5383  const std::string& filter_ = "",
5384  int batchSize_ = DEFAULT_BATCH_SIZE,
5385  bool oofEnabled_ = false,
5386  int topN_ = DEFAULT_TOP_N)
5387  {
5388  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5389  filter_, batchSize_, oofEnabled_,
5390  topN_);
5391  }
5392 
5412  MessageStream sowAndSubscribe(const std::string& topic_,
5413  long timeout_,
5414  const std::string& filter_ = "",
5415  int batchSize_ = DEFAULT_BATCH_SIZE,
5416  bool oofEnabled_ = false,
5417  int topN_ = DEFAULT_TOP_N)
5418  {
5419  MessageStream result(*this);
5420  if (_body.get().getDefaultMaxDepth())
5421  result.maxDepth(_body.get().getDefaultMaxDepth());
5422  result.setSubscription(_body.get().sowAndSubscribe(
5423  result.operator MessageHandler(),
5424  topic_, timeout_, filter_,
5425  batchSize_, oofEnabled_,
5426  topN_, false));
5427  return result;
5428  }
5448  MessageStream sowAndSubscribe(const char *topic_,
5449  long timeout_,
5450  const std::string& filter_ = "",
5451  int batchSize_ = DEFAULT_BATCH_SIZE,
5452  bool oofEnabled_ = false,
5453  int topN_ = DEFAULT_TOP_N)
5454  {
5455  MessageStream result(*this);
5456  if (_body.get().getDefaultMaxDepth())
5457  result.maxDepth(_body.get().getDefaultMaxDepth());
5458  result.setSubscription(_body.get().sowAndSubscribe(
5459  result.operator MessageHandler(),
5460  topic_, timeout_, filter_,
5461  batchSize_, oofEnabled_,
5462  topN_, false));
5463  return result;
5464  }
5465 
5466 
5494  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
5495  const std::string& topic_,
5496  const std::string& filter_ = "",
5497  const std::string& orderBy_ = "",
5498  const std::string& bookmark_ = "",
5499  int batchSize_ = DEFAULT_BATCH_SIZE,
5500  int topN_ = DEFAULT_TOP_N,
5501  const std::string& options_ = "",
5502  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5503  {
5504  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5505  orderBy_, bookmark_, batchSize_,
5506  topN_, options_, timeout_);
5507  }
5508 
5533  MessageStream sowAndSubscribe(const std::string& topic_,
5534  const std::string& filter_ = "",
5535  const std::string& orderBy_ = "",
5536  const std::string& bookmark_ = "",
5537  int batchSize_ = DEFAULT_BATCH_SIZE,
5538  int topN_ = DEFAULT_TOP_N,
5539  const std::string& options_ = "",
5540  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5541  {
5542  MessageStream result(*this);
5543  if (_body.get().getDefaultMaxDepth())
5544  result.maxDepth(_body.get().getDefaultMaxDepth());
5545  result.setSubscription(_body.get().sowAndSubscribe(
5546  result.operator MessageHandler(),
5547  topic_, filter_, orderBy_,
5548  bookmark_, batchSize_, topN_,
5549  options_, timeout_, false));
5550  return result;
5551  }
5552 
5554  MessageStream sowAndSubscribe(const char* topic_,
5555  const std::string& filter_ = "",
5556  const std::string& orderBy_ = "",
5557  const std::string& bookmark_ = "",
5558  int batchSize_ = DEFAULT_BATCH_SIZE,
5559  int topN_ = DEFAULT_TOP_N,
5560  const std::string& options_ = "",
5561  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5562  {
5563  MessageStream result(*this);
5564  if (_body.get().getDefaultMaxDepth())
5565  result.maxDepth(_body.get().getDefaultMaxDepth());
5566  result.setSubscription(_body.get().sowAndSubscribe(
5567  result.operator MessageHandler(),
5568  topic_, filter_, orderBy_,
5569  bookmark_, batchSize_, topN_,
5570  options_, timeout_, false));
5571  return result;
5572  }
5573 
5598  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
5599  const std::string& topic_,
5600  const std::string& filter_ = "",
5601  const std::string& orderBy_ = "",
5602  int batchSize_ = DEFAULT_BATCH_SIZE,
5603  int topN_ = DEFAULT_TOP_N,
5604  const std::string& options_ = "",
5605  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5606  {
5607  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5608  filter_, orderBy_, batchSize_,
5609  topN_, options_, timeout_);
5610  }
5631  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5632  const std::string& filter_ = "",
5633  const std::string& orderBy_ = "",
5634  int batchSize_ = DEFAULT_BATCH_SIZE,
5635  int topN_ = DEFAULT_TOP_N,
5636  const std::string& options_ = "",
5637  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5638  {
5639  MessageStream result(*this);
5640  if (_body.get().getDefaultMaxDepth())
5641  result.maxDepth(_body.get().getDefaultMaxDepth());
5642  result.setSubscription(sowAndDeltaSubscribe(result.operator MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5643  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5644  result.operator MessageHandler(),
5645  topic_, filter_, orderBy_,
5646  batchSize_, topN_, options_,
5647  timeout_, false));
5648  return result;
5649  }
5650 
5653  const std::string& filter_ = "",
5654  const std::string& orderBy_ = "",
5655  int batchSize_ = DEFAULT_BATCH_SIZE,
5656  int topN_ = DEFAULT_TOP_N,
5657  const std::string& options_ = "",
5658  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5659  {
5660  MessageStream result(*this);
5661  if (_body.get().getDefaultMaxDepth())
5662  result.maxDepth(_body.get().getDefaultMaxDepth());
5663  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5664  result.operator MessageHandler(),
5665  topic_, filter_, orderBy_,
5666  batchSize_, topN_, options_,
5667  timeout_, false));
5668  return result;
5669  }
5670 
5695  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
5696  const std::string& topic_,
5697  long timeout_,
5698  const std::string& filter_ = "",
5699  int batchSize_ = DEFAULT_BATCH_SIZE,
5700  bool oofEnabled_ = false,
5701  bool sendEmpties_ = false,
5702  int topN_ = DEFAULT_TOP_N)
5703  {
5704  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5705  timeout_, filter_, batchSize_,
5706  oofEnabled_, sendEmpties_,
5707  topN_);
5708  }
5709 
5731  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5732  long timeout_,
5733  const std::string& filter_ = "",
5734  int batchSize_ = DEFAULT_BATCH_SIZE,
5735  bool oofEnabled_ = false,
5736  bool sendEmpties_ = false,
5737  int topN_ = DEFAULT_TOP_N)
5738  {
5739  MessageStream result(*this);
5740  if (_body.get().getDefaultMaxDepth())
5741  result.maxDepth(_body.get().getDefaultMaxDepth());
5742  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5743  result.operator MessageHandler(),
5744  topic_, timeout_, filter_,
5745  batchSize_, oofEnabled_,
5746  sendEmpties_, topN_, false));
5747  return result;
5748  }
5771  long timeout_,
5772  const std::string& filter_ = "",
5773  int batchSize_ = DEFAULT_BATCH_SIZE,
5774  bool oofEnabled_ = false,
5775  bool sendEmpties_ = false,
5776  int topN_ = DEFAULT_TOP_N)
5777  {
5778  MessageStream result(*this);
5779  if (_body.get().getDefaultMaxDepth())
5780  result.maxDepth(_body.get().getDefaultMaxDepth());
5781  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5782  result.operator MessageHandler(),
5783  topic_, timeout_, filter_,
5784  batchSize_, oofEnabled_,
5785  sendEmpties_, topN_, false));
5786  return result;
5787  }
5807  std::string sowDelete(const MessageHandler& messageHandler,
5808  const std::string& topic,
5809  const std::string& filter,
5810  long timeout)
5811  {
5812  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5813  }
5830  Message sowDelete(const std::string& topic, const std::string& filter,
5831  long timeout=0)
5832  {
5833  MessageStream stream(*this);
5834  char buf[Message::IdentifierLength+1];
5835  buf[Message::IdentifierLength] = 0;
5836  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5837  Field cid(buf);
5838  try
5839  {
5840  stream.setStatsOnly(cid);
5841  _body.get().sowDelete(stream.operator MessageHandler(),topic,filter,timeout,cid);
5842  return *(stream.begin());
5843  }
5844  catch (const DisconnectedException&)
5845  {
5846  removeMessageHandler(cid);
5847  throw;
5848  }
5849  }
5850 
5855  void startTimer()
5856  {
5857  _body.get().startTimer();
5858  }
5859 
5866  std::string stopTimer(const MessageHandler& messageHandler)
5867  {
5868  return _body.get().stopTimer(messageHandler);
5869  }
5870 
5892  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
5893  const std::string& topic_,
5894  const std::string& keys_,
5895  long timeout_=0)
5896  {
5897  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5898  }
5919  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
5920  long timeout_ = 0)
5921  {
5922  MessageStream stream(*this);
5923  char buf[Message::IdentifierLength+1];
5924  buf[Message::IdentifierLength] = 0;
5925  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5926  Field cid(buf);
5927  try
5928  {
5929  stream.setStatsOnly(cid);
5930  _body.get().sowDeleteByKeys(stream.operator MessageHandler(),topic_,keys_,timeout_,cid);
5931  return *(stream.begin());
5932  }
5933  catch (const DisconnectedException&)
5934  {
5935  removeMessageHandler(cid);
5936  throw;
5937  }
5938  }
5939 
5954  std::string sowDeleteByData(const MessageHandler& messageHandler_,
5955  const std::string& topic_, const std::string& data_,
5956  long timeout_=0)
5957  {
5958  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5959  }
5960 
5975  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
5976  long timeout_=0)
5977  {
5978  MessageStream stream(*this);
5979  char buf[Message::IdentifierLength+1];
5980  buf[Message::IdentifierLength] = 0;
5981  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5982  Field cid(buf);
5983  try
5984  {
5985  stream.setStatsOnly(cid);
5986  _body.get().sowDeleteByData(stream.operator MessageHandler(),topic_,data_,timeout_,cid);
5987  return *(stream.begin());
5988  }
5989  catch (const DisconnectedException&)
5990  {
5991  removeMessageHandler(cid);
5992  throw;
5993  }
5994  }
5995 
6000  {
6001  return _body.get().getHandle();
6002  }
6003 
6012  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6013  {
6014  _body.get().setExceptionListener(pListener_);
6015  }
6016 
6026  {
6027  _body.get().setExceptionListener(listener_);
6028  }
6029 
6033  {
6034  return _body.get().getExceptionListener();
6035  }
6036 
6044  // type of message) from the server for the specified interval (plus a grace period),
6058  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6059  {
6060  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6061  }
6062 
6070  // type of message) from the server for the specified interval (plus a grace period),
6082  void setHeartbeat(unsigned heartbeatTime_)
6083  {
6084  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6085  }
6086 
6089  {
6090  setLastChanceMessageHandler(messageHandler);
6091  }
6092 
6096  {
6097  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6098  messageHandler);
6099  }
6100 
6121  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6122  {
6123  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6124  }
6125 
6146  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
6147  {
6148  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6149  }
6150 
6156  static const char* BOOKMARK_NOW() { return AMPS_BOOKMARK_NOW; }
6162  static const char* NOW() { return AMPS_BOOKMARK_NOW; }
6163 
6169  static const char* BOOKMARK_EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6170 
6176  static const char* EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6177 
6184  static const char* BOOKMARK_MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6185 
6192  static const char* MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6193 
6200  static const char* BOOKMARK_RECENT() { return AMPS_BOOKMARK_RECENT; }
6201 
6202 
6209  {
6210  _body.get().addConnectionStateListener(listener);
6211  }
6212 
6217  {
6218  _body.get().removeConnectionStateListener(listener);
6219  }
6220 
6224  {
6225  _body.get().clearConnectionStateListeners();
6226  }
6227 
6253  std::string executeAsync(Command& command_, MessageHandler handler_)
6254  {
6255  return _body.get().executeAsync(command_, handler_);
6256  }
6257 
6287  std::string executeAsyncNoResubscribe(Command& command_,
6288  MessageHandler handler_)
6289  {
6290  std::string id;
6291  try
6292  {
6293  if (command_.isSubscribe())
6294  {
6295  Message& message = command_.getMessage();
6296  Field subId = message.getSubscriptionId();
6297  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace",7);
6298  if(useExistingHandler)
6299  {
6300  MessageHandler existingHandler;
6301  if (_body.get()._routes.getRoute(subId, existingHandler))
6302  {
6303  // we found an existing handler.
6304  _body.get().executeAsync(command_, existingHandler, false);
6305  return id; // empty string indicates existing
6306  }
6307  }
6308  }
6309  id = _body.get().executeAsync(command_, handler_, false);
6310  }
6311  catch (const DisconnectedException&)
6312  {
6313  removeMessageHandler(command_.getMessage().getCommandId());
6314  if (command_.isSubscribe())
6315  {
6316  removeMessageHandler(command_.getMessage().getSubscriptionId());
6317  }
6318  if (command_.isSow())
6319  {
6320  removeMessageHandler(command_.getMessage().getQueryID());
6321  }
6322  throw;
6323  }
6324  return id;
6325  }
6326 
6339  MessageStream execute(Command& command_);
6340 
6349  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6350  {
6351  _body.get().ack(topic_,bookmark_,options_);
6352  }
6353 
6361  void ack(Message& message_, const char* options_ = NULL)
6362  {
6363  _body.get().ack(message_.getTopic(),message_.getBookmark(),options_);
6364  }
6373  void ack(const std::string& topic_, const std::string& bookmark_,
6374  const char* options_ = NULL)
6375  {
6376  _body.get().ack(Field(topic_.data(),topic_.length()), Field(bookmark_.data(),bookmark_.length()),options_);
6377  }
6378 
6384  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6385  {
6386  _body.get()._ack(topic_,bookmark_,options_);
6387  }
6397  void flushAcks(void)
6398  {
6399  _body.get().flushAcks();
6400  }
6401 
6406  bool getAutoAck(void) const
6407  {
6408  return _body.get().getAutoAck();
6409  }
6416  void setAutoAck(bool isAutoAckEnabled_)
6417  {
6418  _body.get().setAutoAck(isAutoAckEnabled_);
6419  }
6424  unsigned getAckBatchSize(void) const
6425  {
6426  return _body.get().getAckBatchSize();
6427  }
6434  void setAckBatchSize(const unsigned ackBatchSize_)
6435  {
6436  _body.get().setAckBatchSize(ackBatchSize_);
6437  }
6438 
6445  int getAckTimeout(void) const
6446  {
6447  return _body.get().getAckTimeout();
6448  }
6455  void setAckTimeout(const int ackTimeout_)
6456  {
6457  _body.get().setAckTimeout(ackTimeout_);
6458  }
6459 
6460 
6469  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
6470  {
6471  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6472  }
6473 
6478  bool getRetryOnDisconnect(void) const
6479  {
6480  return _body.get().getRetryOnDisconnect();
6481  }
6482 
6487  void setDefaultMaxDepth(unsigned maxDepth_)
6488  {
6489  _body.get().setDefaultMaxDepth(maxDepth_);
6490  }
6491 
6496  unsigned getDefaultMaxDepth(void) const
6497  {
6498  return _body.get().getDefaultMaxDepth();
6499  }
6500 
6508  void* userData_)
6509  {
6510  return _body.get().setTransportFilterFunction(filter_, userData_);
6511  }
6512 
6522  void* userData_)
6523  {
6524  return _body.get().setThreadCreatedCallback(callback_, userData_);
6525  }
6526 
6532  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
6533  {
6534  _body.get().deferredExecution(func_,userData_);
6535  }
6539 };
6540 
6541 inline void
6542 ClientImpl::lastChance(AMPS::Message& message)
6543 {
6544  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6545 }
6546 
6547 inline unsigned
6548 ClientImpl::persistedAck(AMPS::Message& message)
6549 {
6550  unsigned deliveries = 0;
6551  try
6552  {
6553  /*
6554  * Best Practice: If you don't care about the dupe acks that
6555  * occur during failover or rapid disconnect/reconnect, then just
6556  * ignore them. We could discard each duplicate from the
6557  * persisted store, but the storage costs of doing 1 record
6558  * discards is heavy. In most scenarios we'll just quickly blow
6559  * through the duplicates and get back to processing the
6560  * non-dupes.
6561  */
6562  const char* data = NULL;
6563  size_t len = 0;
6564  const char* status = NULL;
6565  size_t statusLen = 0;
6566  amps_handle messageHandle = message.getMessage();
6567  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6568  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
6569  amps_message_get_field_value(messageHandle, AMPS_Status,
6570  &status, &statusLen);
6571  if (len == NotEntitled || len == Duplicate ||
6572  (statusLen == Failure && status[0] == 'f'))
6573  {
6574  if (_failedWriteHandler)
6575  {
6576  if (_publishStore.isValid())
6577  {
6578  amps_uint64_t sequence =
6579  amps_message_get_field_uint64(messageHandle,
6580  AMPS_Sequence);
6581  FailedWriteStoreReplayer replayer(this, data, len);
6582  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
6583  replayer, sequence));
6584  }
6585  else // Call the handler with what little we have
6586  {
6587  static Message emptyMessage;
6588  emptyMessage.setSequence(message.getSequence());
6589  AMPS_CALL_EXCEPTION_WRAPPER(
6590  _failedWriteHandler->failedWrite(emptyMessage,
6591  data, len));
6592  }
6593  ++deliveries;
6594  }
6595  }
6596  if (_publishStore.isValid())
6597  {
6598  // Ack for publisher will have sequence while
6599  // ack for bookmark subscribe won't
6600  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
6601  AMPS_Sequence);
6602  if (seq > 0)
6603  {
6604  ++deliveries;
6605  _publishStore.discardUpTo(seq);
6606  }
6607  }
6608 
6609  if (!deliveries && _bookmarkStore.isValid())
6610  {
6611  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
6612  &data, &len);
6613  if (len > 0)
6614  {
6615  Message::Field subId(data, len);
6616  const char* bookmarkData = NULL;
6617  size_t bookmarkLen = 0;
6618  amps_message_get_field_value(messageHandle, AMPS_Bookmark,
6619  &bookmarkData, &bookmarkLen);
6620  // Everything is there and not unsubscribed AC-912
6621  if (bookmarkLen > 0 && _routes.hasRoute(subId))
6622  {
6623  ++deliveries;
6624  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
6625  }
6626  }
6627  }
6628  }
6629  catch (std::exception& ex)
6630  {
6631  AMPS_UNHANDLED_EXCEPTION(ex);
6632  }
6633  return deliveries;
6634 }
6635 
6636 inline unsigned
6637 ClientImpl::processedAck(Message &message)
6638 {
6639  unsigned deliveries = 0;
6640  AckResponse ack;
6641  const char* data = NULL;
6642  size_t len = 0;
6643  amps_handle messageHandle = message.getMessage();
6644  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
6645  Lock<Mutex> l(_lock);
6646  if (data && len)
6647  {
6648  Lock<Mutex> guard(_ackMapLock);
6649  AckMap::iterator i = _ackMap.find(std::string(data, len));
6650  if (i != _ackMap.end())
6651  {
6652  ++deliveries;
6653  ack = i->second;
6654  _ackMap.erase(i);
6655  }
6656  }
6657  if (deliveries)
6658  {
6659  amps_message_get_field_value(messageHandle, AMPS_Status, &data,
6660  &len);
6661  ack.setStatus(data, len);
6662  amps_message_get_field_value(messageHandle, AMPS_Reason, &data,
6663  &len);
6664  ack.setReason(data, len);
6665  amps_message_get_field_value(messageHandle, AMPS_UserId, &data,
6666  &len);
6667  ack.setUsername(data, len);
6668  amps_message_get_field_value(messageHandle, AMPS_Password, &data,
6669  &len);
6670  ack.setPassword(data, len);
6671  amps_message_get_field_value(messageHandle, AMPS_Sequence, &data,
6672  &len);
6673  ack.setSequenceNo(data, len);
6674  amps_message_get_field_value(messageHandle, AMPS_Version, &data,
6675  &len);
6676  ack.setServerVersion(data, len);
6677  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
6678  ack.setOptions(data,len);
6679  amps_message_get_field_value(messageHandle, AMPS_Bookmark, &data, &len);
6680  ack.setBookmark(data,len);
6681  ack.setResponded(true);
6682  _lock.signalAll();
6683  }
6684  return deliveries;
6685 }
6686 
6687 inline void
6688 ClientImpl::checkAndSendHeartbeat(bool force)
6689 {
6690  if (force || _heartbeatTimer.check())
6691  {
6692  _heartbeatTimer.start();
6693  try
6694  {
6695  sendWithoutRetry(_beatMessage);
6696  }
6697  catch (const AMPSException&)
6698  {
6699  ;
6700  }
6701  }
6702 }
6703 
6704 inline ConnectionInfo ClientImpl::getConnectionInfo() const
6705 {
6706  ConnectionInfo info;
6707  std::ostringstream writer;
6708 
6709  info["client.uri"] = _lastUri;
6710  info["client.name"] = _name;
6711  info["client.username"] = _username;
6712  if(_publishStore.isValid())
6713  {
6714  writer << _publishStore.unpersistedCount();
6715  info["publishStore.unpersistedCount"] = writer.str();
6716  writer.clear();
6717  writer.str("");
6718  }
6719 
6720  return info;
6721 }
6722 
6723 inline amps_result
6724 ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
6725 {
6726  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6727  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6728  ClientImpl* me = (ClientImpl*) userData_;
6729  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->processDeferredExecutions());
6730  if(!messageHandle_)
6731  {
6732  if(me->_queueAckTimeout) me->checkQueueAcks();
6733  return AMPS_E_OK;
6734  }
6735 
6736  me->_readMessage.replace(messageHandle_);
6737  Message& message = me->_readMessage;
6738  Message::Command::Type commandType = message.getCommandEnum();
6739  if (commandType & SOWMask)
6740  {
6741 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6742  // A small cheat here to get the right handler, using knowledge of the
6743  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
6744  // and their GlobalCommandTypeHandlers values 1, 2, 3.
6745  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6746  me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6747 #endif
6748  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6749  message.getQueryID()));
6750  }
6751  else if (commandType & PublishMask)
6752  {
6753 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6754  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6755  me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6756  GlobalCommandTypeHandlers::Publish :
6757  GlobalCommandTypeHandlers::OOF)].invoke(message));
6758 #endif
6759  const char* subIds = NULL;
6760  size_t subIdsLen = 0;
6761  // Publish command, send to subscriptions
6762  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds, &subIds, &subIdsLen);
6763  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
6764  for(size_t i=0; i<subIdCount; ++i)
6765  {
6766  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6767  MessageHandler& handler = lookupResult.handler;
6768  if (handler.isValid())
6769  {
6770  amps_message_set_field_value(messageHandle_, AMPS_SubscriptionId,
6771  subIds + lookupResult.idOffset, lookupResult.idLength);
6772  Message::Field bookmark = message.getBookmark();
6773  bool isMessageQueue = message.getLeasePeriod().len() != 0;
6774  bool isAutoAck = me->_isAutoAckEnabled;
6775 
6776  if (!isMessageQueue && !bookmark.empty() &&
6777  me->_bookmarkStore.isValid())
6778  {
6779  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6780  {
6781  //Call duplicate message handler in handlers map
6782  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6783  {
6784  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
6785  }
6786  }
6787  else
6788  {
6789  me->_bookmarkStore.log(me->_readMessage);
6790  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6791  handler.invoke(message));
6792  }
6793  }
6794  else
6795  {
6796  if(isMessageQueue && isAutoAck)
6797  {
6798  try
6799  {
6800  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6801  if (!message.getIgnoreAutoAck())
6802  {
6803  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6804  me->_ack(message.getTopic(),message.getBookmark()));
6805  }
6806  }
6807  catch(std::exception& ex)
6808  {
6809  if (!message.getIgnoreAutoAck())
6810  {
6811  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6812  me->_ack(message.getTopic(),message.getBookmark(),"cancel"));
6813  }
6814  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6815  }
6816  }
6817  else
6818  {
6819  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6820  handler.invoke(message));
6821  }
6822  }
6823  }
6824  else me->lastChance(message);
6825  } // for (subidsEnd)
6826  }
6827  else if (commandType == Message::Command::Ack)
6828  {
6829  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6830  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6831  unsigned ackType = message.getAckTypeEnum();
6832  unsigned deliveries = 0U;
6833  switch (ackType)
6834  {
6835  case Message::AckType::Persisted:
6836  deliveries += me->persistedAck(message);
6837  break;
6838  case Message::AckType::Processed: // processed
6839  deliveries += me->processedAck(message);
6840  break;
6841  }
6842  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6843  if (deliveries == 0)
6844  {
6845  me->lastChance(message);
6846  }
6847  }
6848  else if (commandType == Message::Command::Heartbeat)
6849  {
6850  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6851  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6852  if(me->_heartbeatTimer.getTimeout() != 0.0) // -V550
6853  {
6854  me->checkAndSendHeartbeat(true);
6855  }
6856  else
6857  {
6858  me->lastChance(message);
6859  }
6860  return AMPS_E_OK;
6861  }
6862  else if (!message.getCommandId().empty())
6863  {
6864  unsigned deliveries = 0U;
6865  try
6866  {
6867  while(me->_connected) // Keep sending heartbeats when stream is full
6868  {
6869  try
6870  {
6871  deliveries = me->_routes.deliverData(message, message.getCommandId());
6872  break;
6873  }
6874 #ifdef _WIN32
6875  catch(MessageStreamFullException&)
6876 #else
6877  catch(MessageStreamFullException& ex_)
6878 #endif
6879  {
6880  me->checkAndSendHeartbeat(false);
6881  }
6882  }
6883  }
6884  catch (std::exception& ex_)
6885  {
6886  try
6887  {
6888  me->_exceptionListener->exceptionThrown(ex_);
6889  }
6890  catch(...)
6891  {
6892  ;
6893  }
6894  }
6895  if (deliveries == 0)
6896  me->lastChance(message);
6897  }
6898  me->checkAndSendHeartbeat();
6899  return AMPS_E_OK;
6900 }
6901 
6902 inline void
6903 ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
6904 {
6905  ClientImpl* me = (ClientImpl*) userData;
6906  //Client wrapper(me);
6907  // Go ahead and signal any waiters if they are around...
6908  me->clearAcks(failedConnectionVersion);
6909 }
6910 
6911 inline amps_result
6912 ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
6913 {
6914  ClientImpl* me = (ClientImpl*) userData;
6915  Lock<Mutex> l(me->_lock);
6916  Client wrapper(me,false);
6917  if (me->_connected)
6918  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6919  while(true)
6920  {
6921  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6922  try
6923  {
6924  AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6925  me->_connected = false;
6926  {
6927  // Have to release the lock here or receive thread can't
6928  // invoke the message handler.
6929  Unlock<Mutex> unlock(me->_lock);
6930  me->_disconnectHandler.invoke(wrapper);
6931  }
6932  }
6933  catch(const std::exception& ex)
6934  {
6935  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6936  }
6937  me->_lock.signalAll();
6938 
6939  if (!me->_connected)
6940  {
6941  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6942  AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException("Reconnect failed."));
6943  return AMPS_E_DISCONNECTED;
6944  }
6945  try
6946  {
6947  // Resubscribe
6948  if (me->_subscriptionManager)
6949  {
6950  {
6951  // Have to release the lock here or receive thread can't
6952  // invoke the message handler.
6953  Unlock<Mutex> unlock(me->_lock);
6954  me->_subscriptionManager->resubscribe(wrapper);
6955  }
6956  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6957  }
6958  return AMPS_E_OK;
6959  }
6960  catch(const AMPSException& subEx)
6961  {
6962  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6963  }
6964  catch(const std::exception& subEx)
6965  {
6966  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6967  return AMPS_E_RETRY;
6968  }
6969  catch(...)
6970  {
6971  return AMPS_E_RETRY;
6972  }
6973  }
6974  return AMPS_E_RETRY;
6975 }
6976 
6977 class FIX
6978 {
6979  const char* _data;
6980  size_t _len;
6981  char _fieldSep;
6982 public:
6983  class iterator
6984  {
6985  const char* _data;
6986  size_t _len;
6987  size_t _pos;
6988  char _fieldSep;
6989  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
6990  : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6991  {
6992  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6993  }
6994  public:
6995  typedef void* difference_type;
6996  typedef std::forward_iterator_tag iterator_category;
6997  typedef std::pair<Message::Field, Message::Field> value_type;
6998  typedef value_type* pointer;
6999  typedef value_type& reference;
7000  bool operator==(const iterator& rhs) const
7001  {
7002  return _pos == rhs._pos;
7003  }
7004  bool operator!=(const iterator& rhs) const
7005  {
7006  return _pos != rhs._pos;
7007  }
7008  iterator& operator++()
7009  {
7010  // Skip through the data
7011  while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
7012  // Skip through any field separators
7013  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
7014  return *this;
7015  }
7016 
7017  value_type operator*() const
7018  {
7019  value_type result;
7020  size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
7021  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
7022 
7023  result.first.assign(_data+_pos, keyLength);
7024 
7025  if (i < _len && _data[i] == '=')
7026  {
7027  ++i;
7028  valueStart = i;
7029  for(; i < _len && _data[i] != _fieldSep; ++i)
7030  {
7031  valueLength++;
7032  }
7033  }
7034  result.second.assign(_data+valueStart, valueLength);
7035  return result;
7036  }
7037 
7038  friend class FIX;
7039  };
7040  class reverse_iterator
7041  {
7042  const char* _data;
7043  size_t _len;
7044  const char* _pos;
7045  char _fieldSep;
7046  public:
7047  typedef std::pair<Message::Field, Message::Field> value_type;
7048  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
7049  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7050  {
7051  if (_pos)
7052  {
7053  // skip past meaningless trailing fieldseps
7054  while(_pos >=_data && *_pos == _fieldSep) --_pos;
7055  while(_pos > _data && *_pos != _fieldSep) --_pos;
7056  // if we stopped before the 0th character, it's because
7057  // it's a field sep. advance one to point to the first character
7058  // of a key.
7059  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7060  if (_pos < _data) _pos = 0;
7061  }
7062  }
7063  bool operator==(const reverse_iterator& rhs) const
7064  {
7065  return _pos == rhs._pos;
7066  }
7067  bool operator!=(const reverse_iterator& rhs) const
7068  {
7069  return _pos != rhs._pos;
7070  }
7071  reverse_iterator& operator++()
7072  {
7073  if (_pos == _data)
7074  {
7075  _pos = 0;
7076  }
7077  else
7078  {
7079  // back up 1 to a field separator
7080  --_pos;
7081  // keep backing up through field separators
7082  while(_pos >=_data && *_pos == _fieldSep) --_pos;
7083  // now back up to the beginning of this field
7084  while(_pos >_data && *_pos != _fieldSep) --_pos;
7085  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7086  if (_pos < _data) _pos = 0;
7087  }
7088  return *this;
7089  }
7090  value_type operator*() const
7091  {
7092  value_type result;
7093  size_t keyLength = 0, valueStart = 0, valueLength = 0;
7094  size_t i = (size_t)(_pos - _data);
7095  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
7096  result.first.assign(_pos, keyLength);
7097  if (i<_len && _data[i] == '=')
7098  {
7099  ++i;
7100  valueStart = i;
7101  for(; i<_len && _data[i] != _fieldSep; ++i)
7102  {
7103  valueLength++;
7104  }
7105  }
7106  result.second.assign(_data+valueStart, valueLength);
7107  return result;
7108  }
7109  };
7110  FIX(const Message::Field& data, char fieldSeparator=1)
7111  : _data(data.data()), _len(data.len()),
7112  _fieldSep(fieldSeparator)
7113  {
7114  }
7115 
7116  FIX(const char* data, size_t len, char fieldSeparator=1)
7117  : _data(data), _len(len), _fieldSep(fieldSeparator)
7118  {
7119  }
7120 
7121  iterator begin() const
7122  {
7123  return iterator(_data, _len, 0, _fieldSep);
7124  }
7125  iterator end() const
7126  {
7127  return iterator(_data, _len, _len, _fieldSep);
7128  }
7129 
7130 
7131  reverse_iterator rbegin() const
7132  {
7133  return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
7134  }
7135 
7136  reverse_iterator rend() const
7137  {
7138  return reverse_iterator(_data, _len, 0, _fieldSep);
7139  }
7140 };
7141 
7142 
7155 
7156 template <class T>
7158 {
7159  std::stringstream _data;
7160  char _fs;
7161 public:
7167  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7168 
7176  void append(const T& tag, const char* value, size_t offset, size_t length)
7177  {
7178  _data << tag<<'=';
7179  _data.write(value+offset, (std::streamsize)length);
7180  _data << _fs;
7181  }
7187  void append(const T& tag, const std::string& value)
7188  {
7189  _data << tag << '=' << value << _fs;
7190  }
7191 
7194  std::string getString() const
7195  {
7196  return _data.str();
7197  }
7198  operator std::string() const
7199  {
7200  return _data.str();
7201  }
7202 
7204  void reset()
7205  {
7206  _data.str(std::string());
7207  }
7208 };
7209 
7213 
7215 
7219 
7221 
7222 
7230 
7232 {
7233  char _fs;
7234 public:
7239  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7240 
7243  typedef std::map<Message::Field, Message::Field> map_type;
7244 
7250  map_type toMap(const Message::Field& data)
7251  {
7252  FIX fix(data, _fs);
7253  map_type retval;
7254  for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7255  {
7256  retval.insert(*a);
7257  }
7258 
7259  return retval;
7260  }
7261 };
7262 
7263 class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
7264 {
7265  Mutex _lock;
7266  std::deque<Message> _q;
7267  std::string _commandId;
7268  std::string _subId;
7269  std::string _queryId;
7270  Client _client;
7271  unsigned _timeout;
7272  unsigned _maxDepth;
7273  unsigned _requestedAcks;
7274  Message::Field _previousTopic;
7275  Message::Field _previousBookmark;
7276  volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7277  typedef std::map<std::string, Message*> SOWKeyMap;
7278  SOWKeyMap _sowKeyMap;
7279  public:
7280  MessageStreamImpl(const Client& client_)
7281  : _client(client_),
7282  _timeout(0),
7283  _maxDepth((unsigned)~0),
7284  _requestedAcks(0),
7285  _state(Unset)
7286  {
7287  if (_client.isValid())
7288  {
7289  _client.addConnectionStateListener(this);
7290  }
7291  }
7292 
7293  MessageStreamImpl(ClientImpl* client_)
7294  : _client(client_),
7295  _timeout(0),
7296  _maxDepth((unsigned)~0),
7297  _requestedAcks(0),
7298  _state(Unset)
7299  {
7300  if (_client.isValid())
7301  {
7302  _client.addConnectionStateListener(this);
7303  }
7304  }
7305 
7306  ~MessageStreamImpl()
7307  {
7308  }
7309 
7310  virtual void destroy()
7311  {
7312  try
7313  {
7314  close();
7315  }
7316  catch(std::exception &e)
7317  {
7318  try
7319  {
7320  if (_client.isValid())
7321  {
7322  _client.getExceptionListener().exceptionThrown(e);
7323  }
7324  } catch (...) {/*Ignore exception listener exceptions*/} // -V565
7325  }
7326  if (_client.isValid())
7327  {
7328  _client.removeConnectionStateListener(this);
7329  Client c = _client;
7330  _client = Client((ClientImpl*)NULL);
7331  c.deferredExecution(MessageStreamImpl::destroyer, this);
7332  }
7333  else
7334  {
7335  delete this;
7336  }
7337  }
7338 
7339  static void destroyer(void* vpMessageStreamImpl_)
7340  {
7341  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7342  }
7343 
7344  void setSubscription(const std::string& subId_,
7345  const std::string& commandId_ = "",
7346  const std::string& queryId_ = "")
7347  {
7348  Lock<Mutex> lock(_lock);
7349  _subId = subId_;
7350  if (!commandId_.empty() && commandId_ != subId_)
7351  _commandId = commandId_;
7352  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
7353  _queryId = queryId_;
7354  // It's possible to disconnect between creation/registration and here.
7355  if (Disconnected == _state) return;
7356  assert(Unset==_state);
7357  _state = Subscribe;
7358  }
7359 
7360  void setSOWOnly(const std::string& commandId_,
7361  const std::string& queryId_ = "")
7362  {
7363  Lock<Mutex> lock(_lock);
7364  _commandId = commandId_;
7365  if (!queryId_.empty() && queryId_ != commandId_)
7366  _queryId = queryId_;
7367  // It's possible to disconnect between creation/registration and here.
7368  if (Disconnected == _state) return;
7369  assert(Unset==_state);
7370  _state = SOWOnly;
7371  }
7372 
7373  void setStatsOnly(const std::string& commandId_,
7374  const std::string& queryId_ = "")
7375  {
7376  Lock<Mutex> lock(_lock);
7377  _commandId = commandId_;
7378  if (!queryId_.empty() && queryId_ != commandId_)
7379  _queryId = queryId_;
7380  // It's possible to disconnect between creation/registration and here.
7381  if (Disconnected == _state) return;
7382  assert(Unset==_state);
7383  _state = AcksOnly;
7384  _requestedAcks = Message::AckType::Stats;
7385  }
7386 
7387  void setAcksOnly(const std::string& commandId_, unsigned acks_)
7388  {
7389  Lock<Mutex> lock(_lock);
7390  _commandId = commandId_;
7391  // It's possible to disconnect between creation/registration and here.
7392  if (Disconnected == _state) return;
7393  assert(Unset==_state);
7394  _state = AcksOnly;
7395  _requestedAcks = acks_;
7396  }
7397 
7398  void connectionStateChanged(ConnectionStateListener::State state_)
7399  {
7400  Lock<Mutex> lock(_lock);
7401  if(state_ == AMPS::ConnectionStateListener::Disconnected)
7402  {
7403  _state = Disconnected;
7404  close();
7405  }
7406  _lock.signalAll();
7407  }
7408 
7409  void timeout(unsigned timeout_)
7410  {
7411  _timeout = timeout_;
7412  }
7413  void conflate(void)
7414  {
7415  if(_state == Subscribe) _state = Conflate;
7416  }
7417  void maxDepth(unsigned maxDepth_)
7418  {
7419  if(maxDepth_) _maxDepth = maxDepth_;
7420  else _maxDepth = (unsigned)~0;
7421  }
7422  unsigned getMaxDepth(void) const
7423  {
7424  return _maxDepth;
7425  }
7426  unsigned getDepth(void) const
7427  {
7428  return (unsigned)(_q.size());
7429  }
7430 
7431  bool next(Message& current_)
7432  {
7433  Lock<Mutex> lock(_lock);
7434  if (!_previousTopic.empty() && !_previousBookmark.empty())
7435  {
7436  try
7437  {
7438  if (_client.isValid())
7439  {
7440  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7441  }
7442  }
7443 #ifdef _WIN32
7444  catch (AMPSException&)
7445 #else
7446  catch (AMPSException& e)
7447 #endif
7448  {
7449  current_.invalidate();
7450  _previousTopic.clear();
7451  _previousBookmark.clear();
7452  return false;
7453  }
7454  _previousTopic.clear();
7455  _previousBookmark.clear();
7456  }
7457  double minWaitTime = (double)((_timeout && _timeout > 1000)
7458  ? _timeout : 1000);
7459  Timer timer(minWaitTime);
7460  timer.start();
7461  while(_q.empty() && _state & Running)
7462  {
7463  // Using timeout so python can interrupt
7464  _lock.wait((long)minWaitTime);
7465  {
7466  Unlock<Mutex> unlck(_lock);
7467  amps_invoke_waiting_function();
7468  }
7469  if (_timeout)
7470  {
7471  // In case we woke up early, see how much longer to wait
7472  if(timer.checkAndGetRemaining(&minWaitTime))
7473  {
7474  break;
7475  }
7476  }
7477  }
7478  if(!_q.empty())
7479  {
7480  current_ = _q.front();
7481  if(_q.size() == _maxDepth) _lock.signalAll();
7482  _q.pop_front();
7483  if(_state == Conflate)
7484  {
7485  std::string sowKey = current_.getSowKey();
7486  if(sowKey.length()) _sowKeyMap.erase(sowKey);
7487  }
7488  else if(_state == AcksOnly)
7489  {
7490  _requestedAcks &= ~(current_.getAckTypeEnum());
7491  }
7492  if((_state == AcksOnly && _requestedAcks == 0) ||
7493  (_state == SOWOnly && current_.getCommand()=="group_end"))
7494  {
7495  _state = Closed;
7496  }
7497  else if (current_.getCommandEnum() == Message::Command::Publish &&
7498  _client.isValid() && _client.getAutoAck() &&
7499  !current_.getLeasePeriod().empty() &&
7500  !current_.getBookmark().empty())
7501  {
7502  _previousTopic = current_.getTopic().deepCopy();
7503  _previousBookmark = current_.getBookmark().deepCopy();
7504  }
7505  return true;
7506  }
7507  if(_state == Disconnected)
7508  {
7509  throw DisconnectedException("Connection closed.");
7510  }
7511  current_.invalidate();
7512  if(_state == Closed)
7513  {
7514  return false;
7515  }
7516  return _timeout != 0;
7517  }
7518  void close(void)
7519  {
7520  if (_client.isValid())
7521  {
7522  if (_state == SOWOnly || _state == Subscribe) //not delete
7523  {
7524  if (!_commandId.empty()) _client.unsubscribe(_commandId);
7525  if (!_subId.empty()) _client.unsubscribe(_subId);
7526  if (!_queryId.empty()) _client.unsubscribe(_queryId);
7527  }
7528  else
7529  {
7530  if (!_commandId.empty()) _client.removeMessageHandler(_commandId);
7531  if (!_subId.empty()) _client.removeMessageHandler(_subId);
7532  if (!_queryId.empty()) _client.removeMessageHandler(_queryId);
7533  }
7534  }
7535  if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7536  {
7537  _state = Closed;
7538  }
7539  }
7540  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
7541  {
7542  Lock<Mutex> lock(this_->_lock);
7543  if(this_->_state != Conflate)
7544  {
7545  AMPS_TESTING_SLOW_MESSAGE_STREAM
7546  if(this_->_q.size() >= this_->_maxDepth)
7547  {
7548  // We throw here so that heartbeats can be sent. The exception
7549  // will be handled internally only, and the same Message will
7550  // come back to try again. Make sure to signal.
7551  this_->_lock.signalAll();
7552  throw MessageStreamFullException("Stream is currently full.");
7553  }
7554  this_->_q.push_back(message_.deepCopy());
7555  if (message_.getCommandEnum() == Message::Command::Publish &&
7556  this_->_client.isValid() && this_->_client.getAutoAck() &&
7557  !message_.getLeasePeriod().empty() &&
7558  !message_.getBookmark().empty())
7559  {
7560  message_.setIgnoreAutoAck();
7561  }
7562  }
7563  else
7564  {
7565  std::string sowKey = message_.getSowKey();
7566  if(sowKey.length())
7567  {
7568  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7569  if(it != this_->_sowKeyMap.end())
7570  {
7571  *(it->second) = message_.deepCopy();
7572  }
7573  else
7574  {
7575  if(this_->_q.size() >= this_->_maxDepth)
7576  {
7577  // We throw here so that heartbeats can be sent. The
7578  // exception will be handled internally only, and the
7579  // same Message will come back to try again. Make sure
7580  // to signal.
7581  this_->_lock.signalAll();
7582  throw MessageStreamFullException("Stream is currently full.");
7583  }
7584  this_->_q.push_back(message_.deepCopy());
7585  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7586  }
7587  }
7588  else
7589  {
7590  while(this_->_q.size() >= this_->_maxDepth) // -V712
7591  {
7592  this_->_lock.wait(1);
7593  }
7594  this_->_q.push_back(message_.deepCopy());
7595  }
7596  }
7597  this_->_lock.signalAll();
7598  }
7599 };
7600 inline MessageStream::MessageStream(void)
7601 {
7602 }
7603 inline MessageStream::MessageStream(const Client& client_)
7604  :_body(new MessageStreamImpl(client_))
7605 {
7606 }
7607 inline void MessageStream::iterator::advance(void)
7608 {
7609  _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7610 }
7611 inline MessageStream::operator MessageHandler(void)
7612 {
7613  return MessageHandler((void(*)(const Message&,void*))MessageStreamImpl::_messageHandler, &_body.get());
7614 }
7615 inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
7616 {
7617  MessageStream result;
7618  if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7619  {
7620  result._body = (MessageStreamImpl*)(handler_._userData);
7621  }
7622  return result;
7623 }
7624 
7625 inline void MessageStream::setSOWOnly(const std::string& commandId_,
7626  const std::string& queryId_)
7627 {
7628  _body->setSOWOnly(commandId_, queryId_);
7629 }
7630 inline void MessageStream::setSubscription(const std::string& subId_,
7631  const std::string& commandId_,
7632  const std::string& queryId_)
7633 {
7634  _body->setSubscription(subId_, commandId_, queryId_);
7635 }
7636 inline void MessageStream::setStatsOnly(const std::string& commandId_,
7637  const std::string& queryId_)
7638 {
7639  _body->setStatsOnly(commandId_, queryId_);
7640 }
7641 inline void MessageStream::setAcksOnly(const std::string& commandId_,
7642  unsigned acks_)
7643 {
7644  _body->setAcksOnly(commandId_, acks_);
7645 }
7646 inline MessageStream MessageStream::timeout(unsigned timeout_)
7647 {
7648  _body->timeout(timeout_);
7649  return *this;
7650 }
7652 {
7653  _body->conflate();
7654  return *this;
7655 }
7656 inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
7657 {
7658  _body->maxDepth(maxDepth_);
7659  return *this;
7660 }
7661 inline unsigned MessageStream::getMaxDepth(void) const
7662 {
7663  return _body->getMaxDepth();
7664 }
7665 inline unsigned MessageStream::getDepth(void) const
7666 {
7667  return _body->getDepth();
7668 }
7669 
7670 inline MessageStream ClientImpl::getEmptyMessageStream(void)
7671 {
7672  return *(_pEmptyMessageStream.get());
7673 }
7674 
7676 {
7677  // If the command is sow and has a sub_id, OR
7678  // if the command has a replace option, return the existing
7679  // messagestream, don't create a new one.
7680  ClientImpl& body = _body.get();
7681  Message& message = command_.getMessage();
7682  Field subId = message.getSubscriptionId();
7683  unsigned ackTypes = message.getAckTypeEnum();
7684  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace",7)) || message.getCommandEnum() == Message::Command::SOW);
7685  if(useExistingHandler)
7686  {
7687  // Try to find the existing message handler.
7688  if(!subId.empty())
7689  {
7690  MessageHandler existingHandler;
7691  if (body._routes.getRoute(subId, existingHandler))
7692  {
7693  // we found an existing handler. It might not be a message stream, but that's okay.
7694  body.executeAsync(command_, existingHandler, false);
7695  return MessageStream::fromExistingHandler(existingHandler);
7696  }
7697  }
7698  // fall through; we'll a new handler altogether.
7699  }
7700  // Make sure something will be returned to the stream or use the empty one
7701  Message::Command::Type command = message.getCommandEnum();
7702  if ((command & Message::Command::NoDataCommands)
7703  && (ackTypes == Message::AckType::Persisted
7704  || ackTypes == Message::AckType::None))
7705  {
7706  executeAsync(command_, MessageHandler());
7707  if (!body._pEmptyMessageStream)
7708  {
7709  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
7710  body._pEmptyMessageStream.get()->_body->close();
7711  }
7712  return body.getEmptyMessageStream();
7713  }
7714  MessageStream stream(*this);
7715  if (body.getDefaultMaxDepth())
7716  stream.maxDepth(body.getDefaultMaxDepth());
7717  MessageHandler handler = stream.operator MessageHandler();
7718  std::string commandID = body.executeAsync(command_, handler, false);
7719  if (command_.hasStatsAck())
7720  {
7721  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
7722  }
7723  else if (command_.isSow())
7724  {
7725  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
7726  }
7727  else if (command_.isSubscribe())
7728  {
7729  stream.setSubscription(commandID,
7730  command_.getMessage().getCommandId(),
7731  command_.getMessage().getQueryId());
7732  }
7733  else
7734  {
7735  // Persisted acks for writes don't come back with command id
7736  if (command == Message::Command::Publish ||
7737  command == Message::Command::DeltaPublish ||
7738  command == Message::Command::SOWDelete)
7739  {
7740  stream.setAcksOnly(commandID,
7741  ackTypes & (unsigned)~Message::AckType::Persisted);
7742  }
7743  else
7744  {
7745  stream.setAcksOnly(commandID, ackTypes);
7746  }
7747  }
7748  return stream;
7749 }
7750 
7751 // This is here because it uses api from Client.
7752 inline void Message::ack(const char* options_) const
7753 {
7754  ClientImpl* pClient = _body.get().clientImpl();
7755  Message::Field bookmark = getBookmark();
7756  if(pClient && bookmark.len() &&
7757  !pClient->getAutoAck())
7758  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
7759  pClient->ack(getTopic(),bookmark,options_);
7760 }
7761 }// end namespace AMPS
7762 #endif
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:559
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4277
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1261
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5892
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5866
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:512
std::string getAckType() const
Definition: ampsplusplus.hpp:638
AMPSDLL void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4481
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7157
void startTimer()
Definition: ampsplusplus.hpp:5855
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5412
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:761
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:610
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7651
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1230
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4509
AMPSDLL amps_result amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:486
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6146
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6487
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
AMPSDLL amps_result amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4386
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5209
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:546
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1223
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:733
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6496
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6361
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5164
AMPSDLL void amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:540
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4605
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4870
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4401
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1142
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4621
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6216
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6445
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4372
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5919
AMPSDLL amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
AMPSDLL amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:951
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7194
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6507
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4757
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4184
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1145
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6162
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6169
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4457
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1232
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5136
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:813
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6469
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7661
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4432
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1045
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5598
Success.
Definition: amps.h:197
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1128
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:707
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4847
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7239
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5256
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:187
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4470
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4180
amps_result
Return values from amps_xxx functions.
Definition: amps.h:192
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4658
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:973
AMPSDLL amps_handle amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7675
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:609
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4613
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6521
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:643
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4361
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:608
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1079
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:580
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4918
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:884
const std::string & getNameHash() const
Returns the name hash of this client as generated by the server and returned when the client logged o...
Definition: ampsplusplus.hpp:4330
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:628
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5554
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4260
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6121
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6156
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1138
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:6208
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:501
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6223
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1230
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4706
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
AMPSDLL amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1257
AMPSDLL void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4569
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4979
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:493
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1148
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6032
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:724
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1075
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:538
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5954
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1225
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:597
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:542
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:905
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:719
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5080
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6176
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4936
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:590
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1139
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1232
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:701
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4341
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7167
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6373
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:714
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:755
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5448
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:838
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7176
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:965
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4561
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:922
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6434
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1224
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1259
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:993
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5830
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1233
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4553
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:536
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6025
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:509
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4581
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6088
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7187
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:6424
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4824
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5533
AMPSDLL void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:564
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4151
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5770
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:914
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4803
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6253
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5731
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4954
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:566
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5221
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:6416
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5096
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:875
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5695
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6012
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:979
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4636
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4531
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7231
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4540
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1172
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6349
AMPSDLL amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7243
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4523
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:859
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7250
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:202
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5631
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6082
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:544
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5035
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4143
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6192
AMPSDLL amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1138
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6200
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:635
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:943
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7656
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:592
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1260
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4439
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5294
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:618
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:576
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7665
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
The operation has not succeeded, but ought to be retried.
Definition: amps.h:221
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4348
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6058
AMPSDLL amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4408
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:562
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:893
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6184
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:552
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5999
AMPSDLL amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
AMPSDLL void amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7204
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4650
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5652
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6455
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:534
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1224
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:6406
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4681
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4315
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:670
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:935
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1021
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:817
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5311
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4730
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6095
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4896
The client and server are disconnected.
Definition: amps.h:225
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5975
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5062
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7646
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5348
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4322
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:407
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5494
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1033
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5005
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5183
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:521
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6287
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5807
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4195
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6478
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:6397
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5380