AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.2
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps.h"
28 #include "ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43 #include <inttypes.h>
44 #endif
45 #if defined(sun)
46 #include <sys/atomic.h>
47 #endif
48 #include "BookmarkStore.hpp"
49 #include "MessageRouter.hpp"
50 #include "util.hpp"
51 #include "ampscrc.hpp"
52 
53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
55 #endif
56 
61 
62 
69 
80 
81 // For StoreBuffer implementations
82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
88 #define AMPS_DEFAULT_TOP_N -1
89 #define AMPS_DEFAULT_BATCH_SIZE 10
90 #define AMPS_NUMBER_BUFFER_LEN 20
91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
92 
93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
94 #define AMPS_X64 1
95 #endif
96 
97 #ifdef _WIN32
98 static __declspec ( thread ) AMPS::Message* publishStoreMessage = 0;
99 #else
100 static __thread AMPS::Message* publishStoreMessage = 0;
101 #endif
102 
103 namespace AMPS
104 {
105 
106 typedef std::map<std::string, std::string> ConnectionInfo;
107 
108 class PerThreadMessageTracker {
109 std::vector<AMPS::Message*> _messages;
110 public:
111  PerThreadMessageTracker() {}
112  ~PerThreadMessageTracker()
113  {
114  for (size_t i=0; i<_messages.size(); ++i)
115  {
116  delete _messages[i];
117  }
118  }
119  void addMessage(AMPS::Message* message)
120  {
121  _messages.push_back(message);
122  }
123  static void addMessageToCleanupList(AMPS::Message* message)
124  {
125  static AMPS::Mutex _lock;
126  AMPS::Lock<Mutex> l(_lock);
127  _addMessageToCleanupList(message);
128  }
129  static void _addMessageToCleanupList(AMPS::Message* message)
130  {
131  static PerThreadMessageTracker tracker;
132  tracker.addMessage(message);
133  }
134 };
135 
136 template<class Type>
137 inline std::string asString(Type x_)
138 {
139  std::ostringstream os;
140  os << x_;
141  return os.str();
142 }
143 
144 inline
145 size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
146 {
147  size_t pos = AMPS_NUMBER_BUFFER_LEN;
148  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
149  {
150  if (seqNo_ > 0)
151  {
152  buf_[--pos] = (char)(seqNo_ % 10 + '0');
153  seqNo_ /= 10;
154  }
155  }
156  return pos;
157 }
158 
159 #ifdef _WIN32
160 inline
161 size_t convertToCharArray(char* buf_, unsigned long seqNo_)
162 {
163  size_t pos = AMPS_NUMBER_BUFFER_LEN;
164  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
165  {
166  if (seqNo_ > 0)
167  {
168  buf_[--pos] = (char)(seqNo_ % 10 + '0');
169  seqNo_ /= 10;
170  }
171  }
172  return pos;
173 }
174 #endif
175 
179 class Reason
180 {
181  public:
182  static const char* duplicate() { return "duplicate";}
183  static const char* badFilter() { return "bad filter";}
184  static const char* badRegexTopic() { return "bad regex topic";}
185  static const char* subscriptionAlreadyExists() { return "subscription already exists";}
186  static const char* nameInUse() { return "name in use";}
187  static const char* authFailure() { return "auth failure";}
188  static const char* notEntitled() { return "not entitled";}
189  static const char* authDisabled() { return "authentication disabled";}
190  static const char* subidInUse() { return "subid in use";}
191  static const char* noTopic() { return "no topic";}
192 };
193 
203 {
204 public:
205  virtual ~ExceptionListener() {;}
206  virtual void exceptionThrown(const std::exception&) const {;}
207 };
208 
210 
211 
212 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
213  try\
214  {\
215  x;\
216  }\
217  catch (std::exception& ex_)\
218  {\
219  try\
220  {\
221  _exceptionListener->exceptionThrown(ex_);\
222  }\
223  catch(...)\
224  {\
225  ;\
226  }\
227  }
228  /*
229  * Note : we don't attempt to trap non std::exception exceptions
230  * here because doing so interferes with pthread_exit on some OSes.
231  catch (...)\
232  {\
233  try\
234  {\
235  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
236  "An unhandled exception of unknown type was thrown by "\
237  "the registered handler.", AMPS_E_USAGE));\
238  }\
239  catch(...)\
240  {\
241  ;\
242  }\
243  }
244  */
245 #ifdef _WIN32
246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
247  try\
248  {\
249  while(me->_connected)\
250  {\
251  try\
252  {\
253  x;\
254  break;\
255  }\
256  catch(MessageStreamFullException&)\
257  {\
258  me->checkAndSendHeartbeat(false);\
259  }\
260  }\
261  }\
262  catch (std::exception& ex_)\
263  {\
264  try\
265  {\
266  me->_exceptionListener->exceptionThrown(ex_);\
267  }\
268  catch(...)\
269  {\
270  ;\
271  }\
272  }
273  /*
274  * Note : we don't attempt to trap non std::exception exceptions
275  * here because doing so interferes with pthread_exit on some OSes.
276  catch (...)\
277  {\
278  try\
279  {\
280  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
281  "An unhandled exception of unknown type was thrown by "\
282  "the registered handler.", AMPS_E_USAGE));\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }*/
289 
290 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
291  while(me->_connected)\
292  {\
293  try\
294  {\
295  x;\
296  break;\
297  }\
298  catch(MessageStreamFullException&)\
299  {\
300  me->checkAndSendHeartbeat(false);\
301  }\
302  }
303 #else
304 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
305  try\
306  {\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException& ex_)\
315  {\
316  me->checkAndSendHeartbeat(false);\
317  }\
318  }\
319  }\
320  catch (std::exception& ex_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(ex_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  }
331  /*
332  * Note : we don't attempt to trap non std::exception exceptions
333  * here because doing so interferes with pthread_exit on some OSes.
334  catch (...)\
335  {\
336  try\
337  {\
338  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
339  "An unhandled exception of unknown type was thrown by "\
340  "the registered handler.", AMPS_E_USAGE));\
341  }\
342  catch(...)\
343  {\
344  ;\
345  }\
346  }*/
347 
348 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
349  while(me->_connected)\
350  {\
351  try\
352  {\
353  x;\
354  break;\
355  }\
356  catch(MessageStreamFullException& ex_)\
357  {\
358  me->checkAndSendHeartbeat(false);\
359  }\
360  }
361 #endif
362 
363 #define AMPS_UNHANDLED_EXCEPTION(ex) \
364  try\
365  {\
366  _exceptionListener->exceptionThrown(ex);\
367  }\
368  catch(...)\
369  {;}
370 
371 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
372  try\
373  {\
374  me->_exceptionListener->exceptionThrown(ex);\
375  }\
376  catch(...)\
377  {;}
378 
379 
380 class Client;
381 
406 
407 class Command
408 {
409  Message _message;
410  unsigned _timeout;
411  unsigned _batchSize;
412  unsigned _flags;
413  static const unsigned Subscribe = 1;
414  static const unsigned SOW = 2;
415  static const unsigned NeedsSequenceNumber = 4;
416  static const unsigned ProcessedAck = 8;
417  static const unsigned StatsAck = 16;
418  void init(Message::Command::Type command_)
419  {
420  _timeout = 0;
421  _batchSize = 0;
422  _flags = 0;
423  _message.reset();
424  _message.setCommandEnum(command_);
425  _setIds();
426  }
427  void init(const std::string& command_)
428  {
429  _timeout = 0;
430  _batchSize = 0;
431  _flags = 0;
432  _message.reset();
433  _message.setCommand(command_);
434  _setIds();
435  }
436  void _setIds(void)
437  {
438  Message::Command::Type command = _message.getCommandEnum();
439  if (!(command & Message::Command::NoDataCommands))
440  {
441  _message.newCommandId();
442  if (command == Message::Command::Subscribe ||
443  command == Message::Command::SOWAndSubscribe ||
444  command == Message::Command::DeltaSubscribe ||
445  command == Message::Command::SOWAndDeltaSubscribe)
446  {
447  _message.setSubscriptionId(_message.getCommandId());
448  _flags |= Subscribe;
449  }
450  if (command == Message::Command::SOW
451  || command == Message::Command::SOWAndSubscribe
452  || command == Message::Command::SOWAndDeltaSubscribe)
453  {
454  _message.setQueryID(_message.getCommandId());
455  if (_batchSize == 0)
456  {
457  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
458  }
459  if (command == Message::Command::SOW)
460  {
461  _flags |= SOW;
462  }
463  }
464  _flags |= ProcessedAck;
465  }
466  else if (command == Message::Command::SOWDelete)
467  {
468  _message.newCommandId();
469  _flags |= ProcessedAck;
470  _flags |= NeedsSequenceNumber;
471  }
472  else if (command == Message::Command::Publish
473  || command == Message::Command::DeltaPublish)
474  {
475  _flags |= NeedsSequenceNumber;
476  }
477  else if (command == Message::Command::StopTimer)
478  {
479  _message.newCommandId();
480  }
481  }
482 public:
486  Command(const std::string& command_)
487  {
488  init(command_);
489  }
493  Command(Message::Command::Type command_)
494  {
495  init(command_);
496  }
497 
501  Command& reset(const std::string& command_)
502  {
503  init(command_);
504  return *this;
505  }
509  Command& reset(Message::Command::Type command_)
510  {
511  init(command_);
512  return *this;
513  }
521  Command& setSowKey(const std::string& sowKey_) { _message.setSowKey(sowKey_); return *this; }
534  Command& setSowKeys(const std::string& sowKeys_) { _message.setSowKeys(sowKeys_); return *this; }
536  Command& setCommandId(const std::string& v_) { _message.setCommandId(v_); return *this; }
538  Command& setTopic(const std::string& v_) { _message.setTopic(v_); return *this; }
540  Command& setFilter(const std::string& v_) { _message.setFilter(v_); return *this; }
542  Command& setOrderBy(const std::string& v_) { _message.setOrderBy(v_); return *this; }
544  Command& setSubId(const std::string& v_) { _message.setSubscriptionId(v_); return *this; }
546  Command& setQueryId(const std::string& v_) { _message.setQueryId(v_); return *this; }
552  Command& setBookmark(const std::string& v_) { _message.setBookmark(v_); return *this; }
559  Command& setCorrelationId(const std::string& v_) { _message.setCorrelationId(v_); return *this; }
562  Command& setOptions(const std::string& v_) { _message.setOptions(v_); return *this; }
564  Command& setSequence(const std::string& v_) { _message.setSequence(v_); return *this; }
566  Command& setSequence(const amps_uint64_t v_)
567  {
568  std::ostringstream os;
569  os << v_;
570  _message.setSequence(os.str());
571  return *this;
572  }
573  amps_uint64_t getSequence() const { return amps_message_get_field_uint64(_message.getMessage(),AMPS_Sequence); }
576  Command& setData(const std::string& v_) { _message.setData(v_); return *this; }
580  Command& setData(const char* v_, size_t length_) { _message.setData(v_, length_); return *this; }
590  Command& setTimeout(unsigned v_) { _timeout = v_; return *this; }
592  Command& setTopN(unsigned v_) { _message.setTopNRecordsReturned(v_); return *this; }
597  Command& setBatchSize(unsigned v_) { _message.setBatchSize(v_); _batchSize = v_; return *this; }
608  Command& setExpiration(unsigned v_) { _message.setExpiration(v_); return *this; }
610  Command& addAckType(const std::string& v_)
611  {
612  _message.setAckType(_message.getAckType() + "," + v_);
613  if (v_ == "processed") _flags |= ProcessedAck;
614  else if (v_ == "stats") _flags |= StatsAck;
615  return *this;
616  }
618  Command& setAckType(const std::string& v_)
619  {
620  _message.setAckType(v_);
621  if (v_.find("processed") != std::string::npos) _flags |= ProcessedAck;
622  else _flags &= ~ProcessedAck;
623  if (v_.find("stats") != std::string::npos) _flags |= StatsAck;
624  else _flags &= ~StatsAck;
625  return *this;
626  }
628  Command& setAckType(unsigned v_)
629  {
630  _message.setAckTypeEnum(v_);
631  if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
632  else _flags &= ~ProcessedAck;
633  if (v_ & Message::AckType::Stats) _flags |= StatsAck;
634  else _flags &= ~StatsAck;
635  return *this;
636  }
638  std::string getAckType() const
639  {
640  return (std::string)(_message.getAckType());
641  }
643  unsigned getAckTypeEnum() const
644  {
645  return _message.getAckTypeEnum();
646  }
647 
648  Message& getMessage(void) { return _message; }
649  unsigned getTimeout(void) const { return _timeout; }
650  unsigned getBatchSize(void) const { return _batchSize; }
651  bool isSubscribe(void) const
652  {
653  return _flags & Subscribe;
654  }
655  bool isSow(void) const { return (_flags & SOW) != 0; }
656  bool hasProcessedAck(void) const { return (_flags & ProcessedAck) != 0; }
657  bool hasStatsAck(void) const { return (_flags & StatsAck) != 0; }
658  bool needsSequenceNumber(void) const { return (_flags & NeedsSequenceNumber) != 0; }
659 };
660 
663 typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
664 
665 class Message;
667 
671 {
672 public:
673  virtual ~Authenticator() {;}
674 
680  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
688  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
695  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
696 };
697 
702 {
703 public:
704  virtual ~DefaultAuthenticator() {;}
707  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
708  {
709  return password_;
710  }
711 
714  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
715  {
716  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
717  }
718 
719  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
720 
725  {
726  static DefaultAuthenticator d;
727  return d;
728  }
729 };
730 
734 {
735 public:
736 
740  virtual void execute(Message& message_) = 0;
741 
742  virtual ~StoreReplayer() {;}
743 };
744 
745 class Store;
746 
755 typedef bool (*PublishStoreResizeHandler)(Store store_,
756  size_t size_,
757  void* userData_);
758 
761 class StoreImpl : public RefBody
762 {
763 public:
764  StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
765 
770  virtual amps_uint64_t store(const Message& message_) = 0;
771 
776  virtual void discardUpTo(amps_uint64_t index_) = 0;
777 
782  virtual void replay(StoreReplayer& replayer_) = 0;
783 
791  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
792 
797  virtual size_t unpersistedCount() const = 0;
798 
799  virtual ~StoreImpl() {;}
800 
809  virtual void flush(long timeout_) = 0;
810 
813  static inline size_t getUnsetPosition() { return AMPS_UNSET_INDEX; }
814 
817  static inline amps_uint64_t getUnsetSequence() { return AMPS_UNSET_SEQUENCE; }
818 
822  virtual amps_uint64_t getLowestUnpersisted() const = 0;
823 
827  virtual amps_uint64_t getLastPersisted() = 0;
828 
838  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
839  void* userData_)
840  {
841  _resizeHandler = handler_;
842  _resizeHandlerData = userData_;
843  }
844 
845  inline virtual PublishStoreResizeHandler getResizeHandler() const
846  {
847  return _resizeHandler;
848  }
849 
850  bool callResizeHandler(size_t newSize_);
851 
852 private:
853  PublishStoreResizeHandler _resizeHandler;
854  void* _resizeHandlerData;
855 };
856 
859 class Store
860 {
861  RefHandle<StoreImpl> _body;
862 public:
863  Store() {;}
864  Store(StoreImpl* body_) : _body(body_) {;}
865  Store(const Store& rhs) : _body(rhs._body) {;}
866  Store& operator=(const Store& rhs)
867  {
868  _body = rhs._body;
869  return *this;
870  }
871 
875  amps_uint64_t store(const Message& message_)
876  {
877  return _body.get().store(message_);
878  }
879 
884  void discardUpTo(amps_uint64_t index_)
885  {
886  _body.get().discardUpTo(index_);
887  }
888 
893  void replay(StoreReplayer& replayer_)
894  {
895  _body.get().replay(replayer_);
896  }
897 
905  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
906  {
907  return _body.get().replaySingle(replayer_, index_);
908  }
909 
914  size_t unpersistedCount() const
915  {
916  return _body.get().unpersistedCount();
917  }
918 
922  bool isValid() const
923  {
924  return _body.isValid();
925  }
926 
935  void flush(long timeout_ = 0)
936  {
937  return _body.get().flush(timeout_);
938  }
939 
943  amps_uint64_t getLowestUnpersisted()
944  {
945  return _body.get().getLowestUnpersisted();
946  }
947 
951  amps_uint64_t getLastPersisted()
952  {
953  return _body.get().getLastPersisted();
954  }
955 
965  void setResizeHandler(PublishStoreResizeHandler handler_,
966  void* userData_)
967  {
968  _body.get().setResizeHandler(handler_, userData_);
969  }
970 
971  PublishStoreResizeHandler getResizeHandler()
972  {
973  return _body.get().getResizeHandler();
974  }
975 
979  StoreImpl* get()
980  {
981  if (_body.isValid())
982  return &_body.get();
983  else
984  return NULL;
985  }
986 
987 };
988 
994 {
995 public:
996  virtual ~FailedWriteHandler() {;}
1003  virtual void failedWrite(const Message& message_,
1004  const char* reason_, size_t reasonLength_) = 0;
1005 };
1006 
1007 
1008 inline bool StoreImpl::callResizeHandler(size_t newSize_)
1009 {
1010  if(_resizeHandler)
1011  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1012  return true;
1013 }
1014 
1021 inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1022  void* data_)
1023 {
1024  long* timeoutp = (long*)data_;
1025  size_t count = store_.unpersistedCount();
1026  if (count == 0) return false;
1027  try
1028  {
1029  store_.flush(*timeoutp);
1030  }
1031 #ifdef _WIN32
1032  catch (const TimedOutException&)
1033 #else
1034  catch (const TimedOutException& e)
1035 #endif
1036  {
1037  return true;
1038  }
1039  return (count == store_.unpersistedCount());
1040 }
1041 
1046 {
1047 public:
1048  virtual ~SubscriptionManager() {;}
1056  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1057  unsigned requestedAckTypes_) = 0;
1061  virtual void unsubscribe(const Message::Field& subId_) = 0;
1064  virtual void clear() = 0;
1068  virtual void resubscribe(Client& client_) = 0;
1069 };
1070 
1074 
1076 {
1077 public:
1079  typedef enum { Disconnected = 0,
1080  Shutdown = 1,
1081  Connected = 2,
1082  LoggedOn = 4,
1083  PublishReplayed = 8,
1084  HeartbeatInitiated = 16,
1085  Resubscribed = 32,
1086  UNKNOWN = 16384
1087  } State;
1088 
1098  virtual void connectionStateChanged(State newState_) = 0;
1099  virtual ~ConnectionStateListener() {;};
1100 };
1101 
1102 
1103 class MessageStreamImpl;
1104 class MessageStream;
1105 
1106 typedef void(*DeferredExecutionFunc)(void*);
1107 
1108 class ClientImpl : public RefBody // -V553
1109 {
1110  friend class Client;
1111 protected:
1112  amps_handle _client;
1113  DisconnectHandler _disconnectHandler;
1114  enum GlobalCommandTypeHandlers : size_t
1115  {
1116  Publish = 0,
1117  SOW = 1,
1118  GroupBegin = 2,
1119  GroupEnd = 3,
1120  Heartbeat = 4,
1121  OOF = 5,
1122  Ack = 6,
1123  LastChance = 7,
1124  DuplicateMessage = 8,
1125  COUNT = 9
1126  };
1127  std::vector<MessageHandler> _globalCommandTypeHandlers;
1128  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1129  MessageRouter _routes;
1130  MessageRouter::RouteCache _routeCache;
1131  mutable Mutex _lock;
1132  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1133  BookmarkStore _bookmarkStore;
1134  Store _publishStore;
1135  bool _isRetryOnDisconnect;
1136  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1137  volatile amps_uint64_t _lastSentHaSequenceNumber;
1138  ATOMIC_TYPE_8 _badTimeToHAPublish;
1139  ATOMIC_TYPE_8 _badTimeToHASubscribe;
1140  VersionInfo _serverVersion;
1141  Timer _heartbeatTimer;
1142  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1143 
1144  // queue data
1145  int _queueAckTimeout;
1146  bool _isAutoAckEnabled;
1147  unsigned _ackBatchSize;
1148  unsigned _queuedAckCount;
1149  unsigned _defaultMaxDepth;
1150  struct QueueBookmarks
1151  {
1152  QueueBookmarks(const std::string& topic_)
1153  :_topic(topic_)
1154  ,_oldestTime(0)
1155  ,_bookmarkCount(0)
1156  {;}
1157  std::string _topic;
1158  std::string _data;
1159  amps_uint64_t _oldestTime;
1160  unsigned _bookmarkCount;
1161  };
1162  typedef amps_uint64_t topic_hash;
1163  typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1164  TopicHashMap _topicHashMap;
1165 
1166  class ClientStoreReplayer : public StoreReplayer
1167  {
1168  ClientImpl* _client;
1169  public:
1170  unsigned _version;
1171  amps_result _res;
1172 
1173  ClientStoreReplayer()
1174  : _client(NULL) , _version(0), _res(AMPS_E_OK)
1175  {}
1176 
1177  ClientStoreReplayer(ClientImpl* client_)
1178  : _client(client_) , _version(0), _res(AMPS_E_OK)
1179  {}
1180 
1181  void setClient(ClientImpl* client_) { _client = client_; }
1182 
1183  void execute(Message& message_)
1184  {
1185  if (!_client) throw CommandException("Can't replay without a client.");
1186  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1187  AMPS_Sequence);
1188  if (index > _client->_lastSentHaSequenceNumber)
1189  _client->_lastSentHaSequenceNumber = index;
1190 
1191  _res = AMPS_E_OK;
1192  // Don't replay a queue cancel message after a reconnect.
1193  // Currently, the only messages that will have anything in options
1194  // are cancel messages.
1195  if (!message_.getCommand().empty() &&
1196  (!_client->_badTimeToHAPublish ||
1197  message_.getOptions().len() < 6))
1198  {
1199  _res= amps_client_send_with_version(_client->_client,
1200  message_.getMessage(),
1201  &_version);
1202  if (_res != AMPS_E_OK)
1203  {
1204  throw DisconnectedException("AMPS Server disconnected during replay");
1205  }
1206  }
1207  }
1208 
1209  };
1210  ClientStoreReplayer _replayer;
1211 
1212  class FailedWriteStoreReplayer : public StoreReplayer
1213  {
1214  ClientImpl* _parent;
1215  const char* _reason;
1216  size_t _reasonLength;
1217  size_t _replayCount;
1218  public:
1219  FailedWriteStoreReplayer(ClientImpl* parent,const char* reason_, size_t reasonLength_)
1220  : _parent(parent),
1221  _reason(reason_),
1222  _reasonLength(reasonLength_),
1223  _replayCount(0)
1224  {;}
1225  void execute(Message& message_)
1226  {
1227  if (_parent->_failedWriteHandler)
1228  {
1229  ++_replayCount;
1230  _parent->_failedWriteHandler->failedWrite(message_,
1231  _reason, _reasonLength);
1232  }
1233  }
1234  size_t replayCount(void) const { return _replayCount; }
1235  };
1236 
1237  struct AckResponseImpl : public RefBody
1238  {
1239  std::string username, password, reason, status, bookmark, options;
1240  amps_uint64_t sequenceNo;
1241  VersionInfo serverVersion;
1242  volatile bool responded, abandoned;
1243  unsigned connectionVersion;
1244  AckResponseImpl() :
1245  RefBody(),
1246  sequenceNo((amps_uint64_t)0),
1247  serverVersion(),
1248  responded(false),
1249  abandoned(false),
1250  connectionVersion(0)
1251  {
1252  }
1253  };
1254 
1255  class AckResponse
1256  {
1257  RefHandle<AckResponseImpl> _body;
1258  public:
1259  AckResponse() : _body(NULL) {;}
1260  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1261  static AckResponse create()
1262  {
1263  AckResponse r;
1264  r._body = new AckResponseImpl();
1265  return r;
1266  }
1267 
1268  const std::string& username()
1269  {
1270  return _body.get().username;
1271  }
1272  void setUsername(const char* data_, size_t len_)
1273  {
1274  if (data_) _body.get().username.assign(data_, len_);
1275  else _body.get().username.clear();
1276  }
1277  const std::string& password()
1278  {
1279  return _body.get().password;
1280  }
1281  void setPassword(const char* data_, size_t len_)
1282  {
1283  if (data_) _body.get().password.assign(data_, len_);
1284  else _body.get().password.clear();
1285  }
1286  const std::string& reason()
1287  {
1288  return _body.get().reason;
1289  }
1290  void setReason(const char* data_, size_t len_)
1291  {
1292  if (data_) _body.get().reason.assign(data_, len_);
1293  else _body.get().reason.clear();
1294  }
1295  const std::string& status()
1296  {
1297  return _body.get().status;
1298  }
1299  void setStatus(const char* data_, size_t len_)
1300  {
1301  if (data_) _body.get().status.assign(data_, len_);
1302  else _body.get().status.clear();
1303  }
1304  const std::string& bookmark()
1305  {
1306  return _body.get().bookmark;
1307  }
1308  void setBookmark(const char* data_, size_t len_)
1309  {
1310  if (data_) _body.get().bookmark.assign(data_, len_);
1311  else _body.get().bookmark.clear();
1312  }
1313  amps_uint64_t sequenceNo() const
1314  {
1315  return _body.get().sequenceNo;
1316  }
1317  void setSequenceNo(const char* data_, size_t len_)
1318  {
1319  amps_uint64_t result = (amps_uint64_t)0;
1320  if (data_)
1321  {
1322  for(size_t i=0; i<len_; ++i)
1323  {
1324  result *= (amps_uint64_t)10;
1325  result += (amps_uint64_t)(data_[i] - '0');
1326  }
1327  }
1328  _body.get().sequenceNo = result;
1329  }
1330  VersionInfo serverVersion() const
1331  {
1332  return _body.get().serverVersion;
1333  }
1334  void setServerVersion(const char* data_, size_t len_)
1335  {
1336  if (data_)
1337  _body.get().serverVersion.setVersion(std::string(data_, len_));
1338  }
1339  bool responded()
1340  {
1341  return _body.get().responded;
1342  }
1343  void setResponded(bool responded_)
1344  {
1345  _body.get().responded = responded_;
1346  }
1347  bool abandoned()
1348  {
1349  return _body.get().abandoned;
1350  }
1351  void setAbandoned(bool abandoned_)
1352  {
1353  if (_body.isValid())
1354  _body.get().abandoned = abandoned_;
1355  }
1356 
1357  void setConnectionVersion(unsigned connectionVersion)
1358  {
1359  _body.get().connectionVersion = connectionVersion;
1360  }
1361 
1362  unsigned getConnectionVersion()
1363  {
1364  return _body.get().connectionVersion;
1365  }
1366  void setOptions(const char* data_, size_t len_)
1367  {
1368  if (data_) _body.get().options.assign(data_,len_);
1369  else _body.get().options.clear();
1370  }
1371 
1372  const std::string& options()
1373  {
1374  return _body.get().options;
1375  }
1376 
1377  AckResponse& operator=(const AckResponse& rhs)
1378  {
1379  _body = rhs._body;
1380  return *this;
1381  }
1382  };
1383 
1384 
1385  typedef std::map<std::string, AckResponse> AckMap;
1386  AckMap _ackMap;
1387  Mutex _ackMapLock;
1388  DefaultExceptionListener _defaultExceptionListener;
1389 protected:
1390 
1391  struct DeferredExecutionRequest
1392  {
1393  DeferredExecutionRequest(DeferredExecutionFunc func_,
1394  void* userData_)
1395  : _func(func_),
1396  _userData(userData_)
1397  {;}
1398 
1399  DeferredExecutionFunc _func;
1400  void* _userData;
1401  };
1402  const ExceptionListener* _exceptionListener;
1403  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1404  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1405  bool _connected;
1406  std::string _username;
1407  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1408  ConnectionStateListeners _connectionStateListeners;
1409  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1410  Mutex _deferredExecutionLock;
1411  DeferredExecutionList _deferredExecutionList;
1412  unsigned _heartbeatInterval;
1413  unsigned _readTimeout;
1414 
1415  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1416  {
1417  // If we disconnectd before we got to notification, don't notify.
1418  // This should only be able to happen for Resubscribed, since the lock
1419  // is released to let the subscription manager run resubscribe so a
1420  // disconnect could be called before the change is broadcast.
1421  if (!_connected && newState_ > ConnectionStateListener::Connected)
1422  {
1423  return;
1424  }
1425  for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1426  {
1427  AMPS_CALL_EXCEPTION_WRAPPER(
1428  (*it)->connectionStateChanged(newState_));
1429  }
1430  }
1431  unsigned processedAck(Message& message);
1432  unsigned persistedAck(Message& meesage);
1433  void lastChance(Message& message);
1434  void checkAndSendHeartbeat(bool force=false);
1435  virtual ConnectionInfo getConnectionInfo() const;
1436  static amps_result
1437  ClientImplMessageHandler(amps_handle message, void* userData);
1438  static void
1439  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1440  static amps_result
1441  ClientImplDisconnectHandler(amps_handle client, void* userData);
1442 
1443  void unsubscribeInternal(const std::string& id)
1444  {
1445  if (id.empty()) return;
1446  // remove the handler first to avoid any more message delivery
1447  Message::Field subId;
1448  subId.assign(id.data(), id.length());
1449  _routes.removeRoute(subId);
1450  // Lock is already acquired
1451  if (_subscriptionManager)
1452  {
1453  // Have to unlock before calling into sub manager to avoid deadlock
1454  Unlock<Mutex> unlock(_lock);
1455  _subscriptionManager->unsubscribe(subId);
1456  }
1457  _message.reset();
1458  _message.setCommandEnum(Message::Command::Unsubscribe);
1459  _message.newCommandId();
1460  _message.setSubscriptionId(id);
1461  _sendWithoutRetry(_message);
1462  deferredExecution(&amps_noOpFn, NULL);
1463  }
1464 
1465  AckResponse syncAckProcessing(long timeout_, Message& message_,
1466  bool isHASubscribe_)
1467  {
1468  return syncAckProcessing(timeout_, message_,
1469  (amps_uint64_t)0, isHASubscribe_);
1470  }
1471 
1472  AckResponse syncAckProcessing(long timeout_, Message& message_,
1473  amps_uint64_t haSeq = (amps_uint64_t)0,
1474  bool isHASubscribe_ = false)
1475  {
1476  // inv: we already have _lock locked up.
1477  AckResponse ack = AckResponse::create();
1478  if (1)
1479  {
1480  Lock<Mutex> guard(_ackMapLock);
1481  _ackMap[message_.getCommandId()] = ack;
1482  }
1483  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
1484  if (ack.getConnectionVersion() == 0)
1485  {
1486  // Send failed
1487  throw DisconnectedException("Connection closed while waiting for response.");
1488  }
1489  bool timedOut = false;
1490  AMPS_START_TIMER(timeout_)
1491  while(!timedOut && !ack.responded() && !ack.abandoned() && _connected)
1492  {
1493  if (timeout_)
1494  {
1495  timedOut = !_lock.wait(timeout_);
1496  // May have woken up early, check real time
1497  if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1498  }
1499  else
1500  {
1501  // Using a timeout version to ensure python can interrupt
1502  _lock.wait(1000);
1503  Unlock<Mutex> unlck(_lock);
1504  amps_invoke_waiting_function();
1505  }
1506  }
1507  if (ack.responded())
1508  {
1509  if (ack.status() != "failure")
1510  {
1511  if (message_.getCommand() == "logon")
1512  {
1513  amps_uint64_t ackSequence = ack.sequenceNo();
1514  if (_lastSentHaSequenceNumber < ackSequence)
1515  {
1516  _lastSentHaSequenceNumber = ackSequence;
1517  }
1518  if (_publishStore.isValid())
1519  {
1520  _publishStore.discardUpTo(ackSequence);
1521  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
1522  {
1523  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
1524  }
1525  }
1526  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
1527  _serverVersion = ack.serverVersion();
1528  if (_bookmarkStore.isValid())
1529  _bookmarkStore.setServerVersion(_serverVersion);
1530  }
1531  if(_ackBatchSize)
1532  {
1533  const std::string& options = ack.options();
1534  size_t index = options.find_first_of("max_backlog=");
1535  if(index != std::string::npos)
1536  {
1537  unsigned data =0;
1538  const char* c = options.c_str()+index+12;
1539  while(*c && *c!=',')
1540  {
1541  data = (data*10) + (unsigned)(*c++-48);
1542  }
1543  if(_ackBatchSize > data) _ackBatchSize = data;
1544  }
1545  }
1546  return ack;
1547  }
1548  const size_t NotEntitled = 12;
1549  std::string ackReason = ack.reason();
1550  if (ackReason.length() == 0) return ack; // none
1551  if (ackReason.length() == NotEntitled &&
1552  ackReason[0] == 'n' &&
1553  message_.getUserId().len() == 0)
1554  {
1555  message_.assignUserId(_username);
1556  }
1557  message_.throwFor(_client, ackReason);
1558  }
1559  else // !ack.responded()
1560  {
1561  if (!ack.abandoned())
1562  {
1563  throw TimedOutException("timed out waiting for operation.");
1564  }
1565  else
1566  {
1567  throw DisconnectedException("Connection closed while waiting for response.");
1568  }
1569  }
1570  return ack;
1571  }
1572 
1573  void _cleanup(void)
1574  {
1575  if (!_client) return;
1577  NULL,
1578  0L);
1580  NULL,
1581  0L);
1582  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1583  _pEmptyMessageStream.reset(NULL);
1584  amps_client_destroy(_client);
1585  _client = NULL;
1586  }
1587 
1588 public:
1589 
1590  ClientImpl(const std::string& clientName)
1591  : _client(NULL), _name(clientName)
1592  , _isRetryOnDisconnect(true)
1593  , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1594  , _badTimeToHASubscribe(0), _serverVersion()
1595  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1596  , _isAutoAckEnabled(false)
1597  , _ackBatchSize(0)
1598  , _queuedAckCount(0)
1599  , _defaultMaxDepth(0)
1600  , _connected(false)
1601  , _heartbeatInterval(0)
1602  , _readTimeout(0)
1603  {
1604  _replayer.setClient(this);
1605  _client = amps_client_create(clientName.c_str());
1606  amps_client_set_message_handler(_client, (amps_handler)ClientImpl::ClientImplMessageHandler, this);
1607  amps_client_set_predisconnect_handler(_client, (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler, this);
1608  amps_client_set_disconnect_handler(_client, (amps_handler)ClientImpl::ClientImplDisconnectHandler, this);
1609  _exceptionListener = &_defaultExceptionListener;
1610  for (size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1611  {
1612  _globalCommandTypeHandlers.push_back(MessageHandler());
1613  }
1614  }
1615 
1616  virtual ~ClientImpl()
1617  {
1618  _cleanup();
1619  }
1620 
1621  const std::string& getName() const
1622  {
1623  return _name;
1624  }
1625 
1626  const std::string& getNameHash() const
1627  {
1628  return _nameHash;
1629  }
1630 
1631  void setName(const std::string& name)
1632  {
1633  // This operation will fail if the client's
1634  // name is already set.
1636  _client, name.c_str());
1637  if (result != AMPS_E_OK)
1638  {
1639  AMPSException::throwFor(_client, result);
1640  }
1641  _name = name;
1642  }
1643 
1644  const std::string& getLogonCorrelationData() const
1645  {
1646  return _logonCorrelationData;
1647  }
1648 
1649  void setLogonCorrelationData(const std::string& logonCorrelationData_)
1650  {
1651  _logonCorrelationData = logonCorrelationData_;
1652  }
1653 
1654  size_t getServerVersion() const
1655  {
1656  return _serverVersion.getOldStyleVersion();
1657  }
1658 
1659  VersionInfo getServerVersionInfo() const
1660  {
1661  return _serverVersion;
1662  }
1663 
1664  const std::string& getURI() const
1665  {
1666  return _lastUri;
1667  }
1668 
1669  virtual void connect(const std::string& uri)
1670  {
1671  Lock<Mutex> l(_lock);
1672  _connect(uri);
1673  }
1674 
1675  virtual void _connect(const std::string& uri)
1676  {
1677  _lastUri = uri;
1679  _client, uri.c_str());
1680  if (result != AMPS_E_OK)
1681  {
1682  AMPSException::throwFor(_client, result);
1683  }
1684  _message.reset();
1685  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
1686  _publishMessage.setCommandEnum(Message::Command::Publish);
1687  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
1688  _beatMessage.setOptions("beat");
1689  _readMessage.setClientImpl(this);
1690  if(_queueAckTimeout)
1691  {
1692  amps_client_set_idle_time(_client,_queueAckTimeout);
1693  }
1694  _connected = true;
1695  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1696  }
1697 
1698  void setDisconnected()
1699  {
1700  {
1701  Lock<Mutex> l(_lock);
1702  if (_connected)
1703  {
1704  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
1705  }
1706  _connected = false;
1707  _routes.clear();
1708  _heartbeatTimer.setTimeout(0.0);
1709  }
1710  clearAcks(INT_MAX);
1711  amps_client_disconnect(_client);
1712  }
1713 
1714  virtual void disconnect()
1715  {
1716  {
1717  Lock<Mutex> l(_lock);
1718  _message.reset();
1719  _message.setCommandEnum(Message::Command::Unsubscribe);
1720  _message.newCommandId();
1721  _message.setSubscriptionId("all");
1722  AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1723  }
1724  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1725  setDisconnected();
1726  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
1727  Lock<Mutex> l(_lock);
1728  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1729  }
1730 
1731  void clearAcks(unsigned failedVersion)
1732  {
1733  // Have to lock to prevent race conditions
1734  Lock<Mutex> guard(_ackMapLock);
1735  {
1736  // Go ahead and signal any waiters if they are around...
1737  std::vector<std::string> worklist;
1738  for(AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
1739  {
1740  if (i->second.getConnectionVersion() <= failedVersion)
1741  {
1742  i->second.setAbandoned(true);
1743  worklist.push_back(i->first);
1744  }
1745  }
1746 
1747  for(std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
1748  {
1749  _ackMap.erase(*j);
1750  }
1751  }
1752 
1753  _lock.signalAll();
1754  }
1755 
1756  int send(const Message& message)
1757  {
1758  Lock<Mutex> l(_lock);
1759  return _send(message);
1760  }
1761 
1762  void sendWithoutRetry(const Message& message_)
1763  {
1764  Lock<Mutex> l(_lock);
1765  _sendWithoutRetry(message_);
1766  }
1767 
1768  void _sendWithoutRetry(const Message& message_)
1769  {
1770  amps_result result = amps_client_send(_client, message_.getMessage());
1771  if(result != AMPS_E_OK)
1772  {
1773  AMPSException::throwFor(_client,result);
1774  }
1775  }
1776 
1777  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1778  bool isHASubscribe_ = false)
1779  {
1780  // Lock is already acquired
1781  amps_result result = AMPS_E_RETRY;
1782 
1783  // Create a local reference to this message, as we'll need to hold on
1784  // to a reference to it in case reconnect occurs.
1785  Message localMessage = message;
1786  unsigned version = 0;
1787 
1788  while(result == AMPS_E_RETRY)
1789  {
1790  if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1791  {
1792  // If retrySend is disabled, do not wait for the reconnect
1793  // to finish, just throw.
1794  if(!_isRetryOnDisconnect)
1795  {
1796  AMPSException::throwFor(_client,AMPS_E_RETRY);
1797  }
1798  Unlock<Mutex> l(_lock);
1799 #ifdef _WIN32
1800  Sleep(0);
1801 #elif defined(sun)
1802  sched_yield();
1803 #else
1804  pthread_yield();
1805 #endif
1806  }
1807  else
1808  {
1809  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1810  (isHASubscribe_ && _badTimeToHASubscribe != 0))
1811  {
1812  return (int)version;
1813  }
1814  // It's possible to get here out of order, but this way we'll
1815  // always send in order.
1816  if (haSeq > _lastSentHaSequenceNumber)
1817  {
1818  while (haSeq > _lastSentHaSequenceNumber + 1)
1819  {
1820  try
1821  {
1822  // Replayer updates _lastSentHaSsequenceNumber
1823  if (!_publishStore.replaySingle(_replayer,
1824  _lastSentHaSequenceNumber+1))
1825  {
1826  //++_lastSentHaSequenceNumber;
1827  continue;
1828  }
1829  result = AMPS_E_OK;
1830  version = _replayer._version;
1831  }
1832  #ifdef _WIN32
1833  catch(const DisconnectedException&)
1834  #else
1835  catch(const DisconnectedException& e)
1836  #endif
1837  {
1838  result = _replayer._res;
1839  break;
1840  }
1841  }
1842  result = amps_client_send_with_version(_client,
1843  localMessage.getMessage(),
1844  &version);
1845  ++_lastSentHaSequenceNumber;
1846  }
1847  else
1848  result = amps_client_send_with_version(_client,
1849  localMessage.getMessage(),
1850  &version);
1851  if (result != AMPS_E_OK)
1852  {
1853  if (!isHASubscribe_ && !haSeq &&
1854  localMessage.getMessage() == message.getMessage())
1855  {
1856  localMessage = message.deepCopy();
1857  }
1858  if(_isRetryOnDisconnect)
1859  {
1860  Unlock<Mutex> u(_lock);
1861  result = amps_client_attempt_reconnect(_client, version);
1862  // If this is an HA publish or subscrbie command, it was
1863  // stored first and will have already been replayed by the
1864  // store or sub manager after reconnect, so just return.
1865  if ((isHASubscribe_ || haSeq) &&
1866  result == AMPS_E_RETRY)
1867  {
1868  return (int)version;
1869  }
1870  }
1871  else
1872  {
1873  // retrySend is disabled so throw the error
1874  // from the send as an exception, do not retry.
1875  AMPSException::throwFor(_client, result);
1876  }
1877  }
1878  }
1879  if (result == AMPS_E_RETRY)
1880  {
1881  amps_invoke_waiting_function();
1882  }
1883  }
1884 
1885  if (result != AMPS_E_OK) AMPSException::throwFor(_client, result);
1886  return (int)version;
1887  }
1888 
1889  void addMessageHandler(const Field& commandId_,
1890  const AMPS::MessageHandler& messageHandler_,
1891  unsigned requestedAcks_, bool isSubscribe_)
1892  {
1893  Lock<Mutex> lock(_lock);
1894  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1895  0, isSubscribe_);
1896  }
1897 
1898  bool removeMessageHandler(const Field& commandId_)
1899  {
1900  Lock<Mutex> lock(_lock);
1901  return _routes.removeRoute(commandId_);
1902  }
1903 
1904  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
1905  {
1906  Field id = message_.getCommandId();
1907  Field subId = message_.getSubscriptionId();
1908  Field qid = message_.getQueryId();
1909  bool isSubscribe = false;
1910  bool isSubscribeOnly = false;
1911  bool replace = false;
1912  unsigned requestedAcks = message_.getAckTypeEnum();
1913  unsigned systemAddedAcks = Message::AckType::None;
1914 
1915  switch(message_.getCommandEnum())
1916  {
1917  case Message::Command::Subscribe:
1918  case Message::Command::DeltaSubscribe:
1919  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos;
1920  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
1921  {
1922  systemAddedAcks |= Message::AckType::Persisted;
1923  }
1924  isSubscribeOnly = true;
1925  // fall through
1926  case Message::Command::SOWAndSubscribe:
1927  case Message::Command::SOWAndDeltaSubscribe:
1928  if (id.empty())
1929  {
1930  id = message_.newCommandId().getCommandId();
1931  }
1932  else
1933  {
1934  while (!replace && id != subId && _routes.hasRoute(id))
1935  {
1936  id = message_.newCommandId().getCommandId();
1937  }
1938  }
1939  if (subId.empty())
1940  {
1941  message_.setSubscriptionId(id);
1942  subId = id;
1943  }
1944  isSubscribe = true;
1945  // fall through
1946  case Message::Command::SOW:
1947  if(id.empty())
1948  {
1949  id = message_.newCommandId().getCommandId();
1950  }
1951  else
1952  {
1953  while (!replace && id != subId && _routes.hasRoute(id))
1954  {
1955  message_.newCommandId();
1956  if (qid == id)
1957  {
1958  qid = message_.getCommandId();
1959  message_.setQueryId(qid);
1960  }
1961  id = message_.getCommandId();
1962  }
1963  }
1964  if (!isSubscribeOnly)
1965  {
1966  if (qid.empty())
1967  {
1968  message_.setQueryID(id);
1969  qid = id;
1970  }
1971  else
1972  {
1973  while (!replace && qid != subId && qid != id
1974  && _routes.hasRoute(qid))
1975  {
1976  qid = message_.newQueryId().getQueryId();
1977  }
1978  }
1979  }
1980  systemAddedAcks |= Message::AckType::Processed;
1981  // for SOW only, we get a completed ack so we know when to remove the handler.
1982  if (!isSubscribeOnly) systemAddedAcks |= Message::AckType::Completed;
1983  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
1984  {
1985  int routesAdded = 0;
1986  Lock<Mutex> l(_lock);
1987  if (!subId.empty() && messageHandler_.isValid())
1988  {
1989  if (!_routes.hasRoute(subId))
1990  {
1991  ++routesAdded;
1992  }
1993  // This can replace a non-subscribe with a matching id
1994  // with a subscription but not another subscription.
1995  _routes.addRoute(subId, messageHandler_, requestedAcks,
1996  systemAddedAcks, isSubscribe);
1997  }
1998  if (!isSubscribeOnly && !qid.empty()
1999  && messageHandler_.isValid() && qid != subId)
2000  {
2001  if (routesAdded == 0)
2002  {
2003  _routes.addRoute(qid, messageHandler_,
2004  requestedAcks, systemAddedAcks, false);
2005  }
2006  else
2007  {
2008  void* data = NULL;
2009  {
2010  Unlock<Mutex> u(_lock);
2011  data = amps_invoke_copy_route_function(
2012  messageHandler_.userData());
2013  }
2014  if (!data)
2015  {
2016  _routes.addRoute(qid, messageHandler_, requestedAcks,
2017  systemAddedAcks, false);
2018  }
2019  else
2020  {
2021  _routes.addRoute(qid,
2022  MessageHandler(messageHandler_.function(),
2023  data),
2024  requestedAcks, systemAddedAcks, false);
2025  }
2026  }
2027  ++routesAdded;
2028  }
2029  if (!id.empty() && messageHandler_.isValid()
2030  && requestedAcks & ~Message::AckType::Persisted
2031  && id != subId && id != qid)
2032  {
2033  if (routesAdded == 0)
2034  {
2035  _routes.addRoute(id, messageHandler_, requestedAcks,
2036  systemAddedAcks, false);
2037  }
2038  else
2039  {
2040  void* data = NULL;
2041  {
2042  Unlock<Mutex> u(_lock);
2043  data = amps_invoke_copy_route_function(
2044  messageHandler_.userData());
2045  }
2046  if (!data)
2047  {
2048  _routes.addRoute(id, messageHandler_, requestedAcks,
2049  systemAddedAcks, false);
2050  }
2051  else
2052  {
2053  _routes.addRoute(id,
2054  MessageHandler(messageHandler_.function(),
2055  data),
2056  requestedAcks,
2057  systemAddedAcks, false);
2058  }
2059  }
2060  ++routesAdded;
2061  }
2062  try
2063  {
2064  // We aren't adding to subscription manager, so this isn't
2065  // an HA subscribe.
2066  syncAckProcessing(timeout_, message_, 0, false);
2067  message_.setAckTypeEnum(requestedAcks);
2068  }
2069  catch (...)
2070  {
2071  _routes.removeRoute(message_.getQueryID());
2072  _routes.removeRoute(message_.getSubscriptionId());
2073  _routes.removeRoute(id);
2074  message_.setAckTypeEnum(requestedAcks);
2075  throw;
2076  }
2077  }
2078  break;
2079  // These are valid commands that are used as-is
2080  case Message::Command::Unsubscribe:
2081  case Message::Command::Heartbeat:
2082  case Message::Command::Logon:
2083  case Message::Command::StartTimer:
2084  case Message::Command::StopTimer:
2085  case Message::Command::DeltaPublish:
2086  case Message::Command::Publish:
2087  case Message::Command::SOWDelete:
2088  {
2089  Lock<Mutex> l(_lock);
2090  // if an ack is requested, it'll need a command ID.
2091  if (message_.getAckTypeEnum() != Message::AckType::None)
2092  {
2093  if (id.empty())
2094  {
2095  message_.newCommandId();
2096  id = message_.getCommandId();
2097  }
2098  if (messageHandler_.isValid())
2099  {
2100  _routes.addRoute(id, messageHandler_, requestedAcks,
2101  Message::AckType::None, false);
2102  }
2103  }
2104  _send(message_);
2105  }
2106  break;
2107  // These are things that shouldn't be sent (not meaningful)
2108  case Message::Command::GroupBegin:
2109  case Message::Command::GroupEnd:
2110  case Message::Command::OOF:
2111  case Message::Command::Ack:
2112  case Message::Command::Unknown:
2113  default:
2114  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2115  }
2116  message_.setAckTypeEnum(requestedAcks);
2117  return id;
2118  }
2119 
2120  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2121  {
2122  Lock<Mutex> l(_lock);
2123  _disconnectHandler = disconnectHandler;
2124  }
2125 
2126  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2127  {
2128  switch (command_[0])
2129  {
2130 #if 0 // Not currently implemented to avoid an extra branch in delivery
2131  case 'p':
2132  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2133  break;
2134  case 's':
2135  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2136  break;
2137 #endif
2138  case 'h':
2139  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2140  break;
2141 #if 0 // Not currently implemented to avoid an extra branch in delivery
2142  case 'g':
2143  if (command_[6] == 'b')
2144  {
2145  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2146  }
2147  else if (command_[6] == 'e')
2148  {
2149  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2150  }
2151  else
2152  {
2153  std::ostringstream os;
2154  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2155  throw CommandException(os.str());
2156  }
2157  break;
2158  case 'o':
2159  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2160  break;
2161 #endif
2162  case 'a':
2163  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2164  break;
2165  case 'l':
2166  case 'L':
2167  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2168  break;
2169  case 'd':
2170  case 'D':
2171  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2172  break;
2173  default:
2174  std::ostringstream os;
2175  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2176  throw CommandException(os.str());
2177  break;
2178  }
2179  }
2180 
2181  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2182  {
2183  switch (command_)
2184  {
2185 #if 0 // Not currently implemented to avoid an extra branch in delivery
2186  case Message::Command::Publish:
2187  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2188  break;
2189  case Message::Command::SOW:
2190  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2191  break;
2192 #endif
2193  case Message::Command::Heartbeat:
2194  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2195  break;
2196 #if 0 // Not currently implemented to avoid an extra branch in delivery
2197  case Message::Command::GroupBegin:
2198  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2199  break;
2200  case Message::Command::GroupEnd:
2201  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2202  break;
2203  case Message::Command::OOF:
2204  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2205  break;
2206 #endif
2207  case Message::Command::Ack:
2208  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2209  break;
2210  default:
2211  unsigned bits = 0;
2212  unsigned command = command_;
2213  while (command > 0) { ++bits; command >>= 1; }
2214  char errBuf[128];
2215  AMPS_snprintf(errBuf, sizeof(errBuf),
2216  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2217  CommandConstants<0>::Lengths[bits],
2218  CommandConstants<0>::Values[bits]);
2219  throw CommandException(errBuf);
2220  break;
2221  }
2222  }
2223 
2224  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2225  {
2226  _globalCommandTypeHandlers[handlerType_] = handler_;
2227  }
2228 
2229  void setFailedWriteHandler(FailedWriteHandler* handler_)
2230  {
2231  Lock<Mutex> l(_lock);
2232  _failedWriteHandler.reset(handler_);
2233  }
2234 
2235  void setPublishStore(const Store& publishStore_)
2236  {
2237  Lock<Mutex> l(_lock);
2238  if (_connected) throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2239  _publishStore = publishStore_;
2240  }
2241 
2242  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2243  {
2244  Lock<Mutex> l(_lock);
2245  if (_connected) throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2246  _bookmarkStore = bookmarkStore_;
2247  }
2248 
2249  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2250  {
2251  Lock<Mutex> l(_lock);
2252  _subscriptionManager.reset(subscriptionManager_);
2253  }
2254 
2255  SubscriptionManager* getSubscriptionManager() const
2256  {
2257  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2258  }
2259 
2260  DisconnectHandler getDisconnectHandler() const
2261  {
2262  return _disconnectHandler;
2263  }
2264 
2265  MessageHandler getDuplicateMessageHandler() const
2266  {
2267  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2268  }
2269 
2270  FailedWriteHandler* getFailedWriteHandler() const
2271  {
2272  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2273  }
2274 
2275  Store getPublishStore() const
2276  {
2277  return _publishStore;
2278  }
2279 
2280  BookmarkStore getBookmarkStore() const
2281  {
2282  return _bookmarkStore;
2283  }
2284 
2285  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,size_t dataLen_)
2286  {
2287  if (!_publishStore.isValid())
2288  {
2289  Lock<Mutex> l(_lock);
2290  _publishMessage.assignTopic(topic_, topicLen_);
2291  _publishMessage.assignData(data_, dataLen_);
2292  _send(_publishMessage);
2293  return 0;
2294  }
2295  else
2296  {
2297  if (!publishStoreMessage)
2298  {
2299  publishStoreMessage = new Message();
2300  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2301  }
2302  publishStoreMessage->reset();
2303  publishStoreMessage->setCommandEnum(Message::Command::Publish);
2304  return _publish(topic_, topicLen_, data_, dataLen_);
2305  }
2306  }
2307 
2308  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2309  size_t dataLen_, unsigned long expiration_)
2310  {
2311  if (!_publishStore.isValid())
2312  {
2313  Lock<Mutex> l(_lock);
2314  _publishMessage.assignTopic(topic_, topicLen_);
2315  _publishMessage.assignData(data_, dataLen_);
2316  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2317  size_t pos = convertToCharArray(exprBuf, expiration_);
2318  _publishMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2319  _send(_publishMessage);
2320  _publishMessage.assignExpiration(NULL, 0);
2321  return 0;
2322  }
2323  else
2324  {
2325  if (!publishStoreMessage)
2326  {
2327  publishStoreMessage = new Message();
2328  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2329  }
2330  publishStoreMessage->reset();
2331  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2332  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2333  publishStoreMessage->setCommandEnum(Message::Command::Publish)
2334  .assignExpiration(exprBuf+exprPos,
2335  AMPS_NUMBER_BUFFER_LEN-exprPos);
2336  return _publish(topic_, topicLen_, data_, dataLen_);
2337  }
2338  }
2339 
2340  class FlushAckHandler : ConnectionStateListener
2341  {
2342  private:
2343  ClientImpl* _pClient;
2344  Field _cmdId;
2345  volatile bool _acked;
2346  volatile bool _disconnected;
2347  public:
2348  FlushAckHandler(ClientImpl* pClient_)
2349  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2350  {
2351  pClient_->addConnectionStateListener(this);
2352  }
2353  ~FlushAckHandler()
2354  {
2355  _pClient->removeConnectionStateListener(this);
2356  _pClient->removeMessageHandler(_cmdId);
2357  _cmdId.clear();
2358  }
2359  void setCommandId(const Field& cmdId_)
2360  {
2361  _cmdId.deepCopy(cmdId_);
2362  }
2363  void invoke(const Message&)
2364  {
2365  _acked = true;
2366  }
2367  void connectionStateChanged(State state_)
2368  {
2369  if (state_ <= Shutdown)
2370  {
2371  _disconnected = true;
2372  }
2373  }
2374  bool acked()
2375  {
2376  return _acked;
2377  }
2378  bool done()
2379  {
2380  return _acked || _disconnected;
2381  }
2382  };
2383 
2384  void publishFlush(long timeout_, unsigned ackType_)
2385  {
2386  static const char* processed = "processed";
2387  static const size_t processedLen = strlen(processed);
2388  static const char* persisted = "persisted";
2389  static const size_t persistedLen = strlen(persisted);
2390  static const char* flush = "flush";
2391  static const size_t flushLen = strlen(flush);
2392  static VersionInfo minPersisted("5.3.3.0");
2393  static VersionInfo minFlush("4");
2394  if (ackType_ != Message::AckType::Processed
2395  && ackType_ != Message::AckType::Persisted)
2396  {
2397  throw new CommandException("Flush can only be used with processed or persisted acks.");
2398  }
2399  FlushAckHandler flushHandler(this);
2400  if (_serverVersion >= minFlush)
2401  {
2402  Lock<Mutex> l(_lock);
2403  if (!_connected)
2404  throw DisconnectedException("Not cconnected trying to flush");
2405  _message.reset();
2406  _message.newCommandId();
2407  _message.assignCommand(flush, flushLen);
2408  if (_serverVersion < minPersisted
2409  || ackType_ == Message::AckType::Processed)
2410  {
2411  _message.assignAckType(processed, processedLen);
2412  }
2413  else
2414  {
2415  _message.assignAckType(persisted, persistedLen);
2416  }
2417  flushHandler.setCommandId(_message.getCommandId());
2418  addMessageHandler(_message.getCommandId(),
2419  std::bind(&FlushAckHandler::invoke,
2420  std::ref(flushHandler),
2421  std::placeholders::_1),
2422  ackType_, false);
2423  if (_send(_message) == -1)
2424  throw DisconnectedException("Disconnected trying to flush");
2425  }
2426  if (_publishStore.isValid())
2427  {
2428  try
2429  {
2430  _publishStore.flush(timeout_);
2431  }
2432  catch (const AMPSException& ex)
2433  {
2434  AMPS_UNHANDLED_EXCEPTION(ex);
2435  throw;
2436  }
2437  }
2438  else if (_serverVersion < minFlush)
2439  {
2440  if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2441  else { AMPS_USLEEP(1000 * 1000); }
2442  return;
2443  }
2444  if (timeout_)
2445  {
2446  Timer timer((double)timeout_);
2447  timer.start();
2448  while (!timer.check() && !flushHandler.done())
2449  {
2450  AMPS_USLEEP(10000);
2451  amps_invoke_waiting_function();
2452  }
2453  }
2454  else
2455  {
2456  while (!flushHandler.done())
2457  {
2458  AMPS_USLEEP(10000);
2459  amps_invoke_waiting_function();
2460  }
2461  }
2462  // No response or disconnect in timeout interval
2463  if (!flushHandler.done())
2464  throw TimedOutException("Timed out waiting for flush");
2465  // We got disconnected and there is no publish store
2466  if (!flushHandler.acked() && !_publishStore.isValid())
2467  throw DisconnectedException("Disconnected waiting for flush");
2468  }
2469 
2470  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2471  const char* data_, size_t dataLength_)
2472  {
2473  if (!_publishStore.isValid())
2474  {
2475  Lock<Mutex> l(_lock);
2476  _deltaMessage.assignTopic(topic_, topicLength_);
2477  _deltaMessage.assignData(data_, dataLength_);
2478  _send(_deltaMessage);
2479  return 0;
2480  }
2481  else
2482  {
2483  if (!publishStoreMessage)
2484  {
2485  publishStoreMessage = new Message();
2486  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2487  }
2488  publishStoreMessage->reset();
2489  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2490  return _publish(topic_, topicLength_, data_, dataLength_);
2491  }
2492  }
2493 
2494  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2495  const char* data_, size_t dataLength_,
2496  unsigned long expiration_)
2497  {
2498  if (!_publishStore.isValid())
2499  {
2500  Lock<Mutex> l(_lock);
2501  _deltaMessage.assignTopic(topic_, topicLength_);
2502  _deltaMessage.assignData(data_, dataLength_);
2503  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2504  size_t pos = convertToCharArray(exprBuf, expiration_);
2505  _deltaMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2506  _send(_deltaMessage);
2507  _deltaMessage.assignExpiration(NULL, 0);
2508  return 0;
2509  }
2510  else
2511  {
2512  if (!publishStoreMessage)
2513  {
2514  publishStoreMessage = new Message();
2515  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2516  }
2517  publishStoreMessage->reset();
2518  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2519  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2520  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2521  .assignExpiration(exprBuf+exprPos,
2522  AMPS_NUMBER_BUFFER_LEN-exprPos);
2523  return _publish(topic_, topicLength_, data_, dataLength_);
2524  }
2525  }
2526 
2527  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
2528  const char* data_, size_t dataLength_)
2529  {
2530  publishStoreMessage->assignTopic(topic_, topicLength_)
2531  .setAckTypeEnum(Message::AckType::Persisted)
2532  .assignData(data_, dataLength_);
2533  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
2534  char buf[AMPS_NUMBER_BUFFER_LEN];
2535  size_t pos = convertToCharArray(buf, haSequenceNumber);
2536  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2537  {
2538  Lock<Mutex> l(_lock);
2539  _send(*publishStoreMessage, haSequenceNumber);
2540  }
2541  return haSequenceNumber;
2542  }
2543 
2544  virtual std::string logon(long timeout_, Authenticator& authenticator_,
2545  const char* options_ = NULL)
2546  {
2547  Lock<Mutex> l(_lock);
2548  return _logon(timeout_, authenticator_, options_);
2549  }
2550 
2551  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
2552  const char* options_ = NULL)
2553  {
2554  AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2555  _message.reset();
2556  _message.setCommandEnum(Message::Command::Logon);
2557  _message.newCommandId();
2558  std::string newCommandId = _message.getCommandId();
2559  _message.setClientName(_name);
2560 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
2561  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
2562  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2563 #endif
2564  URI uri(_lastUri);
2565  if(uri.user().size()) _message.setUserId(uri.user());
2566  if(uri.password().size()) _message.setPassword(uri.password());
2567  if(uri.protocol() == "amps" && uri.messageType().size())
2568  {
2569  _message.setMessageType(uri.messageType());
2570  }
2571  if(uri.isTrue("pretty"))
2572  {
2573  _message.setOptions("pretty");
2574  }
2575 
2576  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
2577  if (!_logonCorrelationData.empty())
2578  {
2579  _message.assignCorrelationId(_logonCorrelationData);
2580  }
2581  if (options_)
2582  {
2583  _message.setOptions(options_);
2584  }
2585  _username = _message.getUserId();
2586  try
2587  {
2588  while(true)
2589  {
2590  _message.setAckTypeEnum(Message::AckType::Processed);
2591  AckResponse ack = syncAckProcessing(timeout_, _message);
2592  if (ack.status() == "retry")
2593  {
2594  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
2595  _username = ack.username();
2596  _message.setUserId(_username);
2597  }
2598  else
2599  {
2600  authenticator_.completed(ack.username(), ack.password(), ack.reason());
2601  break;
2602  }
2603  }
2604  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2605 
2606  // Now re-send the heartbeat command if configured
2607  _sendHeartbeat();
2608  }
2609  catch(const AMPSException& ex)
2610  {
2611  AMPS_UNHANDLED_EXCEPTION(ex);
2612  throw;
2613  }
2614  catch(...)
2615  {
2616  throw;
2617  }
2618 
2619  if (_publishStore.isValid())
2620  {
2621  try
2622  {
2623  _publishStore.replay(_replayer);
2624  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2625  }
2626  catch(const StoreException& ex)
2627  {
2628  std::ostringstream os;
2629  os << "A local store exception occurred while logging on."
2630  << ex.toString();
2631  throw ConnectionException(os.str());
2632  }
2633  catch(const AMPSException& ex)
2634  {
2635  AMPS_UNHANDLED_EXCEPTION(ex);
2636  throw ex;
2637  }
2638  catch(const std::exception& ex)
2639  {
2640  AMPS_UNHANDLED_EXCEPTION(ex);
2641  throw ex;
2642  }
2643  catch(...)
2644  {
2645  throw;
2646  }
2647  }
2648  return newCommandId;
2649  }
2650 
2651  std::string subscribe(const MessageHandler& messageHandler_,
2652  const std::string& topic_,
2653  long timeout_,
2654  const std::string& filter_,
2655  const std::string& bookmark_,
2656  const std::string& options_,
2657  const std::string& subId_,
2658  bool isHASubscribe_ = true)
2659  {
2660  isHASubscribe_ &= (bool)_subscriptionManager;
2661  Lock<Mutex> l(_lock);
2662  _message.reset();
2663  _message.setCommandEnum(Message::Command::Subscribe);
2664  _message.newCommandId();
2665  std::string subId(subId_);
2666  if (subId.empty())
2667  {
2668  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2669  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
2670 
2671  subId = _message.getCommandId();
2672  }
2673  _message.setSubscriptionId(subId);
2674  // we need to deep copy this before sending the message; while we are
2675  // waiting for a response, the fields in _message may get blown away for
2676  // other operations.
2677  AMPS::Message::Field subIdField(subId);
2678  unsigned ackTypes = Message::AckType::Processed;
2679 
2680  if (!bookmark_.empty() && _bookmarkStore.isValid())
2681  {
2682  ackTypes |= Message::AckType::Persisted;
2683  }
2684  _message.setTopic(topic_);
2685 
2686  if (filter_.length()) _message.setFilter(filter_);
2687  if (bookmark_.length())
2688  {
2689  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2690  {
2691  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2692  _message.setBookmark(mostRecent);
2693  }
2694  else
2695  {
2696  _message.setBookmark(bookmark_);
2697  if (_bookmarkStore.isValid())
2698  {
2699  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2700  bookmark_ != AMPS_BOOKMARK_EPOCH)
2701  {
2702  _bookmarkStore.log(_message);
2703  _bookmarkStore.discard(_message);
2704  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2705  }
2706  }
2707  }
2708  }
2709  if (options_.length()) _message.setOptions(options_);
2710 
2711  Message message = _message;
2712  if (isHASubscribe_)
2713  {
2714  message = _message.deepCopy();
2715  Unlock<Mutex> u(_lock);
2716  _subscriptionManager->subscribe(messageHandler_, message,
2717  Message::AckType::None);
2718  if (_badTimeToHASubscribe) return subId;
2719  }
2720  if (!_routes.hasRoute(_message.getSubscriptionId()))
2721  {
2722  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2723  Message::AckType::None, ackTypes, true);
2724  }
2725  message.setAckTypeEnum(ackTypes);
2726  if (!options_.empty()) message.setOptions(options_);
2727  try
2728  {
2729  syncAckProcessing(timeout_, message, isHASubscribe_);
2730  }
2731  catch (const DisconnectedException&)
2732  {
2733  if (!isHASubscribe_)
2734  {
2735  _routes.removeRoute(subIdField);
2736  throw;
2737  }
2738  else
2739  {
2740  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2741  throw;
2742  }
2743  }
2744  catch (const TimedOutException&)
2745  {
2746  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2747  throw;
2748  }
2749  catch (...)
2750  {
2751  if (isHASubscribe_)
2752  {
2753  // Have to unlock before calling into sub manager to avoid deadlock
2754  Unlock<Mutex> unlock(_lock);
2755  _subscriptionManager->unsubscribe(subIdField);
2756  }
2757  _routes.removeRoute(subIdField);
2758  throw;
2759  }
2760 
2761  return subId;
2762  }
2763  std::string deltaSubscribe(const MessageHandler& messageHandler_,
2764  const std::string& topic_,
2765  long timeout_,
2766  const std::string& filter_,
2767  const std::string& bookmark_,
2768  const std::string& options_,
2769  const std::string& subId_ = "",
2770  bool isHASubscribe_ = true)
2771  {
2772  isHASubscribe_ &= (bool)_subscriptionManager;
2773  Lock<Mutex> l(_lock);
2774  _message.reset();
2775  _message.setCommandEnum(Message::Command::DeltaSubscribe);
2776  _message.newCommandId();
2777  std::string subId(subId_);
2778  if (subId.empty())
2779  {
2780  subId = _message.getCommandId();
2781  }
2782  _message.setSubscriptionId(subId);
2783  // we need to deep copy this before sending the message; while we are
2784  // waiting for a response, the fields in _message may get blown away for
2785  // other operations.
2786  AMPS::Message::Field subIdField(subId);
2787  unsigned ackTypes = Message::AckType::Processed;
2788 
2789  if (!bookmark_.empty() && _bookmarkStore.isValid())
2790  {
2791  ackTypes |= Message::AckType::Persisted;
2792  }
2793  _message.setTopic(topic_);
2794  if (filter_.length()) _message.setFilter(filter_);
2795  if (bookmark_.length())
2796  {
2797  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2798  {
2799  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2800  _message.setBookmark(mostRecent);
2801  }
2802  else
2803  {
2804  _message.setBookmark(bookmark_);
2805  if (_bookmarkStore.isValid())
2806  {
2807  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2808  bookmark_ != AMPS_BOOKMARK_EPOCH)
2809  {
2810  _bookmarkStore.log(_message);
2811  _bookmarkStore.discard(_message);
2812  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2813  }
2814  }
2815  }
2816  }
2817  if (options_.length()) _message.setOptions(options_);
2818  Message message = _message;
2819  if (isHASubscribe_)
2820  {
2821  message = _message.deepCopy();
2822  Unlock<Mutex> u(_lock);
2823  _subscriptionManager->subscribe(messageHandler_, message,
2824  Message::AckType::None);
2825  if (_badTimeToHASubscribe) return subId;
2826  }
2827  if (!_routes.hasRoute(_message.getSubscriptionId()))
2828  {
2829  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2830  Message::AckType::None, ackTypes, true);
2831  }
2832  message.setAckTypeEnum(ackTypes);
2833  if (!options_.empty()) message.setOptions(options_);
2834  try
2835  {
2836  syncAckProcessing(timeout_, message, isHASubscribe_);
2837  }
2838  catch (const DisconnectedException&)
2839  {
2840  if (!isHASubscribe_)
2841  {
2842  _routes.removeRoute(subIdField);
2843  throw;
2844  }
2845  }
2846  catch (const TimedOutException&)
2847  {
2848  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2849  throw;
2850  }
2851  catch (...)
2852  {
2853  if (isHASubscribe_)
2854  {
2855  // Have to unlock before calling into sub manager to avoid deadlock
2856  Unlock<Mutex> unlock(_lock);
2857  _subscriptionManager->unsubscribe(subIdField);
2858  }
2859  _routes.removeRoute(subIdField);
2860  throw;
2861  }
2862  return subId;
2863  }
2864 
2865  void unsubscribe(const std::string& id)
2866  {
2867  Lock<Mutex> l(_lock);
2868  unsubscribeInternal(id);
2869  }
2870 
2871  void unsubscribe(void)
2872  {
2873  if (_subscriptionManager)
2874  {
2875  _subscriptionManager->clear();
2876  }
2877  {
2878  _routes.unsubscribeAll();
2879  Lock<Mutex> l(_lock);
2880  _message.reset();
2881  _message.setCommandEnum(Message::Command::Unsubscribe);
2882  _message.newCommandId();
2883  _message.setSubscriptionId("all");
2884  _sendWithoutRetry(_message);
2885  }
2886  deferredExecution(&amps_noOpFn, NULL);
2887  }
2888 
2889  std::string sow(const MessageHandler& messageHandler_,
2890  const std::string& topic_,
2891  const std::string& filter_ = "",
2892  const std::string& orderBy_ = "",
2893  const std::string& bookmark_ = "",
2894  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2895  int topN_ = AMPS_DEFAULT_TOP_N,
2896  const std::string& options_ = "",
2897  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2898  {
2899  Lock<Mutex> l(_lock);
2900  _message.reset();
2901  _message.setCommandEnum(Message::Command::SOW);
2902  _message.newCommandId();
2903  // need to keep our own copy of the command ID.
2904  std::string commandId = _message.getCommandId();
2905  _message.setQueryID(_message.getCommandId());
2906  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2907  _message.setAckTypeEnum(ackTypes);
2908  _message.setTopic(topic_);
2909  if (filter_.length()) _message.setFilter(filter_);
2910  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2911  if (bookmark_.length()) _message.setBookmark(bookmark_);
2912  _message.setBatchSize(AMPS::asString(batchSize_));
2913  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2914  if (options_.length()) _message.setOptions(options_);
2915 
2916  _routes.addRoute(_message.getQueryID(), messageHandler_,
2917  Message::AckType::None, ackTypes, false);
2918 
2919  try
2920  {
2921  syncAckProcessing(timeout_, _message);
2922  }
2923  catch (...)
2924  {
2925  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2926  throw;
2927  }
2928 
2929  return commandId;
2930  }
2931 
2932  std::string sow(const MessageHandler& messageHandler_,
2933  const std::string& topic_,
2934  long timeout_,
2935  const std::string& filter_ = "",
2936  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2937  int topN_ = AMPS_DEFAULT_TOP_N)
2938  {
2939  std::string notSet;
2940  return sow(messageHandler_,
2941  topic_,
2942  filter_,
2943  notSet, // orderBy
2944  notSet, // bookmark
2945  batchSize_,
2946  topN_,
2947  notSet,
2948  timeout_);
2949  }
2950 
2951  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
2952  const std::string& topic_,
2953  const std::string& filter_ = "",
2954  const std::string& orderBy_ = "",
2955  const std::string& bookmark_ = "",
2956  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2957  int topN_ = AMPS_DEFAULT_TOP_N,
2958  const std::string& options_ = "",
2959  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2960  bool isHASubscribe_ = true)
2961  {
2962  isHASubscribe_ &= (bool)_subscriptionManager;
2963  Lock<Mutex> l(_lock);
2964  _message.reset();
2965  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
2966  _message.newCommandId();
2967  Field cid = _message.getCommandId();
2968  _message.setQueryID(cid);
2969  _message.setSubscriptionId(cid);
2970  std::string subId = cid;
2971  _message.setTopic(topic_);
2972  if (filter_.length()) _message.setFilter(filter_);
2973  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2974  if (bookmark_.length()) _message.setBookmark(bookmark_);
2975  _message.setBatchSize(AMPS::asString(batchSize_));
2976  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2977  if (options_.length()) _message.setOptions(options_);
2978 
2979  Message message = _message;
2980  if (isHASubscribe_)
2981  {
2982  message = _message.deepCopy();
2983  Unlock<Mutex> u(_lock);
2984  _subscriptionManager->subscribe(messageHandler_, message,
2985  Message::AckType::None);
2986  if (_badTimeToHASubscribe) return subId;
2987  }
2988  _routes.addRoute(cid, messageHandler_,
2989  Message::AckType::None, Message::AckType::Processed, true);
2990  message.setAckTypeEnum(Message::AckType::Processed);
2991  if (!options_.empty()) message.setOptions(options_);
2992  try
2993  {
2994  syncAckProcessing(timeout_, message, isHASubscribe_);
2995  }
2996  catch (const DisconnectedException&)
2997  {
2998  if (!isHASubscribe_)
2999  {
3000  _routes.removeRoute(subId);
3001  throw;
3002  }
3003  }
3004  catch (const TimedOutException&)
3005  {
3006  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3007  throw;
3008  }
3009  catch (...)
3010  {
3011  if (isHASubscribe_)
3012  {
3013  // Have to unlock before calling into sub manager to avoid deadlock
3014  Unlock<Mutex> unlock(_lock);
3015  _subscriptionManager->unsubscribe(cid);
3016  }
3017  _routes.removeRoute(subId);
3018  throw;
3019  }
3020  return subId;
3021  }
3022 
3023  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3024  const std::string& topic_,
3025  long timeout_,
3026  const std::string& filter_ = "",
3027  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3028  bool oofEnabled_ = false,
3029  int topN_ = AMPS_DEFAULT_TOP_N,
3030  bool isHASubscribe_ = true)
3031  {
3032  std::string notSet;
3033  return sowAndSubscribe(messageHandler_,
3034  topic_,
3035  filter_,
3036  notSet, // orderBy
3037  notSet, // bookmark
3038  batchSize_,
3039  topN_,
3040  (oofEnabled_ ? "oof" : ""),
3041  timeout_,
3042  isHASubscribe_);
3043  }
3044 
3045  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3046  const std::string& topic_,
3047  const std::string& filter_ = "",
3048  const std::string& orderBy_ = "",
3049  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3050  int topN_ = AMPS_DEFAULT_TOP_N,
3051  const std::string& options_ = "",
3052  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3053  bool isHASubscribe_ = true)
3054  {
3055  isHASubscribe_ &= (bool)_subscriptionManager;
3056  Lock<Mutex> l(_lock);
3057  _message.reset();
3058  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3059  _message.newCommandId();
3060  _message.setQueryID(_message.getCommandId());
3061  _message.setSubscriptionId(_message.getCommandId());
3062  std::string subId = _message.getSubscriptionId();
3063  _message.setTopic(topic_);
3064  if (filter_.length()) _message.setFilter(filter_);
3065  if (orderBy_.length()) _message.setOrderBy(orderBy_);
3066  _message.setBatchSize(AMPS::asString(batchSize_));
3067  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
3068  if (options_.length()) _message.setOptions(options_);
3069  Message message = _message;
3070  if (isHASubscribe_)
3071  {
3072  message = _message.deepCopy();
3073  Unlock<Mutex> u(_lock);
3074  _subscriptionManager->subscribe(messageHandler_, message,
3075  Message::AckType::None);
3076  if (_badTimeToHASubscribe) return subId;
3077  }
3078  _routes.addRoute(message.getQueryID(), messageHandler_,
3079  Message::AckType::None, Message::AckType::Processed, true);
3080  message.setAckTypeEnum(Message::AckType::Processed);
3081  if (!options_.empty()) message.setOptions(options_);
3082  try
3083  {
3084  syncAckProcessing(timeout_, message, isHASubscribe_);
3085  }
3086  catch (const DisconnectedException&)
3087  {
3088  if (!isHASubscribe_)
3089  {
3090  _routes.removeRoute(subId);
3091  throw;
3092  }
3093  }
3094  catch (const TimedOutException&)
3095  {
3096  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3097  throw;
3098  }
3099  catch (...)
3100  {
3101  if (isHASubscribe_)
3102  {
3103  // Have to unlock before calling into sub manager to avoid deadlock
3104  Unlock<Mutex> unlock(_lock);
3105  _subscriptionManager->unsubscribe(Field(subId));
3106  }
3107  _routes.removeRoute(subId);
3108  throw;
3109  }
3110  return subId;
3111  }
3112 
3113  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3114  const std::string& topic_,
3115  long timeout_,
3116  const std::string& filter_ = "",
3117  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3118  bool oofEnabled_ = false,
3119  bool sendEmpties_ = false,
3120  int topN_ = AMPS_DEFAULT_TOP_N,
3121  bool isHASubscribe_ = true)
3122  {
3123  std::string notSet;
3124  Message::Options options;
3125  if (oofEnabled_) options.setOOF();
3126  if (sendEmpties_ == false) options.setNoEmpties();
3127  return sowAndDeltaSubscribe(messageHandler_,
3128  topic_,
3129  filter_,
3130  notSet, // orderBy
3131  batchSize_,
3132  topN_,
3133  options,
3134  timeout_,
3135  isHASubscribe_);
3136  }
3137 
3138  std::string sowDelete(const MessageHandler& messageHandler_,
3139  const std::string& topic_,
3140  const std::string& filter_,
3141  long timeout_,
3142  Message::Field commandId_ = Message::Field())
3143  {
3144  if (_publishStore.isValid())
3145  {
3146  unsigned ackType = Message::AckType::Processed |
3147  Message::AckType::Stats |
3148  Message::AckType::Persisted;
3149  if (!publishStoreMessage)
3150  {
3151  publishStoreMessage = new Message();
3152  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3153  }
3154  publishStoreMessage->reset();
3155  if (commandId_.empty())
3156  {
3157  publishStoreMessage->newCommandId();
3158  commandId_ = publishStoreMessage->getCommandId();
3159  }
3160  else
3161  {
3162  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3163  }
3164  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3165  .assignSubscriptionId(commandId_.data(), commandId_.len())
3166  .assignQueryID(commandId_.data(), commandId_.len())
3167  .setAckTypeEnum(ackType)
3168  .assignTopic(topic_.c_str(), topic_.length())
3169  .assignFilter(filter_.c_str(), filter_.length());
3170  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3171  char buf[AMPS_NUMBER_BUFFER_LEN];
3172  size_t pos = convertToCharArray(buf, haSequenceNumber);
3173  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3174  {
3175  try
3176  {
3177  Lock<Mutex> l(_lock);
3178  _routes.addRoute(commandId_, messageHandler_,
3179  Message::AckType::Stats,
3180  Message::AckType::Processed|Message::AckType::Persisted,
3181  false);
3182  syncAckProcessing(timeout_, *publishStoreMessage,
3183  haSequenceNumber);
3184  }
3185  catch (const DisconnectedException&)
3186  { // -V565
3187  // Pass - it will get replayed upon reconnect
3188  }
3189  catch (...)
3190  {
3191  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3192  throw;
3193  }
3194  }
3195  return (std::string)commandId_;
3196  }
3197  else
3198  {
3199  Lock<Mutex> l(_lock);
3200  _message.reset();
3201  if (commandId_.empty())
3202  {
3203  _message.newCommandId();
3204  commandId_ = _message.getCommandId();
3205  }
3206  else
3207  {
3208  _message.setCommandId(commandId_.data(), commandId_.len());
3209  }
3210  _message.setCommandEnum(Message::Command::SOWDelete)
3211  .assignSubscriptionId(commandId_.data(), commandId_.len())
3212  .assignQueryID(commandId_.data(), commandId_.len())
3213  .setAckTypeEnum(Message::AckType::Processed |
3214  Message::AckType::Stats)
3215  .assignTopic(topic_.c_str(), topic_.length())
3216  .assignFilter(filter_.c_str(), filter_.length());
3217  _routes.addRoute(commandId_, messageHandler_,
3218  Message::AckType::Stats,
3219  Message::AckType::Processed,
3220  false);
3221  try
3222  {
3223  syncAckProcessing(timeout_, _message);
3224  }
3225  catch (...)
3226  {
3227  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3228  throw;
3229  }
3230  return (std::string)commandId_;
3231  }
3232  }
3233 
3234  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3235  const std::string& topic_,
3236  const std::string& data_,
3237  long timeout_,
3238  Message::Field commandId_ = Message::Field())
3239  {
3240  if (_publishStore.isValid())
3241  {
3242  unsigned ackType = Message::AckType::Processed |
3243  Message::AckType::Stats |
3244  Message::AckType::Persisted;
3245  if (!publishStoreMessage)
3246  {
3247  publishStoreMessage = new Message();
3248  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3249  }
3250  publishStoreMessage->reset();
3251  if (commandId_.empty())
3252  {
3253  publishStoreMessage->newCommandId();
3254  commandId_ = publishStoreMessage->getCommandId();
3255  }
3256  else
3257  {
3258  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3259  }
3260  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3261  .assignSubscriptionId(commandId_.data(), commandId_.len())
3262  .assignQueryID(commandId_.data(), commandId_.len())
3263  .setAckTypeEnum(ackType)
3264  .assignTopic(topic_.c_str(), topic_.length())
3265  .assignData(data_.c_str(), data_.length());
3266  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3267  char buf[AMPS_NUMBER_BUFFER_LEN];
3268  size_t pos = convertToCharArray(buf, haSequenceNumber);
3269  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3270  {
3271  try
3272  {
3273  Lock<Mutex> l(_lock);
3274  _routes.addRoute(commandId_, messageHandler_,
3275  Message::AckType::Stats,
3276  Message::AckType::Processed|Message::AckType::Persisted,
3277  false);
3278  syncAckProcessing(timeout_, *publishStoreMessage,
3279  haSequenceNumber);
3280  }
3281  catch (const DisconnectedException&)
3282  { // -V565
3283  // Pass - it will get replayed upon reconnect
3284  }
3285  catch (...)
3286  {
3287  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3288  throw;
3289  }
3290  }
3291  return (std::string)commandId_;
3292  }
3293  else
3294  {
3295  Lock<Mutex> l(_lock);
3296  _message.reset();
3297  if (commandId_.empty())
3298  {
3299  _message.newCommandId();
3300  commandId_ = _message.getCommandId();
3301  }
3302  else
3303  {
3304  _message.setCommandId(commandId_.data(), commandId_.len());
3305  }
3306  _message.setCommandEnum(Message::Command::SOWDelete)
3307  .assignSubscriptionId(commandId_.data(), commandId_.len())
3308  .assignQueryID(commandId_.data(), commandId_.len())
3309  .setAckTypeEnum(Message::AckType::Processed |
3310  Message::AckType::Stats)
3311  .assignTopic(topic_.c_str(), topic_.length())
3312  .assignData(data_.c_str(), data_.length());
3313  _routes.addRoute(commandId_, messageHandler_,
3314  Message::AckType::Stats,
3315  Message::AckType::Processed,
3316  false);
3317  try
3318  {
3319  syncAckProcessing(timeout_, _message);
3320  }
3321  catch (...)
3322  {
3323  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3324  throw;
3325  }
3326  return (std::string)commandId_;
3327  }
3328  }
3329 
3330  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
3331  const std::string& topic_,
3332  const std::string& keys_,
3333  long timeout_,
3334  Message::Field commandId_ = Message::Field())
3335  {
3336  if (_publishStore.isValid())
3337  {
3338  unsigned ackType = Message::AckType::Processed |
3339  Message::AckType::Stats |
3340  Message::AckType::Persisted;
3341  if (!publishStoreMessage)
3342  {
3343  publishStoreMessage = new Message();
3344  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3345  }
3346  publishStoreMessage->reset();
3347  if (commandId_.empty())
3348  {
3349  publishStoreMessage->newCommandId();
3350  commandId_ = publishStoreMessage->getCommandId();
3351  }
3352  else
3353  {
3354  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3355  }
3356  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3357  .assignSubscriptionId(commandId_.data(), commandId_.len())
3358  .assignQueryID(commandId_.data(), commandId_.len())
3359  .setAckTypeEnum(ackType)
3360  .assignTopic(topic_.c_str(), topic_.length())
3361  .assignSowKeys(keys_.c_str(), keys_.length());
3362  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3363  char buf[AMPS_NUMBER_BUFFER_LEN];
3364  size_t pos = convertToCharArray(buf, haSequenceNumber);
3365  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3366  {
3367  try
3368  {
3369  Lock<Mutex> l(_lock);
3370  _routes.addRoute(commandId_, messageHandler_,
3371  Message::AckType::Stats,
3372  Message::AckType::Processed|Message::AckType::Persisted,
3373  false);
3374  syncAckProcessing(timeout_, *publishStoreMessage,
3375  haSequenceNumber);
3376  }
3377  catch (const DisconnectedException&)
3378  { // -V565
3379  // Pass - it will get replayed upon reconnect
3380  }
3381  catch (...)
3382  {
3383  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3384  throw;
3385  }
3386  }
3387  return (std::string)commandId_;
3388  }
3389  else
3390  {
3391  Lock<Mutex> l(_lock);
3392  _message.reset();
3393  if (commandId_.empty())
3394  {
3395  _message.newCommandId();
3396  commandId_ = _message.getCommandId();
3397  }
3398  else
3399  {
3400  _message.setCommandId(commandId_.data(), commandId_.len());
3401  }
3402  _message.setCommandEnum(Message::Command::SOWDelete)
3403  .assignSubscriptionId(commandId_.data(), commandId_.len())
3404  .assignQueryID(commandId_.data(), commandId_.len())
3405  .setAckTypeEnum(Message::AckType::Processed |
3406  Message::AckType::Stats)
3407  .assignTopic(topic_.c_str(), topic_.length())
3408  .assignSowKeys(keys_.c_str(), keys_.length());
3409  _routes.addRoute(commandId_, messageHandler_,
3410  Message::AckType::Stats,
3411  Message::AckType::Processed,
3412  false);
3413  try
3414  {
3415  syncAckProcessing(timeout_, _message);
3416  }
3417  catch (...)
3418  {
3419  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3420  throw;
3421  }
3422  return (std::string)commandId_;
3423  }
3424  }
3425 
3426  void startTimer(void)
3427  {
3428  if (_serverVersion >= "5.3.2.0")
3429  {
3430  throw CommandException("The start_timer command is deprecated.");
3431  }
3432  Lock<Mutex> l(_lock);
3433  _message.reset();
3434  _message.setCommandEnum(Message::Command::StartTimer);
3435 
3436  _send(_message);
3437  }
3438 
3439  std::string stopTimer(MessageHandler messageHandler_)
3440  {
3441  if (_serverVersion >= "5.3.2.0")
3442  {
3443  throw CommandException("The stop_timer command is deprecated.");
3444  }
3445  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
3446  }
3447 
3448  amps_handle getHandle(void)
3449  {
3450  return _client;
3451  }
3452 
3460  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
3461  {
3462  _pExceptionListener = pListener_;
3463  _exceptionListener = _pExceptionListener.get();
3464  }
3465 
3466  void setExceptionListener(const ExceptionListener& listener_)
3467  {
3468  _exceptionListener = &listener_;
3469  }
3470 
3471  const ExceptionListener& getExceptionListener(void) const
3472  {
3473  return *_exceptionListener;
3474  }
3475 
3476  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
3477  {
3478  if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3479  {
3480  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
3481  }
3482  Lock<Mutex> l(_lock);
3483  if(_heartbeatInterval != heartbeatInterval_ ||
3484  _readTimeout != readTimeout_)
3485  {
3486  _heartbeatInterval = heartbeatInterval_;
3487  _readTimeout = readTimeout_;
3488  _sendHeartbeat();
3489  }
3490  }
3491 
3492  void _sendHeartbeat(void)
3493  {
3494  if (_connected && _heartbeatInterval != 0)
3495  {
3496  std::ostringstream options;
3497  options << "start," << _heartbeatInterval;
3498  Message startMessage = Message()
3499  .setCommandEnum(Message::Command::Heartbeat)
3500  .setOptions(options.str());
3501 
3502  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3503  _heartbeatTimer.start();
3504  try
3505  {
3506  _sendWithoutRetry(startMessage);
3507  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3508  }
3509  catch(ConnectionException &ex_)
3510  {
3511  // If we are disconnected when we attempt to send, that's OK;
3512  // we'll send this message after we re-connect (if we do).
3513  AMPS_UNHANDLED_EXCEPTION(ex_);
3514  }
3515  }
3516  amps_result result = AMPS_E_OK;
3517  if(_readTimeout && _connected)
3518  {
3519  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
3520  }
3521  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
3522  {
3523  AMPSException::throwFor(_client, result);
3524  }
3525  }
3526 
3527  void addConnectionStateListener(ConnectionStateListener *listener_)
3528  {
3529  Lock<Mutex> lock(_lock);
3530  _connectionStateListeners.insert(listener_);
3531  }
3532 
3533  void removeConnectionStateListener(ConnectionStateListener *listener_)
3534  {
3535  Lock<Mutex> lock(_lock);
3536  _connectionStateListeners.erase(listener_);
3537  }
3538 
3539  void clearConnectionStateListeners()
3540  {
3541  Lock<Mutex> lock(_lock);
3542  _connectionStateListeners.clear();
3543  }
3544 
3545  void _registerHandler(Command& command_, Message::Field& cid_,
3546  MessageHandler& handler_, unsigned requestedAcks_,
3547  unsigned systemAddedAcks_, bool isSubscribe_)
3548  {
3549  Message message = command_.getMessage();
3550  Message::Command::Type commandType = message.getCommandEnum();
3551  Message::Field subid = message.getSubscriptionId();
3552  Message::Field qid = message.getQueryID();
3553  // If we have an id, we're good, even if it's an existing route
3554  bool added = qid.len() || subid.len() || cid_.len();
3555  int addedCount = 0;
3556  if (subid.len() > 0)
3557  {
3558  // This can replace a non-subscribe with a matching id
3559  // with a subscription but not another subscription.
3560  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
3561  systemAddedAcks_, isSubscribe_);
3562  }
3563  if (qid.len() > 0 && qid != subid)
3564  {
3565  while (_routes.hasRoute(qid))
3566  {
3567  message.newQueryId();
3568  if (cid_ == qid)
3569  cid_ = message.getQueryId();
3570  qid = message.getQueryId();
3571  }
3572  if (addedCount == 0)
3573  {
3574  _routes.addRoute(qid, handler_, requestedAcks_,
3575  systemAddedAcks_, isSubscribe_);
3576  }
3577  else
3578  {
3579  void* data = NULL;
3580  {
3581  Unlock<Mutex> u(_lock);
3582  data = amps_invoke_copy_route_function(handler_.userData());
3583  }
3584  if (!data)
3585  {
3586  _routes.addRoute(qid, handler_, requestedAcks_,
3587  systemAddedAcks_, false);
3588  }
3589  else
3590  {
3591  _routes.addRoute(qid,
3592  MessageHandler(handler_.function(),
3593  data),
3594  requestedAcks_,
3595  systemAddedAcks_, false);
3596  }
3597  }
3598  ++addedCount;
3599  }
3600  if (cid_.len() > 0 && cid_ != qid && cid_ != subid
3601  && requestedAcks_ & ~Message::AckType::Persisted)
3602  {
3603  while (_routes.hasRoute(cid_))
3604  {
3605  cid_ = message.newCommandId().getCommandId();
3606  }
3607  if (addedCount == 0)
3608  {
3609  _routes.addRoute(cid_, handler_, requestedAcks_,
3610  systemAddedAcks_, false);
3611  }
3612  else
3613  {
3614  void* data = NULL;
3615  {
3616  Unlock<Mutex> u(_lock);
3617  data = amps_invoke_copy_route_function(handler_.userData());
3618  }
3619  if (!data)
3620  {
3621  _routes.addRoute(cid_, handler_, requestedAcks_,
3622  systemAddedAcks_, false);
3623  }
3624  else
3625  {
3626  _routes.addRoute(cid_,
3627  MessageHandler(handler_.function(),
3628  data),
3629  requestedAcks_,
3630  systemAddedAcks_, false);
3631  }
3632  }
3633  }
3634  else if (commandType == Message::Command::Publish ||
3635  commandType == Message::Command::DeltaPublish)
3636  {
3637  cid_ = command_.getMessage().newCommandId().getCommandId();
3638  _routes.addRoute(cid_, handler_, requestedAcks_,
3639  systemAddedAcks_, false);
3640  added=true;
3641  }
3642  if (!added)
3643  {
3644  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
3645  }
3646  }
3647 
3648  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
3649  bool isHASubscribe_ = true)
3650  {
3651  isHASubscribe_ &= (bool)_subscriptionManager;
3652  Message& message = command_.getMessage();
3653  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3654  Message::AckType::Processed : Message::AckType::None;
3655  unsigned requestedAcks = message.getAckTypeEnum();
3656  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
3657  Message::Command::Type commandType = message.getCommandEnum();
3658  if (commandType == Message::Command::SOW
3659  || commandType == Message::Command::SOWAndSubscribe
3660  || commandType == Message::Command::SOWAndDeltaSubscribe
3661  || commandType == Message::Command::StopTimer)
3662  systemAddedAcks |= Message::AckType::Completed;
3663  Message::Field cid = message.getCommandId();
3664  if (handler_.isValid() && cid.empty())
3665  {
3666  cid = message.newCommandId().getCommandId();
3667  }
3668  if (message.getBookmark().len() > 0)
3669  {
3670  if (command_.isSubscribe())
3671  {
3672  Message::Field bookmark = message.getBookmark();
3673  if (_bookmarkStore.isValid())
3674  {
3675  systemAddedAcks |= Message::AckType::Persisted;
3676  if (bookmark == AMPS_BOOKMARK_RECENT)
3677  {
3678  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
3679  }
3680  else if (bookmark != AMPS_BOOKMARK_NOW &&
3681  bookmark != AMPS_BOOKMARK_EPOCH)
3682  {
3683  _bookmarkStore.log(message);
3684  if (!BookmarkRange::isRange(bookmark))
3685  {
3686  _bookmarkStore.discard(message);
3687  _bookmarkStore.persisted(message.getSubscriptionId(),
3688  bookmark);
3689  }
3690  }
3691  }
3692  else if (bookmark == AMPS_BOOKMARK_RECENT)
3693  {
3695  }
3696  }
3697  }
3698  if (isPublishStore)
3699  {
3700  systemAddedAcks |= Message::AckType::Persisted;
3701  }
3702  bool isSubscribe = command_.isSubscribe();
3703  if (handler_.isValid() && !isSubscribe)
3704  {
3705  _registerHandler(command_, cid, handler_,
3706  requestedAcks, systemAddedAcks, isSubscribe);
3707  }
3708  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
3709  if (isPublishStore)
3710  {
3711  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3712  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3713  {
3714  Unlock<Mutex> u(_lock);
3715  haSequenceNumber = _publishStore.store(message);
3716  }
3717  message.setSequence(haSequenceNumber);
3718  try
3719  {
3720  if (useSyncSend)
3721  {
3722  syncAckProcessing((long)command_.getTimeout(), message,
3723  haSequenceNumber);
3724  }
3725  else _send(message, haSequenceNumber);
3726  }
3727  catch (const DisconnectedException&)
3728  { // -V565
3729  // Pass - message will get replayed when reconnected
3730  }
3731  catch (...)
3732  {
3733  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3734  throw;
3735  }
3736  }
3737  else
3738  {
3739  if(isSubscribe)
3740  {
3741  const Message::Field& subId = message.getSubscriptionId();
3742  if (isHASubscribe_)
3743  {
3744  Unlock<Mutex> u(_lock);
3745  _subscriptionManager->subscribe(handler_,
3746  message.deepCopy(),
3747  requestedAcks);
3748  if (_badTimeToHASubscribe)
3749  {
3750  message.setAckTypeEnum(requestedAcks);
3751  return std::string(subId.data(), subId.len());
3752  }
3753  }
3754  if (handler_.isValid())
3755  {
3756  _registerHandler(command_, cid, handler_,
3757  requestedAcks, systemAddedAcks, isSubscribe);
3758  }
3759  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3760  try
3761  {
3762  if (useSyncSend)
3763  {
3764  syncAckProcessing((long)command_.getTimeout(), message,
3765  isHASubscribe_);
3766  }
3767  else _send(message);
3768  }
3769  catch (const DisconnectedException&)
3770  {
3771  if (!isHASubscribe_)
3772  {
3773  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3774  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3775  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3776  message.setAckTypeEnum(requestedAcks);
3777  throw;
3778  }
3779  }
3780  catch (const TimedOutException&)
3781  {
3782  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3783  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3784  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
3785  throw;
3786  }
3787  catch (...)
3788  {
3789  if (isHASubscribe_)
3790  {
3791  // Have to unlock before calling into sub manager to avoid deadlock
3792  Unlock<Mutex> unlock(_lock);
3793  _subscriptionManager->unsubscribe(subId);
3794  }
3795  if (message.getQueryID().len() > 0)
3796  _routes.removeRoute(message.getQueryID());
3797  _routes.removeRoute(cid);
3798  _routes.removeRoute(subId);
3799  throw;
3800  }
3801  if (subId.len() > 0)
3802  {
3803  message.setAckTypeEnum(requestedAcks);
3804  return std::string(subId.data(), subId.len());
3805  }
3806  }
3807  else
3808  {
3809  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3810  try
3811  {
3812  if (useSyncSend)
3813  {
3814  syncAckProcessing((long)(command_.getTimeout()), message);
3815  }
3816  else _send(message);
3817  }
3818  catch (const DisconnectedException&)
3819  {
3820  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3821  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3822  message.setAckTypeEnum(requestedAcks);
3823  throw;
3824  }
3825  catch (...)
3826  {
3827  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3828  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3829  message.setAckTypeEnum(requestedAcks);
3830  throw;
3831  }
3832  }
3833  }
3834  message.setAckTypeEnum(requestedAcks);
3835  return cid;
3836  }
3837 
3838  MessageStream getEmptyMessageStream(void);
3839 
3840  std::string executeAsync(Command& command_, MessageHandler& handler_,
3841  bool isHASubscribe_ = true)
3842  {
3843  Lock<Mutex> lock(_lock);
3844  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3845  }
3846 
3847  // Queue Methods //
3848  void setAutoAck(bool isAutoAckEnabled_)
3849  {
3850  _isAutoAckEnabled = isAutoAckEnabled_;
3851  }
3852  bool getAutoAck(void) const
3853  {
3854  return _isAutoAckEnabled;
3855  }
3856  void setAckBatchSize(const unsigned batchSize_)
3857  {
3858  _ackBatchSize = batchSize_;
3859  if (!_queueAckTimeout)
3860  {
3861  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3862  amps_client_set_idle_time(_client, _queueAckTimeout);
3863  }
3864  }
3865  unsigned getAckBatchSize(void) const
3866  {
3867  return _ackBatchSize;
3868  }
3869  int getAckTimeout(void) const
3870  {
3871  return _queueAckTimeout;
3872  }
3873  void setAckTimeout(const int ackTimeout_)
3874  {
3875  amps_client_set_idle_time(_client,ackTimeout_);
3876  _queueAckTimeout = ackTimeout_;
3877  }
3878  size_t _ack(QueueBookmarks& queueBookmarks_)
3879  {
3880  if(queueBookmarks_._bookmarkCount)
3881  {
3882  if (!publishStoreMessage)
3883  {
3884  publishStoreMessage = new Message();
3885  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3886  }
3887  publishStoreMessage->reset();
3888  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3889  .setTopic(queueBookmarks_._topic)
3890  .setBookmark(queueBookmarks_._data)
3891  .setCommandId("AMPS-queue-ack");
3892  amps_uint64_t haSequenceNumber = 0;
3893  if (_publishStore.isValid())
3894  {
3895  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3896  publishStoreMessage->setAckType("persisted")
3897  .setSequence(haSequenceNumber);
3898  queueBookmarks_._data.erase();
3899  queueBookmarks_._bookmarkCount = 0;
3900  }
3901  _send(*publishStoreMessage, haSequenceNumber);
3902  if (!_publishStore.isValid())
3903  {
3904  queueBookmarks_._data.erase();
3905  queueBookmarks_._bookmarkCount = 0;
3906  }
3907  return 1;
3908  }
3909  return 0;
3910  }
3911  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3912  {
3913  if (_isAutoAckEnabled) return;
3914  _ack(topic_, bookmark_, options_);
3915  }
3916  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3917  {
3918  if (bookmark_.len() == 0) return;
3919  Lock<Mutex> lock(_lock);
3920  if(_ackBatchSize < 2 || options_ != NULL)
3921  {
3922  if (!publishStoreMessage)
3923  {
3924  publishStoreMessage = new Message();
3925  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3926  }
3927  publishStoreMessage->reset();
3928  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3929  .setCommandId("AMPS-queue-ack")
3930  .setTopic(topic_).setBookmark(bookmark_);
3931  if (options_) publishStoreMessage->setOptions(options_);
3932  amps_uint64_t haSequenceNumber = 0;
3933  if (_publishStore.isValid())
3934  {
3935  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3936  publishStoreMessage->setAckType("persisted")
3937  .setSequence(haSequenceNumber);
3938  }
3939  _send(*publishStoreMessage, haSequenceNumber);
3940  return;
3941  }
3942  // have we acked anything for this hash
3943  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(),topic_.len());
3944  TopicHashMap::iterator it = _topicHashMap.find(hash);
3945  if(it == _topicHashMap.end())
3946  {
3947  // add a new one to the map
3948  it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3949  }
3950  QueueBookmarks &queueBookmarks = it->second;
3951  if(queueBookmarks._data.length())
3952  {
3953  queueBookmarks._data.append(",");
3954  }
3955  else
3956  {
3957  queueBookmarks._oldestTime = amps_now();
3958  }
3959  queueBookmarks._data.append(bookmark_);
3960  if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3961  }
3962  void flushAcks(void)
3963  {
3964  size_t sendCount = 0;
3965  if(!_connected)
3966  {
3967  return;
3968  }
3969  else
3970  {
3971  Lock<Mutex> lock(_lock);
3972  typedef TopicHashMap::iterator iterator;
3973  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3974  {
3975  QueueBookmarks& queueBookmarks = it->second;
3976  sendCount += _ack(queueBookmarks);
3977  }
3978  }
3979  if(sendCount && _connected) publishFlush(0, Message::AckType::Processed);
3980  }
3981  // called when there's idle time, to see if we need to flush out any "acks"
3982  void checkQueueAcks(void)
3983  {
3984  if(!_topicHashMap.size()) return;
3985  Lock<Mutex> lock(_lock);
3986  try
3987  {
3988  amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3989  typedef TopicHashMap::iterator iterator;
3990  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3991  {
3992  QueueBookmarks& queueBookmarks = it->second;
3993  if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3994  }
3995  }
3996  catch(std::exception& ex)
3997  {
3998  AMPS_UNHANDLED_EXCEPTION(ex);
3999  }
4000  }
4001 
4002  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
4003  {
4004  Lock<Mutex> lock(_deferredExecutionLock);
4005  _deferredExecutionList.push_back(
4006  DeferredExecutionRequest(func_,userData_));
4007  }
4008 
4009  inline void processDeferredExecutions(void)
4010  {
4011  if(_deferredExecutionList.size())
4012  {
4013  Lock<Mutex> lock(_deferredExecutionLock);
4014  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4015  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4016  for(; it != end; ++it)
4017  {
4018  try
4019  {
4020  it->_func(it->_userData);
4021  }
4022  catch (...)
4023  { // -V565
4024  // Intentionally ignore errors
4025  }
4026  }
4027  _deferredExecutionList.clear();
4028  _routes.invalidateCache();
4029  _routeCache.invalidateCache();
4030  }
4031  }
4032 
4033  bool getRetryOnDisconnect(void) const
4034  {
4035  return _isRetryOnDisconnect;
4036  }
4037 
4038  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4039  {
4040  _isRetryOnDisconnect = isRetryOnDisconnect_;
4041  }
4042 
4043  void setDefaultMaxDepth(unsigned maxDepth_)
4044  {
4045  _defaultMaxDepth = maxDepth_;
4046  }
4047 
4048  unsigned getDefaultMaxDepth(void) const
4049  {
4050  return _defaultMaxDepth;
4051  }
4052 
4053  void setTransportFilterFunction(amps_transport_filter_function filter_,
4054  void* userData_)
4055  {
4056  amps_client_set_transport_filter_function(_client, filter_, userData_);
4057  }
4058 
4059  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4060  void* userData_)
4061  {
4062  amps_client_set_thread_created_callback(_client, callback_, userData_);
4063  }
4064 }; // class ClientImpl
4139 
4141 {
4142  RefHandle<MessageStreamImpl> _body;
4143  public:
4148  class iterator
4149  {
4150  MessageStream* _pStream;
4151  Message _current;
4152  inline void advance(void);
4153 
4154  public:
4155  iterator() // end
4156  :_pStream(NULL)
4157  {;}
4158  iterator(MessageStream* pStream_)
4159  :_pStream(pStream_)
4160  {
4161  advance();
4162  }
4163 
4164  bool operator==(const iterator& rhs)
4165  {
4166  return _pStream == rhs._pStream;
4167  }
4168  bool operator!=(const iterator& rhs)
4169  {
4170  return _pStream != rhs._pStream;
4171  }
4172  void operator++(void) { advance(); }
4173  Message operator*(void) { return _current; }
4174  Message* operator->(void) { return &_current; }
4175  };
4177  bool isValid() const { return _body.isValid(); }
4178 
4182  {
4183  if(!_body.isValid())
4184  {
4185  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4186  }
4187  return iterator(this);
4188  }
4191  // For non-SOW queries, the end is never reached.
4192  iterator end(void) { return iterator(); }
4193  inline MessageStream(void);
4194 
4200  MessageStream timeout(unsigned timeout_);
4201 
4205  MessageStream conflate(void);
4211  MessageStream maxDepth(unsigned maxDepth_);
4214  unsigned getMaxDepth(void) const;
4217  unsigned getDepth(void) const;
4218 
4219  private:
4220  inline MessageStream(const Client& client_);
4221  inline void setSOWOnly(const std::string& commandId_,
4222  const std::string& queryId_ = "");
4223  inline void setSubscription(const std::string& subId_,
4224  const std::string& commandId_ = "",
4225  const std::string& queryId_ = "");
4226  inline void setStatsOnly(const std::string& commandId_,
4227  const std::string& queryId_ = "");
4228  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
4229 
4230  inline operator MessageHandler(void);
4231 
4232  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
4233 
4234  friend class Client;
4235 
4236 };
4237 
4257 class Client // -V553
4258 {
4259 protected:
4260  BorrowRefHandle<ClientImpl> _body;
4261 public:
4262  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4263  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4264  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4265 
4274  Client(const std::string& clientName = "")
4275  : _body(new ClientImpl(clientName), true)
4276  {;}
4277 
4278  Client(ClientImpl* existingClient)
4279  : _body(existingClient, true)
4280  {;}
4281 
4282  Client(ClientImpl* existingClient, bool isRef)
4283  : _body(existingClient, isRef)
4284  {;}
4285 
4286  Client(const Client& rhs) : _body(rhs._body) {;}
4287  virtual ~Client(void) {;}
4288 
4289  Client& operator=(const Client& rhs)
4290  {
4291  _body = rhs._body;
4292  return *this;
4293  }
4294 
4295  bool isValid()
4296  {
4297  return _body.isValid();
4298  }
4299 
4312  void setName(const std::string& name)
4313  {
4314  _body.get().setName(name);
4315  }
4316 
4319  const std::string& getName() const
4320  {
4321  return _body.get().getName();
4322  }
4323 
4327  const std::string& getNameHash() const
4328  {
4329  return _body.get().getNameHash();
4330  }
4331 
4338  void setLogonCorrelationData(const std::string& logonCorrelationData_)
4339  {
4340  _body.get().setLogonCorrelationData(logonCorrelationData_);
4341  }
4342 
4345  const std::string& getLogonCorrelationData() const
4346  {
4347  return _body.get().getLogonCorrelationData();
4348  }
4349 
4358  size_t getServerVersion() const
4359  {
4360  return _body.get().getServerVersion();
4361  }
4362 
4369  VersionInfo getServerVersionInfo() const
4370  {
4371  return _body.get().getServerVersionInfo();
4372  }
4373 
4383  static size_t convertVersionToNumber(const std::string& version_)
4384  {
4385  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4386  }
4387 
4398  static size_t convertVersionToNumber(const char* data_, size_t len_)
4399  {
4400  return AMPS::convertVersionToNumber(data_, len_);
4401  }
4402 
4405  const std::string& getURI() const
4406  {
4407  return _body.get().getURI();
4408  }
4409 
4416 
4418 
4429  void connect(const std::string& uri)
4430  {
4431  _body.get().connect(uri);
4432  }
4433 
4436  void disconnect()
4437  {
4438  _body.get().disconnect();
4439  }
4440 
4454  void send(const Message& message)
4455  {
4456  _body.get().send(message);
4457  }
4458 
4467  void addMessageHandler(const Field& commandId_,
4468  const AMPS::MessageHandler& messageHandler_,
4469  unsigned requestedAcks_, bool isSubscribe_)
4470  {
4471  _body.get().addMessageHandler(commandId_, messageHandler_,
4472  requestedAcks_, isSubscribe_);
4473  }
4474 
4478  bool removeMessageHandler(const Field& commandId_)
4479  {
4480  return _body.get().removeMessageHandler(commandId_);
4481  }
4482 
4506  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
4507  {
4508  return _body.get().send(messageHandler, message, timeout);
4509  }
4510 
4520  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
4521  {
4522  _body.get().setDisconnectHandler(disconnectHandler);
4523  }
4524 
4528  DisconnectHandler getDisconnectHandler(void) const
4529  {
4530  return _body.get().getDisconnectHandler();
4531  }
4532 
4537  virtual ConnectionInfo getConnectionInfo() const
4538  {
4539  return _body.get().getConnectionInfo();
4540  }
4541 
4550  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
4551  {
4552  _body.get().setBookmarkStore(bookmarkStore_);
4553  }
4554 
4559  {
4560  return _body.get().getBookmarkStore();
4561  }
4562 
4567  {
4568  return _body.get().getSubscriptionManager();
4569  }
4570 
4578  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
4579  {
4580  _body.get().setSubscriptionManager(subscriptionManager_);
4581  }
4582 
4602  void setPublishStore(const Store& publishStore_)
4603  {
4604  _body.get().setPublishStore(publishStore_);
4605  }
4606 
4611  {
4612  return _body.get().getPublishStore();
4613  }
4614 
4618  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
4619  {
4620  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4621  duplicateMessageHandler_);
4622  }
4623 
4634  {
4635  return _body.get().getDuplicateMessageHandler();
4636  }
4637 
4648  {
4649  _body.get().setFailedWriteHandler(handler_);
4650  }
4651 
4656  {
4657  return _body.get().getFailedWriteHandler();
4658  }
4659 
4660 
4678  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
4679  {
4680  return _body.get().publish(topic_.c_str(), topic_.length(),
4681  data_.c_str(), data_.length());
4682  }
4683 
4703  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4704  const char* data_, size_t dataLength_)
4705  {
4706  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4707  }
4708 
4727  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
4728  unsigned long expiration_)
4729  {
4730  return _body.get().publish(topic_.c_str(), topic_.length(),
4731  data_.c_str(), data_.length(), expiration_);
4732  }
4733 
4754  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4755  const char* data_, size_t dataLength_,
4756  unsigned long expiration_)
4757  {
4758  return _body.get().publish(topic_, topicLength_,
4759  data_, dataLength_, expiration_);
4760  }
4761 
4800  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
4801  {
4802  _body.get().publishFlush(timeout_, ackType_);
4803  }
4804 
4805 
4821  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
4822  {
4823  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4824  data_.c_str(), data_.length());
4825  }
4826 
4844  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4845  const char* data_, size_t dataLength_)
4846  {
4847  return _body.get().deltaPublish(topic_, topicLength_,
4848  data_, dataLength_);
4849  }
4850 
4867  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
4868  unsigned long expiration_)
4869  {
4870  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4871  data_.c_str(), data_.length(),
4872  expiration_);
4873  }
4874 
4893  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4894  const char* data_, size_t dataLength_,
4895  unsigned long expiration_)
4896  {
4897  return _body.get().deltaPublish(topic_, topicLength_,
4898  data_, dataLength_, expiration_);
4899  }
4900 
4915  std::string logon(int timeout_ = 0,
4916  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
4917  const char* options_ = NULL)
4918  {
4919  return _body.get().logon(timeout_, authenticator_, options_);
4920  }
4933  std::string logon(const char* options_, int timeout_ = 0)
4934  {
4935  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4936  options_);
4937  }
4938 
4951  std::string logon(const std::string& options_, int timeout_ = 0)
4952  {
4953  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4954  options_.c_str());
4955  }
4956 
4976  std::string subscribe(const MessageHandler& messageHandler_,
4977  const std::string& topic_,
4978  long timeout_=0,
4979  const std::string& filter_="",
4980  const std::string& options_ = "",
4981  const std::string& subId_ = "")
4982  {
4983  return _body.get().subscribe(messageHandler_, topic_, timeout_,
4984  filter_, "", options_, subId_);
4985  }
4986 
5002  MessageStream subscribe(const std::string& topic_,
5003  long timeout_=0, const std::string& filter_="",
5004  const std::string& options_ = "",
5005  const std::string& subId_ = "")
5006  {
5007  MessageStream result(*this);
5008  if (_body.get().getDefaultMaxDepth())
5009  result.maxDepth(_body.get().getDefaultMaxDepth());
5010  result.setSubscription(_body.get().subscribe(
5011  result.operator MessageHandler(),
5012  topic_, timeout_, filter_, "",
5013  options_, subId_, false));
5014  return result;
5015  }
5016 
5032  MessageStream subscribe(const char* topic_,
5033  long timeout_ = 0, const std::string& filter_ = "",
5034  const std::string& options_ = "",
5035  const std::string& subId_ = "")
5036  {
5037  MessageStream result(*this);
5038  if (_body.get().getDefaultMaxDepth())
5039  result.maxDepth(_body.get().getDefaultMaxDepth());
5040  result.setSubscription(_body.get().subscribe(
5041  result.operator MessageHandler(),
5042  topic_, timeout_, filter_, "",
5043  options_, subId_, false));
5044  return result;
5045  }
5046 
5059  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5060  const std::string& topic_,
5061  long timeout_,
5062  const std::string& filter_="",
5063  const std::string& options_ = "",
5064  const std::string& subId_ = "")
5065  {
5066  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5067  filter_, "", options_, subId_);
5068  }
5077  MessageStream deltaSubscribe(const std::string& topic_,
5078  long timeout_, const std::string& filter_="",
5079  const std::string& options_ = "",
5080  const std::string& subId_ = "")
5081  {
5082  MessageStream result(*this);
5083  if (_body.get().getDefaultMaxDepth())
5084  result.maxDepth(_body.get().getDefaultMaxDepth());
5085  result.setSubscription(_body.get().deltaSubscribe(
5086  result.operator MessageHandler(),
5087  topic_, timeout_, filter_, "",
5088  options_, subId_, false));
5089  return result;
5090  }
5091 
5093  MessageStream deltaSubscribe(const char* topic_,
5094  long timeout_, const std::string& filter_ = "",
5095  const std::string& options_ = "",
5096  const std::string& subId_ = "")
5097  {
5098  MessageStream result(*this);
5099  if (_body.get().getDefaultMaxDepth())
5100  result.maxDepth(_body.get().getDefaultMaxDepth());
5101  result.setSubscription(_body.get().deltaSubscribe(
5102  result.operator MessageHandler(),
5103  topic_, timeout_, filter_, "",
5104  options_, subId_, false));
5105  return result;
5106  }
5107 
5133  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5134  const std::string& topic_,
5135  long timeout_,
5136  const std::string& bookmark_,
5137  const std::string& filter_="",
5138  const std::string& options_ = "",
5139  const std::string& subId_ = "")
5140  {
5141  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5142  filter_, bookmark_, options_, subId_);
5143  }
5161  MessageStream bookmarkSubscribe(const std::string& topic_,
5162  long timeout_,
5163  const std::string& bookmark_,
5164  const std::string& filter_="",
5165  const std::string& options_ = "",
5166  const std::string& subId_ = "")
5167  {
5168  MessageStream result(*this);
5169  if (_body.get().getDefaultMaxDepth())
5170  result.maxDepth(_body.get().getDefaultMaxDepth());
5171  result.setSubscription(_body.get().subscribe(
5172  result.operator MessageHandler(),
5173  topic_, timeout_, filter_,
5174  bookmark_, options_,
5175  subId_, false));
5176  return result;
5177  }
5178 
5180  MessageStream bookmarkSubscribe(const char* topic_,
5181  long timeout_,
5182  const std::string& bookmark_,
5183  const std::string& filter_ = "",
5184  const std::string& options_ = "",
5185  const std::string& subId_ = "")
5186  {
5187  MessageStream result(*this);
5188  if (_body.get().getDefaultMaxDepth())
5189  result.maxDepth(_body.get().getDefaultMaxDepth());
5190  result.setSubscription(_body.get().subscribe(
5191  result.operator MessageHandler(),
5192  topic_, timeout_, filter_,
5193  bookmark_, options_,
5194  subId_, false));
5195  return result;
5196  }
5197 
5206  void unsubscribe(const std::string& commandId)
5207  {
5208  return _body.get().unsubscribe(commandId);
5209  }
5210 
5219  {
5220  return _body.get().unsubscribe();
5221  }
5222 
5223 
5253  std::string sow(const MessageHandler& messageHandler_,
5254  const std::string& topic_,
5255  const std::string& filter_ = "",
5256  const std::string& orderBy_ = "",
5257  const std::string& bookmark_ = "",
5258  int batchSize_ = DEFAULT_BATCH_SIZE,
5259  int topN_ = DEFAULT_TOP_N,
5260  const std::string& options_ = "",
5261  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5262  {
5263  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5264  bookmark_, batchSize_, topN_, options_,
5265  timeout_);
5266  }
5291  MessageStream sow(const std::string& topic_,
5292  const std::string& filter_ = "",
5293  const std::string& orderBy_ = "",
5294  const std::string& bookmark_ = "",
5295  int batchSize_ = DEFAULT_BATCH_SIZE,
5296  int topN_ = DEFAULT_TOP_N,
5297  const std::string& options_ = "",
5298  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5299  {
5300  MessageStream result(*this);
5301  if (_body.get().getDefaultMaxDepth())
5302  result.maxDepth(_body.get().getDefaultMaxDepth());
5303  result.setSOWOnly(sow(result.operator MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5304  return result;
5305  }
5306 
5308  MessageStream sow(const char* topic_,
5309  const std::string& filter_ = "",
5310  const std::string& orderBy_ = "",
5311  const std::string& bookmark_ = "",
5312  int batchSize_ = DEFAULT_BATCH_SIZE,
5313  int topN_ = DEFAULT_TOP_N,
5314  const std::string& options_ = "",
5315  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5316  {
5317  MessageStream result(*this);
5318  if (_body.get().getDefaultMaxDepth())
5319  result.maxDepth(_body.get().getDefaultMaxDepth());
5320  result.setSOWOnly(sow(result.operator MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5321  return result;
5322  }
5345  std::string sow(const MessageHandler& messageHandler_,
5346  const std::string& topic_,
5347  long timeout_,
5348  const std::string& filter_ = "",
5349  int batchSize_ = DEFAULT_BATCH_SIZE,
5350  int topN_ = DEFAULT_TOP_N)
5351  {
5352  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5353  batchSize_, topN_);
5354  }
5377  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
5378  const std::string& topic_,
5379  long timeout_,
5380  const std::string& filter_ = "",
5381  int batchSize_ = DEFAULT_BATCH_SIZE,
5382  bool oofEnabled_ = false,
5383  int topN_ = DEFAULT_TOP_N)
5384  {
5385  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5386  filter_, batchSize_, oofEnabled_,
5387  topN_);
5388  }
5389 
5409  MessageStream sowAndSubscribe(const std::string& topic_,
5410  long timeout_,
5411  const std::string& filter_ = "",
5412  int batchSize_ = DEFAULT_BATCH_SIZE,
5413  bool oofEnabled_ = false,
5414  int topN_ = DEFAULT_TOP_N)
5415  {
5416  MessageStream result(*this);
5417  if (_body.get().getDefaultMaxDepth())
5418  result.maxDepth(_body.get().getDefaultMaxDepth());
5419  result.setSubscription(_body.get().sowAndSubscribe(
5420  result.operator MessageHandler(),
5421  topic_, timeout_, filter_,
5422  batchSize_, oofEnabled_,
5423  topN_, false));
5424  return result;
5425  }
5445  MessageStream sowAndSubscribe(const char *topic_,
5446  long timeout_,
5447  const std::string& filter_ = "",
5448  int batchSize_ = DEFAULT_BATCH_SIZE,
5449  bool oofEnabled_ = false,
5450  int topN_ = DEFAULT_TOP_N)
5451  {
5452  MessageStream result(*this);
5453  if (_body.get().getDefaultMaxDepth())
5454  result.maxDepth(_body.get().getDefaultMaxDepth());
5455  result.setSubscription(_body.get().sowAndSubscribe(
5456  result.operator MessageHandler(),
5457  topic_, timeout_, filter_,
5458  batchSize_, oofEnabled_,
5459  topN_, false));
5460  return result;
5461  }
5462 
5463 
5491  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
5492  const std::string& topic_,
5493  const std::string& filter_ = "",
5494  const std::string& orderBy_ = "",
5495  const std::string& bookmark_ = "",
5496  int batchSize_ = DEFAULT_BATCH_SIZE,
5497  int topN_ = DEFAULT_TOP_N,
5498  const std::string& options_ = "",
5499  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5500  {
5501  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5502  orderBy_, bookmark_, batchSize_,
5503  topN_, options_, timeout_);
5504  }
5505 
5530  MessageStream sowAndSubscribe(const std::string& topic_,
5531  const std::string& filter_ = "",
5532  const std::string& orderBy_ = "",
5533  const std::string& bookmark_ = "",
5534  int batchSize_ = DEFAULT_BATCH_SIZE,
5535  int topN_ = DEFAULT_TOP_N,
5536  const std::string& options_ = "",
5537  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5538  {
5539  MessageStream result(*this);
5540  if (_body.get().getDefaultMaxDepth())
5541  result.maxDepth(_body.get().getDefaultMaxDepth());
5542  result.setSubscription(_body.get().sowAndSubscribe(
5543  result.operator MessageHandler(),
5544  topic_, filter_, orderBy_,
5545  bookmark_, batchSize_, topN_,
5546  options_, timeout_, false));
5547  return result;
5548  }
5549 
5551  MessageStream sowAndSubscribe(const char* topic_,
5552  const std::string& filter_ = "",
5553  const std::string& orderBy_ = "",
5554  const std::string& bookmark_ = "",
5555  int batchSize_ = DEFAULT_BATCH_SIZE,
5556  int topN_ = DEFAULT_TOP_N,
5557  const std::string& options_ = "",
5558  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5559  {
5560  MessageStream result(*this);
5561  if (_body.get().getDefaultMaxDepth())
5562  result.maxDepth(_body.get().getDefaultMaxDepth());
5563  result.setSubscription(_body.get().sowAndSubscribe(
5564  result.operator MessageHandler(),
5565  topic_, filter_, orderBy_,
5566  bookmark_, batchSize_, topN_,
5567  options_, timeout_, false));
5568  return result;
5569  }
5570 
5595  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
5596  const std::string& topic_,
5597  const std::string& filter_ = "",
5598  const std::string& orderBy_ = "",
5599  int batchSize_ = DEFAULT_BATCH_SIZE,
5600  int topN_ = DEFAULT_TOP_N,
5601  const std::string& options_ = "",
5602  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5603  {
5604  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5605  filter_, orderBy_, batchSize_,
5606  topN_, options_, timeout_);
5607  }
5628  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5629  const std::string& filter_ = "",
5630  const std::string& orderBy_ = "",
5631  int batchSize_ = DEFAULT_BATCH_SIZE,
5632  int topN_ = DEFAULT_TOP_N,
5633  const std::string& options_ = "",
5634  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5635  {
5636  MessageStream result(*this);
5637  if (_body.get().getDefaultMaxDepth())
5638  result.maxDepth(_body.get().getDefaultMaxDepth());
5639  result.setSubscription(sowAndDeltaSubscribe(result.operator MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5640  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5641  result.operator MessageHandler(),
5642  topic_, filter_, orderBy_,
5643  batchSize_, topN_, options_,
5644  timeout_, false));
5645  return result;
5646  }
5647 
5650  const std::string& filter_ = "",
5651  const std::string& orderBy_ = "",
5652  int batchSize_ = DEFAULT_BATCH_SIZE,
5653  int topN_ = DEFAULT_TOP_N,
5654  const std::string& options_ = "",
5655  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5656  {
5657  MessageStream result(*this);
5658  if (_body.get().getDefaultMaxDepth())
5659  result.maxDepth(_body.get().getDefaultMaxDepth());
5660  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5661  result.operator MessageHandler(),
5662  topic_, filter_, orderBy_,
5663  batchSize_, topN_, options_,
5664  timeout_, false));
5665  return result;
5666  }
5667 
5692  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
5693  const std::string& topic_,
5694  long timeout_,
5695  const std::string& filter_ = "",
5696  int batchSize_ = DEFAULT_BATCH_SIZE,
5697  bool oofEnabled_ = false,
5698  bool sendEmpties_ = false,
5699  int topN_ = DEFAULT_TOP_N)
5700  {
5701  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5702  timeout_, filter_, batchSize_,
5703  oofEnabled_, sendEmpties_,
5704  topN_);
5705  }
5706 
5728  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5729  long timeout_,
5730  const std::string& filter_ = "",
5731  int batchSize_ = DEFAULT_BATCH_SIZE,
5732  bool oofEnabled_ = false,
5733  bool sendEmpties_ = false,
5734  int topN_ = DEFAULT_TOP_N)
5735  {
5736  MessageStream result(*this);
5737  if (_body.get().getDefaultMaxDepth())
5738  result.maxDepth(_body.get().getDefaultMaxDepth());
5739  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5740  result.operator MessageHandler(),
5741  topic_, timeout_, filter_,
5742  batchSize_, oofEnabled_,
5743  sendEmpties_, topN_, false));
5744  return result;
5745  }
5768  long timeout_,
5769  const std::string& filter_ = "",
5770  int batchSize_ = DEFAULT_BATCH_SIZE,
5771  bool oofEnabled_ = false,
5772  bool sendEmpties_ = false,
5773  int topN_ = DEFAULT_TOP_N)
5774  {
5775  MessageStream result(*this);
5776  if (_body.get().getDefaultMaxDepth())
5777  result.maxDepth(_body.get().getDefaultMaxDepth());
5778  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5779  result.operator MessageHandler(),
5780  topic_, timeout_, filter_,
5781  batchSize_, oofEnabled_,
5782  sendEmpties_, topN_, false));
5783  return result;
5784  }
5804  std::string sowDelete(const MessageHandler& messageHandler,
5805  const std::string& topic,
5806  const std::string& filter,
5807  long timeout)
5808  {
5809  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5810  }
5827  Message sowDelete(const std::string& topic, const std::string& filter,
5828  long timeout=0)
5829  {
5830  MessageStream stream(*this);
5831  char buf[Message::IdentifierLength+1];
5832  buf[Message::IdentifierLength] = 0;
5833  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5834  Field cid(buf);
5835  try
5836  {
5837  stream.setStatsOnly(cid);
5838  _body.get().sowDelete(stream.operator MessageHandler(),topic,filter,timeout,cid);
5839  return *(stream.begin());
5840  }
5841  catch (const DisconnectedException&)
5842  {
5843  removeMessageHandler(cid);
5844  throw;
5845  }
5846  }
5847 
5852  void startTimer()
5853  {
5854  _body.get().startTimer();
5855  }
5856 
5863  std::string stopTimer(const MessageHandler& messageHandler)
5864  {
5865  return _body.get().stopTimer(messageHandler);
5866  }
5867 
5889  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
5890  const std::string& topic_,
5891  const std::string& keys_,
5892  long timeout_=0)
5893  {
5894  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5895  }
5916  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
5917  long timeout_ = 0)
5918  {
5919  MessageStream stream(*this);
5920  char buf[Message::IdentifierLength+1];
5921  buf[Message::IdentifierLength] = 0;
5922  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5923  Field cid(buf);
5924  try
5925  {
5926  stream.setStatsOnly(cid);
5927  _body.get().sowDeleteByKeys(stream.operator MessageHandler(),topic_,keys_,timeout_,cid);
5928  return *(stream.begin());
5929  }
5930  catch (const DisconnectedException&)
5931  {
5932  removeMessageHandler(cid);
5933  throw;
5934  }
5935  }
5936 
5951  std::string sowDeleteByData(const MessageHandler& messageHandler_,
5952  const std::string& topic_, const std::string& data_,
5953  long timeout_=0)
5954  {
5955  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5956  }
5957 
5972  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
5973  long timeout_=0)
5974  {
5975  MessageStream stream(*this);
5976  char buf[Message::IdentifierLength+1];
5977  buf[Message::IdentifierLength] = 0;
5978  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5979  Field cid(buf);
5980  try
5981  {
5982  stream.setStatsOnly(cid);
5983  _body.get().sowDeleteByData(stream.operator MessageHandler(),topic_,data_,timeout_,cid);
5984  return *(stream.begin());
5985  }
5986  catch (const DisconnectedException&)
5987  {
5988  removeMessageHandler(cid);
5989  throw;
5990  }
5991  }
5992 
5997  {
5998  return _body.get().getHandle();
5999  }
6000 
6009  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6010  {
6011  _body.get().setExceptionListener(pListener_);
6012  }
6013 
6023  {
6024  _body.get().setExceptionListener(listener_);
6025  }
6026 
6030  {
6031  return _body.get().getExceptionListener();
6032  }
6033 
6041  // type of message) from the server for the specified interval (plus a grace period),
6055  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6056  {
6057  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6058  }
6059 
6067  // type of message) from the server for the specified interval (plus a grace period),
6079  void setHeartbeat(unsigned heartbeatTime_)
6080  {
6081  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6082  }
6083 
6086  {
6087  setLastChanceMessageHandler(messageHandler);
6088  }
6089 
6093  {
6094  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6095  messageHandler);
6096  }
6097 
6118  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6119  {
6120  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6121  }
6122 
6143  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
6144  {
6145  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6146  }
6147 
6153  static const char* BOOKMARK_NOW() { return AMPS_BOOKMARK_NOW; }
6159  static const char* NOW() { return AMPS_BOOKMARK_NOW; }
6160 
6166  static const char* BOOKMARK_EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6167 
6173  static const char* EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6174 
6181  static const char* BOOKMARK_MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6182 
6189  static const char* MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6190 
6197  static const char* BOOKMARK_RECENT() { return AMPS_BOOKMARK_RECENT; }
6198 
6199 
6206  {
6207  _body.get().addConnectionStateListener(listener);
6208  }
6209 
6214  {
6215  _body.get().removeConnectionStateListener(listener);
6216  }
6217 
6221  {
6222  _body.get().clearConnectionStateListeners();
6223  }
6224 
6250  std::string executeAsync(Command& command_, MessageHandler handler_)
6251  {
6252  return _body.get().executeAsync(command_, handler_);
6253  }
6254 
6284  std::string executeAsyncNoResubscribe(Command& command_,
6285  MessageHandler handler_)
6286  {
6287  std::string id;
6288  try
6289  {
6290  if (command_.isSubscribe())
6291  {
6292  Message& message = command_.getMessage();
6293  Field subId = message.getSubscriptionId();
6294  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace",7);
6295  if(useExistingHandler)
6296  {
6297  MessageHandler existingHandler;
6298  if (_body.get()._routes.getRoute(subId, existingHandler))
6299  {
6300  // we found an existing handler.
6301  _body.get().executeAsync(command_, existingHandler, false);
6302  return id; // empty string indicates existing
6303  }
6304  }
6305  }
6306  id = _body.get().executeAsync(command_, handler_, false);
6307  }
6308  catch (const DisconnectedException&)
6309  {
6310  removeMessageHandler(command_.getMessage().getCommandId());
6311  if (command_.isSubscribe())
6312  {
6313  removeMessageHandler(command_.getMessage().getSubscriptionId());
6314  }
6315  if (command_.isSow())
6316  {
6317  removeMessageHandler(command_.getMessage().getQueryID());
6318  }
6319  throw;
6320  }
6321  return id;
6322  }
6323 
6336  MessageStream execute(Command& command_);
6337 
6346  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6347  {
6348  _body.get().ack(topic_,bookmark_,options_);
6349  }
6350 
6358  void ack(Message& message_, const char* options_ = NULL)
6359  {
6360  _body.get().ack(message_.getTopic(),message_.getBookmark(),options_);
6361  }
6370  void ack(const std::string& topic_, const std::string& bookmark_,
6371  const char* options_ = NULL)
6372  {
6373  _body.get().ack(Field(topic_.data(),topic_.length()), Field(bookmark_.data(),bookmark_.length()),options_);
6374  }
6375 
6381  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6382  {
6383  _body.get()._ack(topic_,bookmark_,options_);
6384  }
6394  void flushAcks(void)
6395  {
6396  _body.get().flushAcks();
6397  }
6398 
6403  bool getAutoAck(void) const
6404  {
6405  return _body.get().getAutoAck();
6406  }
6413  void setAutoAck(bool isAutoAckEnabled_)
6414  {
6415  _body.get().setAutoAck(isAutoAckEnabled_);
6416  }
6421  unsigned getAckBatchSize(void) const
6422  {
6423  return _body.get().getAckBatchSize();
6424  }
6431  void setAckBatchSize(const unsigned ackBatchSize_)
6432  {
6433  _body.get().setAckBatchSize(ackBatchSize_);
6434  }
6435 
6442  int getAckTimeout(void) const
6443  {
6444  return _body.get().getAckTimeout();
6445  }
6452  void setAckTimeout(const int ackTimeout_)
6453  {
6454  _body.get().setAckTimeout(ackTimeout_);
6455  }
6456 
6457 
6466  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
6467  {
6468  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6469  }
6470 
6475  bool getRetryOnDisconnect(void) const
6476  {
6477  return _body.get().getRetryOnDisconnect();
6478  }
6479 
6484  void setDefaultMaxDepth(unsigned maxDepth_)
6485  {
6486  _body.get().setDefaultMaxDepth(maxDepth_);
6487  }
6488 
6493  unsigned getDefaultMaxDepth(void) const
6494  {
6495  return _body.get().getDefaultMaxDepth();
6496  }
6497 
6505  void* userData_)
6506  {
6507  return _body.get().setTransportFilterFunction(filter_, userData_);
6508  }
6509 
6519  void* userData_)
6520  {
6521  return _body.get().setThreadCreatedCallback(callback_, userData_);
6522  }
6523 
6529  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
6530  {
6531  _body.get().deferredExecution(func_,userData_);
6532  }
6536 };
6537 
6538 inline void
6539 ClientImpl::lastChance(AMPS::Message& message)
6540 {
6541  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6542 }
6543 
6544 inline unsigned
6545 ClientImpl::persistedAck(AMPS::Message& message)
6546 {
6547  unsigned deliveries = 0;
6548  try
6549  {
6550  /*
6551  * Best Practice: If you don't care about the dupe acks that
6552  * occur during failover or rapid disconnect/reconnect, then just
6553  * ignore them. We could discard each duplicate from the
6554  * persisted store, but the storage costs of doing 1 record
6555  * discards is heavy. In most scenarios we'll just quickly blow
6556  * through the duplicates and get back to processing the
6557  * non-dupes.
6558  */
6559  const char* data = NULL;
6560  size_t len = 0;
6561  const char* status = NULL;
6562  size_t statusLen = 0;
6563  amps_handle messageHandle = message.getMessage();
6564  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6565  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
6566  amps_message_get_field_value(messageHandle, AMPS_Status,
6567  &status, &statusLen);
6568  if (len == NotEntitled || len == Duplicate ||
6569  (statusLen == Failure && status[0] == 'f'))
6570  {
6571  if (_failedWriteHandler)
6572  {
6573  if (_publishStore.isValid())
6574  {
6575  amps_uint64_t sequence =
6576  amps_message_get_field_uint64(messageHandle,
6577  AMPS_Sequence);
6578  FailedWriteStoreReplayer replayer(this, data, len);
6579  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
6580  replayer, sequence));
6581  }
6582  else // Call the handler with what little we have
6583  {
6584  static Message emptyMessage;
6585  emptyMessage.setSequence(message.getSequence());
6586  AMPS_CALL_EXCEPTION_WRAPPER(
6587  _failedWriteHandler->failedWrite(emptyMessage,
6588  data, len));
6589  }
6590  ++deliveries;
6591  }
6592  }
6593  if (_publishStore.isValid())
6594  {
6595  // Ack for publisher will have sequence while
6596  // ack for bookmark subscribe won't
6597  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
6598  AMPS_Sequence);
6599  if (seq > 0)
6600  {
6601  ++deliveries;
6602  _publishStore.discardUpTo(seq);
6603  }
6604  }
6605 
6606  if (!deliveries && _bookmarkStore.isValid())
6607  {
6608  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
6609  &data, &len);
6610  if (len > 0)
6611  {
6612  Message::Field subId(data, len);
6613  const char* bookmarkData = NULL;
6614  size_t bookmarkLen = 0;
6615  amps_message_get_field_value(messageHandle, AMPS_Bookmark,
6616  &bookmarkData, &bookmarkLen);
6617  // Everything is there and not unsubscribed AC-912
6618  if (bookmarkLen > 0 && _routes.hasRoute(subId))
6619  {
6620  ++deliveries;
6621  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
6622  }
6623  }
6624  }
6625  }
6626  catch (std::exception& ex)
6627  {
6628  AMPS_UNHANDLED_EXCEPTION(ex);
6629  }
6630  return deliveries;
6631 }
6632 
6633 inline unsigned
6634 ClientImpl::processedAck(Message &message)
6635 {
6636  unsigned deliveries = 0;
6637  AckResponse ack;
6638  const char* data = NULL;
6639  size_t len = 0;
6640  amps_handle messageHandle = message.getMessage();
6641  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
6642  Lock<Mutex> l(_lock);
6643  if (data && len)
6644  {
6645  Lock<Mutex> guard(_ackMapLock);
6646  AckMap::iterator i = _ackMap.find(std::string(data, len));
6647  if (i != _ackMap.end())
6648  {
6649  ++deliveries;
6650  ack = i->second;
6651  _ackMap.erase(i);
6652  }
6653  }
6654  if (deliveries)
6655  {
6656  amps_message_get_field_value(messageHandle, AMPS_Status, &data,
6657  &len);
6658  ack.setStatus(data, len);
6659  amps_message_get_field_value(messageHandle, AMPS_Reason, &data,
6660  &len);
6661  ack.setReason(data, len);
6662  amps_message_get_field_value(messageHandle, AMPS_UserId, &data,
6663  &len);
6664  ack.setUsername(data, len);
6665  amps_message_get_field_value(messageHandle, AMPS_Password, &data,
6666  &len);
6667  ack.setPassword(data, len);
6668  amps_message_get_field_value(messageHandle, AMPS_Sequence, &data,
6669  &len);
6670  ack.setSequenceNo(data, len);
6671  amps_message_get_field_value(messageHandle, AMPS_Version, &data,
6672  &len);
6673  ack.setServerVersion(data, len);
6674  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
6675  ack.setOptions(data,len);
6676  amps_message_get_field_value(messageHandle, AMPS_Bookmark, &data, &len);
6677  ack.setBookmark(data,len);
6678  ack.setResponded(true);
6679  _lock.signalAll();
6680  }
6681  return deliveries;
6682 }
6683 
6684 inline void
6685 ClientImpl::checkAndSendHeartbeat(bool force)
6686 {
6687  if (force || _heartbeatTimer.check())
6688  {
6689  _heartbeatTimer.start();
6690  try
6691  {
6692  sendWithoutRetry(_beatMessage);
6693  }
6694  catch (const AMPSException&)
6695  {
6696  ;
6697  }
6698  }
6699 }
6700 
6701 inline ConnectionInfo ClientImpl::getConnectionInfo() const
6702 {
6703  ConnectionInfo info;
6704  std::ostringstream writer;
6705 
6706  info["client.uri"] = _lastUri;
6707  info["client.name"] = _name;
6708  info["client.username"] = _username;
6709  if(_publishStore.isValid())
6710  {
6711  writer << _publishStore.unpersistedCount();
6712  info["publishStore.unpersistedCount"] = writer.str();
6713  writer.clear();
6714  writer.str("");
6715  }
6716 
6717  return info;
6718 }
6719 
6720 inline amps_result
6721 ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
6722 {
6723  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6724  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6725  ClientImpl* me = (ClientImpl*) userData_;
6726  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->processDeferredExecutions());
6727  if(!messageHandle_)
6728  {
6729  if(me->_queueAckTimeout) me->checkQueueAcks();
6730  return AMPS_E_OK;
6731  }
6732 
6733  me->_readMessage.replace(messageHandle_);
6734  Message& message = me->_readMessage;
6735  Message::Command::Type commandType = message.getCommandEnum();
6736  if (commandType & SOWMask)
6737  {
6738 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6739  // A small cheat here to get the right handler, using knowledge of the
6740  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
6741  // and their GlobalCommandTypeHandlers values 1, 2, 3.
6742  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6743  me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6744 #endif
6745  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6746  message.getQueryID()));
6747  }
6748  else if (commandType & PublishMask)
6749  {
6750 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6751  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6752  me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6753  GlobalCommandTypeHandlers::Publish :
6754  GlobalCommandTypeHandlers::OOF)].invoke(message));
6755 #endif
6756  const char* subIds = NULL;
6757  size_t subIdsLen = 0;
6758  // Publish command, send to subscriptions
6759  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds, &subIds, &subIdsLen);
6760  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
6761  for(size_t i=0; i<subIdCount; ++i)
6762  {
6763  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6764  MessageHandler& handler = lookupResult.handler;
6765  if (handler.isValid())
6766  {
6767  amps_message_set_field_value(messageHandle_, AMPS_SubscriptionId,
6768  subIds + lookupResult.idOffset, lookupResult.idLength);
6769  Message::Field bookmark = message.getBookmark();
6770  bool isMessageQueue = message.getLeasePeriod().len() != 0;
6771  bool isAutoAck = me->_isAutoAckEnabled;
6772 
6773  if (!isMessageQueue && !bookmark.empty() &&
6774  me->_bookmarkStore.isValid())
6775  {
6776  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6777  {
6778  //Call duplicate message handler in handlers map
6779  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6780  {
6781  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
6782  }
6783  }
6784  else
6785  {
6786  me->_bookmarkStore.log(me->_readMessage);
6787  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6788  handler.invoke(message));
6789  }
6790  }
6791  else
6792  {
6793  if(isMessageQueue && isAutoAck)
6794  {
6795  try
6796  {
6797  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6798  if (!message.getIgnoreAutoAck())
6799  {
6800  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6801  me->_ack(message.getTopic(),message.getBookmark()));
6802  }
6803  }
6804  catch(std::exception& ex)
6805  {
6806  if (!message.getIgnoreAutoAck())
6807  {
6808  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6809  me->_ack(message.getTopic(),message.getBookmark(),"cancel"));
6810  }
6811  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6812  }
6813  }
6814  else
6815  {
6816  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6817  handler.invoke(message));
6818  }
6819  }
6820  }
6821  else me->lastChance(message);
6822  } // for (subidsEnd)
6823  }
6824  else if (commandType == Message::Command::Ack)
6825  {
6826  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6827  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6828  unsigned ackType = message.getAckTypeEnum();
6829  unsigned deliveries = 0U;
6830  switch (ackType)
6831  {
6832  case Message::AckType::Persisted:
6833  deliveries += me->persistedAck(message);
6834  break;
6835  case Message::AckType::Processed: // processed
6836  deliveries += me->processedAck(message);
6837  break;
6838  }
6839  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6840  if (deliveries == 0)
6841  {
6842  me->lastChance(message);
6843  }
6844  }
6845  else if (commandType == Message::Command::Heartbeat)
6846  {
6847  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6848  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6849  if(me->_heartbeatTimer.getTimeout() != 0.0) // -V550
6850  {
6851  me->checkAndSendHeartbeat(true);
6852  }
6853  else
6854  {
6855  me->lastChance(message);
6856  }
6857  return AMPS_E_OK;
6858  }
6859  else if (!message.getCommandId().empty())
6860  {
6861  unsigned deliveries = 0U;
6862  try
6863  {
6864  while(me->_connected) // Keep sending heartbeats when stream is full
6865  {
6866  try
6867  {
6868  deliveries = me->_routes.deliverData(message, message.getCommandId());
6869  break;
6870  }
6871 #ifdef _WIN32
6872  catch(MessageStreamFullException&)
6873 #else
6874  catch(MessageStreamFullException& ex_)
6875 #endif
6876  {
6877  me->checkAndSendHeartbeat(false);
6878  }
6879  }
6880  }
6881  catch (std::exception& ex_)
6882  {
6883  try
6884  {
6885  me->_exceptionListener->exceptionThrown(ex_);
6886  }
6887  catch(...)
6888  {
6889  ;
6890  }
6891  }
6892  if (deliveries == 0)
6893  me->lastChance(message);
6894  }
6895  me->checkAndSendHeartbeat();
6896  return AMPS_E_OK;
6897 }
6898 
6899 inline void
6900 ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
6901 {
6902  ClientImpl* me = (ClientImpl*) userData;
6903  //Client wrapper(me);
6904  // Go ahead and signal any waiters if they are around...
6905  me->clearAcks(failedConnectionVersion);
6906 }
6907 
6908 inline amps_result
6909 ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
6910 {
6911  ClientImpl* me = (ClientImpl*) userData;
6912  Lock<Mutex> l(me->_lock);
6913  Client wrapper(me,false);
6914  if (me->_connected)
6915  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6916  while(true)
6917  {
6918  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6919  try
6920  {
6921  AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6922  me->_connected = false;
6923  {
6924  // Have to release the lock here or receive thread can't
6925  // invoke the message handler.
6926  Unlock<Mutex> unlock(me->_lock);
6927  me->_disconnectHandler.invoke(wrapper);
6928  }
6929  }
6930  catch(const std::exception& ex)
6931  {
6932  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6933  }
6934 
6935  if (!me->_connected)
6936  {
6937  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6938  AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException("Reconnect failed."));
6939  return AMPS_E_DISCONNECTED;
6940  }
6941  try
6942  {
6943  // Resubscribe
6944  if (me->_subscriptionManager)
6945  {
6946  {
6947  // Have to release the lock here or receive thread can't
6948  // invoke the message handler.
6949  Unlock<Mutex> unlock(me->_lock);
6950  me->_subscriptionManager->resubscribe(wrapper);
6951  }
6952  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6953  }
6954  return AMPS_E_OK;
6955  }
6956  catch(const AMPSException& subEx)
6957  {
6958  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6959  }
6960  catch(const std::exception& subEx)
6961  {
6962  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6963  return AMPS_E_RETRY;
6964  }
6965  catch(...)
6966  {
6967  return AMPS_E_RETRY;
6968  }
6969  }
6970  return AMPS_E_RETRY;
6971 }
6972 
6973 class FIX
6974 {
6975  const char* _data;
6976  size_t _len;
6977  char _fieldSep;
6978 public:
6979  class iterator
6980  {
6981  const char* _data;
6982  size_t _len;
6983  size_t _pos;
6984  char _fieldSep;
6985  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
6986  : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6987  {
6988  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6989  }
6990  public:
6991  typedef void* difference_type;
6992  typedef std::forward_iterator_tag iterator_category;
6993  typedef std::pair<Message::Field, Message::Field> value_type;
6994  typedef value_type* pointer;
6995  typedef value_type& reference;
6996  bool operator==(const iterator& rhs) const
6997  {
6998  return _pos == rhs._pos;
6999  }
7000  bool operator!=(const iterator& rhs) const
7001  {
7002  return _pos != rhs._pos;
7003  }
7004  iterator& operator++()
7005  {
7006  // Skip through the data
7007  while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
7008  // Skip through any field separators
7009  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
7010  return *this;
7011  }
7012 
7013  value_type operator*() const
7014  {
7015  value_type result;
7016  size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
7017  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
7018 
7019  result.first.assign(_data+_pos, keyLength);
7020 
7021  if (i < _len && _data[i] == '=')
7022  {
7023  ++i;
7024  valueStart = i;
7025  for(; i < _len && _data[i] != _fieldSep; ++i)
7026  {
7027  valueLength++;
7028  }
7029  }
7030  result.second.assign(_data+valueStart, valueLength);
7031  return result;
7032  }
7033 
7034  friend class FIX;
7035  };
7036  class reverse_iterator
7037  {
7038  const char* _data;
7039  size_t _len;
7040  const char* _pos;
7041  char _fieldSep;
7042  public:
7043  typedef std::pair<Message::Field, Message::Field> value_type;
7044  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
7045  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
7046  {
7047  if (_pos)
7048  {
7049  // skip past meaningless trailing fieldseps
7050  while(_pos >=_data && *_pos == _fieldSep) --_pos;
7051  while(_pos > _data && *_pos != _fieldSep) --_pos;
7052  // if we stopped before the 0th character, it's because
7053  // it's a field sep. advance one to point to the first character
7054  // of a key.
7055  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7056  if (_pos < _data) _pos = 0;
7057  }
7058  }
7059  bool operator==(const reverse_iterator& rhs) const
7060  {
7061  return _pos == rhs._pos;
7062  }
7063  bool operator!=(const reverse_iterator& rhs) const
7064  {
7065  return _pos != rhs._pos;
7066  }
7067  reverse_iterator& operator++()
7068  {
7069  if (_pos == _data)
7070  {
7071  _pos = 0;
7072  }
7073  else
7074  {
7075  // back up 1 to a field separator
7076  --_pos;
7077  // keep backing up through field separators
7078  while(_pos >=_data && *_pos == _fieldSep) --_pos;
7079  // now back up to the beginning of this field
7080  while(_pos >_data && *_pos != _fieldSep) --_pos;
7081  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7082  if (_pos < _data) _pos = 0;
7083  }
7084  return *this;
7085  }
7086  value_type operator*() const
7087  {
7088  value_type result;
7089  size_t keyLength = 0, valueStart = 0, valueLength = 0;
7090  size_t i = (size_t)(_pos - _data);
7091  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
7092  result.first.assign(_pos, keyLength);
7093  if (i<_len && _data[i] == '=')
7094  {
7095  ++i;
7096  valueStart = i;
7097  for(; i<_len && _data[i] != _fieldSep; ++i)
7098  {
7099  valueLength++;
7100  }
7101  }
7102  result.second.assign(_data+valueStart, valueLength);
7103  return result;
7104  }
7105  };
7106  FIX(const Message::Field& data, char fieldSeparator=1)
7107  : _data(data.data()), _len(data.len()),
7108  _fieldSep(fieldSeparator)
7109  {
7110  }
7111 
7112  FIX(const char* data, size_t len, char fieldSeparator=1)
7113  : _data(data), _len(len), _fieldSep(fieldSeparator)
7114  {
7115  }
7116 
7117  iterator begin() const
7118  {
7119  return iterator(_data, _len, 0, _fieldSep);
7120  }
7121  iterator end() const
7122  {
7123  return iterator(_data, _len, _len, _fieldSep);
7124  }
7125 
7126 
7127  reverse_iterator rbegin() const
7128  {
7129  return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
7130  }
7131 
7132  reverse_iterator rend() const
7133  {
7134  return reverse_iterator(_data, _len, 0, _fieldSep);
7135  }
7136 };
7137 
7138 
7151 
7152 template <class T>
7154 {
7155  std::stringstream _data;
7156  char _fs;
7157 public:
7163  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7164 
7172  void append(const T& tag, const char* value, size_t offset, size_t length)
7173  {
7174  _data << tag<<'=';
7175  _data.write(value+offset, (std::streamsize)length);
7176  _data << _fs;
7177  }
7183  void append(const T& tag, const std::string& value)
7184  {
7185  _data << tag << '=' << value << _fs;
7186  }
7187 
7190  std::string getString() const
7191  {
7192  return _data.str();
7193  }
7194  operator std::string() const
7195  {
7196  return _data.str();
7197  }
7198 
7200  void reset()
7201  {
7202  _data.str(std::string());
7203  }
7204 };
7205 
7209 
7211 
7215 
7217 
7218 
7226 
7228 {
7229  char _fs;
7230 public:
7235  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7236 
7239  typedef std::map<Message::Field, Message::Field> map_type;
7240 
7246  map_type toMap(const Message::Field& data)
7247  {
7248  FIX fix(data, _fs);
7249  map_type retval;
7250  for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7251  {
7252  retval.insert(*a);
7253  }
7254 
7255  return retval;
7256  }
7257 };
7258 
7259 class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
7260 {
7261  Mutex _lock;
7262  std::deque<Message> _q;
7263  std::string _commandId;
7264  std::string _subId;
7265  std::string _queryId;
7266  Client _client;
7267  unsigned _timeout;
7268  unsigned _maxDepth;
7269  unsigned _requestedAcks;
7270  Message::Field _previousTopic;
7271  Message::Field _previousBookmark;
7272  volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7273  typedef std::map<std::string, Message*> SOWKeyMap;
7274  SOWKeyMap _sowKeyMap;
7275  public:
7276  MessageStreamImpl(const Client& client_)
7277  : _client(client_),
7278  _timeout(0),
7279  _maxDepth((unsigned)~0),
7280  _requestedAcks(0),
7281  _state(Unset)
7282  {
7283  if (_client.isValid())
7284  {
7285  _client.addConnectionStateListener(this);
7286  }
7287  }
7288 
7289  MessageStreamImpl(ClientImpl* client_)
7290  : _client(client_),
7291  _timeout(0),
7292  _maxDepth((unsigned)~0),
7293  _requestedAcks(0),
7294  _state(Unset)
7295  {
7296  if (_client.isValid())
7297  {
7298  _client.addConnectionStateListener(this);
7299  }
7300  }
7301 
7302  ~MessageStreamImpl()
7303  {
7304  }
7305 
7306  virtual void destroy()
7307  {
7308  try
7309  {
7310  close();
7311  }
7312  catch(std::exception &e)
7313  {
7314  try
7315  {
7316  if (_client.isValid())
7317  {
7318  _client.getExceptionListener().exceptionThrown(e);
7319  }
7320  } catch (...) {/*Ignore exception listener exceptions*/} // -V565
7321  }
7322  if (_client.isValid())
7323  {
7324  _client.removeConnectionStateListener(this);
7325  Client c = _client;
7326  _client = Client((ClientImpl*)NULL);
7327  c.deferredExecution(MessageStreamImpl::destroyer, this);
7328  }
7329  else
7330  {
7331  delete this;
7332  }
7333  }
7334 
7335  static void destroyer(void* vpMessageStreamImpl_)
7336  {
7337  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7338  }
7339 
7340  void setSubscription(const std::string& subId_,
7341  const std::string& commandId_ = "",
7342  const std::string& queryId_ = "")
7343  {
7344  Lock<Mutex> lock(_lock);
7345  _subId = subId_;
7346  if (!commandId_.empty() && commandId_ != subId_)
7347  _commandId = commandId_;
7348  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
7349  _queryId = queryId_;
7350  // It's possible to disconnect between creation/registration and here.
7351  if (Disconnected == _state) return;
7352  assert(Unset==_state);
7353  _state = Subscribe;
7354  }
7355 
7356  void setSOWOnly(const std::string& commandId_,
7357  const std::string& queryId_ = "")
7358  {
7359  Lock<Mutex> lock(_lock);
7360  _commandId = commandId_;
7361  if (!queryId_.empty() && queryId_ != commandId_)
7362  _queryId = queryId_;
7363  // It's possible to disconnect between creation/registration and here.
7364  if (Disconnected == _state) return;
7365  assert(Unset==_state);
7366  _state = SOWOnly;
7367  }
7368 
7369  void setStatsOnly(const std::string& commandId_,
7370  const std::string& queryId_ = "")
7371  {
7372  Lock<Mutex> lock(_lock);
7373  _commandId = commandId_;
7374  if (!queryId_.empty() && queryId_ != commandId_)
7375  _queryId = queryId_;
7376  // It's possible to disconnect between creation/registration and here.
7377  if (Disconnected == _state) return;
7378  assert(Unset==_state);
7379  _state = AcksOnly;
7380  _requestedAcks = Message::AckType::Stats;
7381  }
7382 
7383  void setAcksOnly(const std::string& commandId_, unsigned acks_)
7384  {
7385  Lock<Mutex> lock(_lock);
7386  _commandId = commandId_;
7387  // It's possible to disconnect between creation/registration and here.
7388  if (Disconnected == _state) return;
7389  assert(Unset==_state);
7390  _state = AcksOnly;
7391  _requestedAcks = acks_;
7392  }
7393 
7394  void connectionStateChanged(ConnectionStateListener::State state_)
7395  {
7396  Lock<Mutex> lock(_lock);
7397  if(state_ == AMPS::ConnectionStateListener::Disconnected)
7398  {
7399  _state = Disconnected;
7400  close();
7401  }
7402  _lock.signalAll();
7403  }
7404 
7405  void timeout(unsigned timeout_)
7406  {
7407  _timeout = timeout_;
7408  }
7409  void conflate(void)
7410  {
7411  if(_state == Subscribe) _state = Conflate;
7412  }
7413  void maxDepth(unsigned maxDepth_)
7414  {
7415  if(maxDepth_) _maxDepth = maxDepth_;
7416  else _maxDepth = (unsigned)~0;
7417  }
7418  unsigned getMaxDepth(void) const
7419  {
7420  return _maxDepth;
7421  }
7422  unsigned getDepth(void) const
7423  {
7424  return (unsigned)(_q.size());
7425  }
7426 
7427  bool next(Message& current_)
7428  {
7429  Lock<Mutex> lock(_lock);
7430  if (!_previousTopic.empty() && !_previousBookmark.empty())
7431  {
7432  try
7433  {
7434  if (_client.isValid())
7435  {
7436  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7437  }
7438  }
7439 #ifdef _WIN32
7440  catch (AMPSException&)
7441 #else
7442  catch (AMPSException& e)
7443 #endif
7444  {
7445  current_.invalidate();
7446  _previousTopic.clear();
7447  _previousBookmark.clear();
7448  return false;
7449  }
7450  _previousTopic.clear();
7451  _previousBookmark.clear();
7452  }
7453  double minWaitTime = (double)((_timeout && _timeout > 1000)
7454  ? _timeout : 1000);
7455  Timer timer(minWaitTime);
7456  timer.start();
7457  while(_q.empty() && _state & Running)
7458  {
7459  // Using timeout so python can interrupt
7460  _lock.wait((long)minWaitTime);
7461  {
7462  Unlock<Mutex> unlck(_lock);
7463  amps_invoke_waiting_function();
7464  }
7465  if (_timeout)
7466  {
7467  // In case we woke up early, see how much longer to wait
7468  if(timer.checkAndGetRemaining(&minWaitTime))
7469  {
7470  break;
7471  }
7472  }
7473  }
7474  if(!_q.empty())
7475  {
7476  current_ = _q.front();
7477  if(_q.size() == _maxDepth) _lock.signalAll();
7478  _q.pop_front();
7479  if(_state == Conflate)
7480  {
7481  std::string sowKey = current_.getSowKey();
7482  if(sowKey.length()) _sowKeyMap.erase(sowKey);
7483  }
7484  else if(_state == AcksOnly)
7485  {
7486  _requestedAcks &= ~(current_.getAckTypeEnum());
7487  }
7488  if((_state == AcksOnly && _requestedAcks == 0) ||
7489  (_state == SOWOnly && current_.getCommand()=="group_end"))
7490  {
7491  _state = Closed;
7492  }
7493  else if (current_.getCommandEnum() == Message::Command::Publish &&
7494  _client.isValid() && _client.getAutoAck() &&
7495  !current_.getLeasePeriod().empty() &&
7496  !current_.getBookmark().empty())
7497  {
7498  _previousTopic = current_.getTopic().deepCopy();
7499  _previousBookmark = current_.getBookmark().deepCopy();
7500  }
7501  return true;
7502  }
7503  if(_state == Disconnected)
7504  {
7505  throw DisconnectedException("Connection closed.");
7506  }
7507  current_.invalidate();
7508  if(_state == Closed)
7509  {
7510  return false;
7511  }
7512  return _timeout != 0;
7513  }
7514  void close(void)
7515  {
7516  if (_client.isValid())
7517  {
7518  if (_state == SOWOnly || _state == Subscribe) //not delete
7519  {
7520  if (!_commandId.empty()) _client.unsubscribe(_commandId);
7521  if (!_subId.empty()) _client.unsubscribe(_subId);
7522  if (!_queryId.empty()) _client.unsubscribe(_queryId);
7523  }
7524  else
7525  {
7526  if (!_commandId.empty()) _client.removeMessageHandler(_commandId);
7527  if (!_subId.empty()) _client.removeMessageHandler(_subId);
7528  if (!_queryId.empty()) _client.removeMessageHandler(_queryId);
7529  }
7530  }
7531  if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7532  {
7533  _state = Closed;
7534  }
7535  }
7536  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
7537  {
7538  Lock<Mutex> lock(this_->_lock);
7539  if(this_->_state != Conflate)
7540  {
7541  AMPS_TESTING_SLOW_MESSAGE_STREAM
7542  if(this_->_q.size() >= this_->_maxDepth)
7543  {
7544  // We throw here so that heartbeats can be sent. The exception
7545  // will be handled internally only, and the same Message will
7546  // come back to try again. Make sure to signal.
7547  this_->_lock.signalAll();
7548  throw MessageStreamFullException("Stream is currently full.");
7549  }
7550  this_->_q.push_back(message_.deepCopy());
7551  if (message_.getCommandEnum() == Message::Command::Publish &&
7552  this_->_client.isValid() && this_->_client.getAutoAck() &&
7553  !message_.getLeasePeriod().empty() &&
7554  !message_.getBookmark().empty())
7555  {
7556  message_.setIgnoreAutoAck();
7557  }
7558  }
7559  else
7560  {
7561  std::string sowKey = message_.getSowKey();
7562  if(sowKey.length())
7563  {
7564  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7565  if(it != this_->_sowKeyMap.end())
7566  {
7567  *(it->second) = message_.deepCopy();
7568  }
7569  else
7570  {
7571  if(this_->_q.size() >= this_->_maxDepth)
7572  {
7573  // We throw here so that heartbeats can be sent. The
7574  // exception will be handled internally only, and the
7575  // same Message will come back to try again. Make sure
7576  // to signal.
7577  this_->_lock.signalAll();
7578  throw MessageStreamFullException("Stream is currently full.");
7579  }
7580  this_->_q.push_back(message_.deepCopy());
7581  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7582  }
7583  }
7584  else
7585  {
7586  while(this_->_q.size() >= this_->_maxDepth) // -V712
7587  {
7588  this_->_lock.wait(1);
7589  }
7590  this_->_q.push_back(message_.deepCopy());
7591  }
7592  }
7593  this_->_lock.signalAll();
7594  }
7595 };
7596 inline MessageStream::MessageStream(void)
7597 {
7598 }
7599 inline MessageStream::MessageStream(const Client& client_)
7600  :_body(new MessageStreamImpl(client_))
7601 {
7602 }
7603 inline void MessageStream::iterator::advance(void)
7604 {
7605  _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7606 }
7607 inline MessageStream::operator MessageHandler(void)
7608 {
7609  return MessageHandler((void(*)(const Message&,void*))MessageStreamImpl::_messageHandler, &_body.get());
7610 }
7611 inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
7612 {
7613  MessageStream result;
7614  if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7615  {
7616  result._body = (MessageStreamImpl*)(handler_._userData);
7617  }
7618  return result;
7619 }
7620 
7621 inline void MessageStream::setSOWOnly(const std::string& commandId_,
7622  const std::string& queryId_)
7623 {
7624  _body->setSOWOnly(commandId_, queryId_);
7625 }
7626 inline void MessageStream::setSubscription(const std::string& subId_,
7627  const std::string& commandId_,
7628  const std::string& queryId_)
7629 {
7630  _body->setSubscription(subId_, commandId_, queryId_);
7631 }
7632 inline void MessageStream::setStatsOnly(const std::string& commandId_,
7633  const std::string& queryId_)
7634 {
7635  _body->setStatsOnly(commandId_, queryId_);
7636 }
7637 inline void MessageStream::setAcksOnly(const std::string& commandId_,
7638  unsigned acks_)
7639 {
7640  _body->setAcksOnly(commandId_, acks_);
7641 }
7642 inline MessageStream MessageStream::timeout(unsigned timeout_)
7643 {
7644  _body->timeout(timeout_);
7645  return *this;
7646 }
7648 {
7649  _body->conflate();
7650  return *this;
7651 }
7652 inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
7653 {
7654  _body->maxDepth(maxDepth_);
7655  return *this;
7656 }
7657 inline unsigned MessageStream::getMaxDepth(void) const
7658 {
7659  return _body->getMaxDepth();
7660 }
7661 inline unsigned MessageStream::getDepth(void) const
7662 {
7663  return _body->getDepth();
7664 }
7665 
7666 inline MessageStream ClientImpl::getEmptyMessageStream(void)
7667 {
7668  return *(_pEmptyMessageStream.get());
7669 }
7670 
7672 {
7673  // If the command is sow and has a sub_id, OR
7674  // if the command has a replace option, return the existing
7675  // messagestream, don't create a new one.
7676  ClientImpl& body = _body.get();
7677  Message& message = command_.getMessage();
7678  Field subId = message.getSubscriptionId();
7679  unsigned ackTypes = message.getAckTypeEnum();
7680  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace",7)) || message.getCommandEnum() == Message::Command::SOW);
7681  if(useExistingHandler)
7682  {
7683  // Try to find the existing message handler.
7684  if(!subId.empty())
7685  {
7686  MessageHandler existingHandler;
7687  if (body._routes.getRoute(subId, existingHandler))
7688  {
7689  // we found an existing handler. It might not be a message stream, but that's okay.
7690  body.executeAsync(command_, existingHandler, false);
7691  return MessageStream::fromExistingHandler(existingHandler);
7692  }
7693  }
7694  // fall through; we'll a new handler altogether.
7695  }
7696  // Make sure something will be returned to the stream or use the empty one
7697  Message::Command::Type command = message.getCommandEnum();
7698  if ((command & Message::Command::NoDataCommands)
7699  && (ackTypes == Message::AckType::Persisted
7700  || ackTypes == Message::AckType::None))
7701  {
7702  executeAsync(command_, MessageHandler());
7703  if (!body._pEmptyMessageStream)
7704  {
7705  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
7706  body._pEmptyMessageStream.get()->_body->close();
7707  }
7708  return body.getEmptyMessageStream();
7709  }
7710  MessageStream stream(*this);
7711  if (body.getDefaultMaxDepth())
7712  stream.maxDepth(body.getDefaultMaxDepth());
7713  MessageHandler handler = stream.operator MessageHandler();
7714  std::string commandID = body.executeAsync(command_, handler, false);
7715  if (command_.hasStatsAck())
7716  {
7717  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
7718  }
7719  else if (command_.isSow())
7720  {
7721  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
7722  }
7723  else if (command_.isSubscribe())
7724  {
7725  stream.setSubscription(commandID,
7726  command_.getMessage().getCommandId(),
7727  command_.getMessage().getQueryId());
7728  }
7729  else
7730  {
7731  // Persisted acks for writes don't come back with command id
7732  if (command == Message::Command::Publish ||
7733  command == Message::Command::DeltaPublish ||
7734  command == Message::Command::SOWDelete)
7735  {
7736  stream.setAcksOnly(commandID,
7737  ackTypes & (unsigned)~Message::AckType::Persisted);
7738  }
7739  else
7740  {
7741  stream.setAcksOnly(commandID, ackTypes);
7742  }
7743  }
7744  return stream;
7745 }
7746 
7747 // This is here because it uses api from Client.
7748 inline void Message::ack(const char* options_) const
7749 {
7750  ClientImpl* pClient = _body.get().clientImpl();
7751  Message::Field bookmark = getBookmark();
7752  if(pClient && bookmark.len() &&
7753  !pClient->getAutoAck())
7754  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
7755  pClient->ack(getTopic(),bookmark,options_);
7756 }
7757 }// end namespace AMPS
7758 #endif
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:559
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4274
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1261
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5889
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5863
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:512
std::string getAckType() const
Definition: ampsplusplus.hpp:638
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4478
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7153
void startTimer()
Definition: ampsplusplus.hpp:5852
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5409
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:761
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:610
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7647
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1230
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4506
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:486
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6143
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6484
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4383
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5206
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:546
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1223
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:733
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6493
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6358
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5161
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:540
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4602
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4867
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4398
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1142
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4618
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6213
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6442
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4369
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5916
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:951
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7190
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6504
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4754
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4181
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1145
static const char * NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6159
static const char * BOOKMARK_EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6166
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4454
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1232
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5133
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:813
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6466
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7657
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4429
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1045
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5595
Success.
Definition: amps.h:189
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1128
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:707
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4844
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7235
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5253
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4467
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4177
amps_result
Return values from amps_xxx functions.
Definition: amps.h:184
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4655
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:973
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7671
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:601
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4610
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6518
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:643
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4358
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:608
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1079
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:580
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4915
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:884
const std::string & getNameHash() const
Returns the name hash of this client as generated by the server and returned when the client logged o...
Definition: ampsplusplus.hpp:4327
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:628
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5551
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4257
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6118
static const char * BOOKMARK_NOW()
Convenience method for returning the special value to start a subscription at the end of the transact...
Definition: ampsplusplus.hpp:6153
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1138
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:6205
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:501
void clearConnectionStateListeners()
Clear all listeners from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6220
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1230
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4703
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1257
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4566
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4976
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:493
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1148
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6029
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:724
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1075
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:538
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5951
amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1225
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:597
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:542
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:905
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:719
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5077
static const char * EPOCH()
Convenience method for returning the special value to start a subscription at the beginning of the tr...
Definition: ampsplusplus.hpp:6173
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4933
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:590
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1139
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1232
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:701
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4338
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7163
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6370
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:714
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:755
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5445
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:838
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7172
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:965
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4558
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:922
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6431
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1224
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1259
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:993
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5827
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1233
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4550
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:536
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6022
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:509
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4578
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6085
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7183
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:6421
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4821
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5530
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:564
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4148
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5767
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:914
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4800
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6250
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5728
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4951
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:566
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5218
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:6413
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5093
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:875
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5692
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6009
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:979
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4633
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4528
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7227
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4537
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1172
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6346
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7239
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4520
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:859
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7246
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:202
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5628
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6079
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:544
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5032
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4140
static const char * MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6189
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1138
static const char * BOOKMARK_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6197
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:627
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:943
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7652
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:592
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1260
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4436
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5291
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:618
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:576
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7661
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
The operation has not succeeded, but ought to be retried.
Definition: amps.h:213
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4345
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6055
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4405
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:562
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:893
static const char * BOOKMARK_MOST_RECENT()
Convenience method for returning the special value to start a subscription at a recovery point based ...
Definition: ampsplusplus.hpp:6181
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:552
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5996
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7200
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4647
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5649
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6452
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:534
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1224
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:6403
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4678
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4312
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:670
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:935
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1021
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:817
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5308
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4727
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6092
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4893
The client and server are disconnected.
Definition: amps.h:217
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5972
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5059
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7642
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5345
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4319
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:407
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5491
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1033
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5002
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5180
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:521
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6284
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5804
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4192
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6475
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:6394
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5377