AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps.h"
28 #include "ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43 #include <inttypes.h>
44 #endif
45 #if defined(sun)
46 #include <sys/atomic.h>
47 #endif
48 #include "BookmarkStore.hpp"
49 #include "MessageRouter.hpp"
50 #include "util.hpp"
51 #include "ampscrc.hpp"
52 
53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
55 #endif
56 
61 
62 
69 
80 
81 // For StoreBuffer implementations
82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
87 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
88 #define AMPS_DEFAULT_TOP_N -1
89 #define AMPS_DEFAULT_BATCH_SIZE 10
90 #define AMPS_NUMBER_BUFFER_LEN 20
91 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
92 
93 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
94 #define AMPS_X64 1
95 #endif
96 
97 #ifdef _WIN32
98 static __declspec ( thread ) AMPS::Message* publishStoreMessage = 0;
99 #else
100 static __thread AMPS::Message* publishStoreMessage = 0;
101 #endif
102 
103 namespace AMPS
104 {
105 
106 typedef std::map<std::string, std::string> ConnectionInfo;
107 
108 class PerThreadMessageTracker {
109 std::vector<AMPS::Message*> _messages;
110 public:
111  PerThreadMessageTracker() {}
112  ~PerThreadMessageTracker()
113  {
114  for (size_t i=0; i<_messages.size(); ++i)
115  {
116  delete _messages[i];
117  }
118  }
119  void addMessage(AMPS::Message* message)
120  {
121  _messages.push_back(message);
122  }
123  static void addMessageToCleanupList(AMPS::Message* message)
124  {
125  static AMPS::Mutex _lock;
126  AMPS::Lock<Mutex> l(_lock);
127  _addMessageToCleanupList(message);
128  }
129  static void _addMessageToCleanupList(AMPS::Message* message)
130  {
131  static PerThreadMessageTracker tracker;
132  tracker.addMessage(message);
133  }
134 };
135 
136 template<class Type>
137 inline std::string asString(Type x_)
138 {
139  std::ostringstream os;
140  os << x_;
141  return os.str();
142 }
143 
144 inline
145 size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
146 {
147  size_t pos = AMPS_NUMBER_BUFFER_LEN;
148  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
149  {
150  if (seqNo_ > 0)
151  {
152  buf_[--pos] = (char)(seqNo_ % 10 + '0');
153  seqNo_ /= 10;
154  }
155  }
156  return pos;
157 }
158 
159 #ifdef _WIN32
160 inline
161 size_t convertToCharArray(char* buf_, unsigned long seqNo_)
162 {
163  size_t pos = AMPS_NUMBER_BUFFER_LEN;
164  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
165  {
166  if (seqNo_ > 0)
167  {
168  buf_[--pos] = (char)(seqNo_ % 10 + '0');
169  seqNo_ /= 10;
170  }
171  }
172  return pos;
173 }
174 #endif
175 
179 class Reason
180 {
181  public:
182  static const char* duplicate() { return "duplicate";}
183  static const char* badFilter() { return "bad filter";}
184  static const char* badRegexTopic() { return "bad regex topic";}
185  static const char* subscriptionAlreadyExists() { return "subscription already exists";}
186  static const char* nameInUse() { return "name in use";}
187  static const char* authFailure() { return "auth failure";}
188  static const char* notEntitled() { return "not entitled";}
189  static const char* authDisabled() { return "authentication disabled";}
190  static const char* subidInUse() { return "subid in use";}
191  static const char* noTopic() { return "no topic";}
192 };
193 
203 {
204 public:
205  virtual ~ExceptionListener() {;}
206  virtual void exceptionThrown(const std::exception&) const {;}
207 };
208 
210 
211 
212 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
213  try\
214  {\
215  x;\
216  }\
217  catch (std::exception& ex_)\
218  {\
219  try\
220  {\
221  _exceptionListener->exceptionThrown(ex_);\
222  }\
223  catch(...)\
224  {\
225  ;\
226  }\
227  }
228  /*
229  * Note : we don't attempt to trap non std::exception exceptions
230  * here because doing so interferes with pthread_exit on some OSes.
231  catch (...)\
232  {\
233  try\
234  {\
235  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
236  "An unhandled exception of unknown type was thrown by "\
237  "the registered handler.", AMPS_E_USAGE));\
238  }\
239  catch(...)\
240  {\
241  ;\
242  }\
243  }
244  */
245 #ifdef _WIN32
246 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
247  try\
248  {\
249  while(me->_connected)\
250  {\
251  try\
252  {\
253  x;\
254  break;\
255  }\
256  catch(MessageStreamFullException&)\
257  {\
258  me->checkAndSendHeartbeat(false);\
259  }\
260  }\
261  }\
262  catch (std::exception& ex_)\
263  {\
264  try\
265  {\
266  me->_exceptionListener->exceptionThrown(ex_);\
267  }\
268  catch(...)\
269  {\
270  ;\
271  }\
272  }
273  /*
274  * Note : we don't attempt to trap non std::exception exceptions
275  * here because doing so interferes with pthread_exit on some OSes.
276  catch (...)\
277  {\
278  try\
279  {\
280  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
281  "An unhandled exception of unknown type was thrown by "\
282  "the registered handler.", AMPS_E_USAGE));\
283  }\
284  catch(...)\
285  {\
286  ;\
287  }\
288  }*/
289 
290 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
291  while(me->_connected)\
292  {\
293  try\
294  {\
295  x;\
296  break;\
297  }\
298  catch(MessageStreamFullException&)\
299  {\
300  me->checkAndSendHeartbeat(false);\
301  }\
302  }
303 #else
304 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
305  try\
306  {\
307  while(me->_connected)\
308  {\
309  try\
310  {\
311  x;\
312  break;\
313  }\
314  catch(MessageStreamFullException& ex_)\
315  {\
316  me->checkAndSendHeartbeat(false);\
317  }\
318  }\
319  }\
320  catch (std::exception& ex_)\
321  {\
322  try\
323  {\
324  me->_exceptionListener->exceptionThrown(ex_);\
325  }\
326  catch(...)\
327  {\
328  ;\
329  }\
330  }
331  /*
332  * Note : we don't attempt to trap non std::exception exceptions
333  * here because doing so interferes with pthread_exit on some OSes.
334  catch (...)\
335  {\
336  try\
337  {\
338  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
339  "An unhandled exception of unknown type was thrown by "\
340  "the registered handler.", AMPS_E_USAGE));\
341  }\
342  catch(...)\
343  {\
344  ;\
345  }\
346  }*/
347 
348 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, x)\
349  while(me->_connected)\
350  {\
351  try\
352  {\
353  x;\
354  break;\
355  }\
356  catch(MessageStreamFullException& ex_)\
357  {\
358  me->checkAndSendHeartbeat(false);\
359  }\
360  }
361 #endif
362 
363 #define AMPS_UNHANDLED_EXCEPTION(ex) \
364  try\
365  {\
366  _exceptionListener->exceptionThrown(ex);\
367  }\
368  catch(...)\
369  {;}
370 
371 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
372  try\
373  {\
374  me->_exceptionListener->exceptionThrown(ex);\
375  }\
376  catch(...)\
377  {;}
378 
379 
380 class Client;
381 
406 
407 class Command
408 {
409  Message _message;
410  unsigned _timeout;
411  unsigned _batchSize;
412  unsigned _flags;
413  static const unsigned Subscribe = 1;
414  static const unsigned SOW = 2;
415  static const unsigned NeedsSequenceNumber = 4;
416  static const unsigned ProcessedAck = 8;
417  static const unsigned StatsAck = 16;
418  void init(Message::Command::Type command_)
419  {
420  _timeout = 0;
421  _batchSize = 0;
422  _flags = 0;
423  _message.reset();
424  _message.setCommandEnum(command_);
425  _setIds();
426  }
427  void init(const std::string& command_)
428  {
429  _timeout = 0;
430  _batchSize = 0;
431  _flags = 0;
432  _message.reset();
433  _message.setCommand(command_);
434  _setIds();
435  }
436  void _setIds(void)
437  {
438  Message::Command::Type command = _message.getCommandEnum();
439  if (!(command & Message::Command::NoDataCommands))
440  {
441  _message.newCommandId();
442  if (command == Message::Command::Subscribe ||
443  command == Message::Command::SOWAndSubscribe ||
444  command == Message::Command::DeltaSubscribe ||
445  command == Message::Command::SOWAndDeltaSubscribe)
446  {
447  _message.setSubscriptionId(_message.getCommandId());
448  _flags |= Subscribe;
449  }
450  if (command == Message::Command::SOW
451  || command == Message::Command::SOWAndSubscribe
452  || command == Message::Command::SOWAndDeltaSubscribe)
453  {
454  _message.setQueryID(_message.getCommandId());
455  if (_batchSize == 0)
456  {
457  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
458  }
459  if (command == Message::Command::SOW)
460  {
461  _flags |= SOW;
462  }
463  }
464  _flags |= ProcessedAck;
465  }
466  else if (command == Message::Command::SOWDelete)
467  {
468  _message.newCommandId();
469  _flags |= ProcessedAck;
470  _flags |= NeedsSequenceNumber;
471  }
472  else if (command == Message::Command::Publish
473  || command == Message::Command::DeltaPublish)
474  {
475  _flags |= NeedsSequenceNumber;
476  }
477  else if (command == Message::Command::StopTimer)
478  {
479  _message.newCommandId();
480  }
481  }
482 public:
486  Command(const std::string& command_)
487  {
488  init(command_);
489  }
493  Command(Message::Command::Type command_)
494  {
495  init(command_);
496  }
497 
501  Command& reset(const std::string& command_)
502  {
503  init(command_);
504  return *this;
505  }
509  Command& reset(Message::Command::Type command_)
510  {
511  init(command_);
512  return *this;
513  }
521  Command& setSowKey(const std::string& sowKey_) { _message.setSowKey(sowKey_); return *this; }
534  Command& setSowKeys(const std::string& sowKeys_) { _message.setSowKeys(sowKeys_); return *this; }
536  Command& setCommandId(const std::string& v_) { _message.setCommandId(v_); return *this; }
538  Command& setTopic(const std::string& v_) { _message.setTopic(v_); return *this; }
540  Command& setFilter(const std::string& v_) { _message.setFilter(v_); return *this; }
542  Command& setOrderBy(const std::string& v_) { _message.setOrderBy(v_); return *this; }
544  Command& setSubId(const std::string& v_) { _message.setSubscriptionId(v_); return *this; }
546  Command& setQueryId(const std::string& v_) { _message.setQueryId(v_); return *this; }
550  Command& setBookmark(const std::string& v_) { _message.setBookmark(v_); return *this; }
557  Command& setCorrelationId(const std::string& v_) { _message.setCorrelationId(v_); return *this; }
560  Command& setOptions(const std::string& v_) { _message.setOptions(v_); return *this; }
562  Command& setSequence(const std::string& v_) { _message.setSequence(v_); return *this; }
564  Command& setSequence(const amps_uint64_t v_)
565  {
566  std::ostringstream os;
567  os << v_;
568  _message.setSequence(os.str());
569  return *this;
570  }
571  amps_uint64_t getSequence() const { return amps_message_get_field_uint64(_message.getMessage(),AMPS_Sequence); }
574  Command& setData(const std::string& v_) { _message.setData(v_); return *this; }
578  Command& setData(const char* v_, size_t length_) { _message.setData(v_, length_); return *this; }
588  Command& setTimeout(unsigned v_) { _timeout = v_; return *this; }
590  Command& setTopN(unsigned v_) { _message.setTopNRecordsReturned(v_); return *this; }
595  Command& setBatchSize(unsigned v_) { _message.setBatchSize(v_); _batchSize = v_; return *this; }
606  Command& setExpiration(unsigned v_) { _message.setExpiration(v_); return *this; }
608  Command& addAckType(const std::string& v_)
609  {
610  _message.setAckType(_message.getAckType() + "," + v_);
611  if (v_ == "processed") _flags |= ProcessedAck;
612  else if (v_ == "stats") _flags |= StatsAck;
613  return *this;
614  }
616  Command& setAckType(const std::string& v_)
617  {
618  _message.setAckType(v_);
619  if (v_.find("processed") != std::string::npos) _flags |= ProcessedAck;
620  else _flags &= ~ProcessedAck;
621  if (v_.find("stats") != std::string::npos) _flags |= StatsAck;
622  else _flags &= ~StatsAck;
623  return *this;
624  }
626  Command& setAckType(unsigned v_)
627  {
628  _message.setAckTypeEnum(v_);
629  if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
630  else _flags &= ~ProcessedAck;
631  if (v_ & Message::AckType::Stats) _flags |= StatsAck;
632  else _flags &= ~StatsAck;
633  return *this;
634  }
636  std::string getAckType() const
637  {
638  return (std::string)(_message.getAckType());
639  }
641  unsigned getAckTypeEnum() const
642  {
643  return _message.getAckTypeEnum();
644  }
645 
646  Message& getMessage(void) { return _message; }
647  unsigned getTimeout(void) const { return _timeout; }
648  unsigned getBatchSize(void) const { return _batchSize; }
649  bool isSubscribe(void) const
650  {
651  return _flags & Subscribe;
652  }
653  bool isSow(void) const { return (_flags & SOW) != 0; }
654  bool hasProcessedAck(void) const { return (_flags & ProcessedAck) != 0; }
655  bool hasStatsAck(void) const { return (_flags & StatsAck) != 0; }
656  bool needsSequenceNumber(void) const { return (_flags & NeedsSequenceNumber) != 0; }
657 };
658 
661 typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
662 
663 class Message;
665 
669 {
670 public:
671  virtual ~Authenticator() {;}
672 
678  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
686  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
693  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
694 };
695 
700 {
701 public:
702  virtual ~DefaultAuthenticator() {;}
705  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
706  {
707  return password_;
708  }
709 
712  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
713  {
714  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
715  }
716 
717  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
718 
723  {
724  static DefaultAuthenticator d;
725  return d;
726  }
727 };
728 
732 {
733 public:
734 
738  virtual void execute(Message& message_) = 0;
739 
740  virtual ~StoreReplayer() {;}
741 };
742 
743 class Store;
744 
753 typedef bool (*PublishStoreResizeHandler)(Store store_,
754  size_t size_,
755  void* userData_);
756 
759 class StoreImpl : public RefBody
760 {
761 public:
762  StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
763 
768  virtual amps_uint64_t store(const Message& message_) = 0;
769 
774  virtual void discardUpTo(amps_uint64_t index_) = 0;
775 
780  virtual void replay(StoreReplayer& replayer_) = 0;
781 
789  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
790 
795  virtual size_t unpersistedCount() const = 0;
796 
797  virtual ~StoreImpl() {;}
798 
807  virtual void flush(long timeout_) = 0;
808 
811  static inline size_t getUnsetPosition() { return AMPS_UNSET_INDEX; }
812 
815  static inline amps_uint64_t getUnsetSequence() { return AMPS_UNSET_SEQUENCE; }
816 
820  virtual amps_uint64_t getLowestUnpersisted() const = 0;
821 
825  virtual amps_uint64_t getLastPersisted() = 0;
826 
836  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
837  void* userData_)
838  {
839  _resizeHandler = handler_;
840  _resizeHandlerData = userData_;
841  }
842 
843  inline virtual PublishStoreResizeHandler getResizeHandler() const
844  {
845  return _resizeHandler;
846  }
847 
848  bool callResizeHandler(size_t newSize_);
849 
850 private:
851  PublishStoreResizeHandler _resizeHandler;
852  void* _resizeHandlerData;
853 };
854 
857 class Store
858 {
859  RefHandle<StoreImpl> _body;
860 public:
861  Store() {;}
862  Store(StoreImpl* body_) : _body(body_) {;}
863  Store(const Store& rhs) : _body(rhs._body) {;}
864  Store& operator=(const Store& rhs)
865  {
866  _body = rhs._body;
867  return *this;
868  }
869 
873  amps_uint64_t store(const Message& message_)
874  {
875  return _body.get().store(message_);
876  }
877 
882  void discardUpTo(amps_uint64_t index_)
883  {
884  _body.get().discardUpTo(index_);
885  }
886 
891  void replay(StoreReplayer& replayer_)
892  {
893  _body.get().replay(replayer_);
894  }
895 
903  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
904  {
905  return _body.get().replaySingle(replayer_, index_);
906  }
907 
912  size_t unpersistedCount() const
913  {
914  return _body.get().unpersistedCount();
915  }
916 
920  bool isValid() const
921  {
922  return _body.isValid();
923  }
924 
933  void flush(long timeout_ = 0)
934  {
935  return _body.get().flush(timeout_);
936  }
937 
941  amps_uint64_t getLowestUnpersisted()
942  {
943  return _body.get().getLowestUnpersisted();
944  }
945 
949  amps_uint64_t getLastPersisted()
950  {
951  return _body.get().getLastPersisted();
952  }
953 
963  void setResizeHandler(PublishStoreResizeHandler handler_,
964  void* userData_)
965  {
966  _body.get().setResizeHandler(handler_, userData_);
967  }
968 
969  PublishStoreResizeHandler getResizeHandler()
970  {
971  return _body.get().getResizeHandler();
972  }
973 
977  StoreImpl* get()
978  {
979  if (_body.isValid())
980  return &_body.get();
981  else
982  return NULL;
983  }
984 
985 };
986 
992 {
993 public:
994  virtual ~FailedWriteHandler() {;}
1001  virtual void failedWrite(const Message& message_,
1002  const char* reason_, size_t reasonLength_) = 0;
1003 };
1004 
1005 
1006 inline bool StoreImpl::callResizeHandler(size_t newSize_)
1007 {
1008  if(_resizeHandler)
1009  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
1010  return true;
1011 }
1012 
1019 inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
1020  void* data_)
1021 {
1022  long* timeoutp = (long*)data_;
1023  size_t count = store_.unpersistedCount();
1024  if (count == 0) return false;
1025  try
1026  {
1027  store_.flush(*timeoutp);
1028  }
1029 #ifdef _WIN32
1030  catch (const TimedOutException&)
1031 #else
1032  catch (const TimedOutException& e)
1033 #endif
1034  {
1035  return true;
1036  }
1037  return (count == store_.unpersistedCount());
1038 }
1039 
1044 {
1045 public:
1046  virtual ~SubscriptionManager() {;}
1054  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1055  unsigned requestedAckTypes_) = 0;
1059  virtual void unsubscribe(const Message::Field& subId_) = 0;
1062  virtual void clear() = 0;
1066  virtual void resubscribe(Client& client_) = 0;
1067 };
1068 
1072 
1074 {
1075 public:
1077  typedef enum { Disconnected = 0,
1078  Shutdown = 1,
1079  Connected = 2,
1080  LoggedOn = 4,
1081  PublishReplayed = 8,
1082  HeartbeatInitiated = 16,
1083  Resubscribed = 32,
1084  UNKNOWN = 16384
1085  } State;
1086 
1096  virtual void connectionStateChanged(State newState_) = 0;
1097  virtual ~ConnectionStateListener() {;};
1098 };
1099 
1100 
1101 class MessageStreamImpl;
1102 class MessageStream;
1103 
1104 typedef void(*DeferredExecutionFunc)(void*);
1105 
1106 class ClientImpl : public RefBody // -V553
1107 {
1108  friend class Client;
1109 protected:
1110  amps_handle _client;
1111  DisconnectHandler _disconnectHandler;
1112  enum GlobalCommandTypeHandlers : size_t
1113  {
1114  Publish = 0,
1115  SOW = 1,
1116  GroupBegin = 2,
1117  GroupEnd = 3,
1118  Heartbeat = 4,
1119  OOF = 5,
1120  Ack = 6,
1121  LastChance = 7,
1122  DuplicateMessage = 8,
1123  COUNT = 9
1124  };
1125  std::vector<MessageHandler> _globalCommandTypeHandlers;
1126  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1127  MessageRouter _routes;
1128  MessageRouter::RouteCache _routeCache;
1129  mutable Mutex _lock;
1130  std::string _name, _nameHash, _lastUri, _logonCorrelationData;
1131  BookmarkStore _bookmarkStore;
1132  Store _publishStore;
1133  bool _isRetryOnDisconnect;
1134  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1135  volatile amps_uint64_t _lastSentHaSequenceNumber;
1136  ATOMIC_TYPE_8 _badTimeToHAPublish;
1137  ATOMIC_TYPE_8 _badTimeToHASubscribe;
1138  VersionInfo _serverVersion;
1139  Timer _heartbeatTimer;
1140  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1141 
1142  // queue data
1143  int _queueAckTimeout;
1144  bool _isAutoAckEnabled;
1145  unsigned _ackBatchSize;
1146  unsigned _queuedAckCount;
1147  unsigned _defaultMaxDepth;
1148  struct QueueBookmarks
1149  {
1150  QueueBookmarks(const std::string& topic_)
1151  :_topic(topic_)
1152  ,_oldestTime(0)
1153  ,_bookmarkCount(0)
1154  {;}
1155  std::string _topic;
1156  std::string _data;
1157  amps_uint64_t _oldestTime;
1158  unsigned _bookmarkCount;
1159  };
1160  typedef amps_uint64_t topic_hash;
1161  typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1162  TopicHashMap _topicHashMap;
1163 
1164  class ClientStoreReplayer : public StoreReplayer
1165  {
1166  ClientImpl* _client;
1167  public:
1168  unsigned _version;
1169  amps_result _res;
1170 
1171  ClientStoreReplayer()
1172  : _client(NULL) , _version(0), _res(AMPS_E_OK)
1173  {}
1174 
1175  ClientStoreReplayer(ClientImpl* client_)
1176  : _client(client_) , _version(0), _res(AMPS_E_OK)
1177  {}
1178 
1179  void setClient(ClientImpl* client_) { _client = client_; }
1180 
1181  void execute(Message& message_)
1182  {
1183  if (!_client) throw CommandException("Can't replay without a client.");
1184  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1185  AMPS_Sequence);
1186  if (index > _client->_lastSentHaSequenceNumber)
1187  _client->_lastSentHaSequenceNumber = index;
1188 
1189  _res = AMPS_E_OK;
1190  // Don't replay a queue cancel message after a reconnect.
1191  // Currently, the only messages that will have anything in options
1192  // are cancel messages.
1193  if (!message_.getCommand().empty() &&
1194  (!_client->_badTimeToHAPublish ||
1195  message_.getOptions().len() < 6))
1196  {
1197  _res= amps_client_send_with_version(_client->_client,
1198  message_.getMessage(),
1199  &_version);
1200  if (_res != AMPS_E_OK)
1201  {
1202  throw DisconnectedException("AMPS Server disconnected during replay");
1203  }
1204  }
1205  }
1206 
1207  };
1208  ClientStoreReplayer _replayer;
1209 
1210  class FailedWriteStoreReplayer : public StoreReplayer
1211  {
1212  ClientImpl* _parent;
1213  const char* _reason;
1214  size_t _reasonLength;
1215  size_t _replayCount;
1216  public:
1217  FailedWriteStoreReplayer(ClientImpl* parent,const char* reason_, size_t reasonLength_)
1218  : _parent(parent),
1219  _reason(reason_),
1220  _reasonLength(reasonLength_),
1221  _replayCount(0)
1222  {;}
1223  void execute(Message& message_)
1224  {
1225  if (_parent->_failedWriteHandler)
1226  {
1227  ++_replayCount;
1228  _parent->_failedWriteHandler->failedWrite(message_,
1229  _reason, _reasonLength);
1230  }
1231  }
1232  size_t replayCount(void) const { return _replayCount; }
1233  };
1234 
1235  struct AckResponseImpl : public RefBody
1236  {
1237  std::string username, password, reason, status, bookmark, options;
1238  amps_uint64_t sequenceNo;
1239  VersionInfo serverVersion;
1240  volatile bool responded, abandoned;
1241  unsigned connectionVersion;
1242  AckResponseImpl() :
1243  RefBody(),
1244  sequenceNo((amps_uint64_t)0),
1245  serverVersion(),
1246  responded(false),
1247  abandoned(false),
1248  connectionVersion(0)
1249  {
1250  }
1251  };
1252 
1253  class AckResponse
1254  {
1255  RefHandle<AckResponseImpl> _body;
1256  public:
1257  AckResponse() : _body(NULL) {;}
1258  AckResponse(const AckResponse& rhs) : _body(rhs._body) {;}
1259  static AckResponse create()
1260  {
1261  AckResponse r;
1262  r._body = new AckResponseImpl();
1263  return r;
1264  }
1265 
1266  const std::string& username()
1267  {
1268  return _body.get().username;
1269  }
1270  void setUsername(const char* data_, size_t len_)
1271  {
1272  if (data_) _body.get().username.assign(data_, len_);
1273  else _body.get().username.clear();
1274  }
1275  const std::string& password()
1276  {
1277  return _body.get().password;
1278  }
1279  void setPassword(const char* data_, size_t len_)
1280  {
1281  if (data_) _body.get().password.assign(data_, len_);
1282  else _body.get().password.clear();
1283  }
1284  const std::string& reason()
1285  {
1286  return _body.get().reason;
1287  }
1288  void setReason(const char* data_, size_t len_)
1289  {
1290  if (data_) _body.get().reason.assign(data_, len_);
1291  else _body.get().reason.clear();
1292  }
1293  const std::string& status()
1294  {
1295  return _body.get().status;
1296  }
1297  void setStatus(const char* data_, size_t len_)
1298  {
1299  if (data_) _body.get().status.assign(data_, len_);
1300  else _body.get().status.clear();
1301  }
1302  const std::string& bookmark()
1303  {
1304  return _body.get().bookmark;
1305  }
1306  void setBookmark(const char* data_, size_t len_)
1307  {
1308  if (data_) _body.get().bookmark.assign(data_, len_);
1309  else _body.get().bookmark.clear();
1310  }
1311  amps_uint64_t sequenceNo() const
1312  {
1313  return _body.get().sequenceNo;
1314  }
1315  void setSequenceNo(const char* data_, size_t len_)
1316  {
1317  amps_uint64_t result = (amps_uint64_t)0;
1318  if (data_)
1319  {
1320  for(size_t i=0; i<len_; ++i)
1321  {
1322  result *= (amps_uint64_t)10;
1323  result += (amps_uint64_t)(data_[i] - '0');
1324  }
1325  }
1326  _body.get().sequenceNo = result;
1327  }
1328  VersionInfo serverVersion() const
1329  {
1330  return _body.get().serverVersion;
1331  }
1332  void setServerVersion(const char* data_, size_t len_)
1333  {
1334  if (data_)
1335  _body.get().serverVersion.setVersion(std::string(data_, len_));
1336  }
1337  bool responded()
1338  {
1339  return _body.get().responded;
1340  }
1341  void setResponded(bool responded_)
1342  {
1343  _body.get().responded = responded_;
1344  }
1345  bool abandoned()
1346  {
1347  return _body.get().abandoned;
1348  }
1349  void setAbandoned(bool abandoned_)
1350  {
1351  if (_body.isValid())
1352  _body.get().abandoned = abandoned_;
1353  }
1354 
1355  void setConnectionVersion(unsigned connectionVersion)
1356  {
1357  _body.get().connectionVersion = connectionVersion;
1358  }
1359 
1360  unsigned getConnectionVersion()
1361  {
1362  return _body.get().connectionVersion;
1363  }
1364  void setOptions(const char* data_, size_t len_)
1365  {
1366  if (data_) _body.get().options.assign(data_,len_);
1367  else _body.get().options.clear();
1368  }
1369 
1370  const std::string& options()
1371  {
1372  return _body.get().options;
1373  }
1374 
1375  AckResponse& operator=(const AckResponse& rhs)
1376  {
1377  _body = rhs._body;
1378  return *this;
1379  }
1380  };
1381 
1382 
1383  typedef std::map<std::string, AckResponse> AckMap;
1384  AckMap _ackMap;
1385  Mutex _ackMapLock;
1386  DefaultExceptionListener _defaultExceptionListener;
1387 protected:
1388 
1389  struct DeferredExecutionRequest
1390  {
1391  DeferredExecutionRequest(DeferredExecutionFunc func_,
1392  void* userData_)
1393  : _func(func_),
1394  _userData(userData_)
1395  {;}
1396 
1397  DeferredExecutionFunc _func;
1398  void* _userData;
1399  };
1400  const ExceptionListener* _exceptionListener;
1401  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1402  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1403  bool _connected;
1404  std::string _username;
1405  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1406  ConnectionStateListeners _connectionStateListeners;
1407  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1408  Mutex _deferredExecutionLock;
1409  DeferredExecutionList _deferredExecutionList;
1410  unsigned _heartbeatInterval;
1411  unsigned _readTimeout;
1412 
1413  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1414  {
1415  // If we disconnectd before we got to notification, don't notify.
1416  // This should only be able to happen for Resubscribed, since the lock
1417  // is released to let the subscription manager run resubscribe so a
1418  // disconnect could be called before the change is broadcast.
1419  if (!_connected && newState_ > ConnectionStateListener::Connected)
1420  {
1421  return;
1422  }
1423  for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1424  {
1425  AMPS_CALL_EXCEPTION_WRAPPER(
1426  (*it)->connectionStateChanged(newState_));
1427  }
1428  }
1429  unsigned processedAck(Message& message);
1430  unsigned persistedAck(Message& meesage);
1431  void lastChance(Message& message);
1432  void checkAndSendHeartbeat(bool force=false);
1433  virtual ConnectionInfo getConnectionInfo() const;
1434  static amps_result
1435  ClientImplMessageHandler(amps_handle message, void* userData);
1436  static void
1437  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1438  static amps_result
1439  ClientImplDisconnectHandler(amps_handle client, void* userData);
1440 
1441  void unsubscribeInternal(const std::string& id)
1442  {
1443  if (id.empty()) return;
1444  // remove the handler first to avoid any more message delivery
1445  Message::Field subId;
1446  subId.assign(id.data(), id.length());
1447  _routes.removeRoute(subId);
1448  // Lock is already acquired
1449  if (_subscriptionManager)
1450  {
1451  // Have to unlock before calling into sub manager to avoid deadlock
1452  Unlock<Mutex> unlock(_lock);
1453  _subscriptionManager->unsubscribe(subId);
1454  }
1455  _message.reset();
1456  _message.setCommandEnum(Message::Command::Unsubscribe);
1457  _message.newCommandId();
1458  _message.setSubscriptionId(id);
1459  _sendWithoutRetry(_message);
1460  deferredExecution(&amps_noOpFn, NULL);
1461  }
1462 
1463  AckResponse syncAckProcessing(long timeout_, Message& message_,
1464  bool isHASubscribe_)
1465  {
1466  return syncAckProcessing(timeout_, message_,
1467  (amps_uint64_t)0, isHASubscribe_);
1468  }
1469 
1470  AckResponse syncAckProcessing(long timeout_, Message& message_,
1471  amps_uint64_t haSeq = (amps_uint64_t)0,
1472  bool isHASubscribe_ = false)
1473  {
1474  // inv: we already have _lock locked up.
1475  AckResponse ack = AckResponse::create();
1476  if (1)
1477  {
1478  Lock<Mutex> guard(_ackMapLock);
1479  _ackMap[message_.getCommandId()] = ack;
1480  }
1481  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
1482  if (ack.getConnectionVersion() == 0)
1483  {
1484  // Send failed
1485  throw DisconnectedException("Connection closed while waiting for response.");
1486  }
1487  bool timedOut = false;
1488  AMPS_START_TIMER(timeout_)
1489  while(!timedOut && !ack.responded() && !ack.abandoned())
1490  {
1491  if (timeout_)
1492  {
1493  timedOut = !_lock.wait(timeout_);
1494  // May have woken up early, check real time
1495  if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1496  }
1497  else
1498  {
1499  // Using a timeout version to ensure python can interrupt
1500  _lock.wait(1000);
1501  Unlock<Mutex> unlck(_lock);
1502  amps_invoke_waiting_function();
1503  }
1504  }
1505  if (ack.responded())
1506  {
1507  if (ack.status() != "failure")
1508  {
1509  if (message_.getCommand() == "logon")
1510  {
1511  amps_uint64_t ackSequence = ack.sequenceNo();
1512  if (_lastSentHaSequenceNumber < ackSequence)
1513  {
1514  _lastSentHaSequenceNumber = ackSequence;
1515  }
1516  if (_publishStore.isValid())
1517  {
1518  _publishStore.discardUpTo(ackSequence);
1519  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
1520  {
1521  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
1522  }
1523  }
1524  _nameHash = ack.bookmark().substr(0, ack.bookmark().find('|'));
1525  _serverVersion = ack.serverVersion();
1526  if (_bookmarkStore.isValid())
1527  _bookmarkStore.setServerVersion(_serverVersion);
1528  }
1529  if(_ackBatchSize)
1530  {
1531  const std::string& options = ack.options();
1532  size_t index = options.find_first_of("max_backlog=");
1533  if(index != std::string::npos)
1534  {
1535  unsigned data =0;
1536  const char* c = options.c_str()+index+12;
1537  while(*c && *c!=',')
1538  {
1539  data = (data*10) + (unsigned)(*c++-48);
1540  }
1541  if(_ackBatchSize > data) _ackBatchSize = data;
1542  }
1543  }
1544  return ack;
1545  }
1546  const size_t NotEntitled = 12;
1547  std::string ackReason = ack.reason();
1548  if (ackReason.length() == 0) return ack; // none
1549  if (ackReason.length() == NotEntitled &&
1550  ackReason[0] == 'n' &&
1551  message_.getUserId().len() == 0)
1552  {
1553  message_.assignUserId(_username);
1554  }
1555  message_.throwFor(_client, ackReason);
1556  }
1557  else // !ack.responded()
1558  {
1559  if (!ack.abandoned())
1560  {
1561  throw TimedOutException("timed out waiting for operation.");
1562  }
1563  else
1564  {
1565  throw DisconnectedException("Connection closed while waiting for response.");
1566  }
1567  }
1568  return ack;
1569  }
1570 
1571  void _cleanup(void)
1572  {
1573  if (!_client) return;
1575  NULL,
1576  0L);
1578  NULL,
1579  0L);
1580  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1581  _pEmptyMessageStream.reset(NULL);
1582  amps_client_destroy(_client);
1583  _client = NULL;
1584  }
1585 
1586 public:
1587 
1588  ClientImpl(const std::string& clientName)
1589  : _client(NULL), _name(clientName)
1590  , _isRetryOnDisconnect(true)
1591  , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1592  , _badTimeToHASubscribe(0), _serverVersion()
1593  , _queueAckTimeout(AMPS_DEFAULT_QUEUE_ACK_TIMEOUT)
1594  , _isAutoAckEnabled(false)
1595  , _ackBatchSize(0)
1596  , _queuedAckCount(0)
1597  , _defaultMaxDepth(0)
1598  , _connected(false)
1599  , _heartbeatInterval(0)
1600  , _readTimeout(0)
1601  {
1602  _replayer.setClient(this);
1603  _client = amps_client_create(clientName.c_str());
1604  amps_client_set_message_handler(_client, (amps_handler)ClientImpl::ClientImplMessageHandler, this);
1605  amps_client_set_predisconnect_handler(_client, (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler, this);
1606  amps_client_set_disconnect_handler(_client, (amps_handler)ClientImpl::ClientImplDisconnectHandler, this);
1607  _exceptionListener = &_defaultExceptionListener;
1608  for (size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1609  {
1610  _globalCommandTypeHandlers.push_back(MessageHandler());
1611  }
1612  }
1613 
1614  virtual ~ClientImpl()
1615  {
1616  _cleanup();
1617  }
1618 
1619  const std::string& getName() const
1620  {
1621  return _name;
1622  }
1623 
1624  const std::string& getNameHash() const
1625  {
1626  return _nameHash;
1627  }
1628 
1629  void setName(const std::string& name)
1630  {
1631  // This operation will fail if the client's
1632  // name is already set.
1634  _client, name.c_str());
1635  if (result != AMPS_E_OK)
1636  {
1637  AMPSException::throwFor(_client, result);
1638  }
1639  _name = name;
1640  }
1641 
1642  const std::string& getLogonCorrelationData() const
1643  {
1644  return _logonCorrelationData;
1645  }
1646 
1647  void setLogonCorrelationData(const std::string& logonCorrelationData_)
1648  {
1649  _logonCorrelationData = logonCorrelationData_;
1650  }
1651 
1652  size_t getServerVersion() const
1653  {
1654  return _serverVersion.getOldStyleVersion();
1655  }
1656 
1657  VersionInfo getServerVersionInfo() const
1658  {
1659  return _serverVersion;
1660  }
1661 
1662  const std::string& getURI() const
1663  {
1664  return _lastUri;
1665  }
1666 
1667  virtual void connect(const std::string& uri)
1668  {
1669  Lock<Mutex> l(_lock);
1670  _connect(uri);
1671  }
1672 
1673  virtual void _connect(const std::string& uri)
1674  {
1675  _lastUri = uri;
1677  _client, uri.c_str());
1678  if (result != AMPS_E_OK)
1679  {
1680  AMPSException::throwFor(_client, result);
1681  }
1682  _message.reset();
1683  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
1684  _publishMessage.setCommandEnum(Message::Command::Publish);
1685  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
1686  _beatMessage.setOptions("beat");
1687  _readMessage.setClientImpl(this);
1688  if(_queueAckTimeout)
1689  {
1690  amps_client_set_idle_time(_client,_queueAckTimeout);
1691  }
1692  _connected = true;
1693  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1694  }
1695 
1696  void setDisconnected()
1697  {
1698  {
1699  Lock<Mutex> l(_lock);
1700  if (_connected)
1701  {
1702  AMPS_CALL_EXCEPTION_WRAPPER(broadcastConnectionStateChanged(ConnectionStateListener::Disconnected));
1703  }
1704  _connected = false;
1705  _routes.clear();
1706  _heartbeatTimer.setTimeout(0.0);
1707  }
1708  clearAcks(INT_MAX);
1709  amps_client_disconnect(_client);
1710  }
1711 
1712  virtual void disconnect()
1713  {
1714  {
1715  Lock<Mutex> l(_lock);
1716  _message.reset();
1717  _message.setCommandEnum(Message::Command::Unsubscribe);
1718  _message.newCommandId();
1719  _message.setSubscriptionId("all");
1720  AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1721  }
1722  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1723  setDisconnected();
1724  AMPS_CALL_EXCEPTION_WRAPPER(processDeferredExecutions());
1725  Lock<Mutex> l(_lock);
1726  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1727  }
1728 
1729  void clearAcks(unsigned failedVersion)
1730  {
1731  // Have to lock to prevent race conditions
1732  Lock<Mutex> guard(_ackMapLock);
1733  {
1734  // Go ahead and signal any waiters if they are around...
1735  std::vector<std::string> worklist;
1736  for(AckMap::iterator i = _ackMap.begin(), e = _ackMap.end(); i != e; ++i)
1737  {
1738  if (i->second.getConnectionVersion() <= failedVersion)
1739  {
1740  i->second.setAbandoned(true);
1741  worklist.push_back(i->first);
1742  }
1743  }
1744 
1745  for(std::vector<std::string>::iterator j = worklist.begin(), e = worklist.end(); j != e; ++j)
1746  {
1747  _ackMap.erase(*j);
1748  }
1749  }
1750 
1751  _lock.signalAll();
1752  }
1753 
1754  int send(const Message& message)
1755  {
1756  Lock<Mutex> l(_lock);
1757  return _send(message);
1758  }
1759 
1760  void sendWithoutRetry(const Message& message_)
1761  {
1762  Lock<Mutex> l(_lock);
1763  _sendWithoutRetry(message_);
1764  }
1765 
1766  void _sendWithoutRetry(const Message& message_)
1767  {
1768  amps_result result = amps_client_send(_client, message_.getMessage());
1769  if(result != AMPS_E_OK)
1770  {
1771  AMPSException::throwFor(_client,result);
1772  }
1773  }
1774 
1775  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1776  bool isHASubscribe_ = false)
1777  {
1778  // Lock is already acquired
1779  amps_result result = AMPS_E_RETRY;
1780 
1781  // Create a local reference to this message, as we'll need to hold on
1782  // to a reference to it in case reconnect occurs.
1783  Message localMessage = message;
1784  unsigned version = 0;
1785 
1786  while(result == AMPS_E_RETRY)
1787  {
1788  if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1789  {
1790  // If retrySend is disabled, do not wait for the reconnect
1791  // to finish, just throw.
1792  if(!_isRetryOnDisconnect)
1793  {
1794  AMPSException::throwFor(_client,AMPS_E_RETRY);
1795  }
1796  Unlock<Mutex> l(_lock);
1797 #ifdef _WIN32
1798  Sleep(0);
1799 #elif defined(sun)
1800  sched_yield();
1801 #else
1802  pthread_yield();
1803 #endif
1804  }
1805  else
1806  {
1807  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1808  (isHASubscribe_ && _badTimeToHASubscribe != 0))
1809  {
1810  return (int)version;
1811  }
1812  // It's possible to get here out of order, but this way we'll
1813  // always send in order.
1814  if (haSeq > _lastSentHaSequenceNumber)
1815  {
1816  while (haSeq > _lastSentHaSequenceNumber + 1)
1817  {
1818  try
1819  {
1820  // Replayer updates _lastSentHaSsequenceNumber
1821  if (!_publishStore.replaySingle(_replayer,
1822  _lastSentHaSequenceNumber+1))
1823  {
1824  //++_lastSentHaSequenceNumber;
1825  continue;
1826  }
1827  result = AMPS_E_OK;
1828  version = _replayer._version;
1829  }
1830  #ifdef _WIN32
1831  catch(const DisconnectedException&)
1832  #else
1833  catch(const DisconnectedException& e)
1834  #endif
1835  {
1836  result = _replayer._res;
1837  break;
1838  }
1839  }
1840  result = amps_client_send_with_version(_client,
1841  localMessage.getMessage(),
1842  &version);
1843  ++_lastSentHaSequenceNumber;
1844  }
1845  else
1846  result = amps_client_send_with_version(_client,
1847  localMessage.getMessage(),
1848  &version);
1849  if (result != AMPS_E_OK)
1850  {
1851  if (!isHASubscribe_ && !haSeq &&
1852  localMessage.getMessage() == message.getMessage())
1853  {
1854  localMessage = message.deepCopy();
1855  }
1856  if(_isRetryOnDisconnect)
1857  {
1858  Unlock<Mutex> u(_lock);
1859  result = amps_client_attempt_reconnect(_client, version);
1860  // If this is an HA publish or subscrbie command, it was
1861  // stored first and will have already been replayed by the
1862  // store or sub manager after reconnect, so just return.
1863  if ((isHASubscribe_ || haSeq) &&
1864  result == AMPS_E_RETRY)
1865  {
1866  return (int)version;
1867  }
1868  }
1869  else
1870  {
1871  // retrySend is disabled so throw the error
1872  // from the send as an exception, do not retry.
1873  AMPSException::throwFor(_client, result);
1874  }
1875  }
1876  }
1877  if (result == AMPS_E_RETRY)
1878  {
1879  amps_invoke_waiting_function();
1880  }
1881  }
1882 
1883  if (result != AMPS_E_OK) AMPSException::throwFor(_client, result);
1884  return (int)version;
1885  }
1886 
1887  void addMessageHandler(const Field& commandId_,
1888  const AMPS::MessageHandler& messageHandler_,
1889  unsigned requestedAcks_, bool isSubscribe_)
1890  {
1891  Lock<Mutex> lock(_lock);
1892  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1893  0, isSubscribe_);
1894  }
1895 
1896  bool removeMessageHandler(const Field& commandId_)
1897  {
1898  Lock<Mutex> lock(_lock);
1899  return _routes.removeRoute(commandId_);
1900  }
1901 
1902  std::string send(const MessageHandler& messageHandler_, Message& message_, int timeout_ = 0)
1903  {
1904  Field id = message_.getCommandId();
1905  Field subId = message_.getSubscriptionId();
1906  Field qid = message_.getQueryId();
1907  bool isSubscribe = false;
1908  bool isSubscribeOnly = false;
1909  bool replace = false;
1910  unsigned requestedAcks = message_.getAckTypeEnum();
1911  unsigned systemAddedAcks = Message::AckType::None;
1912 
1913  switch(message_.getCommandEnum())
1914  {
1915  case Message::Command::Subscribe:
1916  case Message::Command::DeltaSubscribe:
1917  replace = message_.getOptions().operator std::string().find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos;
1918  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
1919  {
1920  systemAddedAcks |= Message::AckType::Persisted;
1921  }
1922  isSubscribeOnly = true;
1923  // fall through
1924  case Message::Command::SOWAndSubscribe:
1925  case Message::Command::SOWAndDeltaSubscribe:
1926  if (id.empty())
1927  {
1928  id = message_.newCommandId().getCommandId();
1929  }
1930  else
1931  {
1932  while (!replace && id != subId && _routes.hasRoute(id))
1933  {
1934  id = message_.newCommandId().getCommandId();
1935  }
1936  }
1937  if (subId.empty())
1938  {
1939  message_.setSubscriptionId(id);
1940  subId = id;
1941  }
1942  isSubscribe = true;
1943  // fall through
1944  case Message::Command::SOW:
1945  if(id.empty())
1946  {
1947  id = message_.newCommandId().getCommandId();
1948  }
1949  else
1950  {
1951  while (!replace && id != subId && _routes.hasRoute(id))
1952  {
1953  message_.newCommandId();
1954  if (qid == id)
1955  {
1956  qid = message_.getCommandId();
1957  message_.setQueryId(qid);
1958  }
1959  id = message_.getCommandId();
1960  }
1961  }
1962  if (!isSubscribeOnly)
1963  {
1964  if (qid.empty())
1965  {
1966  message_.setQueryID(id);
1967  qid = id;
1968  }
1969  else
1970  {
1971  while (!replace && qid != subId && qid != id
1972  && _routes.hasRoute(qid))
1973  {
1974  qid = message_.newQueryId().getQueryId();
1975  }
1976  }
1977  }
1978  systemAddedAcks |= Message::AckType::Processed;
1979  // for SOW only, we get a completed ack so we know when to remove the handler.
1980  if (!isSubscribeOnly) systemAddedAcks |= Message::AckType::Completed;
1981  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
1982  {
1983  int routesAdded = 0;
1984  Lock<Mutex> l(_lock);
1985  if (!subId.empty() && messageHandler_.isValid())
1986  {
1987  if (!_routes.hasRoute(subId))
1988  {
1989  ++routesAdded;
1990  }
1991  // This can replace a non-subscribe with a matching id
1992  // with a subscription but not another subscription.
1993  _routes.addRoute(subId, messageHandler_, requestedAcks,
1994  systemAddedAcks, isSubscribe);
1995  }
1996  if (!isSubscribeOnly && !qid.empty()
1997  && messageHandler_.isValid() && qid != subId)
1998  {
1999  if (routesAdded == 0)
2000  {
2001  _routes.addRoute(qid, messageHandler_,
2002  requestedAcks, systemAddedAcks, false);
2003  }
2004  else
2005  {
2006  void* data = NULL;
2007  {
2008  Unlock<Mutex> u(_lock);
2009  data = amps_invoke_copy_route_function(
2010  messageHandler_.userData());
2011  }
2012  if (!data)
2013  {
2014  _routes.addRoute(qid, messageHandler_, requestedAcks,
2015  systemAddedAcks, false);
2016  }
2017  else
2018  {
2019  _routes.addRoute(qid,
2020  MessageHandler(messageHandler_.function(),
2021  data),
2022  requestedAcks, systemAddedAcks, false);
2023  }
2024  }
2025  ++routesAdded;
2026  }
2027  if (!id.empty() && messageHandler_.isValid()
2028  && requestedAcks & ~Message::AckType::Persisted
2029  && id != subId && id != qid)
2030  {
2031  if (routesAdded == 0)
2032  {
2033  _routes.addRoute(id, messageHandler_, requestedAcks,
2034  systemAddedAcks, false);
2035  }
2036  else
2037  {
2038  void* data = NULL;
2039  {
2040  Unlock<Mutex> u(_lock);
2041  data = amps_invoke_copy_route_function(
2042  messageHandler_.userData());
2043  }
2044  if (!data)
2045  {
2046  _routes.addRoute(id, messageHandler_, requestedAcks,
2047  systemAddedAcks, false);
2048  }
2049  else
2050  {
2051  _routes.addRoute(id,
2052  MessageHandler(messageHandler_.function(),
2053  data),
2054  requestedAcks,
2055  systemAddedAcks, false);
2056  }
2057  }
2058  ++routesAdded;
2059  }
2060  try
2061  {
2062  // We aren't adding to subscription manager, so this isn't
2063  // an HA subscribe.
2064  syncAckProcessing(timeout_, message_, 0, false);
2065  message_.setAckTypeEnum(requestedAcks);
2066  }
2067  catch (...)
2068  {
2069  _routes.removeRoute(message_.getQueryID());
2070  _routes.removeRoute(message_.getSubscriptionId());
2071  _routes.removeRoute(id);
2072  message_.setAckTypeEnum(requestedAcks);
2073  throw;
2074  }
2075  }
2076  break;
2077  // These are valid commands that are used as-is
2078  case Message::Command::Unsubscribe:
2079  case Message::Command::Heartbeat:
2080  case Message::Command::Logon:
2081  case Message::Command::StartTimer:
2082  case Message::Command::StopTimer:
2083  case Message::Command::DeltaPublish:
2084  case Message::Command::Publish:
2085  case Message::Command::SOWDelete:
2086  {
2087  Lock<Mutex> l(_lock);
2088  // if an ack is requested, it'll need a command ID.
2089  if (message_.getAckTypeEnum() != Message::AckType::None)
2090  {
2091  if (id.empty())
2092  {
2093  message_.newCommandId();
2094  id = message_.getCommandId();
2095  }
2096  if (messageHandler_.isValid())
2097  {
2098  _routes.addRoute(id, messageHandler_, requestedAcks,
2099  Message::AckType::None, false);
2100  }
2101  }
2102  _send(message_);
2103  }
2104  break;
2105  // These are things that shouldn't be sent (not meaningful)
2106  case Message::Command::GroupBegin:
2107  case Message::Command::GroupEnd:
2108  case Message::Command::OOF:
2109  case Message::Command::Ack:
2110  case Message::Command::Unknown:
2111  default:
2112  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
2113  }
2114  message_.setAckTypeEnum(requestedAcks);
2115  return id;
2116  }
2117 
2118  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
2119  {
2120  Lock<Mutex> l(_lock);
2121  _disconnectHandler = disconnectHandler;
2122  }
2123 
2124  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
2125  {
2126  switch (command_[0])
2127  {
2128 #if 0 // Not currently implemented to avoid an extra branch in delivery
2129  case 'p':
2130  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2131  break;
2132  case 's':
2133  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2134  break;
2135 #endif
2136  case 'h':
2137  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2138  break;
2139 #if 0 // Not currently implemented to avoid an extra branch in delivery
2140  case 'g':
2141  if (command_[6] == 'b')
2142  {
2143  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2144  }
2145  else if (command_[6] == 'e')
2146  {
2147  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2148  }
2149  else
2150  {
2151  std::ostringstream os;
2152  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2153  throw CommandException(os.str());
2154  }
2155  break;
2156  case 'o':
2157  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2158  break;
2159 #endif
2160  case 'a':
2161  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2162  break;
2163  case 'l':
2164  case 'L':
2165  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
2166  break;
2167  case 'd':
2168  case 'D':
2169  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
2170  break;
2171  default:
2172  std::ostringstream os;
2173  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
2174  throw CommandException(os.str());
2175  break;
2176  }
2177  }
2178 
2179  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
2180  {
2181  switch (command_)
2182  {
2183 #if 0 // Not currently implemented to avoid an extra branch in delivery
2184  case Message::Command::Publish:
2185  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2186  break;
2187  case Message::Command::SOW:
2188  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2189  break;
2190 #endif
2191  case Message::Command::Heartbeat:
2192  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2193  break;
2194 #if 0 // Not currently implemented to avoid an extra branch in delivery
2195  case Message::Command::GroupBegin:
2196  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2197  break;
2198  case Message::Command::GroupEnd:
2199  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2200  break;
2201  case Message::Command::OOF:
2202  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2203  break;
2204 #endif
2205  case Message::Command::Ack:
2206  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2207  break;
2208  default:
2209  unsigned bits = 0;
2210  unsigned command = command_;
2211  while (command > 0) { ++bits; command >>= 1; }
2212  char errBuf[128];
2213  AMPS_snprintf(errBuf, sizeof(errBuf),
2214  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2215  CommandConstants<0>::Lengths[bits],
2216  CommandConstants<0>::Values[bits]);
2217  throw CommandException(errBuf);
2218  break;
2219  }
2220  }
2221 
2222  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2223  {
2224  _globalCommandTypeHandlers[handlerType_] = handler_;
2225  }
2226 
2227  void setFailedWriteHandler(FailedWriteHandler* handler_)
2228  {
2229  Lock<Mutex> l(_lock);
2230  _failedWriteHandler.reset(handler_);
2231  }
2232 
2233  void setPublishStore(const Store& publishStore_)
2234  {
2235  Lock<Mutex> l(_lock);
2236  if (_connected) throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2237  _publishStore = publishStore_;
2238  }
2239 
2240  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2241  {
2242  Lock<Mutex> l(_lock);
2243  if (_connected) throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2244  _bookmarkStore = bookmarkStore_;
2245  }
2246 
2247  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2248  {
2249  Lock<Mutex> l(_lock);
2250  _subscriptionManager.reset(subscriptionManager_);
2251  }
2252 
2253  SubscriptionManager* getSubscriptionManager() const
2254  {
2255  return const_cast<SubscriptionManager*>(_subscriptionManager.get());
2256  }
2257 
2258  DisconnectHandler getDisconnectHandler() const
2259  {
2260  return _disconnectHandler;
2261  }
2262 
2263  MessageHandler getDuplicateMessageHandler() const
2264  {
2265  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2266  }
2267 
2268  FailedWriteHandler* getFailedWriteHandler() const
2269  {
2270  return const_cast<FailedWriteHandler*>(_failedWriteHandler.get());
2271  }
2272 
2273  Store getPublishStore() const
2274  {
2275  return _publishStore;
2276  }
2277 
2278  BookmarkStore getBookmarkStore() const
2279  {
2280  return _bookmarkStore;
2281  }
2282 
2283  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,size_t dataLen_)
2284  {
2285  if (!_publishStore.isValid())
2286  {
2287  Lock<Mutex> l(_lock);
2288  _publishMessage.assignTopic(topic_, topicLen_);
2289  _publishMessage.assignData(data_, dataLen_);
2290  _send(_publishMessage);
2291  return 0;
2292  }
2293  else
2294  {
2295  if (!publishStoreMessage)
2296  {
2297  publishStoreMessage = new Message();
2298  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2299  }
2300  publishStoreMessage->reset();
2301  publishStoreMessage->setCommandEnum(Message::Command::Publish);
2302  return _publish(topic_, topicLen_, data_, dataLen_);
2303  }
2304  }
2305 
2306  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2307  size_t dataLen_, unsigned long expiration_)
2308  {
2309  if (!_publishStore.isValid())
2310  {
2311  Lock<Mutex> l(_lock);
2312  _publishMessage.assignTopic(topic_, topicLen_);
2313  _publishMessage.assignData(data_, dataLen_);
2314  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2315  size_t pos = convertToCharArray(exprBuf, expiration_);
2316  _publishMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2317  _send(_publishMessage);
2318  _publishMessage.assignExpiration(NULL, 0);
2319  return 0;
2320  }
2321  else
2322  {
2323  if (!publishStoreMessage)
2324  {
2325  publishStoreMessage = new Message();
2326  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2327  }
2328  publishStoreMessage->reset();
2329  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2330  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2331  publishStoreMessage->setCommandEnum(Message::Command::Publish)
2332  .assignExpiration(exprBuf+exprPos,
2333  AMPS_NUMBER_BUFFER_LEN-exprPos);
2334  return _publish(topic_, topicLen_, data_, dataLen_);
2335  }
2336  }
2337 
2338  class FlushAckHandler : ConnectionStateListener
2339  {
2340  private:
2341  ClientImpl* _pClient;
2342  Field _cmdId;
2343  volatile bool _acked;
2344  volatile bool _disconnected;
2345  public:
2346  FlushAckHandler(ClientImpl* pClient_)
2347  : _pClient(pClient_), _cmdId(), _acked(false), _disconnected(false)
2348  {
2349  pClient_->addConnectionStateListener(this);
2350  }
2351  ~FlushAckHandler()
2352  {
2353  _pClient->removeConnectionStateListener(this);
2354  _pClient->removeMessageHandler(_cmdId);
2355  _cmdId.clear();
2356  }
2357  void setCommandId(const Field& cmdId_)
2358  {
2359  _cmdId.deepCopy(cmdId_);
2360  }
2361  void invoke(const Message&)
2362  {
2363  _acked = true;
2364  }
2365  void connectionStateChanged(State state_)
2366  {
2367  if (state_ <= Shutdown)
2368  {
2369  _disconnected = true;
2370  }
2371  }
2372  bool acked()
2373  {
2374  return _acked;
2375  }
2376  bool done()
2377  {
2378  return _acked || _disconnected;
2379  }
2380  };
2381 
2382  void publishFlush(long timeout_, unsigned ackType_)
2383  {
2384  static const char* processed = "processed";
2385  static const size_t processedLen = strlen(processed);
2386  static const char* persisted = "persisted";
2387  static const size_t persistedLen = strlen(persisted);
2388  static const char* flush = "flush";
2389  static const size_t flushLen = strlen(flush);
2390  static VersionInfo minPersisted("5.3.3.0");
2391  static VersionInfo minFlush("4");
2392  if (ackType_ != Message::AckType::Processed
2393  && ackType_ != Message::AckType::Persisted)
2394  {
2395  throw new CommandException("Flush can only be used with processed or persisted acks.");
2396  }
2397  FlushAckHandler flushHandler(this);
2398  if (_serverVersion >= minFlush)
2399  {
2400  Lock<Mutex> l(_lock);
2401  if (!_connected)
2402  throw DisconnectedException("Not cconnected trying to flush");
2403  _message.reset();
2404  _message.newCommandId();
2405  _message.assignCommand(flush, flushLen);
2406  if (_serverVersion < minPersisted
2407  || ackType_ == Message::AckType::Processed)
2408  {
2409  _message.assignAckType(processed, processedLen);
2410  }
2411  else
2412  {
2413  _message.assignAckType(persisted, persistedLen);
2414  }
2415  flushHandler.setCommandId(_message.getCommandId());
2416  addMessageHandler(_message.getCommandId(),
2417  std::bind(&FlushAckHandler::invoke,
2418  std::ref(flushHandler),
2419  std::placeholders::_1),
2420  ackType_, false);
2421  if (_send(_message) == -1)
2422  throw DisconnectedException("Disconnected trying to flush");
2423  }
2424  if (_publishStore.isValid())
2425  {
2426  try
2427  {
2428  _publishStore.flush(timeout_);
2429  }
2430  catch (const AMPSException& ex)
2431  {
2432  AMPS_UNHANDLED_EXCEPTION(ex);
2433  throw;
2434  }
2435  }
2436  else if (_serverVersion < minFlush)
2437  {
2438  if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2439  else { AMPS_USLEEP(1000 * 1000); }
2440  return;
2441  }
2442  if (timeout_)
2443  {
2444  Timer timer((double)timeout_);
2445  timer.start();
2446  while (!timer.check() && !flushHandler.done())
2447  {
2448  AMPS_USLEEP(10000);
2449  amps_invoke_waiting_function();
2450  }
2451  }
2452  else
2453  {
2454  while (!flushHandler.done())
2455  {
2456  AMPS_USLEEP(10000);
2457  amps_invoke_waiting_function();
2458  }
2459  }
2460  // No response or disconnect in timeout interval
2461  if (!flushHandler.done())
2462  throw TimedOutException("Timed out waiting for flush");
2463  // We got disconnected and there is no publish store
2464  if (!flushHandler.acked() && !_publishStore.isValid())
2465  throw DisconnectedException("Disconnected waiting for flush");
2466  }
2467 
2468  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2469  const char* data_, size_t dataLength_)
2470  {
2471  if (!_publishStore.isValid())
2472  {
2473  Lock<Mutex> l(_lock);
2474  _deltaMessage.assignTopic(topic_, topicLength_);
2475  _deltaMessage.assignData(data_, dataLength_);
2476  _send(_deltaMessage);
2477  return 0;
2478  }
2479  else
2480  {
2481  if (!publishStoreMessage)
2482  {
2483  publishStoreMessage = new Message();
2484  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2485  }
2486  publishStoreMessage->reset();
2487  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2488  return _publish(topic_, topicLength_, data_, dataLength_);
2489  }
2490  }
2491 
2492  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2493  const char* data_, size_t dataLength_,
2494  unsigned long expiration_)
2495  {
2496  if (!_publishStore.isValid())
2497  {
2498  Lock<Mutex> l(_lock);
2499  _deltaMessage.assignTopic(topic_, topicLength_);
2500  _deltaMessage.assignData(data_, dataLength_);
2501  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2502  size_t pos = convertToCharArray(exprBuf, expiration_);
2503  _deltaMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2504  _send(_deltaMessage);
2505  _deltaMessage.assignExpiration(NULL, 0);
2506  return 0;
2507  }
2508  else
2509  {
2510  if (!publishStoreMessage)
2511  {
2512  publishStoreMessage = new Message();
2513  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2514  }
2515  publishStoreMessage->reset();
2516  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2517  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2518  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2519  .assignExpiration(exprBuf+exprPos,
2520  AMPS_NUMBER_BUFFER_LEN-exprPos);
2521  return _publish(topic_, topicLength_, data_, dataLength_);
2522  }
2523  }
2524 
2525  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
2526  const char* data_, size_t dataLength_)
2527  {
2528  publishStoreMessage->assignTopic(topic_, topicLength_)
2529  .setAckTypeEnum(Message::AckType::Persisted)
2530  .assignData(data_, dataLength_);
2531  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
2532  char buf[AMPS_NUMBER_BUFFER_LEN];
2533  size_t pos = convertToCharArray(buf, haSequenceNumber);
2534  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2535  {
2536  Lock<Mutex> l(_lock);
2537  _send(*publishStoreMessage, haSequenceNumber);
2538  }
2539  return haSequenceNumber;
2540  }
2541 
2542  virtual std::string logon(long timeout_, Authenticator& authenticator_,
2543  const char* options_ = NULL)
2544  {
2545  Lock<Mutex> l(_lock);
2546  return _logon(timeout_, authenticator_, options_);
2547  }
2548 
2549  virtual std::string _logon(long timeout_, Authenticator& authenticator_,
2550  const char* options_ = NULL)
2551  {
2552  AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2553  _message.reset();
2554  _message.setCommandEnum(Message::Command::Logon);
2555  _message.newCommandId();
2556  std::string newCommandId = _message.getCommandId();
2557  _message.setClientName(_name);
2558 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
2559  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
2560  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2561 #endif
2562  URI uri(_lastUri);
2563  if(uri.user().size()) _message.setUserId(uri.user());
2564  if(uri.password().size()) _message.setPassword(uri.password());
2565  if(uri.protocol() == "amps" && uri.messageType().size())
2566  {
2567  _message.setMessageType(uri.messageType());
2568  }
2569  if(uri.isTrue("pretty"))
2570  {
2571  _message.setOptions("pretty");
2572  }
2573 
2574  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
2575  if (!_logonCorrelationData.empty())
2576  {
2577  _message.assignCorrelationId(_logonCorrelationData);
2578  }
2579  if (options_)
2580  {
2581  _message.setOptions(options_);
2582  }
2583  _username = _message.getUserId();
2584  try
2585  {
2586  while(true)
2587  {
2588  _message.setAckTypeEnum(Message::AckType::Processed);
2589  AckResponse ack = syncAckProcessing(timeout_, _message);
2590  if (ack.status() == "retry")
2591  {
2592  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
2593  _username = ack.username();
2594  _message.setUserId(_username);
2595  }
2596  else
2597  {
2598  authenticator_.completed(ack.username(), ack.password(), ack.reason());
2599  break;
2600  }
2601  }
2602  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2603 
2604  // Now re-send the heartbeat command if configured
2605  _sendHeartbeat();
2606  }
2607  catch(const AMPSException& ex)
2608  {
2609  AMPS_UNHANDLED_EXCEPTION(ex);
2610  throw;
2611  }
2612  catch(...)
2613  {
2614  throw;
2615  }
2616 
2617  if (_publishStore.isValid())
2618  {
2619  try
2620  {
2621  _publishStore.replay(_replayer);
2622  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2623  }
2624  catch(const StoreException& ex)
2625  {
2626  std::ostringstream os;
2627  os << "A local store exception occurred while logging on."
2628  << ex.toString();
2629  throw ConnectionException(os.str());
2630  }
2631  catch(const AMPSException& ex)
2632  {
2633  AMPS_UNHANDLED_EXCEPTION(ex);
2634  throw ex;
2635  }
2636  catch(const std::exception& ex)
2637  {
2638  AMPS_UNHANDLED_EXCEPTION(ex);
2639  throw ex;
2640  }
2641  catch(...)
2642  {
2643  throw;
2644  }
2645  }
2646  return newCommandId;
2647  }
2648 
2649  std::string subscribe(const MessageHandler& messageHandler_,
2650  const std::string& topic_,
2651  long timeout_,
2652  const std::string& filter_,
2653  const std::string& bookmark_,
2654  const std::string& options_,
2655  const std::string& subId_,
2656  bool isHASubscribe_ = true)
2657  {
2658  isHASubscribe_ &= (bool)_subscriptionManager;
2659  Lock<Mutex> l(_lock);
2660  _message.reset();
2661  _message.setCommandEnum(Message::Command::Subscribe);
2662  _message.newCommandId();
2663  std::string subId(subId_);
2664  if (subId.empty())
2665  {
2666  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2667  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
2668 
2669  subId = _message.getCommandId();
2670  }
2671  _message.setSubscriptionId(subId);
2672  // we need to deep copy this before sending the message; while we are
2673  // waiting for a response, the fields in _message may get blown away for
2674  // other operations.
2675  AMPS::Message::Field subIdField(subId);
2676  unsigned ackTypes = Message::AckType::Processed;
2677 
2678  if (!bookmark_.empty() && _bookmarkStore.isValid())
2679  {
2680  ackTypes |= Message::AckType::Persisted;
2681  }
2682  _message.setTopic(topic_);
2683 
2684  if (filter_.length()) _message.setFilter(filter_);
2685  if (bookmark_.length())
2686  {
2687  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2688  {
2689  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2690  _message.setBookmark(mostRecent);
2691  }
2692  else
2693  {
2694  _message.setBookmark(bookmark_);
2695  if (_bookmarkStore.isValid())
2696  {
2697  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2698  bookmark_ != AMPS_BOOKMARK_EPOCH)
2699  {
2700  _bookmarkStore.log(_message);
2701  _bookmarkStore.discard(_message);
2702  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2703  }
2704  }
2705  }
2706  }
2707  if (options_.length()) _message.setOptions(options_);
2708 
2709  Message message = _message;
2710  if (isHASubscribe_)
2711  {
2712  message = _message.deepCopy();
2713  Unlock<Mutex> u(_lock);
2714  _subscriptionManager->subscribe(messageHandler_, message,
2715  Message::AckType::None);
2716  if (_badTimeToHASubscribe) return subId;
2717  }
2718  if (!_routes.hasRoute(_message.getSubscriptionId()))
2719  {
2720  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2721  Message::AckType::None, ackTypes, true);
2722  }
2723  message.setAckTypeEnum(ackTypes);
2724  if (!options_.empty()) message.setOptions(options_);
2725  try
2726  {
2727  syncAckProcessing(timeout_, message, isHASubscribe_);
2728  }
2729  catch (const DisconnectedException&)
2730  {
2731  if (!isHASubscribe_)
2732  {
2733  _routes.removeRoute(subIdField);
2734  throw;
2735  }
2736  else
2737  {
2738  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2739  throw;
2740  }
2741  }
2742  catch (const TimedOutException&)
2743  {
2744  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2745  throw;
2746  }
2747  catch (...)
2748  {
2749  if (isHASubscribe_)
2750  {
2751  // Have to unlock before calling into sub manager to avoid deadlock
2752  Unlock<Mutex> unlock(_lock);
2753  _subscriptionManager->unsubscribe(subIdField);
2754  }
2755  _routes.removeRoute(subIdField);
2756  throw;
2757  }
2758 
2759  return subId;
2760  }
2761  std::string deltaSubscribe(const MessageHandler& messageHandler_,
2762  const std::string& topic_,
2763  long timeout_,
2764  const std::string& filter_,
2765  const std::string& bookmark_,
2766  const std::string& options_,
2767  const std::string& subId_ = "",
2768  bool isHASubscribe_ = true)
2769  {
2770  isHASubscribe_ &= (bool)_subscriptionManager;
2771  Lock<Mutex> l(_lock);
2772  _message.reset();
2773  _message.setCommandEnum(Message::Command::DeltaSubscribe);
2774  _message.newCommandId();
2775  std::string subId(subId_);
2776  if (subId.empty())
2777  {
2778  subId = _message.getCommandId();
2779  }
2780  _message.setSubscriptionId(subId);
2781  // we need to deep copy this before sending the message; while we are
2782  // waiting for a response, the fields in _message may get blown away for
2783  // other operations.
2784  AMPS::Message::Field subIdField(subId);
2785  unsigned ackTypes = Message::AckType::Processed;
2786 
2787  if (!bookmark_.empty() && _bookmarkStore.isValid())
2788  {
2789  ackTypes |= Message::AckType::Persisted;
2790  }
2791  _message.setTopic(topic_);
2792  if (filter_.length()) _message.setFilter(filter_);
2793  if (bookmark_.length())
2794  {
2795  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2796  {
2797  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2798  _message.setBookmark(mostRecent);
2799  }
2800  else
2801  {
2802  _message.setBookmark(bookmark_);
2803  if (_bookmarkStore.isValid())
2804  {
2805  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2806  bookmark_ != AMPS_BOOKMARK_EPOCH)
2807  {
2808  _bookmarkStore.log(_message);
2809  _bookmarkStore.discard(_message);
2810  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2811  }
2812  }
2813  }
2814  }
2815  if (options_.length()) _message.setOptions(options_);
2816  Message message = _message;
2817  if (isHASubscribe_)
2818  {
2819  message = _message.deepCopy();
2820  Unlock<Mutex> u(_lock);
2821  _subscriptionManager->subscribe(messageHandler_, message,
2822  Message::AckType::None);
2823  if (_badTimeToHASubscribe) return subId;
2824  }
2825  if (!_routes.hasRoute(_message.getSubscriptionId()))
2826  {
2827  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2828  Message::AckType::None, ackTypes, true);
2829  }
2830  message.setAckTypeEnum(ackTypes);
2831  if (!options_.empty()) message.setOptions(options_);
2832  try
2833  {
2834  syncAckProcessing(timeout_, message, isHASubscribe_);
2835  }
2836  catch (const DisconnectedException&)
2837  {
2838  if (!isHASubscribe_)
2839  {
2840  _routes.removeRoute(subIdField);
2841  throw;
2842  }
2843  }
2844  catch (const TimedOutException&)
2845  {
2846  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2847  throw;
2848  }
2849  catch (...)
2850  {
2851  if (isHASubscribe_)
2852  {
2853  // Have to unlock before calling into sub manager to avoid deadlock
2854  Unlock<Mutex> unlock(_lock);
2855  _subscriptionManager->unsubscribe(subIdField);
2856  }
2857  _routes.removeRoute(subIdField);
2858  throw;
2859  }
2860  return subId;
2861  }
2862 
2863  void unsubscribe(const std::string& id)
2864  {
2865  Lock<Mutex> l(_lock);
2866  unsubscribeInternal(id);
2867  }
2868 
2869  void unsubscribe(void)
2870  {
2871  if (_subscriptionManager)
2872  {
2873  _subscriptionManager->clear();
2874  }
2875  {
2876  _routes.unsubscribeAll();
2877  Lock<Mutex> l(_lock);
2878  _message.reset();
2879  _message.setCommandEnum(Message::Command::Unsubscribe);
2880  _message.newCommandId();
2881  _message.setSubscriptionId("all");
2882  _sendWithoutRetry(_message);
2883  }
2884  deferredExecution(&amps_noOpFn, NULL);
2885  }
2886 
2887  std::string sow(const MessageHandler& messageHandler_,
2888  const std::string& topic_,
2889  const std::string& filter_ = "",
2890  const std::string& orderBy_ = "",
2891  const std::string& bookmark_ = "",
2892  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2893  int topN_ = AMPS_DEFAULT_TOP_N,
2894  const std::string& options_ = "",
2895  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2896  {
2897  Lock<Mutex> l(_lock);
2898  _message.reset();
2899  _message.setCommandEnum(Message::Command::SOW);
2900  _message.newCommandId();
2901  // need to keep our own copy of the command ID.
2902  std::string commandId = _message.getCommandId();
2903  _message.setQueryID(_message.getCommandId());
2904  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2905  _message.setAckTypeEnum(ackTypes);
2906  _message.setTopic(topic_);
2907  if (filter_.length()) _message.setFilter(filter_);
2908  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2909  if (bookmark_.length()) _message.setBookmark(bookmark_);
2910  _message.setBatchSize(AMPS::asString(batchSize_));
2911  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2912  if (options_.length()) _message.setOptions(options_);
2913 
2914  _routes.addRoute(_message.getQueryID(), messageHandler_,
2915  Message::AckType::None, ackTypes, false);
2916 
2917  try
2918  {
2919  syncAckProcessing(timeout_, _message);
2920  }
2921  catch (...)
2922  {
2923  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2924  throw;
2925  }
2926 
2927  return commandId;
2928  }
2929 
2930  std::string sow(const MessageHandler& messageHandler_,
2931  const std::string& topic_,
2932  long timeout_,
2933  const std::string& filter_ = "",
2934  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2935  int topN_ = AMPS_DEFAULT_TOP_N)
2936  {
2937  std::string notSet;
2938  return sow(messageHandler_,
2939  topic_,
2940  filter_,
2941  notSet, // orderBy
2942  notSet, // bookmark
2943  batchSize_,
2944  topN_,
2945  notSet,
2946  timeout_);
2947  }
2948 
2949  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
2950  const std::string& topic_,
2951  const std::string& filter_ = "",
2952  const std::string& orderBy_ = "",
2953  const std::string& bookmark_ = "",
2954  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2955  int topN_ = AMPS_DEFAULT_TOP_N,
2956  const std::string& options_ = "",
2957  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2958  bool isHASubscribe_ = true)
2959  {
2960  isHASubscribe_ &= (bool)_subscriptionManager;
2961  Lock<Mutex> l(_lock);
2962  _message.reset();
2963  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
2964  _message.newCommandId();
2965  Field cid = _message.getCommandId();
2966  _message.setQueryID(cid);
2967  _message.setSubscriptionId(cid);
2968  std::string subId = cid;
2969  _message.setTopic(topic_);
2970  if (filter_.length()) _message.setFilter(filter_);
2971  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2972  if (bookmark_.length()) _message.setBookmark(bookmark_);
2973  _message.setBatchSize(AMPS::asString(batchSize_));
2974  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2975  if (options_.length()) _message.setOptions(options_);
2976 
2977  Message message = _message;
2978  if (isHASubscribe_)
2979  {
2980  message = _message.deepCopy();
2981  Unlock<Mutex> u(_lock);
2982  _subscriptionManager->subscribe(messageHandler_, message,
2983  Message::AckType::None);
2984  if (_badTimeToHASubscribe) return subId;
2985  }
2986  _routes.addRoute(cid, messageHandler_,
2987  Message::AckType::None, Message::AckType::Processed, true);
2988  message.setAckTypeEnum(Message::AckType::Processed);
2989  if (!options_.empty()) message.setOptions(options_);
2990  try
2991  {
2992  syncAckProcessing(timeout_, message, isHASubscribe_);
2993  }
2994  catch (const DisconnectedException&)
2995  {
2996  if (!isHASubscribe_)
2997  {
2998  _routes.removeRoute(subId);
2999  throw;
3000  }
3001  }
3002  catch (const TimedOutException&)
3003  {
3004  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3005  throw;
3006  }
3007  catch (...)
3008  {
3009  if (isHASubscribe_)
3010  {
3011  // Have to unlock before calling into sub manager to avoid deadlock
3012  Unlock<Mutex> unlock(_lock);
3013  _subscriptionManager->unsubscribe(cid);
3014  }
3015  _routes.removeRoute(subId);
3016  throw;
3017  }
3018  return subId;
3019  }
3020 
3021  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
3022  const std::string& topic_,
3023  long timeout_,
3024  const std::string& filter_ = "",
3025  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3026  bool oofEnabled_ = false,
3027  int topN_ = AMPS_DEFAULT_TOP_N,
3028  bool isHASubscribe_ = true)
3029  {
3030  std::string notSet;
3031  return sowAndSubscribe(messageHandler_,
3032  topic_,
3033  filter_,
3034  notSet, // orderBy
3035  notSet, // bookmark
3036  batchSize_,
3037  topN_,
3038  (oofEnabled_ ? "oof" : ""),
3039  timeout_,
3040  isHASubscribe_);
3041  }
3042 
3043  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3044  const std::string& topic_,
3045  const std::string& filter_ = "",
3046  const std::string& orderBy_ = "",
3047  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3048  int topN_ = AMPS_DEFAULT_TOP_N,
3049  const std::string& options_ = "",
3050  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
3051  bool isHASubscribe_ = true)
3052  {
3053  isHASubscribe_ &= (bool)_subscriptionManager;
3054  Lock<Mutex> l(_lock);
3055  _message.reset();
3056  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
3057  _message.newCommandId();
3058  _message.setQueryID(_message.getCommandId());
3059  _message.setSubscriptionId(_message.getCommandId());
3060  std::string subId = _message.getSubscriptionId();
3061  _message.setTopic(topic_);
3062  if (filter_.length()) _message.setFilter(filter_);
3063  if (orderBy_.length()) _message.setOrderBy(orderBy_);
3064  _message.setBatchSize(AMPS::asString(batchSize_));
3065  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
3066  if (options_.length()) _message.setOptions(options_);
3067  Message message = _message;
3068  if (isHASubscribe_)
3069  {
3070  message = _message.deepCopy();
3071  Unlock<Mutex> u(_lock);
3072  _subscriptionManager->subscribe(messageHandler_, message,
3073  Message::AckType::None);
3074  if (_badTimeToHASubscribe) return subId;
3075  }
3076  _routes.addRoute(message.getQueryID(), messageHandler_,
3077  Message::AckType::None, Message::AckType::Processed, true);
3078  message.setAckTypeEnum(Message::AckType::Processed);
3079  if (!options_.empty()) message.setOptions(options_);
3080  try
3081  {
3082  syncAckProcessing(timeout_, message, isHASubscribe_);
3083  }
3084  catch (const DisconnectedException&)
3085  {
3086  if (!isHASubscribe_)
3087  {
3088  _routes.removeRoute(subId);
3089  throw;
3090  }
3091  }
3092  catch (const TimedOutException&)
3093  {
3094  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3095  throw;
3096  }
3097  catch (...)
3098  {
3099  if (isHASubscribe_)
3100  {
3101  // Have to unlock before calling into sub manager to avoid deadlock
3102  Unlock<Mutex> unlock(_lock);
3103  _subscriptionManager->unsubscribe(Field(subId));
3104  }
3105  _routes.removeRoute(subId);
3106  throw;
3107  }
3108  return subId;
3109  }
3110 
3111  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
3112  const std::string& topic_,
3113  long timeout_,
3114  const std::string& filter_ = "",
3115  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
3116  bool oofEnabled_ = false,
3117  bool sendEmpties_ = false,
3118  int topN_ = AMPS_DEFAULT_TOP_N,
3119  bool isHASubscribe_ = true)
3120  {
3121  std::string notSet;
3122  Message::Options options;
3123  if (oofEnabled_) options.setOOF();
3124  if (sendEmpties_ == false) options.setNoEmpties();
3125  return sowAndDeltaSubscribe(messageHandler_,
3126  topic_,
3127  filter_,
3128  notSet, // orderBy
3129  batchSize_,
3130  topN_,
3131  options,
3132  timeout_,
3133  isHASubscribe_);
3134  }
3135 
3136  std::string sowDelete(const MessageHandler& messageHandler_,
3137  const std::string& topic_,
3138  const std::string& filter_,
3139  long timeout_,
3140  Message::Field commandId_ = Message::Field())
3141  {
3142  if (_publishStore.isValid())
3143  {
3144  unsigned ackType = Message::AckType::Processed |
3145  Message::AckType::Stats |
3146  Message::AckType::Persisted;
3147  if (!publishStoreMessage)
3148  {
3149  publishStoreMessage = new Message();
3150  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3151  }
3152  publishStoreMessage->reset();
3153  if (commandId_.empty())
3154  {
3155  publishStoreMessage->newCommandId();
3156  commandId_ = publishStoreMessage->getCommandId();
3157  }
3158  else
3159  {
3160  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3161  }
3162  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3163  .assignSubscriptionId(commandId_.data(), commandId_.len())
3164  .assignQueryID(commandId_.data(), commandId_.len())
3165  .setAckTypeEnum(ackType)
3166  .assignTopic(topic_.c_str(), topic_.length())
3167  .assignFilter(filter_.c_str(), filter_.length());
3168  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3169  char buf[AMPS_NUMBER_BUFFER_LEN];
3170  size_t pos = convertToCharArray(buf, haSequenceNumber);
3171  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3172  {
3173  try
3174  {
3175  Lock<Mutex> l(_lock);
3176  _routes.addRoute(commandId_, messageHandler_,
3177  Message::AckType::Stats,
3178  Message::AckType::Processed|Message::AckType::Persisted,
3179  false);
3180  syncAckProcessing(timeout_, *publishStoreMessage,
3181  haSequenceNumber);
3182  }
3183  catch (const DisconnectedException&)
3184  { // -V565
3185  // Pass - it will get replayed upon reconnect
3186  }
3187  catch (...)
3188  {
3189  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3190  throw;
3191  }
3192  }
3193  return (std::string)commandId_;
3194  }
3195  else
3196  {
3197  Lock<Mutex> l(_lock);
3198  _message.reset();
3199  if (commandId_.empty())
3200  {
3201  _message.newCommandId();
3202  commandId_ = _message.getCommandId();
3203  }
3204  else
3205  {
3206  _message.setCommandId(commandId_.data(), commandId_.len());
3207  }
3208  _message.setCommandEnum(Message::Command::SOWDelete)
3209  .assignSubscriptionId(commandId_.data(), commandId_.len())
3210  .assignQueryID(commandId_.data(), commandId_.len())
3211  .setAckTypeEnum(Message::AckType::Processed |
3212  Message::AckType::Stats)
3213  .assignTopic(topic_.c_str(), topic_.length())
3214  .assignFilter(filter_.c_str(), filter_.length());
3215  _routes.addRoute(commandId_, messageHandler_,
3216  Message::AckType::Stats,
3217  Message::AckType::Processed,
3218  false);
3219  try
3220  {
3221  syncAckProcessing(timeout_, _message);
3222  }
3223  catch (...)
3224  {
3225  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3226  throw;
3227  }
3228  return (std::string)commandId_;
3229  }
3230  }
3231 
3232  std::string sowDeleteByData(const MessageHandler& messageHandler_,
3233  const std::string& topic_,
3234  const std::string& data_,
3235  long timeout_,
3236  Message::Field commandId_ = Message::Field())
3237  {
3238  if (_publishStore.isValid())
3239  {
3240  unsigned ackType = Message::AckType::Processed |
3241  Message::AckType::Stats |
3242  Message::AckType::Persisted;
3243  if (!publishStoreMessage)
3244  {
3245  publishStoreMessage = new Message();
3246  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3247  }
3248  publishStoreMessage->reset();
3249  if (commandId_.empty())
3250  {
3251  publishStoreMessage->newCommandId();
3252  commandId_ = publishStoreMessage->getCommandId();
3253  }
3254  else
3255  {
3256  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3257  }
3258  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3259  .assignSubscriptionId(commandId_.data(), commandId_.len())
3260  .assignQueryID(commandId_.data(), commandId_.len())
3261  .setAckTypeEnum(ackType)
3262  .assignTopic(topic_.c_str(), topic_.length())
3263  .assignData(data_.c_str(), data_.length());
3264  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3265  char buf[AMPS_NUMBER_BUFFER_LEN];
3266  size_t pos = convertToCharArray(buf, haSequenceNumber);
3267  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3268  {
3269  try
3270  {
3271  Lock<Mutex> l(_lock);
3272  _routes.addRoute(commandId_, messageHandler_,
3273  Message::AckType::Stats,
3274  Message::AckType::Processed|Message::AckType::Persisted,
3275  false);
3276  syncAckProcessing(timeout_, *publishStoreMessage,
3277  haSequenceNumber);
3278  }
3279  catch (const DisconnectedException&)
3280  { // -V565
3281  // Pass - it will get replayed upon reconnect
3282  }
3283  catch (...)
3284  {
3285  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3286  throw;
3287  }
3288  }
3289  return (std::string)commandId_;
3290  }
3291  else
3292  {
3293  Lock<Mutex> l(_lock);
3294  _message.reset();
3295  if (commandId_.empty())
3296  {
3297  _message.newCommandId();
3298  commandId_ = _message.getCommandId();
3299  }
3300  else
3301  {
3302  _message.setCommandId(commandId_.data(), commandId_.len());
3303  }
3304  _message.setCommandEnum(Message::Command::SOWDelete)
3305  .assignSubscriptionId(commandId_.data(), commandId_.len())
3306  .assignQueryID(commandId_.data(), commandId_.len())
3307  .setAckTypeEnum(Message::AckType::Processed |
3308  Message::AckType::Stats)
3309  .assignTopic(topic_.c_str(), topic_.length())
3310  .assignData(data_.c_str(), data_.length());
3311  _routes.addRoute(commandId_, messageHandler_,
3312  Message::AckType::Stats,
3313  Message::AckType::Processed,
3314  false);
3315  try
3316  {
3317  syncAckProcessing(timeout_, _message);
3318  }
3319  catch (...)
3320  {
3321  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3322  throw;
3323  }
3324  return (std::string)commandId_;
3325  }
3326  }
3327 
3328  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
3329  const std::string& topic_,
3330  const std::string& keys_,
3331  long timeout_,
3332  Message::Field commandId_ = Message::Field())
3333  {
3334  if (_publishStore.isValid())
3335  {
3336  unsigned ackType = Message::AckType::Processed |
3337  Message::AckType::Stats |
3338  Message::AckType::Persisted;
3339  if (!publishStoreMessage)
3340  {
3341  publishStoreMessage = new Message();
3342  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3343  }
3344  publishStoreMessage->reset();
3345  if (commandId_.empty())
3346  {
3347  publishStoreMessage->newCommandId();
3348  commandId_ = publishStoreMessage->getCommandId();
3349  }
3350  else
3351  {
3352  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3353  }
3354  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3355  .assignSubscriptionId(commandId_.data(), commandId_.len())
3356  .assignQueryID(commandId_.data(), commandId_.len())
3357  .setAckTypeEnum(ackType)
3358  .assignTopic(topic_.c_str(), topic_.length())
3359  .assignSowKeys(keys_.c_str(), keys_.length());
3360  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3361  char buf[AMPS_NUMBER_BUFFER_LEN];
3362  size_t pos = convertToCharArray(buf, haSequenceNumber);
3363  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3364  {
3365  try
3366  {
3367  Lock<Mutex> l(_lock);
3368  _routes.addRoute(commandId_, messageHandler_,
3369  Message::AckType::Stats,
3370  Message::AckType::Processed|Message::AckType::Persisted,
3371  false);
3372  syncAckProcessing(timeout_, *publishStoreMessage,
3373  haSequenceNumber);
3374  }
3375  catch (const DisconnectedException&)
3376  { // -V565
3377  // Pass - it will get replayed upon reconnect
3378  }
3379  catch (...)
3380  {
3381  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3382  throw;
3383  }
3384  }
3385  return (std::string)commandId_;
3386  }
3387  else
3388  {
3389  Lock<Mutex> l(_lock);
3390  _message.reset();
3391  if (commandId_.empty())
3392  {
3393  _message.newCommandId();
3394  commandId_ = _message.getCommandId();
3395  }
3396  else
3397  {
3398  _message.setCommandId(commandId_.data(), commandId_.len());
3399  }
3400  _message.setCommandEnum(Message::Command::SOWDelete)
3401  .assignSubscriptionId(commandId_.data(), commandId_.len())
3402  .assignQueryID(commandId_.data(), commandId_.len())
3403  .setAckTypeEnum(Message::AckType::Processed |
3404  Message::AckType::Stats)
3405  .assignTopic(topic_.c_str(), topic_.length())
3406  .assignSowKeys(keys_.c_str(), keys_.length());
3407  _routes.addRoute(commandId_, messageHandler_,
3408  Message::AckType::Stats,
3409  Message::AckType::Processed,
3410  false);
3411  try
3412  {
3413  syncAckProcessing(timeout_, _message);
3414  }
3415  catch (...)
3416  {
3417  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3418  throw;
3419  }
3420  return (std::string)commandId_;
3421  }
3422  }
3423 
3424  void startTimer(void)
3425  {
3426  if (_serverVersion >= "5.3.2.0")
3427  {
3428  throw CommandException("The start_timer command is deprecated.");
3429  }
3430  Lock<Mutex> l(_lock);
3431  _message.reset();
3432  _message.setCommandEnum(Message::Command::StartTimer);
3433 
3434  _send(_message);
3435  }
3436 
3437  std::string stopTimer(MessageHandler messageHandler_)
3438  {
3439  if (_serverVersion >= "5.3.2.0")
3440  {
3441  throw CommandException("The stop_timer command is deprecated.");
3442  }
3443  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
3444  }
3445 
3446  amps_handle getHandle(void)
3447  {
3448  return _client;
3449  }
3450 
3458  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
3459  {
3460  _pExceptionListener = pListener_;
3461  _exceptionListener = _pExceptionListener.get();
3462  }
3463 
3464  void setExceptionListener(const ExceptionListener& listener_)
3465  {
3466  _exceptionListener = &listener_;
3467  }
3468 
3469  const ExceptionListener& getExceptionListener(void) const
3470  {
3471  return *_exceptionListener;
3472  }
3473 
3474  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
3475  {
3476  if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3477  {
3478  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
3479  }
3480  Lock<Mutex> l(_lock);
3481  if(_heartbeatInterval != heartbeatInterval_ ||
3482  _readTimeout != readTimeout_)
3483  {
3484  _heartbeatInterval = heartbeatInterval_;
3485  _readTimeout = readTimeout_;
3486  _sendHeartbeat();
3487  }
3488  }
3489 
3490  void _sendHeartbeat(void)
3491  {
3492  if (_connected && _heartbeatInterval != 0)
3493  {
3494  std::ostringstream options;
3495  options << "start," << _heartbeatInterval;
3496  Message startMessage = Message()
3497  .setCommandEnum(Message::Command::Heartbeat)
3498  .setOptions(options.str());
3499 
3500  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3501  _heartbeatTimer.start();
3502  try
3503  {
3504  _sendWithoutRetry(startMessage);
3505  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3506  }
3507  catch(ConnectionException &ex_)
3508  {
3509  // If we are disconnected when we attempt to send, that's OK;
3510  // we'll send this message after we re-connect (if we do).
3511  AMPS_UNHANDLED_EXCEPTION(ex_);
3512  }
3513  }
3514  amps_result result = AMPS_E_OK;
3515  if(_readTimeout && _connected)
3516  {
3517  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
3518  }
3519  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
3520  {
3521  AMPSException::throwFor(_client, result);
3522  }
3523  }
3524 
3525  void addConnectionStateListener(ConnectionStateListener *listener_)
3526  {
3527  Lock<Mutex> lock(_lock);
3528  _connectionStateListeners.insert(listener_);
3529  }
3530 
3531  void removeConnectionStateListener(ConnectionStateListener *listener_)
3532  {
3533  Lock<Mutex> lock(_lock);
3534  _connectionStateListeners.erase(listener_);
3535  }
3536 
3537  void _registerHandler(Command& command_, Message::Field& cid_,
3538  MessageHandler& handler_, unsigned requestedAcks_,
3539  unsigned systemAddedAcks_, bool isSubscribe_)
3540  {
3541  Message message = command_.getMessage();
3542  Message::Command::Type commandType = message.getCommandEnum();
3543  Message::Field subid = message.getSubscriptionId();
3544  Message::Field qid = message.getQueryID();
3545  // If we have an id, we're good, even if it's an existing route
3546  bool added = qid.len() || subid.len() || cid_.len();
3547  int addedCount = 0;
3548  if (subid.len() > 0)
3549  {
3550  // This can replace a non-subscribe with a matching id
3551  // with a subscription but not another subscription.
3552  addedCount += _routes.addRoute(subid, handler_, requestedAcks_,
3553  systemAddedAcks_, isSubscribe_);
3554  }
3555  if (qid.len() > 0 && qid != subid)
3556  {
3557  while (_routes.hasRoute(qid))
3558  {
3559  message.newQueryId();
3560  if (cid_ == qid)
3561  cid_ = message.getQueryId();
3562  qid = message.getQueryId();
3563  }
3564  if (addedCount == 0)
3565  {
3566  _routes.addRoute(qid, handler_, requestedAcks_,
3567  systemAddedAcks_, isSubscribe_);
3568  }
3569  else
3570  {
3571  void* data = NULL;
3572  {
3573  Unlock<Mutex> u(_lock);
3574  data = amps_invoke_copy_route_function(handler_.userData());
3575  }
3576  if (!data)
3577  {
3578  _routes.addRoute(qid, handler_, requestedAcks_,
3579  systemAddedAcks_, false);
3580  }
3581  else
3582  {
3583  _routes.addRoute(qid,
3584  MessageHandler(handler_.function(),
3585  data),
3586  requestedAcks_,
3587  systemAddedAcks_, false);
3588  }
3589  }
3590  ++addedCount;
3591  }
3592  if (cid_.len() > 0 && cid_ != qid && cid_ != subid
3593  && requestedAcks_ & ~Message::AckType::Persisted)
3594  {
3595  while (_routes.hasRoute(cid_))
3596  {
3597  cid_ = message.newCommandId().getCommandId();
3598  }
3599  if (addedCount == 0)
3600  {
3601  _routes.addRoute(cid_, handler_, requestedAcks_,
3602  systemAddedAcks_, false);
3603  }
3604  else
3605  {
3606  void* data = NULL;
3607  {
3608  Unlock<Mutex> u(_lock);
3609  data = amps_invoke_copy_route_function(handler_.userData());
3610  }
3611  if (!data)
3612  {
3613  _routes.addRoute(cid_, handler_, requestedAcks_,
3614  systemAddedAcks_, false);
3615  }
3616  else
3617  {
3618  _routes.addRoute(cid_,
3619  MessageHandler(handler_.function(),
3620  data),
3621  requestedAcks_,
3622  systemAddedAcks_, false);
3623  }
3624  }
3625  }
3626  else if (commandType == Message::Command::Publish ||
3627  commandType == Message::Command::DeltaPublish)
3628  {
3629  cid_ = command_.getMessage().newCommandId().getCommandId();
3630  _routes.addRoute(cid_, handler_, requestedAcks_,
3631  systemAddedAcks_, false);
3632  added=true;
3633  }
3634  if (!added)
3635  {
3636  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
3637  }
3638  }
3639 
3640  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
3641  bool isHASubscribe_ = true)
3642  {
3643  isHASubscribe_ &= (bool)_subscriptionManager;
3644  Message& message = command_.getMessage();
3645  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3646  Message::AckType::Processed : Message::AckType::None;
3647  unsigned requestedAcks = message.getAckTypeEnum();
3648  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
3649  Message::Command::Type commandType = message.getCommandEnum();
3650  if (commandType == Message::Command::SOW
3651  || commandType == Message::Command::SOWAndSubscribe
3652  || commandType == Message::Command::SOWAndDeltaSubscribe
3653  || commandType == Message::Command::StopTimer)
3654  systemAddedAcks |= Message::AckType::Completed;
3655  Message::Field cid = message.getCommandId();
3656  if (handler_.isValid() && cid.empty())
3657  {
3658  cid = message.newCommandId().getCommandId();
3659  }
3660  if (message.getBookmark().len() > 0)
3661  {
3662  if (command_.isSubscribe())
3663  {
3664  Message::Field bookmark = message.getBookmark();
3665  if (_bookmarkStore.isValid())
3666  {
3667  systemAddedAcks |= Message::AckType::Persisted;
3668  if (bookmark == AMPS_BOOKMARK_RECENT)
3669  {
3670  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
3671  }
3672  else if (bookmark != AMPS_BOOKMARK_NOW &&
3673  bookmark != AMPS_BOOKMARK_EPOCH)
3674  {
3675  _bookmarkStore.log(message);
3676  if (!BookmarkRange::isRange(bookmark))
3677  {
3678  _bookmarkStore.discard(message);
3679  _bookmarkStore.persisted(message.getSubscriptionId(),
3680  bookmark);
3681  }
3682  }
3683  }
3684  else if (bookmark == AMPS_BOOKMARK_RECENT)
3685  {
3687  }
3688  }
3689  }
3690  if (isPublishStore)
3691  {
3692  systemAddedAcks |= Message::AckType::Persisted;
3693  }
3694  bool isSubscribe = command_.isSubscribe();
3695  if (handler_.isValid() && !isSubscribe)
3696  {
3697  _registerHandler(command_, cid, handler_,
3698  requestedAcks, systemAddedAcks, isSubscribe);
3699  }
3700  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
3701  if (isPublishStore)
3702  {
3703  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3704  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3705  {
3706  Unlock<Mutex> u(_lock);
3707  haSequenceNumber = _publishStore.store(message);
3708  }
3709  message.setSequence(haSequenceNumber);
3710  if (useSyncSend)
3711  {
3712  try
3713  {
3714  syncAckProcessing((long)command_.getTimeout(), message,
3715  haSequenceNumber);
3716  }
3717  catch (const DisconnectedException&)
3718  { // -V565
3719  // Pass - message will get replayed when reconnected
3720  }
3721  catch (...)
3722  {
3723  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3724  throw;
3725  }
3726  }
3727  else _send(message, haSequenceNumber);
3728  }
3729  else
3730  {
3731  if(command_.isSubscribe())
3732  {
3733  const Message::Field& subId = message.getSubscriptionId();
3734  if (isHASubscribe_)
3735  {
3736  Unlock<Mutex> u(_lock);
3737  _subscriptionManager->subscribe(handler_,
3738  message.deepCopy(),
3739  requestedAcks);
3740  if (_badTimeToHASubscribe)
3741  {
3742  message.setAckTypeEnum(requestedAcks);
3743  return std::string(subId.data(), subId.len());
3744  }
3745  }
3746  if (handler_.isValid())
3747  {
3748  _registerHandler(command_, cid, handler_,
3749  requestedAcks, systemAddedAcks, isSubscribe);
3750  }
3751  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3752  if (useSyncSend)
3753  {
3754  try
3755  {
3756  syncAckProcessing((long)command_.getTimeout(), message,
3757  isHASubscribe_);
3758  }
3759  catch (const DisconnectedException&)
3760  {
3761  if (!isHASubscribe_)
3762  {
3763  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3764  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3765  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3766  message.setAckTypeEnum(requestedAcks);
3767  throw;
3768  }
3769  }
3770  catch (const TimedOutException&)
3771  {
3772  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3773  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3774  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
3775  throw;
3776  }
3777  catch (...)
3778  {
3779  if (isHASubscribe_)
3780  {
3781  // Have to unlock before calling into sub manager to avoid deadlock
3782  Unlock<Mutex> unlock(_lock);
3783  _subscriptionManager->unsubscribe(subId);
3784  }
3785  if (message.getQueryID().len() > 0)
3786  _routes.removeRoute(message.getQueryID());
3787  _routes.removeRoute(cid);
3788  _routes.removeRoute(subId);
3789  throw;
3790  }
3791  }
3792  else _send(message);
3793  if (subId.len() > 0)
3794  {
3795  message.setAckTypeEnum(requestedAcks);
3796  return std::string(subId.data(), subId.len());
3797  }
3798  }
3799  else
3800  {
3801  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3802  if (useSyncSend)
3803  {
3804  try
3805  {
3806  syncAckProcessing((long)(command_.getTimeout()), message);
3807  }
3808  catch (const DisconnectedException&)
3809  {
3810  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3811  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3812  message.setAckTypeEnum(requestedAcks);
3813  throw;
3814  }
3815  catch (...)
3816  {
3817  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3818  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3819  message.setAckTypeEnum(requestedAcks);
3820  throw;
3821  }
3822  }
3823  else _send(message);
3824  }
3825  }
3826  message.setAckTypeEnum(requestedAcks);
3827  return cid;
3828  }
3829 
3830  MessageStream getEmptyMessageStream(void);
3831 
3832  std::string executeAsync(Command& command_, MessageHandler& handler_,
3833  bool isHASubscribe_ = true)
3834  {
3835  Lock<Mutex> lock(_lock);
3836  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3837  }
3838 
3839  // Queue Methods //
3840  void setAutoAck(bool isAutoAckEnabled_)
3841  {
3842  _isAutoAckEnabled = isAutoAckEnabled_;
3843  }
3844  bool getAutoAck(void) const
3845  {
3846  return _isAutoAckEnabled;
3847  }
3848  void setAckBatchSize(const unsigned batchSize_)
3849  {
3850  _ackBatchSize = batchSize_;
3851  if (!_queueAckTimeout)
3852  {
3853  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3854  amps_client_set_idle_time(_client, _queueAckTimeout);
3855  }
3856  }
3857  unsigned getAckBatchSize(void) const
3858  {
3859  return _ackBatchSize;
3860  }
3861  int getAckTimeout(void) const
3862  {
3863  return _queueAckTimeout;
3864  }
3865  void setAckTimeout(const int ackTimeout_)
3866  {
3867  amps_client_set_idle_time(_client,ackTimeout_);
3868  _queueAckTimeout = ackTimeout_;
3869  }
3870  size_t _ack(QueueBookmarks& queueBookmarks_)
3871  {
3872  if(queueBookmarks_._bookmarkCount)
3873  {
3874  if (!publishStoreMessage)
3875  {
3876  publishStoreMessage = new Message();
3877  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3878  }
3879  publishStoreMessage->reset();
3880  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3881  .setTopic(queueBookmarks_._topic)
3882  .setBookmark(queueBookmarks_._data)
3883  .setCommandId("AMPS-queue-ack");
3884  amps_uint64_t haSequenceNumber = 0;
3885  if (_publishStore.isValid())
3886  {
3887  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3888  publishStoreMessage->setAckType("persisted")
3889  .setSequence(haSequenceNumber);
3890  queueBookmarks_._data.erase();
3891  queueBookmarks_._bookmarkCount = 0;
3892  }
3893  _send(*publishStoreMessage, haSequenceNumber);
3894  if (!_publishStore.isValid())
3895  {
3896  queueBookmarks_._data.erase();
3897  queueBookmarks_._bookmarkCount = 0;
3898  }
3899  return 1;
3900  }
3901  return 0;
3902  }
3903  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3904  {
3905  if (_isAutoAckEnabled) return;
3906  _ack(topic_, bookmark_, options_);
3907  }
3908  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3909  {
3910  if (bookmark_.len() == 0) return;
3911  Lock<Mutex> lock(_lock);
3912  if(_ackBatchSize < 2 || options_ != NULL)
3913  {
3914  if (!publishStoreMessage)
3915  {
3916  publishStoreMessage = new Message();
3917  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3918  }
3919  publishStoreMessage->reset();
3920  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3921  .setCommandId("AMPS-queue-ack")
3922  .setTopic(topic_).setBookmark(bookmark_);
3923  if (options_) publishStoreMessage->setOptions(options_);
3924  amps_uint64_t haSequenceNumber = 0;
3925  if (_publishStore.isValid())
3926  {
3927  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3928  publishStoreMessage->setAckType("persisted")
3929  .setSequence(haSequenceNumber);
3930  }
3931  _send(*publishStoreMessage, haSequenceNumber);
3932  return;
3933  }
3934  // have we acked anything for this hash
3935  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(),topic_.len());
3936  TopicHashMap::iterator it = _topicHashMap.find(hash);
3937  if(it == _topicHashMap.end())
3938  {
3939  // add a new one to the map
3940  it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3941  }
3942  QueueBookmarks &queueBookmarks = it->second;
3943  if(queueBookmarks._data.length())
3944  {
3945  queueBookmarks._data.append(",");
3946  }
3947  else
3948  {
3949  queueBookmarks._oldestTime = amps_now();
3950  }
3951  queueBookmarks._data.append(bookmark_);
3952  if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3953  }
3954  void flushAcks(void)
3955  {
3956  size_t sendCount = 0;
3957  if(!_connected)
3958  {
3959  return;
3960  }
3961  else
3962  {
3963  Lock<Mutex> lock(_lock);
3964  typedef TopicHashMap::iterator iterator;
3965  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3966  {
3967  QueueBookmarks& queueBookmarks = it->second;
3968  sendCount += _ack(queueBookmarks);
3969  }
3970  }
3971  if(sendCount && _connected) publishFlush(0, Message::AckType::Processed);
3972  }
3973  // called when there's idle time, to see if we need to flush out any "acks"
3974  void checkQueueAcks(void)
3975  {
3976  if(!_topicHashMap.size()) return;
3977  Lock<Mutex> lock(_lock);
3978  try
3979  {
3980  amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3981  typedef TopicHashMap::iterator iterator;
3982  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3983  {
3984  QueueBookmarks& queueBookmarks = it->second;
3985  if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3986  }
3987  }
3988  catch(std::exception& ex)
3989  {
3990  AMPS_UNHANDLED_EXCEPTION(ex);
3991  }
3992  }
3993 
3994  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
3995  {
3996  Lock<Mutex> lock(_deferredExecutionLock);
3997  _deferredExecutionList.push_back(
3998  DeferredExecutionRequest(func_,userData_));
3999  }
4000 
4001  inline void processDeferredExecutions(void)
4002  {
4003  if(_deferredExecutionList.size())
4004  {
4005  Lock<Mutex> lock(_deferredExecutionLock);
4006  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
4007  DeferredExecutionList::iterator end = _deferredExecutionList.end();
4008  for(; it != end; ++it)
4009  {
4010  try
4011  {
4012  it->_func(it->_userData);
4013  }
4014  catch (...)
4015  { // -V565
4016  // Intentionally ignore errors
4017  }
4018  }
4019  _deferredExecutionList.clear();
4020  _routes.invalidateCache();
4021  _routeCache.invalidateCache();
4022  }
4023  }
4024 
4025  bool getRetryOnDisconnect(void) const
4026  {
4027  return _isRetryOnDisconnect;
4028  }
4029 
4030  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
4031  {
4032  _isRetryOnDisconnect = isRetryOnDisconnect_;
4033  }
4034 
4035  void setDefaultMaxDepth(unsigned maxDepth_)
4036  {
4037  _defaultMaxDepth = maxDepth_;
4038  }
4039 
4040  unsigned getDefaultMaxDepth(void) const
4041  {
4042  return _defaultMaxDepth;
4043  }
4044 
4045  void setTransportFilterFunction(amps_transport_filter_function filter_,
4046  void* userData_)
4047  {
4048  amps_client_set_transport_filter_function(_client, filter_, userData_);
4049  }
4050 
4051  void setThreadCreatedCallback(amps_thread_created_callback callback_,
4052  void* userData_)
4053  {
4054  amps_client_set_thread_created_callback(_client, callback_, userData_);
4055  }
4056 }; // class ClientImpl
4131 
4133 {
4134  RefHandle<MessageStreamImpl> _body;
4135  public:
4140  class iterator
4141  {
4142  MessageStream* _pStream;
4143  Message _current;
4144  inline void advance(void);
4145 
4146  public:
4147  iterator() // end
4148  :_pStream(NULL)
4149  {;}
4150  iterator(MessageStream* pStream_)
4151  :_pStream(pStream_)
4152  {
4153  advance();
4154  }
4155 
4156  bool operator==(const iterator& rhs)
4157  {
4158  return _pStream == rhs._pStream;
4159  }
4160  bool operator!=(const iterator& rhs)
4161  {
4162  return _pStream != rhs._pStream;
4163  }
4164  void operator++(void) { advance(); }
4165  Message operator*(void) { return _current; }
4166  Message* operator->(void) { return &_current; }
4167  };
4169  bool isValid() const { return _body.isValid(); }
4170 
4174  {
4175  if(!_body.isValid())
4176  {
4177  throw UsageException("This MessageStream is not valid and cannot be iterated.");
4178  }
4179  return iterator(this);
4180  }
4183  // For non-SOW queries, the end is never reached.
4184  iterator end(void) { return iterator(); }
4185  inline MessageStream(void);
4186 
4192  MessageStream timeout(unsigned timeout_);
4193 
4197  MessageStream conflate(void);
4203  MessageStream maxDepth(unsigned maxDepth_);
4206  unsigned getMaxDepth(void) const;
4209  unsigned getDepth(void) const;
4210 
4211  private:
4212  inline MessageStream(const Client& client_);
4213  inline void setSOWOnly(const std::string& commandId_,
4214  const std::string& queryId_ = "");
4215  inline void setSubscription(const std::string& subId_,
4216  const std::string& commandId_ = "",
4217  const std::string& queryId_ = "");
4218  inline void setStatsOnly(const std::string& commandId_,
4219  const std::string& queryId_ = "");
4220  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
4221 
4222  inline operator MessageHandler(void);
4223 
4224  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
4225 
4226  friend class Client;
4227 
4228 };
4229 
4249 class Client // -V553
4250 {
4251 protected:
4252  BorrowRefHandle<ClientImpl> _body;
4253 public:
4254  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
4255  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
4256  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
4257 
4266  Client(const std::string& clientName = "")
4267  : _body(new ClientImpl(clientName), true)
4268  {;}
4269 
4270  Client(ClientImpl* existingClient)
4271  : _body(existingClient, true)
4272  {;}
4273 
4274  Client(ClientImpl* existingClient, bool isRef)
4275  : _body(existingClient, isRef)
4276  {;}
4277 
4278  Client(const Client& rhs) : _body(rhs._body) {;}
4279  virtual ~Client(void) {;}
4280 
4281  Client& operator=(const Client& rhs)
4282  {
4283  _body = rhs._body;
4284  return *this;
4285  }
4286 
4287  bool isValid()
4288  {
4289  return _body.isValid();
4290  }
4291 
4304  void setName(const std::string& name)
4305  {
4306  _body.get().setName(name);
4307  }
4308 
4311  const std::string& getName() const
4312  {
4313  return _body.get().getName();
4314  }
4315 
4319  const std::string& getNameHash() const
4320  {
4321  return _body.get().getNameHash();
4322  }
4323 
4330  void setLogonCorrelationData(const std::string& logonCorrelationData_)
4331  {
4332  _body.get().setLogonCorrelationData(logonCorrelationData_);
4333  }
4334 
4337  const std::string& getLogonCorrelationData() const
4338  {
4339  return _body.get().getLogonCorrelationData();
4340  }
4341 
4350  size_t getServerVersion() const
4351  {
4352  return _body.get().getServerVersion();
4353  }
4354 
4361  VersionInfo getServerVersionInfo() const
4362  {
4363  return _body.get().getServerVersionInfo();
4364  }
4365 
4375  static size_t convertVersionToNumber(const std::string& version_)
4376  {
4377  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4378  }
4379 
4390  static size_t convertVersionToNumber(const char* data_, size_t len_)
4391  {
4392  return AMPS::convertVersionToNumber(data_, len_);
4393  }
4394 
4397  const std::string& getURI() const
4398  {
4399  return _body.get().getURI();
4400  }
4401 
4408 
4410 
4421  void connect(const std::string& uri)
4422  {
4423  _body.get().connect(uri);
4424  }
4425 
4428  void disconnect()
4429  {
4430  _body.get().disconnect();
4431  }
4432 
4446  void send(const Message& message)
4447  {
4448  _body.get().send(message);
4449  }
4450 
4459  void addMessageHandler(const Field& commandId_,
4460  const AMPS::MessageHandler& messageHandler_,
4461  unsigned requestedAcks_, bool isSubscribe_)
4462  {
4463  _body.get().addMessageHandler(commandId_, messageHandler_,
4464  requestedAcks_, isSubscribe_);
4465  }
4466 
4470  bool removeMessageHandler(const Field& commandId_)
4471  {
4472  return _body.get().removeMessageHandler(commandId_);
4473  }
4474 
4498  std::string send(const MessageHandler& messageHandler, Message& message, int timeout = 0)
4499  {
4500  return _body.get().send(messageHandler, message, timeout);
4501  }
4502 
4512  void setDisconnectHandler(const DisconnectHandler& disconnectHandler)
4513  {
4514  _body.get().setDisconnectHandler(disconnectHandler);
4515  }
4516 
4520  DisconnectHandler getDisconnectHandler(void) const
4521  {
4522  return _body.get().getDisconnectHandler();
4523  }
4524 
4529  virtual ConnectionInfo getConnectionInfo() const
4530  {
4531  return _body.get().getConnectionInfo();
4532  }
4533 
4542  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
4543  {
4544  _body.get().setBookmarkStore(bookmarkStore_);
4545  }
4546 
4551  {
4552  return _body.get().getBookmarkStore();
4553  }
4554 
4559  {
4560  return _body.get().getSubscriptionManager();
4561  }
4562 
4570  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
4571  {
4572  _body.get().setSubscriptionManager(subscriptionManager_);
4573  }
4574 
4594  void setPublishStore(const Store& publishStore_)
4595  {
4596  _body.get().setPublishStore(publishStore_);
4597  }
4598 
4603  {
4604  return _body.get().getPublishStore();
4605  }
4606 
4610  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
4611  {
4612  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4613  duplicateMessageHandler_);
4614  }
4615 
4626  {
4627  return _body.get().getDuplicateMessageHandler();
4628  }
4629 
4640  {
4641  _body.get().setFailedWriteHandler(handler_);
4642  }
4643 
4648  {
4649  return _body.get().getFailedWriteHandler();
4650  }
4651 
4652 
4670  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
4671  {
4672  return _body.get().publish(topic_.c_str(), topic_.length(),
4673  data_.c_str(), data_.length());
4674  }
4675 
4695  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4696  const char* data_, size_t dataLength_)
4697  {
4698  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4699  }
4700 
4719  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
4720  unsigned long expiration_)
4721  {
4722  return _body.get().publish(topic_.c_str(), topic_.length(),
4723  data_.c_str(), data_.length(), expiration_);
4724  }
4725 
4746  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4747  const char* data_, size_t dataLength_,
4748  unsigned long expiration_)
4749  {
4750  return _body.get().publish(topic_, topicLength_,
4751  data_, dataLength_, expiration_);
4752  }
4753 
4792  void publishFlush(long timeout_ = 0, unsigned ackType_ = Message::AckType::Processed)
4793  {
4794  _body.get().publishFlush(timeout_, ackType_);
4795  }
4796 
4797 
4813  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
4814  {
4815  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4816  data_.c_str(), data_.length());
4817  }
4818 
4836  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4837  const char* data_, size_t dataLength_)
4838  {
4839  return _body.get().deltaPublish(topic_, topicLength_,
4840  data_, dataLength_);
4841  }
4842 
4859  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
4860  unsigned long expiration_)
4861  {
4862  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4863  data_.c_str(), data_.length(),
4864  expiration_);
4865  }
4866 
4885  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4886  const char* data_, size_t dataLength_,
4887  unsigned long expiration_)
4888  {
4889  return _body.get().deltaPublish(topic_, topicLength_,
4890  data_, dataLength_, expiration_);
4891  }
4892 
4907  std::string logon(int timeout_ = 0,
4908  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
4909  const char* options_ = NULL)
4910  {
4911  return _body.get().logon(timeout_, authenticator_, options_);
4912  }
4925  std::string logon(const char* options_, int timeout_ = 0)
4926  {
4927  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4928  options_);
4929  }
4930 
4943  std::string logon(const std::string& options_, int timeout_ = 0)
4944  {
4945  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4946  options_.c_str());
4947  }
4948 
4968  std::string subscribe(const MessageHandler& messageHandler_,
4969  const std::string& topic_,
4970  long timeout_=0,
4971  const std::string& filter_="",
4972  const std::string& options_ = "",
4973  const std::string& subId_ = "")
4974  {
4975  return _body.get().subscribe(messageHandler_, topic_, timeout_,
4976  filter_, "", options_, subId_);
4977  }
4978 
4994  MessageStream subscribe(const std::string& topic_,
4995  long timeout_=0, const std::string& filter_="",
4996  const std::string& options_ = "",
4997  const std::string& subId_ = "")
4998  {
4999  MessageStream result(*this);
5000  if (_body.get().getDefaultMaxDepth())
5001  result.maxDepth(_body.get().getDefaultMaxDepth());
5002  result.setSubscription(_body.get().subscribe(
5003  result.operator MessageHandler(),
5004  topic_, timeout_, filter_, "",
5005  options_, subId_, false));
5006  return result;
5007  }
5008 
5024  MessageStream subscribe(const char* topic_,
5025  long timeout_ = 0, const std::string& filter_ = "",
5026  const std::string& options_ = "",
5027  const std::string& subId_ = "")
5028  {
5029  MessageStream result(*this);
5030  if (_body.get().getDefaultMaxDepth())
5031  result.maxDepth(_body.get().getDefaultMaxDepth());
5032  result.setSubscription(_body.get().subscribe(
5033  result.operator MessageHandler(),
5034  topic_, timeout_, filter_, "",
5035  options_, subId_, false));
5036  return result;
5037  }
5038 
5051  std::string deltaSubscribe(const MessageHandler& messageHandler_,
5052  const std::string& topic_,
5053  long timeout_,
5054  const std::string& filter_="",
5055  const std::string& options_ = "",
5056  const std::string& subId_ = "")
5057  {
5058  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
5059  filter_, "", options_, subId_);
5060  }
5069  MessageStream deltaSubscribe(const std::string& topic_,
5070  long timeout_, const std::string& filter_="",
5071  const std::string& options_ = "",
5072  const std::string& subId_ = "")
5073  {
5074  MessageStream result(*this);
5075  if (_body.get().getDefaultMaxDepth())
5076  result.maxDepth(_body.get().getDefaultMaxDepth());
5077  result.setSubscription(_body.get().deltaSubscribe(
5078  result.operator MessageHandler(),
5079  topic_, timeout_, filter_, "",
5080  options_, subId_, false));
5081  return result;
5082  }
5083 
5085  MessageStream deltaSubscribe(const char* topic_,
5086  long timeout_, const std::string& filter_ = "",
5087  const std::string& options_ = "",
5088  const std::string& subId_ = "")
5089  {
5090  MessageStream result(*this);
5091  if (_body.get().getDefaultMaxDepth())
5092  result.maxDepth(_body.get().getDefaultMaxDepth());
5093  result.setSubscription(_body.get().deltaSubscribe(
5094  result.operator MessageHandler(),
5095  topic_, timeout_, filter_, "",
5096  options_, subId_, false));
5097  return result;
5098  }
5099 
5125  std::string bookmarkSubscribe(const MessageHandler& messageHandler_,
5126  const std::string& topic_,
5127  long timeout_,
5128  const std::string& bookmark_,
5129  const std::string& filter_="",
5130  const std::string& options_ = "",
5131  const std::string& subId_ = "")
5132  {
5133  return _body.get().subscribe(messageHandler_, topic_, timeout_,
5134  filter_, bookmark_, options_, subId_);
5135  }
5153  MessageStream bookmarkSubscribe(const std::string& topic_,
5154  long timeout_,
5155  const std::string& bookmark_,
5156  const std::string& filter_="",
5157  const std::string& options_ = "",
5158  const std::string& subId_ = "")
5159  {
5160  MessageStream result(*this);
5161  if (_body.get().getDefaultMaxDepth())
5162  result.maxDepth(_body.get().getDefaultMaxDepth());
5163  result.setSubscription(_body.get().subscribe(
5164  result.operator MessageHandler(),
5165  topic_, timeout_, filter_,
5166  bookmark_, options_,
5167  subId_, false));
5168  return result;
5169  }
5170 
5172  MessageStream bookmarkSubscribe(const char* topic_,
5173  long timeout_,
5174  const std::string& bookmark_,
5175  const std::string& filter_ = "",
5176  const std::string& options_ = "",
5177  const std::string& subId_ = "")
5178  {
5179  MessageStream result(*this);
5180  if (_body.get().getDefaultMaxDepth())
5181  result.maxDepth(_body.get().getDefaultMaxDepth());
5182  result.setSubscription(_body.get().subscribe(
5183  result.operator MessageHandler(),
5184  topic_, timeout_, filter_,
5185  bookmark_, options_,
5186  subId_, false));
5187  return result;
5188  }
5189 
5198  void unsubscribe(const std::string& commandId)
5199  {
5200  return _body.get().unsubscribe(commandId);
5201  }
5202 
5211  {
5212  return _body.get().unsubscribe();
5213  }
5214 
5215 
5245  std::string sow(const MessageHandler& messageHandler_,
5246  const std::string& topic_,
5247  const std::string& filter_ = "",
5248  const std::string& orderBy_ = "",
5249  const std::string& bookmark_ = "",
5250  int batchSize_ = DEFAULT_BATCH_SIZE,
5251  int topN_ = DEFAULT_TOP_N,
5252  const std::string& options_ = "",
5253  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5254  {
5255  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
5256  bookmark_, batchSize_, topN_, options_,
5257  timeout_);
5258  }
5283  MessageStream sow(const std::string& topic_,
5284  const std::string& filter_ = "",
5285  const std::string& orderBy_ = "",
5286  const std::string& bookmark_ = "",
5287  int batchSize_ = DEFAULT_BATCH_SIZE,
5288  int topN_ = DEFAULT_TOP_N,
5289  const std::string& options_ = "",
5290  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5291  {
5292  MessageStream result(*this);
5293  if (_body.get().getDefaultMaxDepth())
5294  result.maxDepth(_body.get().getDefaultMaxDepth());
5295  result.setSOWOnly(sow(result.operator MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
5296  return result;
5297  }
5298 
5300  MessageStream sow(const char* topic_,
5301  const std::string& filter_ = "",
5302  const std::string& orderBy_ = "",
5303  const std::string& bookmark_ = "",
5304  int batchSize_ = DEFAULT_BATCH_SIZE,
5305  int topN_ = DEFAULT_TOP_N,
5306  const std::string& options_ = "",
5307  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5308  {
5309  MessageStream result(*this);
5310  if (_body.get().getDefaultMaxDepth())
5311  result.maxDepth(_body.get().getDefaultMaxDepth());
5312  result.setSOWOnly(sow(result.operator MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
5313  return result;
5314  }
5337  std::string sow(const MessageHandler& messageHandler_,
5338  const std::string& topic_,
5339  long timeout_,
5340  const std::string& filter_ = "",
5341  int batchSize_ = DEFAULT_BATCH_SIZE,
5342  int topN_ = DEFAULT_TOP_N)
5343  {
5344  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
5345  batchSize_, topN_);
5346  }
5369  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
5370  const std::string& topic_,
5371  long timeout_,
5372  const std::string& filter_ = "",
5373  int batchSize_ = DEFAULT_BATCH_SIZE,
5374  bool oofEnabled_ = false,
5375  int topN_ = DEFAULT_TOP_N)
5376  {
5377  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5378  filter_, batchSize_, oofEnabled_,
5379  topN_);
5380  }
5381 
5401  MessageStream sowAndSubscribe(const std::string& topic_,
5402  long timeout_,
5403  const std::string& filter_ = "",
5404  int batchSize_ = DEFAULT_BATCH_SIZE,
5405  bool oofEnabled_ = false,
5406  int topN_ = DEFAULT_TOP_N)
5407  {
5408  MessageStream result(*this);
5409  if (_body.get().getDefaultMaxDepth())
5410  result.maxDepth(_body.get().getDefaultMaxDepth());
5411  result.setSubscription(_body.get().sowAndSubscribe(
5412  result.operator MessageHandler(),
5413  topic_, timeout_, filter_,
5414  batchSize_, oofEnabled_,
5415  topN_, false));
5416  return result;
5417  }
5437  MessageStream sowAndSubscribe(const char *topic_,
5438  long timeout_,
5439  const std::string& filter_ = "",
5440  int batchSize_ = DEFAULT_BATCH_SIZE,
5441  bool oofEnabled_ = false,
5442  int topN_ = DEFAULT_TOP_N)
5443  {
5444  MessageStream result(*this);
5445  if (_body.get().getDefaultMaxDepth())
5446  result.maxDepth(_body.get().getDefaultMaxDepth());
5447  result.setSubscription(_body.get().sowAndSubscribe(
5448  result.operator MessageHandler(),
5449  topic_, timeout_, filter_,
5450  batchSize_, oofEnabled_,
5451  topN_, false));
5452  return result;
5453  }
5454 
5455 
5483  std::string sowAndSubscribe(const MessageHandler& messageHandler_,
5484  const std::string& topic_,
5485  const std::string& filter_ = "",
5486  const std::string& orderBy_ = "",
5487  const std::string& bookmark_ = "",
5488  int batchSize_ = DEFAULT_BATCH_SIZE,
5489  int topN_ = DEFAULT_TOP_N,
5490  const std::string& options_ = "",
5491  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5492  {
5493  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5494  orderBy_, bookmark_, batchSize_,
5495  topN_, options_, timeout_);
5496  }
5497 
5522  MessageStream sowAndSubscribe(const std::string& topic_,
5523  const std::string& filter_ = "",
5524  const std::string& orderBy_ = "",
5525  const std::string& bookmark_ = "",
5526  int batchSize_ = DEFAULT_BATCH_SIZE,
5527  int topN_ = DEFAULT_TOP_N,
5528  const std::string& options_ = "",
5529  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5530  {
5531  MessageStream result(*this);
5532  if (_body.get().getDefaultMaxDepth())
5533  result.maxDepth(_body.get().getDefaultMaxDepth());
5534  result.setSubscription(_body.get().sowAndSubscribe(
5535  result.operator MessageHandler(),
5536  topic_, filter_, orderBy_,
5537  bookmark_, batchSize_, topN_,
5538  options_, timeout_, false));
5539  return result;
5540  }
5541 
5543  MessageStream sowAndSubscribe(const char* topic_,
5544  const std::string& filter_ = "",
5545  const std::string& orderBy_ = "",
5546  const std::string& bookmark_ = "",
5547  int batchSize_ = DEFAULT_BATCH_SIZE,
5548  int topN_ = DEFAULT_TOP_N,
5549  const std::string& options_ = "",
5550  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5551  {
5552  MessageStream result(*this);
5553  if (_body.get().getDefaultMaxDepth())
5554  result.maxDepth(_body.get().getDefaultMaxDepth());
5555  result.setSubscription(_body.get().sowAndSubscribe(
5556  result.operator MessageHandler(),
5557  topic_, filter_, orderBy_,
5558  bookmark_, batchSize_, topN_,
5559  options_, timeout_, false));
5560  return result;
5561  }
5562 
5587  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
5588  const std::string& topic_,
5589  const std::string& filter_ = "",
5590  const std::string& orderBy_ = "",
5591  int batchSize_ = DEFAULT_BATCH_SIZE,
5592  int topN_ = DEFAULT_TOP_N,
5593  const std::string& options_ = "",
5594  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5595  {
5596  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5597  filter_, orderBy_, batchSize_,
5598  topN_, options_, timeout_);
5599  }
5620  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5621  const std::string& filter_ = "",
5622  const std::string& orderBy_ = "",
5623  int batchSize_ = DEFAULT_BATCH_SIZE,
5624  int topN_ = DEFAULT_TOP_N,
5625  const std::string& options_ = "",
5626  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5627  {
5628  MessageStream result(*this);
5629  if (_body.get().getDefaultMaxDepth())
5630  result.maxDepth(_body.get().getDefaultMaxDepth());
5631  result.setSubscription(sowAndDeltaSubscribe(result.operator MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5632  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5633  result.operator MessageHandler(),
5634  topic_, filter_, orderBy_,
5635  batchSize_, topN_, options_,
5636  timeout_, false));
5637  return result;
5638  }
5639 
5642  const std::string& filter_ = "",
5643  const std::string& orderBy_ = "",
5644  int batchSize_ = DEFAULT_BATCH_SIZE,
5645  int topN_ = DEFAULT_TOP_N,
5646  const std::string& options_ = "",
5647  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5648  {
5649  MessageStream result(*this);
5650  if (_body.get().getDefaultMaxDepth())
5651  result.maxDepth(_body.get().getDefaultMaxDepth());
5652  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5653  result.operator MessageHandler(),
5654  topic_, filter_, orderBy_,
5655  batchSize_, topN_, options_,
5656  timeout_, false));
5657  return result;
5658  }
5659 
5684  std::string sowAndDeltaSubscribe(const MessageHandler& messageHandler_,
5685  const std::string& topic_,
5686  long timeout_,
5687  const std::string& filter_ = "",
5688  int batchSize_ = DEFAULT_BATCH_SIZE,
5689  bool oofEnabled_ = false,
5690  bool sendEmpties_ = false,
5691  int topN_ = DEFAULT_TOP_N)
5692  {
5693  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5694  timeout_, filter_, batchSize_,
5695  oofEnabled_, sendEmpties_,
5696  topN_);
5697  }
5698 
5720  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5721  long timeout_,
5722  const std::string& filter_ = "",
5723  int batchSize_ = DEFAULT_BATCH_SIZE,
5724  bool oofEnabled_ = false,
5725  bool sendEmpties_ = false,
5726  int topN_ = DEFAULT_TOP_N)
5727  {
5728  MessageStream result(*this);
5729  if (_body.get().getDefaultMaxDepth())
5730  result.maxDepth(_body.get().getDefaultMaxDepth());
5731  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5732  result.operator MessageHandler(),
5733  topic_, timeout_, filter_,
5734  batchSize_, oofEnabled_,
5735  sendEmpties_, topN_, false));
5736  return result;
5737  }
5760  long timeout_,
5761  const std::string& filter_ = "",
5762  int batchSize_ = DEFAULT_BATCH_SIZE,
5763  bool oofEnabled_ = false,
5764  bool sendEmpties_ = false,
5765  int topN_ = DEFAULT_TOP_N)
5766  {
5767  MessageStream result(*this);
5768  if (_body.get().getDefaultMaxDepth())
5769  result.maxDepth(_body.get().getDefaultMaxDepth());
5770  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5771  result.operator MessageHandler(),
5772  topic_, timeout_, filter_,
5773  batchSize_, oofEnabled_,
5774  sendEmpties_, topN_, false));
5775  return result;
5776  }
5796  std::string sowDelete(const MessageHandler& messageHandler,
5797  const std::string& topic,
5798  const std::string& filter,
5799  long timeout)
5800  {
5801  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5802  }
5819  Message sowDelete(const std::string& topic, const std::string& filter,
5820  long timeout=0)
5821  {
5822  MessageStream stream(*this);
5823  char buf[Message::IdentifierLength+1];
5824  buf[Message::IdentifierLength] = 0;
5825  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5826  Field cid(buf);
5827  try
5828  {
5829  stream.setStatsOnly(cid);
5830  _body.get().sowDelete(stream.operator MessageHandler(),topic,filter,timeout,cid);
5831  return *(stream.begin());
5832  }
5833  catch (const DisconnectedException&)
5834  {
5835  removeMessageHandler(cid);
5836  throw;
5837  }
5838  }
5839 
5844  void startTimer()
5845  {
5846  _body.get().startTimer();
5847  }
5848 
5855  std::string stopTimer(const MessageHandler& messageHandler)
5856  {
5857  return _body.get().stopTimer(messageHandler);
5858  }
5859 
5881  std::string sowDeleteByKeys(const MessageHandler& messageHandler_,
5882  const std::string& topic_,
5883  const std::string& keys_,
5884  long timeout_=0)
5885  {
5886  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5887  }
5908  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
5909  long timeout_ = 0)
5910  {
5911  MessageStream stream(*this);
5912  char buf[Message::IdentifierLength+1];
5913  buf[Message::IdentifierLength] = 0;
5914  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5915  Field cid(buf);
5916  try
5917  {
5918  stream.setStatsOnly(cid);
5919  _body.get().sowDeleteByKeys(stream.operator MessageHandler(),topic_,keys_,timeout_,cid);
5920  return *(stream.begin());
5921  }
5922  catch (const DisconnectedException&)
5923  {
5924  removeMessageHandler(cid);
5925  throw;
5926  }
5927  }
5928 
5943  std::string sowDeleteByData(const MessageHandler& messageHandler_,
5944  const std::string& topic_, const std::string& data_,
5945  long timeout_=0)
5946  {
5947  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5948  }
5949 
5964  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
5965  long timeout_=0)
5966  {
5967  MessageStream stream(*this);
5968  char buf[Message::IdentifierLength+1];
5969  buf[Message::IdentifierLength] = 0;
5970  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5971  Field cid(buf);
5972  try
5973  {
5974  stream.setStatsOnly(cid);
5975  _body.get().sowDeleteByData(stream.operator MessageHandler(),topic_,data_,timeout_,cid);
5976  return *(stream.begin());
5977  }
5978  catch (const DisconnectedException&)
5979  {
5980  removeMessageHandler(cid);
5981  throw;
5982  }
5983  }
5984 
5989  {
5990  return _body.get().getHandle();
5991  }
5992 
6001  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
6002  {
6003  _body.get().setExceptionListener(pListener_);
6004  }
6005 
6015  {
6016  _body.get().setExceptionListener(listener_);
6017  }
6018 
6022  {
6023  return _body.get().getExceptionListener();
6024  }
6025 
6033  // type of message) from the server for the specified interval (plus a grace period),
6047  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
6048  {
6049  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
6050  }
6051 
6059  // type of message) from the server for the specified interval (plus a grace period),
6071  void setHeartbeat(unsigned heartbeatTime_)
6072  {
6073  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
6074  }
6075 
6078  {
6079  setLastChanceMessageHandler(messageHandler);
6080  }
6081 
6085  {
6086  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
6087  messageHandler);
6088  }
6089 
6110  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
6111  {
6112  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6113  }
6114 
6135  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
6136  {
6137  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
6138  }
6139 
6140  static const char* BOOKMARK_NOW() { return AMPS_BOOKMARK_NOW; }
6141  static const char* BOOKMARK_EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6142  static const char* BOOKMARK_RECENT() { return AMPS_BOOKMARK_RECENT; }
6143  static const char* BOOKMARK_MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6144  static const char* NOW() { return AMPS_BOOKMARK_NOW; }
6145  static const char* EPOCH() { return AMPS_BOOKMARK_EPOCH; }
6146  static const char* MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
6147 
6148 
6155  {
6156  _body.get().addConnectionStateListener(listener);
6157  }
6158 
6163  {
6164  _body.get().removeConnectionStateListener(listener);
6165  }
6166 
6192  std::string executeAsync(Command& command_, MessageHandler handler_)
6193  {
6194  return _body.get().executeAsync(command_, handler_);
6195  }
6196 
6226  std::string executeAsyncNoResubscribe(Command& command_,
6227  MessageHandler handler_)
6228  {
6229  std::string id;
6230  try
6231  {
6232  if (command_.isSubscribe())
6233  {
6234  Message& message = command_.getMessage();
6235  Field subId = message.getSubscriptionId();
6236  bool useExistingHandler = !subId.empty() && !message.getOptions().empty() && message.getOptions().contains("replace",7);
6237  if(useExistingHandler)
6238  {
6239  MessageHandler existingHandler;
6240  if (_body.get()._routes.getRoute(subId, existingHandler))
6241  {
6242  // we found an existing handler.
6243  _body.get().executeAsync(command_, existingHandler, false);
6244  return id; // empty string indicates existing
6245  }
6246  }
6247  }
6248  id = _body.get().executeAsync(command_, handler_, false);
6249  }
6250  catch (const DisconnectedException&)
6251  {
6252  removeMessageHandler(command_.getMessage().getCommandId());
6253  if (command_.isSubscribe())
6254  {
6255  removeMessageHandler(command_.getMessage().getSubscriptionId());
6256  }
6257  if (command_.isSow())
6258  {
6259  removeMessageHandler(command_.getMessage().getQueryID());
6260  }
6261  throw;
6262  }
6263  return id;
6264  }
6265 
6278  MessageStream execute(Command& command_);
6279 
6288  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6289  {
6290  _body.get().ack(topic_,bookmark_,options_);
6291  }
6292 
6300  void ack(Message& message_, const char* options_ = NULL)
6301  {
6302  _body.get().ack(message_.getTopic(),message_.getBookmark(),options_);
6303  }
6312  void ack(const std::string& topic_, const std::string& bookmark_,
6313  const char* options_ = NULL)
6314  {
6315  _body.get().ack(Field(topic_.data(),topic_.length()), Field(bookmark_.data(),bookmark_.length()),options_);
6316  }
6317 
6323  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
6324  {
6325  _body.get()._ack(topic_,bookmark_,options_);
6326  }
6336  void flushAcks(void)
6337  {
6338  _body.get().flushAcks();
6339  }
6340 
6345  bool getAutoAck(void) const
6346  {
6347  return _body.get().getAutoAck();
6348  }
6355  void setAutoAck(bool isAutoAckEnabled_)
6356  {
6357  _body.get().setAutoAck(isAutoAckEnabled_);
6358  }
6363  unsigned getAckBatchSize(void) const
6364  {
6365  return _body.get().getAckBatchSize();
6366  }
6373  void setAckBatchSize(const unsigned ackBatchSize_)
6374  {
6375  _body.get().setAckBatchSize(ackBatchSize_);
6376  }
6377 
6384  int getAckTimeout(void) const
6385  {
6386  return _body.get().getAckTimeout();
6387  }
6394  void setAckTimeout(const int ackTimeout_)
6395  {
6396  _body.get().setAckTimeout(ackTimeout_);
6397  }
6398 
6399 
6408  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
6409  {
6410  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6411  }
6412 
6417  bool getRetryOnDisconnect(void) const
6418  {
6419  return _body.get().getRetryOnDisconnect();
6420  }
6421 
6426  void setDefaultMaxDepth(unsigned maxDepth_)
6427  {
6428  _body.get().setDefaultMaxDepth(maxDepth_);
6429  }
6430 
6435  unsigned getDefaultMaxDepth(void) const
6436  {
6437  return _body.get().getDefaultMaxDepth();
6438  }
6439 
6447  void* userData_)
6448  {
6449  return _body.get().setTransportFilterFunction(filter_, userData_);
6450  }
6451 
6461  void* userData_)
6462  {
6463  return _body.get().setThreadCreatedCallback(callback_, userData_);
6464  }
6465 
6471  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
6472  {
6473  _body.get().deferredExecution(func_,userData_);
6474  }
6478 };
6479 
6480 inline void
6481 ClientImpl::lastChance(AMPS::Message& message)
6482 {
6483  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6484 }
6485 
6486 inline unsigned
6487 ClientImpl::persistedAck(AMPS::Message& message)
6488 {
6489  unsigned deliveries = 0;
6490  try
6491  {
6492  /*
6493  * Best Practice: If you don't care about the dupe acks that
6494  * occur during failover or rapid disconnect/reconnect, then just
6495  * ignore them. We could discard each duplicate from the
6496  * persisted store, but the storage costs of doing 1 record
6497  * discards is heavy. In most scenarios we'll just quickly blow
6498  * through the duplicates and get back to processing the
6499  * non-dupes.
6500  */
6501  const char* data = NULL;
6502  size_t len = 0;
6503  const char* status = NULL;
6504  size_t statusLen = 0;
6505  amps_handle messageHandle = message.getMessage();
6506  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6507  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
6508  amps_message_get_field_value(messageHandle, AMPS_Status,
6509  &status, &statusLen);
6510  if (len == NotEntitled || len == Duplicate ||
6511  (statusLen == Failure && status[0] == 'f'))
6512  {
6513  if (_failedWriteHandler)
6514  {
6515  if (_publishStore.isValid())
6516  {
6517  amps_uint64_t sequence =
6518  amps_message_get_field_uint64(messageHandle,
6519  AMPS_Sequence);
6520  FailedWriteStoreReplayer replayer(this, data, len);
6521  AMPS_CALL_EXCEPTION_WRAPPER(_publishStore.replaySingle(
6522  replayer, sequence));
6523  }
6524  else // Call the handler with what little we have
6525  {
6526  static Message emptyMessage;
6527  emptyMessage.setSequence(message.getSequence());
6528  AMPS_CALL_EXCEPTION_WRAPPER(
6529  _failedWriteHandler->failedWrite(emptyMessage,
6530  data, len));
6531  }
6532  ++deliveries;
6533  }
6534  }
6535  if (_publishStore.isValid())
6536  {
6537  // Ack for publisher will have sequence while
6538  // ack for bookmark subscribe won't
6539  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
6540  AMPS_Sequence);
6541  if (seq > 0)
6542  {
6543  ++deliveries;
6544  _publishStore.discardUpTo(seq);
6545  }
6546  }
6547 
6548  if (!deliveries && _bookmarkStore.isValid())
6549  {
6550  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
6551  &data, &len);
6552  if (len > 0)
6553  {
6554  Message::Field subId(data, len);
6555  const char* bookmarkData = NULL;
6556  size_t bookmarkLen = 0;
6557  amps_message_get_field_value(messageHandle, AMPS_Bookmark,
6558  &bookmarkData, &bookmarkLen);
6559  // Everything is there and not unsubscribed AC-912
6560  if (bookmarkLen > 0 && _routes.hasRoute(subId))
6561  {
6562  ++deliveries;
6563  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
6564  }
6565  }
6566  }
6567  }
6568  catch (std::exception& ex)
6569  {
6570  AMPS_UNHANDLED_EXCEPTION(ex);
6571  }
6572  return deliveries;
6573 }
6574 
6575 inline unsigned
6576 ClientImpl::processedAck(Message &message)
6577 {
6578  unsigned deliveries = 0;
6579  AckResponse ack;
6580  const char* data = NULL;
6581  size_t len = 0;
6582  amps_handle messageHandle = message.getMessage();
6583  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
6584  Lock<Mutex> l(_lock);
6585  if (data && len)
6586  {
6587  Lock<Mutex> guard(_ackMapLock);
6588  AckMap::iterator i = _ackMap.find(std::string(data, len));
6589  if (i != _ackMap.end())
6590  {
6591  ++deliveries;
6592  ack = i->second;
6593  _ackMap.erase(i);
6594  }
6595  }
6596  if (deliveries)
6597  {
6598  amps_message_get_field_value(messageHandle, AMPS_Status, &data,
6599  &len);
6600  ack.setStatus(data, len);
6601  amps_message_get_field_value(messageHandle, AMPS_Reason, &data,
6602  &len);
6603  ack.setReason(data, len);
6604  amps_message_get_field_value(messageHandle, AMPS_UserId, &data,
6605  &len);
6606  ack.setUsername(data, len);
6607  amps_message_get_field_value(messageHandle, AMPS_Password, &data,
6608  &len);
6609  ack.setPassword(data, len);
6610  amps_message_get_field_value(messageHandle, AMPS_Sequence, &data,
6611  &len);
6612  ack.setSequenceNo(data, len);
6613  amps_message_get_field_value(messageHandle, AMPS_Version, &data,
6614  &len);
6615  ack.setServerVersion(data, len);
6616  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
6617  ack.setOptions(data,len);
6618  amps_message_get_field_value(messageHandle, AMPS_Bookmark, &data, &len);
6619  ack.setBookmark(data,len);
6620  ack.setResponded(true);
6621  _lock.signalAll();
6622  }
6623  return deliveries;
6624 }
6625 
6626 inline void
6627 ClientImpl::checkAndSendHeartbeat(bool force)
6628 {
6629  if (force || _heartbeatTimer.check())
6630  {
6631  _heartbeatTimer.start();
6632  try
6633  {
6634  sendWithoutRetry(_beatMessage);
6635  }
6636  catch (const AMPSException&)
6637  {
6638  ;
6639  }
6640  }
6641 }
6642 
6643 inline ConnectionInfo ClientImpl::getConnectionInfo() const
6644 {
6645  ConnectionInfo info;
6646  std::ostringstream writer;
6647 
6648  info["client.uri"] = _lastUri;
6649  info["client.name"] = _name;
6650  info["client.username"] = _username;
6651  if(_publishStore.isValid())
6652  {
6653  writer << _publishStore.unpersistedCount();
6654  info["publishStore.unpersistedCount"] = writer.str();
6655  writer.clear();
6656  writer.str("");
6657  }
6658 
6659  return info;
6660 }
6661 
6662 inline amps_result
6663 ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
6664 {
6665  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6666  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6667  ClientImpl* me = (ClientImpl*) userData_;
6668  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->processDeferredExecutions());
6669  if(!messageHandle_)
6670  {
6671  if(me->_queueAckTimeout) me->checkQueueAcks();
6672  return AMPS_E_OK;
6673  }
6674 
6675  me->_readMessage.replace(messageHandle_);
6676  Message& message = me->_readMessage;
6677  Message::Command::Type commandType = message.getCommandEnum();
6678  if (commandType & SOWMask)
6679  {
6680 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6681  // A small cheat here to get the right handler, using knowledge of the
6682  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
6683  // and their GlobalCommandTypeHandlers values 1, 2, 3.
6684  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6685  me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6686 #endif
6687  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6688  message.getQueryID()));
6689  }
6690  else if (commandType & PublishMask)
6691  {
6692 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6693  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6694  me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6695  GlobalCommandTypeHandlers::Publish :
6696  GlobalCommandTypeHandlers::OOF)].invoke(message));
6697 #endif
6698  const char* subIds = NULL;
6699  size_t subIdsLen = 0;
6700  // Publish command, send to subscriptions
6701  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds, &subIds, &subIdsLen);
6702  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
6703  for(size_t i=0; i<subIdCount; ++i)
6704  {
6705  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6706  MessageHandler& handler = lookupResult.handler;
6707  if (handler.isValid())
6708  {
6709  amps_message_set_field_value(messageHandle_, AMPS_SubscriptionId,
6710  subIds + lookupResult.idOffset, lookupResult.idLength);
6711  Message::Field bookmark = message.getBookmark();
6712  bool isMessageQueue = message.getLeasePeriod().len() != 0;
6713  bool isAutoAck = me->_isAutoAckEnabled;
6714 
6715  if (!isMessageQueue && !bookmark.empty() &&
6716  me->_bookmarkStore.isValid())
6717  {
6718  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6719  {
6720  //Call duplicate message handler in handlers map
6721  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6722  {
6723  AMPS_CALL_EXCEPTION_WRAPPER_2(me, me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message));
6724  }
6725  }
6726  else
6727  {
6728  me->_bookmarkStore.log(me->_readMessage);
6729  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6730  handler.invoke(message));
6731  }
6732  }
6733  else
6734  {
6735  if(isMessageQueue && isAutoAck)
6736  {
6737  try
6738  {
6739  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6740  if (!message.getIgnoreAutoAck())
6741  {
6742  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6743  me->_ack(message.getTopic(),message.getBookmark()));
6744  }
6745  }
6746  catch(std::exception& ex)
6747  {
6748  if (!message.getIgnoreAutoAck())
6749  {
6750  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6751  me->_ack(message.getTopic(),message.getBookmark(),"cancel"));
6752  }
6753  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6754  }
6755  }
6756  else
6757  {
6758  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6759  handler.invoke(message));
6760  }
6761  }
6762  }
6763  else me->lastChance(message);
6764  } // for (subidsEnd)
6765  }
6766  else if (commandType == Message::Command::Ack)
6767  {
6768  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6769  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6770  unsigned ackType = message.getAckTypeEnum();
6771  unsigned deliveries = 0U;
6772  switch (ackType)
6773  {
6774  case Message::AckType::Persisted:
6775  deliveries += me->persistedAck(message);
6776  break;
6777  case Message::AckType::Processed: // processed
6778  deliveries += me->processedAck(message);
6779  break;
6780  }
6781  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6782  if (deliveries == 0)
6783  {
6784  me->lastChance(message);
6785  }
6786  }
6787  else if (commandType == Message::Command::Heartbeat)
6788  {
6789  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6790  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6791  if(me->_heartbeatTimer.getTimeout() != 0.0) // -V550
6792  {
6793  me->checkAndSendHeartbeat(true);
6794  }
6795  else
6796  {
6797  me->lastChance(message);
6798  }
6799  return AMPS_E_OK;
6800  }
6801  else if (!message.getCommandId().empty())
6802  {
6803  unsigned deliveries = 0U;
6804  try
6805  {
6806  while(me->_connected) // Keep sending heartbeats when stream is full
6807  {
6808  try
6809  {
6810  deliveries = me->_routes.deliverData(message, message.getCommandId());
6811  break;
6812  }
6813 #ifdef _WIN32
6814  catch(MessageStreamFullException&)
6815 #else
6816  catch(MessageStreamFullException& ex_)
6817 #endif
6818  {
6819  me->checkAndSendHeartbeat(false);
6820  }
6821  }
6822  }
6823  catch (std::exception& ex_)
6824  {
6825  try
6826  {
6827  me->_exceptionListener->exceptionThrown(ex_);
6828  }
6829  catch(...)
6830  {
6831  ;
6832  }
6833  }
6834  if (deliveries == 0)
6835  me->lastChance(message);
6836  }
6837  me->checkAndSendHeartbeat();
6838  return AMPS_E_OK;
6839 }
6840 
6841 inline void
6842 ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
6843 {
6844  ClientImpl* me = (ClientImpl*) userData;
6845  //Client wrapper(me);
6846  // Go ahead and signal any waiters if they are around...
6847  me->clearAcks(failedConnectionVersion);
6848 }
6849 
6850 inline amps_result
6851 ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
6852 {
6853  ClientImpl* me = (ClientImpl*) userData;
6854  Lock<Mutex> l(me->_lock);
6855  Client wrapper(me,false);
6856  if (me->_connected)
6857  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6858  while(true)
6859  {
6860  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6861  try
6862  {
6863  AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6864  me->_connected = false;
6865  {
6866  // Have to release the lock here or receive thread can't
6867  // invoke the message handler.
6868  Unlock<Mutex> unlock(me->_lock);
6869  me->_disconnectHandler.invoke(wrapper);
6870  }
6871  }
6872  catch(const std::exception& ex)
6873  {
6874  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6875  }
6876 
6877  if (!me->_connected)
6878  {
6879  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6880  AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException("Reconnect failed."));
6881  return AMPS_E_DISCONNECTED;
6882  }
6883  try
6884  {
6885  // Resubscribe
6886  if (me->_subscriptionManager)
6887  {
6888  {
6889  // Have to release the lock here or receive thread can't
6890  // invoke the message handler.
6891  Unlock<Mutex> unlock(me->_lock);
6892  me->_subscriptionManager->resubscribe(wrapper);
6893  }
6894  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6895  }
6896  return AMPS_E_OK;
6897  }
6898  catch(const AMPSException& subEx)
6899  {
6900  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6901  }
6902  catch(const std::exception& subEx)
6903  {
6904  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6905  return AMPS_E_RETRY;
6906  }
6907  catch(...)
6908  {
6909  return AMPS_E_RETRY;
6910  }
6911  }
6912  return AMPS_E_RETRY;
6913 }
6914 
6915 class FIX
6916 {
6917  const char* _data;
6918  size_t _len;
6919  char _fieldSep;
6920 public:
6921  class iterator
6922  {
6923  const char* _data;
6924  size_t _len;
6925  size_t _pos;
6926  char _fieldSep;
6927  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
6928  : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6929  {
6930  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6931  }
6932  public:
6933  typedef void* difference_type;
6934  typedef std::forward_iterator_tag iterator_category;
6935  typedef std::pair<Message::Field, Message::Field> value_type;
6936  typedef value_type* pointer;
6937  typedef value_type& reference;
6938  bool operator==(const iterator& rhs) const
6939  {
6940  return _pos == rhs._pos;
6941  }
6942  bool operator!=(const iterator& rhs) const
6943  {
6944  return _pos != rhs._pos;
6945  }
6946  iterator& operator++()
6947  {
6948  // Skip through the data
6949  while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
6950  // Skip through any field separators
6951  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6952  return *this;
6953  }
6954 
6955  value_type operator*() const
6956  {
6957  value_type result;
6958  size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
6959  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
6960 
6961  result.first.assign(_data+_pos, keyLength);
6962 
6963  if (i < _len && _data[i] == '=')
6964  {
6965  ++i;
6966  valueStart = i;
6967  for(; i < _len && _data[i] != _fieldSep; ++i)
6968  {
6969  valueLength++;
6970  }
6971  }
6972  result.second.assign(_data+valueStart, valueLength);
6973  return result;
6974  }
6975 
6976  friend class FIX;
6977  };
6978  class reverse_iterator
6979  {
6980  const char* _data;
6981  size_t _len;
6982  const char* _pos;
6983  char _fieldSep;
6984  public:
6985  typedef std::pair<Message::Field, Message::Field> value_type;
6986  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
6987  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
6988  {
6989  if (_pos)
6990  {
6991  // skip past meaningless trailing fieldseps
6992  while(_pos >=_data && *_pos == _fieldSep) --_pos;
6993  while(_pos > _data && *_pos != _fieldSep) --_pos;
6994  // if we stopped before the 0th character, it's because
6995  // it's a field sep. advance one to point to the first character
6996  // of a key.
6997  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6998  if (_pos < _data) _pos = 0;
6999  }
7000  }
7001  bool operator==(const reverse_iterator& rhs) const
7002  {
7003  return _pos == rhs._pos;
7004  }
7005  bool operator!=(const reverse_iterator& rhs) const
7006  {
7007  return _pos != rhs._pos;
7008  }
7009  reverse_iterator& operator++()
7010  {
7011  if (_pos == _data)
7012  {
7013  _pos = 0;
7014  }
7015  else
7016  {
7017  // back up 1 to a field separator
7018  --_pos;
7019  // keep backing up through field separators
7020  while(_pos >=_data && *_pos == _fieldSep) --_pos;
7021  // now back up to the beginning of this field
7022  while(_pos >_data && *_pos != _fieldSep) --_pos;
7023  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
7024  if (_pos < _data) _pos = 0;
7025  }
7026  return *this;
7027  }
7028  value_type operator*() const
7029  {
7030  value_type result;
7031  size_t keyLength = 0, valueStart = 0, valueLength = 0;
7032  size_t i = (size_t)(_pos - _data);
7033  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
7034  result.first.assign(_pos, keyLength);
7035  if (i<_len && _data[i] == '=')
7036  {
7037  ++i;
7038  valueStart = i;
7039  for(; i<_len && _data[i] != _fieldSep; ++i)
7040  {
7041  valueLength++;
7042  }
7043  }
7044  result.second.assign(_data+valueStart, valueLength);
7045  return result;
7046  }
7047  };
7048  FIX(const Message::Field& data, char fieldSeparator=1)
7049  : _data(data.data()), _len(data.len()),
7050  _fieldSep(fieldSeparator)
7051  {
7052  }
7053 
7054  FIX(const char* data, size_t len, char fieldSeparator=1)
7055  : _data(data), _len(len), _fieldSep(fieldSeparator)
7056  {
7057  }
7058 
7059  iterator begin() const
7060  {
7061  return iterator(_data, _len, 0, _fieldSep);
7062  }
7063  iterator end() const
7064  {
7065  return iterator(_data, _len, _len, _fieldSep);
7066  }
7067 
7068 
7069  reverse_iterator rbegin() const
7070  {
7071  return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
7072  }
7073 
7074  reverse_iterator rend() const
7075  {
7076  return reverse_iterator(_data, _len, 0, _fieldSep);
7077  }
7078 };
7079 
7080 
7093 
7094 template <class T>
7096 {
7097  std::stringstream _data;
7098  char _fs;
7099 public:
7105  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7106 
7114  void append(const T& tag, const char* value, size_t offset, size_t length)
7115  {
7116  _data << tag<<'=';
7117  _data.write(value+offset, (std::streamsize)length);
7118  _data << _fs;
7119  }
7125  void append(const T& tag, const std::string& value)
7126  {
7127  _data << tag << '=' << value << _fs;
7128  }
7129 
7132  std::string getString() const
7133  {
7134  return _data.str();
7135  }
7136  operator std::string() const
7137  {
7138  return _data.str();
7139  }
7140 
7142  void reset()
7143  {
7144  _data.str(std::string());
7145  }
7146 };
7147 
7151 
7153 
7157 
7159 
7160 
7168 
7170 {
7171  char _fs;
7172 public:
7177  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
7178 
7181  typedef std::map<Message::Field, Message::Field> map_type;
7182 
7188  map_type toMap(const Message::Field& data)
7189  {
7190  FIX fix(data, _fs);
7191  map_type retval;
7192  for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
7193  {
7194  retval.insert(*a);
7195  }
7196 
7197  return retval;
7198  }
7199 };
7200 
7201 class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
7202 {
7203  Mutex _lock;
7204  std::deque<Message> _q;
7205  std::string _commandId;
7206  std::string _subId;
7207  std::string _queryId;
7208  Client _client;
7209  unsigned _timeout;
7210  unsigned _maxDepth;
7211  unsigned _requestedAcks;
7212  Message::Field _previousTopic;
7213  Message::Field _previousBookmark;
7214  volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
7215  typedef std::map<std::string, Message*> SOWKeyMap;
7216  SOWKeyMap _sowKeyMap;
7217  public:
7218  MessageStreamImpl(const Client& client_)
7219  : _client(client_),
7220  _timeout(0),
7221  _maxDepth((unsigned)~0),
7222  _requestedAcks(0),
7223  _state(Unset)
7224  {
7225  if (_client.isValid())
7226  {
7227  _client.addConnectionStateListener(this);
7228  }
7229  }
7230 
7231  MessageStreamImpl(ClientImpl* client_)
7232  : _client(client_),
7233  _timeout(0),
7234  _maxDepth((unsigned)~0),
7235  _requestedAcks(0),
7236  _state(Unset)
7237  {
7238  if (_client.isValid())
7239  {
7240  _client.addConnectionStateListener(this);
7241  }
7242  }
7243 
7244  ~MessageStreamImpl()
7245  {
7246  }
7247 
7248  virtual void destroy()
7249  {
7250  try
7251  {
7252  close();
7253  }
7254  catch(std::exception &e)
7255  {
7256  try
7257  {
7258  if (_client.isValid())
7259  {
7260  _client.getExceptionListener().exceptionThrown(e);
7261  }
7262  } catch (...) {/*Ignore exception listener exceptions*/} // -V565
7263  }
7264  if (_client.isValid())
7265  {
7266  _client.removeConnectionStateListener(this);
7267  Client c = _client;
7268  _client = Client((ClientImpl*)NULL);
7269  c.deferredExecution(MessageStreamImpl::destroyer, this);
7270  }
7271  else
7272  {
7273  delete this;
7274  }
7275  }
7276 
7277  static void destroyer(void* vpMessageStreamImpl_)
7278  {
7279  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
7280  }
7281 
7282  void setSubscription(const std::string& subId_,
7283  const std::string& commandId_ = "",
7284  const std::string& queryId_ = "")
7285  {
7286  Lock<Mutex> lock(_lock);
7287  _subId = subId_;
7288  if (!commandId_.empty() && commandId_ != subId_)
7289  _commandId = commandId_;
7290  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
7291  _queryId = queryId_;
7292  // It's possible to disconnect between creation/registration and here.
7293  if (Disconnected == _state) return;
7294  assert(Unset==_state);
7295  _state = Subscribe;
7296  }
7297 
7298  void setSOWOnly(const std::string& commandId_,
7299  const std::string& queryId_ = "")
7300  {
7301  Lock<Mutex> lock(_lock);
7302  _commandId = commandId_;
7303  if (!queryId_.empty() && queryId_ != commandId_)
7304  _queryId = queryId_;
7305  // It's possible to disconnect between creation/registration and here.
7306  if (Disconnected == _state) return;
7307  assert(Unset==_state);
7308  _state = SOWOnly;
7309  }
7310 
7311  void setStatsOnly(const std::string& commandId_,
7312  const std::string& queryId_ = "")
7313  {
7314  Lock<Mutex> lock(_lock);
7315  _commandId = commandId_;
7316  if (!queryId_.empty() && queryId_ != commandId_)
7317  _queryId = queryId_;
7318  // It's possible to disconnect between creation/registration and here.
7319  if (Disconnected == _state) return;
7320  assert(Unset==_state);
7321  _state = AcksOnly;
7322  _requestedAcks = Message::AckType::Stats;
7323  }
7324 
7325  void setAcksOnly(const std::string& commandId_, unsigned acks_)
7326  {
7327  Lock<Mutex> lock(_lock);
7328  _commandId = commandId_;
7329  // It's possible to disconnect between creation/registration and here.
7330  if (Disconnected == _state) return;
7331  assert(Unset==_state);
7332  _state = AcksOnly;
7333  _requestedAcks = acks_;
7334  }
7335 
7336  void connectionStateChanged(ConnectionStateListener::State state_)
7337  {
7338  Lock<Mutex> lock(_lock);
7339  if(state_ == AMPS::ConnectionStateListener::Disconnected)
7340  {
7341  _state = Disconnected;
7342  close();
7343  }
7344  _lock.signalAll();
7345  }
7346 
7347  void timeout(unsigned timeout_)
7348  {
7349  _timeout = timeout_;
7350  }
7351  void conflate(void)
7352  {
7353  if(_state == Subscribe) _state = Conflate;
7354  }
7355  void maxDepth(unsigned maxDepth_)
7356  {
7357  if(maxDepth_) _maxDepth = maxDepth_;
7358  else _maxDepth = (unsigned)~0;
7359  }
7360  unsigned getMaxDepth(void) const
7361  {
7362  return _maxDepth;
7363  }
7364  unsigned getDepth(void) const
7365  {
7366  return (unsigned)(_q.size());
7367  }
7368 
7369  bool next(Message& current_)
7370  {
7371  Lock<Mutex> lock(_lock);
7372  if (!_previousTopic.empty() && !_previousBookmark.empty())
7373  {
7374  try
7375  {
7376  if (_client.isValid())
7377  {
7378  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7379  }
7380  }
7381 #ifdef _WIN32
7382  catch (AMPSException&)
7383 #else
7384  catch (AMPSException& e)
7385 #endif
7386  {
7387  current_.invalidate();
7388  _previousTopic.clear();
7389  _previousBookmark.clear();
7390  return false;
7391  }
7392  _previousTopic.clear();
7393  _previousBookmark.clear();
7394  }
7395  double minWaitTime = (double)((_timeout && _timeout > 1000)
7396  ? _timeout : 1000);
7397  Timer timer(minWaitTime);
7398  timer.start();
7399  while(_q.empty() && _state & Running)
7400  {
7401  // Using timeout so python can interrupt
7402  _lock.wait((long)minWaitTime);
7403  {
7404  Unlock<Mutex> unlck(_lock);
7405  amps_invoke_waiting_function();
7406  }
7407  if (_timeout)
7408  {
7409  // In case we woke up early, see how much longer to wait
7410  if(timer.checkAndGetRemaining(&minWaitTime))
7411  {
7412  break;
7413  }
7414  }
7415  }
7416  if(!_q.empty())
7417  {
7418  current_ = _q.front();
7419  if(_q.size() == _maxDepth) _lock.signalAll();
7420  _q.pop_front();
7421  if(_state == Conflate)
7422  {
7423  std::string sowKey = current_.getSowKey();
7424  if(sowKey.length()) _sowKeyMap.erase(sowKey);
7425  }
7426  else if(_state == AcksOnly)
7427  {
7428  _requestedAcks &= ~(current_.getAckTypeEnum());
7429  }
7430  if((_state == AcksOnly && _requestedAcks == 0) ||
7431  (_state == SOWOnly && current_.getCommand()=="group_end"))
7432  {
7433  _state = Closed;
7434  }
7435  else if (current_.getCommandEnum() == Message::Command::Publish &&
7436  _client.isValid() && _client.getAutoAck() &&
7437  !current_.getLeasePeriod().empty() &&
7438  !current_.getBookmark().empty())
7439  {
7440  _previousTopic = current_.getTopic().deepCopy();
7441  _previousBookmark = current_.getBookmark().deepCopy();
7442  }
7443  return true;
7444  }
7445  if(_state == Disconnected)
7446  {
7447  throw DisconnectedException("Connection closed.");
7448  }
7449  current_.invalidate();
7450  if(_state == Closed)
7451  {
7452  return false;
7453  }
7454  return _timeout != 0;
7455  }
7456  void close(void)
7457  {
7458  if (_client.isValid())
7459  {
7460  if (_state == SOWOnly || _state == Subscribe) //not delete
7461  {
7462  if (!_commandId.empty()) _client.unsubscribe(_commandId);
7463  if (!_subId.empty()) _client.unsubscribe(_subId);
7464  if (!_queryId.empty()) _client.unsubscribe(_queryId);
7465  }
7466  else
7467  {
7468  if (!_commandId.empty()) _client.removeMessageHandler(_commandId);
7469  if (!_subId.empty()) _client.removeMessageHandler(_subId);
7470  if (!_queryId.empty()) _client.removeMessageHandler(_queryId);
7471  }
7472  }
7473  if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7474  {
7475  _state = Closed;
7476  }
7477  }
7478  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
7479  {
7480  Lock<Mutex> lock(this_->_lock);
7481  if(this_->_state != Conflate)
7482  {
7483  AMPS_TESTING_SLOW_MESSAGE_STREAM
7484  if(this_->_q.size() >= this_->_maxDepth)
7485  {
7486  // We throw here so that heartbeats can be sent. The exception
7487  // will be handled internally only, and the same Message will
7488  // come back to try again. Make sure to signal.
7489  this_->_lock.signalAll();
7490  throw MessageStreamFullException("Stream is currently full.");
7491  }
7492  this_->_q.push_back(message_.deepCopy());
7493  if (message_.getCommandEnum() == Message::Command::Publish &&
7494  this_->_client.isValid() && this_->_client.getAutoAck() &&
7495  !message_.getLeasePeriod().empty() &&
7496  !message_.getBookmark().empty())
7497  {
7498  message_.setIgnoreAutoAck();
7499  }
7500  }
7501  else
7502  {
7503  std::string sowKey = message_.getSowKey();
7504  if(sowKey.length())
7505  {
7506  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7507  if(it != this_->_sowKeyMap.end())
7508  {
7509  *(it->second) = message_.deepCopy();
7510  }
7511  else
7512  {
7513  if(this_->_q.size() >= this_->_maxDepth)
7514  {
7515  // We throw here so that heartbeats can be sent. The
7516  // exception will be handled internally only, and the
7517  // same Message will come back to try again. Make sure
7518  // to signal.
7519  this_->_lock.signalAll();
7520  throw MessageStreamFullException("Stream is currently full.");
7521  }
7522  this_->_q.push_back(message_.deepCopy());
7523  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7524  }
7525  }
7526  else
7527  {
7528  while(this_->_q.size() >= this_->_maxDepth) // -V712
7529  {
7530  this_->_lock.wait(1);
7531  }
7532  this_->_q.push_back(message_.deepCopy());
7533  }
7534  }
7535  this_->_lock.signalAll();
7536  }
7537 };
7538 inline MessageStream::MessageStream(void)
7539 {
7540 }
7541 inline MessageStream::MessageStream(const Client& client_)
7542  :_body(new MessageStreamImpl(client_))
7543 {
7544 }
7545 inline void MessageStream::iterator::advance(void)
7546 {
7547  _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7548 }
7549 inline MessageStream::operator MessageHandler(void)
7550 {
7551  return MessageHandler((void(*)(const Message&,void*))MessageStreamImpl::_messageHandler, &_body.get());
7552 }
7553 inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
7554 {
7555  MessageStream result;
7556  if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7557  {
7558  result._body = (MessageStreamImpl*)(handler_._userData);
7559  }
7560  return result;
7561 }
7562 
7563 inline void MessageStream::setSOWOnly(const std::string& commandId_,
7564  const std::string& queryId_)
7565 {
7566  _body->setSOWOnly(commandId_, queryId_);
7567 }
7568 inline void MessageStream::setSubscription(const std::string& subId_,
7569  const std::string& commandId_,
7570  const std::string& queryId_)
7571 {
7572  _body->setSubscription(subId_, commandId_, queryId_);
7573 }
7574 inline void MessageStream::setStatsOnly(const std::string& commandId_,
7575  const std::string& queryId_)
7576 {
7577  _body->setStatsOnly(commandId_, queryId_);
7578 }
7579 inline void MessageStream::setAcksOnly(const std::string& commandId_,
7580  unsigned acks_)
7581 {
7582  _body->setAcksOnly(commandId_, acks_);
7583 }
7584 inline MessageStream MessageStream::timeout(unsigned timeout_)
7585 {
7586  _body->timeout(timeout_);
7587  return *this;
7588 }
7590 {
7591  _body->conflate();
7592  return *this;
7593 }
7594 inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
7595 {
7596  _body->maxDepth(maxDepth_);
7597  return *this;
7598 }
7599 inline unsigned MessageStream::getMaxDepth(void) const
7600 {
7601  return _body->getMaxDepth();
7602 }
7603 inline unsigned MessageStream::getDepth(void) const
7604 {
7605  return _body->getDepth();
7606 }
7607 
7608 inline MessageStream ClientImpl::getEmptyMessageStream(void)
7609 {
7610  return *(_pEmptyMessageStream.get());
7611 }
7612 
7614 {
7615  // If the command is sow and has a sub_id, OR
7616  // if the command has a replace option, return the existing
7617  // messagestream, don't create a new one.
7618  ClientImpl& body = _body.get();
7619  Message& message = command_.getMessage();
7620  Field subId = message.getSubscriptionId();
7621  unsigned ackTypes = message.getAckTypeEnum();
7622  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace",7)) || message.getCommandEnum() == Message::Command::SOW);
7623  if(useExistingHandler)
7624  {
7625  // Try to find the existing message handler.
7626  if(!subId.empty())
7627  {
7628  MessageHandler existingHandler;
7629  if (body._routes.getRoute(subId, existingHandler))
7630  {
7631  // we found an existing handler. It might not be a message stream, but that's okay.
7632  body.executeAsync(command_, existingHandler, false);
7633  return MessageStream::fromExistingHandler(existingHandler);
7634  }
7635  }
7636  // fall through; we'll a new handler altogether.
7637  }
7638  // Make sure something will be returned to the stream or use the empty one
7639  Message::Command::Type command = message.getCommandEnum();
7640  if ((command & Message::Command::NoDataCommands)
7641  && (ackTypes == Message::AckType::Persisted
7642  || ackTypes == Message::AckType::None))
7643  {
7644  executeAsync(command_, MessageHandler());
7645  if (!body._pEmptyMessageStream)
7646  {
7647  body._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
7648  body._pEmptyMessageStream.get()->_body->close();
7649  }
7650  return body.getEmptyMessageStream();
7651  }
7652  MessageStream stream(*this);
7653  if (body.getDefaultMaxDepth())
7654  stream.maxDepth(body.getDefaultMaxDepth());
7655  MessageHandler handler = stream.operator MessageHandler();
7656  std::string commandID = body.executeAsync(command_, handler, false);
7657  if (command_.hasStatsAck())
7658  {
7659  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
7660  }
7661  else if (command_.isSow())
7662  {
7663  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
7664  }
7665  else if (command_.isSubscribe())
7666  {
7667  stream.setSubscription(commandID,
7668  command_.getMessage().getCommandId(),
7669  command_.getMessage().getQueryId());
7670  }
7671  else
7672  {
7673  // Persisted acks for writes don't come back with command id
7674  if (command == Message::Command::Publish ||
7675  command == Message::Command::DeltaPublish ||
7676  command == Message::Command::SOWDelete)
7677  {
7678  stream.setAcksOnly(commandID,
7679  ackTypes & (unsigned)~Message::AckType::Persisted);
7680  }
7681  else
7682  {
7683  stream.setAcksOnly(commandID, ackTypes);
7684  }
7685  }
7686  return stream;
7687 }
7688 
7689 // This is here because it uses api from Client.
7690 inline void Message::ack(const char* options_) const
7691 {
7692  ClientImpl* pClient = _body.get().clientImpl();
7693  Message::Field bookmark = getBookmark();
7694  if(pClient && bookmark.len() &&
7695  !pClient->getAutoAck())
7696  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
7697  pClient->ack(getTopic(),bookmark,options_);
7698 }
7699 }// end namespace AMPS
7700 #endif
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:557
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:4266
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1261
std::string sowDeleteByKeys(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5881
std::string stopTimer(const MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5855
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:512
std::string getAckType() const
Definition: ampsplusplus.hpp:636
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4470
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:7095
void startTimer()
Definition: ampsplusplus.hpp:5844
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5401
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:759
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:608
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7589
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1230
std::string send(const MessageHandler &messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4498
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:486
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:6135
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6426
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4375
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:5198
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:546
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1223
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:731
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6435
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6300
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5153
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:540
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4594
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4859
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4390
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1142
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4610
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:6162
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6384
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4361
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5908
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:949
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:7132
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6446
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4746
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:4173
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1145
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4446
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1232
std::string bookmarkSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5125
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:811
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6408
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7599
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4421
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:1043
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5587
Success.
Definition: amps.h:189
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1128
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:705
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4836
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:7177
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:5245
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4459
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:4169
amps_result
Return values from amps_xxx functions.
Definition: amps.h:184
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4647
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:973
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1035
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7613
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:601
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1257
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4602
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6460
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:641
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4350
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:606
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1077
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:578
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4907
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:882
const std::string & getNameHash() const
Returns the name hash of this client as generated by the server and returned when the client logged o...
Definition: ampsplusplus.hpp:4319
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:626
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5543
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:4249
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:6110
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1138
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:6154
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:501
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1230
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4695
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1261
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1257
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4558
std::string subscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4968
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:493
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1148
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1138
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:6021
Field getQueryId() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:722
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1073
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:538
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
std::string sowDeleteByData(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5943
amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Message & newQueryId()
Creates and sets a new sequential value for the QueryID header for this Message.
Definition: Message.hpp:1225
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:595
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:542
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:903
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:717
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5069
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4925
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:588
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1139
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1232
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:699
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:4330
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:7105
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:6312
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:712
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:753
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5437
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:836
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7114
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1035
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:963
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4550
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:920
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6373
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1224
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1259
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:991
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5819
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1233
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4542
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:136
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1141
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:536
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:6014
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:509
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4570
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:6077
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:7125
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:6363
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4813
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5522
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:562
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:4140
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5759
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:912
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
void publishFlush(long timeout_=0, unsigned ackType_=Message::AckType::Processed)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4792
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6192
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5720
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4943
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:564
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:5210
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:6355
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5085
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1140
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:873
std::string sowAndDeltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5684
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:6001
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:977
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4625
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
DisconnectHandler getDisconnectHandler(void) const
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4520
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:7169
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:973
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4529
Message & setOptions(const std::string &v)
Sets the value of the Options header for this Message.
Definition: Message.hpp:1172
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:6288
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:7181
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
void setDisconnectHandler(const DisconnectHandler &disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4512
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:857
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:7188
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:202
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5620
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6071
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:544
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:5024
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:4132
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1138
Message & setQueryId(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1225
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:627
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:941
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7594
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:590
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1260
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4428
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5283
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:616
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:574
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7603
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1225
The operation has not succeeded, but ought to be retried.
Definition: amps.h:213
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4337
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:6047
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4397
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:560
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:891
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:550
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5988
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:7142
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4639
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5641
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6394
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:534
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1224
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:6345
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4670
Definition: ampsplusplus.hpp:103
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:4304
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:668
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:933
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:1019
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:815
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5300
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4719
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:6084
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4885
The client and server are disconnected.
Definition: amps.h:217
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5964
std::string deltaSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:5051
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7584
std::string sow(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:5337
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:4311
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:407
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5483
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1033
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4994
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:5172
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:521
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:6226
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
std::string sowDelete(const MessageHandler &messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5796
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:4184
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6417
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:6336
std::string sowAndSubscribe(const MessageHandler &messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5369