AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.2.0
ampsplusplus.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef _AMPSPLUSPLUS_H_
26 #define _AMPSPLUSPLUS_H_
27 #include "amps.h"
28 #include "ampsver.h"
29 #include <string>
30 #include <map>
31 #include <sstream>
32 #include <iostream>
33 #include <memory>
34 #include <stdexcept>
35 #include <limits.h>
36 #include <list>
37 #include <memory>
38 #include <set>
39 #include <deque>
40 #include <vector>
41 #include <assert.h>
42 #ifndef _WIN32
43 #include <inttypes.h>
44 #endif
45 #if defined(sun)
46 #include <sys/atomic.h>
47 #endif
48 #include "BookmarkStore.hpp"
49 #include "MessageRouter.hpp"
50 #include "util.hpp"
51 #include "ampscrc.hpp"
52 
53 #ifndef AMPS_TESTING_SLOW_MESSAGE_STREAM
54 #define AMPS_TESTING_SLOW_MESSAGE_STREAM
55 #endif
56 
61 
62 
69 
80 
81 // For StoreBuffer implementations
82 #define AMPS_MEMORYBUFFER_DEFAULT_BUFFERS 10
83 #define AMPS_MEMORYBUFFER_DEFAULT_LENGTH 40960
84 #define AMPS_SUBSCRIPTION_MANAGER_DEFAULT_TIMEOUT 0
85 #define AMPS_HACLIENT_TIMEOUT_DEFAULT 10000
86 #define AMPS_HACLIENT_RECONNECT_DEFAULT 200
87 #define AMPS_FLUSH_MIN_VERSION 4000000
88 #define AMPS_MIN_SOW_KEY_PUBLISH_VERSION 4030100
89 #define AMPS_DEFAULT_COMMAND_TIMEOUT 5000
90 #define AMPS_DEFAULT_TOP_N -1
91 #define AMPS_DEFAULT_BATCH_SIZE 10
92 #define AMPS_NUMBER_BUFFER_LEN 20
93 #define AMPS_DEFAULT_QUEUE_ACK_TIMEOUT 1000
94 
95 #if defined(_M_X64) || defined(__x86_64) || defined(_WIN64)
96 #define AMPS_X64 1
97 #endif
98 
99 #ifdef _WIN32
100 static __declspec ( thread ) AMPS::Message* publishStoreMessage = 0;
101 #else
102 static __thread AMPS::Message* publishStoreMessage = 0;
103 #endif
104 
105 namespace AMPS
106 {
107 
108 typedef std::map<std::string, std::string> ConnectionInfo;
109 
110 class PerThreadMessageTracker {
111 std::vector<AMPS::Message*> _messages;
112 public:
113  PerThreadMessageTracker() {}
114  ~PerThreadMessageTracker()
115  {
116  for (size_t i=0; i<_messages.size(); ++i)
117  {
118  delete _messages[i];
119  }
120  }
121  void addMessage(AMPS::Message* message)
122  {
123  _messages.push_back(message);
124  }
125  static void addMessageToCleanupList(AMPS::Message* message)
126  {
127  static AMPS::Mutex _lock;
128  AMPS::Lock<Mutex> l(_lock);
129  _addMessageToCleanupList(message);
130  }
131  static void _addMessageToCleanupList(AMPS::Message* message)
132  {
133  static PerThreadMessageTracker tracker;
134  tracker.addMessage(message);
135  }
136 };
137 
138 template<class Type>
139 inline std::string asString(Type x_)
140 {
141  std::ostringstream os;
142  os << x_;
143  return os.str();
144 }
145 
146 inline
147 size_t convertToCharArray(char* buf_, amps_uint64_t seqNo_)
148 {
149  size_t pos = AMPS_NUMBER_BUFFER_LEN;
150  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
151  {
152  if (seqNo_ > 0)
153  {
154  buf_[--pos] = (char)(seqNo_ % 10 + '0');
155  seqNo_ /= 10;
156  }
157  }
158  return pos;
159 }
160 
161 #ifdef _WIN32
162 inline
163 size_t convertToCharArray(char* buf_, unsigned long seqNo_)
164 {
165  size_t pos = AMPS_NUMBER_BUFFER_LEN;
166  for(int i = 0; i<AMPS_NUMBER_BUFFER_LEN; ++i)
167  {
168  if (seqNo_ > 0)
169  {
170  buf_[--pos] = (char)(seqNo_ % 10 + '0');
171  seqNo_ /= 10;
172  }
173  }
174  return pos;
175 }
176 #endif
177 
181 class Reason
182 {
183  public:
184  static const char* duplicate() { return "duplicate";}
185  static const char* badFilter() { return "bad filter";}
186  static const char* badRegexTopic() { return "bad regex topic";}
187  static const char* subscriptionAlreadyExists() { return "subscription already exists";}
188  static const char* nameInUse() { return "name in use";}
189  static const char* authFailure() { return "auth failure";}
190  static const char* notEntitled() { return "not entitled";}
191  static const char* authDisabled() { return "authentication disabled";}
192  static const char* subidInUse() { return "subid in use";}
193  static const char* noTopic() { return "no topic";}
194 };
195 
205 {
206 public:
207  virtual ~ExceptionListener() {;}
208  virtual void exceptionThrown(const std::exception&) const {;}
209 };
210 
212 
213 
214 #define AMPS_CALL_EXCEPTION_WRAPPER(x) \
215  try\
216  {\
217  x;\
218  }\
219  catch (std::exception& ex_)\
220  {\
221  try\
222  {\
223  _exceptionListener->exceptionThrown(ex_);\
224  }\
225  catch(...)\
226  {\
227  ;\
228  }\
229  }
230  /*
231  * Note : we don't attempt to trap non std::exception exceptions
232  * here because doing so interferes with pthread_exit on some OSes.
233  catch (...)\
234  {\
235  try\
236  {\
237  _exceptionListener->exceptionThrown(AMPS::AMPSException(\
238  "An unhandled exception of unknown type was thrown by "\
239  "the registered handler.", AMPS_E_USAGE));\
240  }\
241  catch(...)\
242  {\
243  ;\
244  }\
245  }
246  */
247 #ifdef _WIN32
248 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \
249  while(me->_connected)\
250  {\
251  try\
252  {\
253  x;\
254  break;\
255  }\
256  catch(MessageStreamFullException&)\
257  {\
258  me->checkAndSendHeartbeat(false);\
259  }\
260  }
261 #else
262 #define AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \
263  while(me->_connected)\
264  {\
265  try\
266  {\
267  x;\
268  break;\
269  }\
270  catch(MessageStreamFullException& ex_)\
271  {\
272  me->checkAndSendHeartbeat(false);\
273  }\
274  }
275 #endif
276 
277 #define AMPS_CALL_EXCEPTION_WRAPPER_2(me,x) \
278  try\
279  {\
280  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me,x) \
281  }\
282  catch (std::exception& ex_)\
283  {\
284  try\
285  {\
286  me->_exceptionListener->exceptionThrown(ex_);\
287  }\
288  catch(...)\
289  {\
290  ;\
291  }\
292  }
293  /*
294  * Note : we don't attempt to trap non std::exception exceptions
295  * here because doing so interferes with pthread_exit on some OSes.
296  catch (...)\
297  {\
298  try\
299  {\
300  me->_exceptionListener->exceptionThrown(AMPS::AMPSException(\
301  "An unhandled exception of unknown type was thrown by "\
302  "the registered handler.", AMPS_E_USAGE));\
303  }\
304  catch(...)\
305  {\
306  ;\
307  }\
308  }*/
309 
310 #define AMPS_UNHANDLED_EXCEPTION(ex) \
311  try\
312  {\
313  _exceptionListener->exceptionThrown(ex);\
314  }\
315  catch(...)\
316  {;}
317 
318 #define AMPS_UNHANDLED_EXCEPTION_2(me,ex) \
319  try\
320  {\
321  me->_exceptionListener->exceptionThrown(ex);\
322  }\
323  catch(...)\
324  {;}
325 
326 
327 class Client;
328 
353 
354 class Command
355 {
356  Message _message;
357  unsigned _timeout;
358  unsigned _batchSize;
359  unsigned _flags;
360  static const unsigned Subscribe = 1;
361  static const unsigned SOW = 2;
362  static const unsigned NeedsSequenceNumber = 4;
363  static const unsigned ProcessedAck = 8;
364  static const unsigned StatsAck = 16;
365  void init(Message::Command::Type command_)
366  {
367  _timeout = 0;
368  _batchSize = 0;
369  _flags = 0;
370  _message.reset();
371  _message.setCommandEnum(command_);
372  _setIds();
373  }
374  void init(const std::string& command_)
375  {
376  _timeout = 0;
377  _batchSize = 0;
378  _flags = 0;
379  _message.reset();
380  _message.setCommand(command_);
381  _setIds();
382  }
383  void _setIds(void)
384  {
385  Message::Command::Type command = _message.getCommandEnum();
386  if (!(command & Message::Command::NoDataCommands))
387  {
388  _message.newCommandId();
389  if (command == Message::Command::Subscribe ||
390  command == Message::Command::SOWAndSubscribe ||
391  command == Message::Command::DeltaSubscribe ||
392  command == Message::Command::SOWAndDeltaSubscribe)
393  {
394  _message.setSubscriptionId(_message.getCommandId());
395  _flags |= Subscribe;
396  }
397  if (command == Message::Command::SOW
398  || command == Message::Command::SOWAndSubscribe
399  || command == Message::Command::SOWAndDeltaSubscribe)
400  {
401  _message.setQueryID(_message.getCommandId());
402  if (_batchSize == 0)
403  {
404  setBatchSize(AMPS_DEFAULT_BATCH_SIZE);
405  }
406  if (command == Message::Command::SOW)
407  {
408  _flags |= SOW;
409  }
410  }
411  _flags |= ProcessedAck;
412  }
413  else if (command == Message::Command::SOWDelete)
414  {
415  _message.newCommandId();
416  _flags |= ProcessedAck;
417  _flags |= NeedsSequenceNumber;
418  }
419  else if (command == Message::Command::Publish
420  || command == Message::Command::DeltaPublish)
421  {
422  _flags |= NeedsSequenceNumber;
423  }
424  else if (command == Message::Command::StopTimer)
425  {
426  _message.newCommandId();
427  }
428  }
429 public:
433  Command(const std::string& command_)
434  {
435  init(command_);
436  }
440  Command(Message::Command::Type command_)
441  {
442  init(command_);
443  }
444 
448  Command& reset(const std::string& command_)
449  {
450  init(command_);
451  return *this;
452  }
456  Command& reset(Message::Command::Type command_)
457  {
458  init(command_);
459  return *this;
460  }
468  Command& setSowKey(const std::string& sowKey_) { _message.setSowKey(sowKey_); return *this; }
481  Command& setSowKeys(const std::string& sowKeys_) { _message.setSowKeys(sowKeys_); return *this; }
483  Command& setCommandId(const std::string& v_) { _message.setCommandId(v_); return *this; }
485  Command& setTopic(const std::string& v_) { _message.setTopic(v_); return *this; }
487  Command& setFilter(const std::string& v_) { _message.setFilter(v_); return *this; }
489  Command& setOrderBy(const std::string& v_) { _message.setOrderBy(v_); return *this; }
491  Command& setSubId(const std::string& v_) { _message.setSubscriptionId(v_); return *this; }
493  Command& setQueryId(const std::string& v_) { _message.setQueryId(v_); return *this; }
497  Command& setBookmark(const std::string& v_) { _message.setBookmark(v_); return *this; }
504  Command& setCorrelationId(const std::string& v_) { _message.setCorrelationId(v_); return *this; }
507  Command& setOptions(const std::string& v_) { _message.setOptions(v_); return *this; }
509  Command& setSequence(const std::string& v_) { _message.setSequence(v_); return *this; }
511  Command& setSequence(const amps_uint64_t v_)
512  {
513  std::ostringstream os;
514  os << v_;
515  _message.setSequence(os.str());
516  return *this;
517  }
518  amps_uint64_t getSequence() const { return amps_message_get_field_uint64(_message.getMessage(),AMPS_Sequence); }
521  Command& setData(const std::string& v_) { _message.setData(v_); return *this; }
525  Command& setData(const char* v_, size_t length_) { _message.setData(v_, length_); return *this; }
535  Command& setTimeout(unsigned v_) { _timeout = v_; return *this; }
537  Command& setTopN(unsigned v_) { _message.setTopNRecordsReturned(v_); return *this; }
542  Command& setBatchSize(unsigned v_) { _message.setBatchSize(v_); _batchSize = v_; return *this; }
553  Command& setExpiration(unsigned v_) { _message.setExpiration(v_); return *this; }
555  Command& addAckType(const std::string& v_)
556  {
557  _message.setAckType(_message.getAckType() + "," + v_);
558  if (v_ == "processed") _flags |= ProcessedAck;
559  else if (v_ == "stats") _flags |= StatsAck;
560  return *this;
561  }
563  Command& setAckType(const std::string& v_)
564  {
565  _message.setAckType(v_);
566  if (v_.find("processed") != std::string::npos) _flags |= ProcessedAck;
567  else _flags &= ~ProcessedAck;
568  if (v_.find("stats") != std::string::npos) _flags |= StatsAck;
569  else _flags &= ~StatsAck;
570  return *this;
571  }
573  Command& setAckType(unsigned v_)
574  {
575  _message.setAckTypeEnum(v_);
576  if (v_ & Message::AckType::Processed) _flags |= ProcessedAck;
577  else _flags &= ~ProcessedAck;
578  if (v_ & Message::AckType::Stats) _flags |= StatsAck;
579  else _flags &= ~StatsAck;
580  return *this;
581  }
583  std::string getAckType() const
584  {
585  return (std::string)(_message.getAckType());
586  }
588  unsigned getAckTypeEnum() const
589  {
590  return _message.getAckTypeEnum();
591  }
592 
593  Message& getMessage(void) { return _message; }
594  unsigned getTimeout(void) const { return _timeout; }
595  unsigned getBatchSize(void) const { return _batchSize; }
596  bool isSubscribe(void) const
597  {
598  return _flags & Subscribe;
599  }
600  bool isSow(void) const { return (_flags & SOW) != 0; }
601  bool hasProcessedAck(void) const { return (_flags & ProcessedAck) != 0; }
602  bool hasStatsAck(void) const { return (_flags & StatsAck) != 0; }
603  bool needsSequenceNumber(void) const { return (_flags & NeedsSequenceNumber) != 0; }
604 };
605 
608 typedef void(*DisconnectHandlerFunc)(Client&, void* userData);
609 
610 class Message;
612 
616 {
617 public:
618  virtual ~Authenticator() {;}
619 
625  virtual std::string authenticate(const std::string& userName_, const std::string& password_) = 0;
633  virtual std::string retry(const std::string& userName_, const std::string& password_) = 0;
640  virtual void completed(const std::string& userName_, const std::string& password_, const std::string& reason_) = 0;
641 };
642 
647 {
648 public:
649  virtual ~DefaultAuthenticator() {;}
652  std::string authenticate(const std::string& /*userName_*/, const std::string& password_)
653  {
654  return password_;
655  }
656 
659  std::string retry(const std::string& /*userName_*/, const std::string& /*password_*/)
660  {
661  throw AuthenticationException("retry not implemented by DefaultAuthenticator.");
662  }
663 
664  void completed(const std::string& /*userName_*/, const std::string& /* password_ */, const std::string& /* reason */) {;}
665 
670  {
671  static DefaultAuthenticator d;
672  return d;
673  }
674 };
675 
679 {
680 public:
681 
685  virtual void execute(Message& message_) = 0;
686 
687  virtual ~StoreReplayer() {;}
688 };
689 
690 class Store;
691 
700 typedef bool (*PublishStoreResizeHandler)(Store store_,
701  size_t size_,
702  void* userData_);
703 
706 class StoreImpl : public RefBody
707 {
708 public:
709  StoreImpl() : _resizeHandler(NULL), _resizeHandlerData(NULL) {;}
710 
715  virtual amps_uint64_t store(const Message& message_) = 0;
716 
721  virtual void discardUpTo(amps_uint64_t index_) = 0;
722 
727  virtual void replay(StoreReplayer& replayer_) = 0;
728 
736  virtual bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_) = 0;
737 
742  virtual size_t unpersistedCount() const = 0;
743 
744  virtual ~StoreImpl() {;}
745 
754  virtual void flush(long timeout_) = 0;
755 
758  static inline size_t getUnsetPosition() { return AMPS_UNSET_INDEX; }
759 
762  static inline amps_uint64_t getUnsetSequence() { return AMPS_UNSET_SEQUENCE; }
763 
767  virtual amps_uint64_t getLowestUnpersisted() const = 0;
768 
772  virtual amps_uint64_t getLastPersisted() = 0;
773 
783  inline virtual void setResizeHandler(PublishStoreResizeHandler handler_,
784  void* userData_)
785  {
786  _resizeHandler = handler_;
787  _resizeHandlerData = userData_;
788  }
789 
790  inline virtual PublishStoreResizeHandler getResizeHandler() const
791  {
792  return _resizeHandler;
793  }
794 
795  bool callResizeHandler(size_t newSize_);
796 
797 private:
798  PublishStoreResizeHandler _resizeHandler;
799  void* _resizeHandlerData;
800 };
801 
804 class Store
805 {
806  RefHandle<StoreImpl> _body;
807 public:
808  Store() {;}
809  Store(StoreImpl* body_) : _body(body_) {;}
810  Store& operator=(const Store& rhs)
811  {
812  _body = rhs._body;
813  return *this;
814  }
815 
819  amps_uint64_t store(const Message& message_)
820  {
821  return _body.get().store(message_);
822  }
823 
828  void discardUpTo(amps_uint64_t index_)
829  {
830  _body.get().discardUpTo(index_);
831  }
832 
837  void replay(StoreReplayer& replayer_)
838  {
839  _body.get().replay(replayer_);
840  }
841 
849  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
850  {
851  return _body.get().replaySingle(replayer_, index_);
852  }
853 
858  size_t unpersistedCount() const
859  {
860  return _body.get().unpersistedCount();
861  }
862 
866  bool isValid() const
867  {
868  return _body.isValid();
869  }
870 
879  void flush(long timeout_ = 0)
880  {
881  return _body.get().flush(timeout_);
882  }
883 
887  amps_uint64_t getLowestUnpersisted()
888  {
889  return _body.get().getLowestUnpersisted();
890  }
891 
895  amps_uint64_t getLastPersisted()
896  {
897  return _body.get().getLastPersisted();
898  }
899 
909  void setResizeHandler(PublishStoreResizeHandler handler_,
910  void* userData_)
911  {
912  _body.get().setResizeHandler(handler_, userData_);
913  }
914 
915  PublishStoreResizeHandler getResizeHandler()
916  {
917  return _body.get().getResizeHandler();
918  }
919 
923  StoreImpl* get()
924  {
925  if (_body.isValid())
926  return &_body.get();
927  else
928  return NULL;
929  }
930 
931 };
932 
938 {
939 public:
940  virtual ~FailedWriteHandler() {;}
947  virtual void failedWrite(const Message& message_,
948  const char* reason_, size_t reasonLength_) = 0;
949 };
950 
951 
952 inline bool StoreImpl::callResizeHandler(size_t newSize_)
953 {
954  if(_resizeHandler)
955  return _resizeHandler(Store(this), newSize_, _resizeHandlerData);
956  return true;
957 }
958 
965 inline bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t /*size_*/,
966  void* data_)
967 {
968  long* timeoutp = (long*)data_;
969  size_t count = store_.unpersistedCount();
970  if (count == 0) return false;
971  try
972  {
973  store_.flush(*timeoutp);
974  }
975 #ifdef _WIN32
976  catch (const TimedOutException&)
977 #else
978  catch (const TimedOutException& e)
979 #endif
980  {
981  return true;
982  }
983  return (count == store_.unpersistedCount());
984 }
985 
990 {
991 public:
992  virtual ~SubscriptionManager() {;}
1000  virtual void subscribe(MessageHandler messageHandler_, const Message& message_,
1001  unsigned requestedAckTypes_) = 0;
1005  virtual void unsubscribe(const Message::Field& subId_) = 0;
1008  virtual void clear() = 0;
1012  virtual void resubscribe(Client& client_) = 0;
1013 };
1014 
1018 
1020 {
1021 public:
1023  typedef enum { Disconnected = 0,
1024  Shutdown = 1,
1025  Connected = 2,
1026  LoggedOn = 4,
1027  PublishReplayed = 8,
1028  HeartbeatInitiated = 16,
1029  Resubscribed = 32,
1030  UNKNOWN = 16384
1031  } State;
1032 
1042  virtual void connectionStateChanged(State newState_) = 0;
1043  virtual ~ConnectionStateListener() {;};
1044 };
1045 
1046 
1047 class MessageStreamImpl;
1048 class MessageStream;
1049 
1050 typedef void(*DeferredExecutionFunc)(void*);
1051 
1052 class ClientImpl : public RefBody
1053 {
1054  friend class Client;
1055 protected:
1056  amps_handle _client;
1057  DisconnectHandler _disconnectHandler;
1058  enum GlobalCommandTypeHandlers : size_t
1059  {
1060  Publish = 0,
1061  SOW = 1,
1062  GroupBegin = 2,
1063  GroupEnd = 3,
1064  Heartbeat = 4,
1065  OOF = 5,
1066  Ack = 6,
1067  LastChance = 7,
1068  DuplicateMessage = 8,
1069  COUNT = 9
1070  };
1071  std::vector<MessageHandler> _globalCommandTypeHandlers;
1072  Message _message, _readMessage, _publishMessage, _deltaMessage, _beatMessage;
1073  MessageRouter _routes;
1074  MessageRouter::RouteCache _routeCache;
1075  mutable Mutex _lock;
1076  std::string _name, _lastUri, _logonCorrelationData;
1077  BookmarkStore _bookmarkStore;
1078  Store _publishStore;
1079  bool _isRetryOnDisconnect;
1080  amps_unique_ptr<FailedWriteHandler> _failedWriteHandler;
1081  volatile amps_uint64_t _lastSentHaSequenceNumber;
1082  ATOMIC_TYPE_8 _badTimeToHAPublish;
1083  ATOMIC_TYPE_8 _badTimeToHASubscribe;
1084  VersionInfo _serverVersion;
1085  Timer _heartbeatTimer;
1086  amps_unique_ptr<MessageStream> _pEmptyMessageStream;
1087 
1088  // queue data
1089  int _queueAckTimeout;
1090  bool _isAutoAckEnabled;
1091  unsigned _ackBatchSize;
1092  unsigned _queuedAckCount;
1093  unsigned _defaultMaxDepth;
1094  struct QueueBookmarks
1095  {
1096  QueueBookmarks(const std::string topic_)
1097  :_topic(topic_)
1098  ,_oldestTime(0)
1099  ,_bookmarkCount(0)
1100  {;}
1101  std::string _topic;
1102  std::string _data;
1103  amps_uint64_t _oldestTime;
1104  unsigned _bookmarkCount;
1105  };
1106  typedef amps_uint64_t topic_hash;
1107  typedef std::map<topic_hash,QueueBookmarks> TopicHashMap;
1108  TopicHashMap _topicHashMap;
1109 
1110  class ClientStoreReplayer : public StoreReplayer
1111  {
1112  ClientImpl* _client;
1113  public:
1114  unsigned _version;
1115  amps_result _res;
1116 
1117  ClientStoreReplayer()
1118  : _client(NULL) , _version(0), _res(AMPS_E_OK)
1119  {}
1120 
1121  ClientStoreReplayer(ClientImpl* client_)
1122  : _client(client_) , _version(0), _res(AMPS_E_OK)
1123  {}
1124 
1125  void setClient(ClientImpl* client_) { _client = client_; }
1126 
1127  void execute(Message& message_)
1128  {
1129  if (!_client) throw CommandException("Can't replay without a client.");
1130  amps_uint64_t index = amps_message_get_field_uint64(message_.getMessage(),
1131  AMPS_Sequence);
1132  if (index > _client->_lastSentHaSequenceNumber)
1133  _client->_lastSentHaSequenceNumber = index;
1134 
1135  _res = AMPS_E_OK;
1136  // Don't replay a queue cancel message after a reconnect.
1137  // Currently, the only messages that will have anything in options
1138  // are cancel messages.
1139  if (!message_.getCommand().empty() &&
1140  (!_client->_badTimeToHAPublish ||
1141  message_.getOptions().len() < 6))
1142  {
1143  _res= amps_client_send_with_version(_client->_client,
1144  message_.getMessage(),
1145  &_version);
1146  if (_res != AMPS_E_OK)
1147  {
1148  throw DisconnectedException("AMPS Server disconnected during replay");
1149  }
1150  }
1151  }
1152 
1153  };
1154  ClientStoreReplayer _replayer;
1155 
1156  class FailedWriteStoreReplayer : public StoreReplayer
1157  {
1158  ClientImpl* _parent;
1159  const char* _reason;
1160  size_t _reasonLength;
1161  size_t _replayCount;
1162  public:
1163  FailedWriteStoreReplayer(ClientImpl* parent,const char* reason_, size_t reasonLength_)
1164  : _parent(parent),
1165  _reason(reason_),
1166  _reasonLength(reasonLength_),
1167  _replayCount(0)
1168  {;}
1169  void execute(Message& message_)
1170  {
1171  if (_parent->_failedWriteHandler)
1172  {
1173  ++_replayCount;
1174  _parent->_failedWriteHandler->failedWrite(message_,
1175  _reason, _reasonLength);
1176  }
1177  }
1178  size_t replayCount(void) const { return _replayCount; }
1179  };
1180 
1181  struct AckResponseImpl : public RefBody
1182  {
1183  std::string username, password, reason, status, bookmark, options;
1184  amps_uint64_t sequenceNo;
1185  VersionInfo serverVersion;
1186  volatile bool responded, abandoned;
1187  unsigned connectionVersion;
1188  AckResponseImpl() :
1189  RefBody(),
1190  sequenceNo((amps_uint64_t)0),
1191  serverVersion(),
1192  responded(false),
1193  abandoned(false),
1194  connectionVersion(0)
1195  {
1196  }
1197  };
1198 
1199  class AckResponse
1200  {
1201  RefHandle<AckResponseImpl> _body;
1202  public:
1203  AckResponse() : _body(NULL) {;}
1204  static AckResponse create()
1205  {
1206  AckResponse r;
1207  r._body = new AckResponseImpl();
1208  return r;
1209  }
1210 
1211  const std::string& username()
1212  {
1213  return _body.get().username;
1214  }
1215  void setUsername(const char* data_, size_t len_)
1216  {
1217  if (data_) _body.get().username.assign(data_, len_);
1218  else _body.get().username.clear();
1219  }
1220  const std::string& password()
1221  {
1222  return _body.get().password;
1223  }
1224  void setPassword(const char* data_, size_t len_)
1225  {
1226  if (data_) _body.get().password.assign(data_, len_);
1227  else _body.get().password.clear();
1228  }
1229  const std::string& reason()
1230  {
1231  return _body.get().reason;
1232  }
1233  void setReason(const char* data_, size_t len_)
1234  {
1235  if (data_) _body.get().reason.assign(data_, len_);
1236  else _body.get().reason.clear();
1237  }
1238  const std::string& status()
1239  {
1240  return _body.get().status;
1241  }
1242  void setStatus(const char* data_, size_t len_)
1243  {
1244  if (data_) _body.get().status.assign(data_, len_);
1245  else _body.get().status.clear();
1246  }
1247  const std::string& bookmark()
1248  {
1249  return _body.get().bookmark;
1250  }
1251  void setBookmark(const char* data_, size_t len_)
1252  {
1253  if (data_) _body.get().bookmark.assign(data_, len_);
1254  else _body.get().bookmark.clear();
1255  }
1256  amps_uint64_t sequenceNo() const
1257  {
1258  return _body.get().sequenceNo;
1259  }
1260  void setSequenceNo(const char* data_, size_t len_)
1261  {
1262  amps_uint64_t result = (amps_uint64_t)0;
1263  if (data_)
1264  {
1265  for(size_t i=0; i<len_; ++i)
1266  {
1267  result *= (amps_uint64_t)10;
1268  result += (amps_uint64_t)(data_[i] - '0');
1269  }
1270  }
1271  _body.get().sequenceNo = result;
1272  }
1273  VersionInfo serverVersion() const
1274  {
1275  return _body.get().serverVersion;
1276  }
1277  void setServerVersion(const char* data_, size_t len_)
1278  {
1279  if (data_)
1280  _body.get().serverVersion.setVersion(std::string(data_, len_));
1281  }
1282  bool responded()
1283  {
1284  return _body.get().responded;
1285  }
1286  void setResponded(bool responded_)
1287  {
1288  _body.get().responded = responded_;
1289  }
1290  bool abandoned()
1291  {
1292  return _body.get().abandoned;
1293  }
1294  void setAbandoned(bool abandoned_)
1295  {
1296  if (_body.isValid())
1297  _body.get().abandoned = abandoned_;
1298  }
1299 
1300  void setConnectionVersion(unsigned connectionVersion)
1301  {
1302  _body.get().connectionVersion = connectionVersion;
1303  }
1304 
1305  unsigned getConnectionVersion()
1306  {
1307  return _body.get().connectionVersion;
1308  }
1309  void setOptions(const char* data_, size_t len_)
1310  {
1311  if (data_) _body.get().options.assign(data_,len_);
1312  else _body.get().options.clear();
1313  }
1314 
1315  const std::string& options()
1316  {
1317  return _body.get().options;
1318  }
1319 
1320  AckResponse& operator=(const AckResponse& rhs)
1321  {
1322  _body = rhs._body;
1323  return *this;
1324  }
1325  };
1326 
1327 
1328  typedef std::map<std::string, AckResponse> AckMap;
1329  AckMap _acks;
1330  DefaultExceptionListener _defaultExceptionListener;
1331 protected:
1332 
1333  struct DeferredExecutionRequest
1334  {
1335  DeferredExecutionRequest(DeferredExecutionFunc func_,
1336  void* userData_)
1337  : _func(func_),
1338  _userData(userData_)
1339  {;}
1340 
1341  DeferredExecutionFunc _func;
1342  void* _userData;
1343  };
1344  const ExceptionListener* _exceptionListener;
1345  std::shared_ptr<const ExceptionListener> _pExceptionListener;
1346  amps_unique_ptr<SubscriptionManager> _subscriptionManager;
1347  bool _connected;
1348  std::string _username;
1349  typedef std::set<ConnectionStateListener*> ConnectionStateListeners;
1350  ConnectionStateListeners _connectionStateListeners;
1351  typedef std::vector<DeferredExecutionRequest> DeferredExecutionList;
1352  DeferredExecutionList _deferredExecutionList;
1353  unsigned _heartbeatInterval;
1354  unsigned _readTimeout;
1355 
1356  void broadcastConnectionStateChanged(ConnectionStateListener::State newState_)
1357  {
1358  // If we disconnectd before we got to notification, don't notify.
1359  // This should only be able to happen for Resubscribed, since the lock
1360  // is released to let the subscription manager run resubscribe so a
1361  // disconnect could be called before the change is broadcast.
1362  if (!_connected && newState_ > ConnectionStateListener::Connected)
1363  {
1364  return;
1365  }
1366  for(ConnectionStateListeners::iterator it= _connectionStateListeners.begin(); it != _connectionStateListeners.end(); ++it)
1367  {
1368  AMPS_CALL_EXCEPTION_WRAPPER(
1369  (*it)->connectionStateChanged(newState_));
1370  }
1371  }
1372  unsigned processedAck(Message& message);
1373  unsigned persistedAck(Message& meesage);
1374  void lastChance(Message& message);
1375  void checkAndSendHeartbeat(bool force=false);
1376  virtual ConnectionInfo getConnectionInfo() const;
1377  static amps_result
1378  ClientImplMessageHandler(amps_handle message, void* userData);
1379  static void
1380  ClientImplPreDisconnectHandler(amps_handle client, unsigned failedConnectionVersion, void* userData);
1381  static amps_result
1382  ClientImplDisconnectHandler(amps_handle client, void* userData);
1383 
1384  void unsubscribeInternal(const std::string& id)
1385  {
1386  if (id.empty()) return;
1387  // remove the handler first to avoid any more message delivery
1388  Message::Field subId;
1389  subId.assign(id.data(), id.length());
1390  _routes.removeRoute(subId);
1391  // Lock is already acquired
1392  if (_subscriptionManager)
1393  {
1394  // Have to unlock before calling into sub manager to avoid deadlock
1395  Unlock<Mutex> unlock(_lock);
1396  _subscriptionManager->unsubscribe(subId);
1397  }
1398  _message.reset();
1399  _message.setCommandEnum(Message::Command::Unsubscribe);
1400  _message.newCommandId();
1401  _message.setSubscriptionId(id);
1402  _sendWithoutRetry(_message);
1403  }
1404 
1405  AckResponse syncAckProcessing(long timeout_, Message& message_,
1406  bool isHASubscribe_)
1407  {
1408  return syncAckProcessing(timeout_, message_,
1409  (amps_uint64_t)0, isHASubscribe_);
1410  }
1411 
1412  AckResponse syncAckProcessing(long timeout_, Message& message_,
1413  amps_uint64_t haSeq = (amps_uint64_t)0,
1414  bool isHASubscribe_ = false)
1415  {
1416  // inv: we already have _lock locked up.
1417  AckResponse ack = AckResponse::create();
1418  _acks[message_.getCommandId()] = ack;
1419  ack.setConnectionVersion((unsigned)_send(message_, haSeq, isHASubscribe_));
1420  if (ack.getConnectionVersion() == 0)
1421  {
1422  // Send failed
1423  throw DisconnectedException("Connection closed while waiting for response.");
1424  }
1425  bool timedOut = false;
1426  AMPS_START_TIMER(timeout_)
1427  while(!timedOut && !ack.responded() && !ack.abandoned())
1428  {
1429  if (timeout_)
1430  {
1431  timedOut = !_lock.wait(timeout_);
1432  // May have woken up early, check real time
1433  if (timedOut) { AMPS_RESET_TIMER(timedOut, timeout_); }
1434  }
1435  else
1436  {
1437  // Using a timeout version to ensure python can interrupt
1438  _lock.wait(1000);
1439  amps_invoke_waiting_function();
1440  }
1441  }
1442  if (ack.responded())
1443  {
1444  if (ack.status() != "failure")
1445  {
1446  if (message_.getCommand() == "logon")
1447  {
1448  amps_uint64_t ackSequence = ack.sequenceNo();
1449  if (_lastSentHaSequenceNumber < ackSequence)
1450  {
1451  _lastSentHaSequenceNumber = ackSequence;
1452  }
1453  if (_publishStore.isValid())
1454  {
1455  _publishStore.discardUpTo(ackSequence);
1456  if (_lastSentHaSequenceNumber < _publishStore.getLastPersisted())
1457  {
1458  _lastSentHaSequenceNumber = _publishStore.getLastPersisted();
1459  }
1460  }
1461  _serverVersion = ack.serverVersion();
1462  if (_bookmarkStore.isValid())
1463  _bookmarkStore.setServerVersion(_serverVersion);
1464  }
1465  if(_ackBatchSize)
1466  {
1467  const std::string& options = ack.options();
1468  size_t index = options.find_first_of("max_backlog=");
1469  if(index != std::string::npos)
1470  {
1471  unsigned data =0;
1472  const char* c = options.c_str()+index+12;
1473  while(*c && *c!=',')
1474  {
1475  data = (data*10) + (unsigned)(*c++-48);
1476  }
1477  if(_ackBatchSize > data) _ackBatchSize = data;
1478  }
1479  }
1480  return ack;
1481  }
1482  const size_t NotEntitled = 12;
1483  std::string ackReason = ack.reason();
1484  if (ackReason.length() == 0) return ack; // none
1485  if (ackReason.length() == NotEntitled &&
1486  ackReason[0] == 'n' &&
1487  message_.getUserId().len() == 0)
1488  {
1489  message_.assignUserId(_username);
1490  }
1491  message_.throwFor(_client, ackReason);
1492  }
1493  else // !ack.responded()
1494  {
1495  if (!ack.abandoned())
1496  {
1497  throw TimedOutException("timed out waiting for operation.");
1498  }
1499  else
1500  {
1501  throw DisconnectedException("Connection closed while waiting for response.");
1502  }
1503  }
1504  return ack;
1505  }
1506 
1507  void _cleanup(void)
1508  {
1509  if (!_client) return;
1511  NULL,
1512  0L);
1513  AMPS_CALL_EXCEPTION_WRAPPER(ClientImpl::disconnect());
1514  _pEmptyMessageStream.reset(NULL);
1515  amps_client_destroy(_client);
1516  processDeferredExecutions();
1517  _client = NULL;
1518  }
1519 
1520 public:
1521 
1522  ClientImpl(const std::string& clientName)
1523  : _client(NULL), _name(clientName)
1524  , _isRetryOnDisconnect(true)
1525  , _lastSentHaSequenceNumber((amps_uint64_t)0), _badTimeToHAPublish(0)
1526  , _badTimeToHASubscribe(0), _serverVersion()
1527  , _queueAckTimeout(0)
1528  , _isAutoAckEnabled(false)
1529  , _ackBatchSize(0)
1530  , _queuedAckCount(0)
1531  , _defaultMaxDepth(0)
1532  , _connected(false)
1533  , _heartbeatInterval(0)
1534  , _readTimeout(0)
1535  {
1536  _replayer.setClient(this);
1537  _client = amps_client_create(clientName.c_str());
1538  amps_client_set_message_handler(_client, (amps_handler)ClientImpl::ClientImplMessageHandler, this);
1539  amps_client_set_predisconnect_handler(_client, (amps_predisconnect_handler)ClientImpl::ClientImplPreDisconnectHandler, this);
1540  amps_client_set_disconnect_handler(_client, (amps_handler)ClientImpl::ClientImplDisconnectHandler, this);
1541  _exceptionListener = &_defaultExceptionListener;
1542  for (size_t i=0; i<GlobalCommandTypeHandlers::COUNT; ++i)
1543  {
1544  _globalCommandTypeHandlers.push_back(MessageHandler());
1545  }
1546  }
1547 
1548 
1549  virtual ~ClientImpl()
1550  {
1551  _cleanup();
1552  }
1553 
1554 
1555  const std::string& getName() const
1556  {
1557  return _name;
1558  }
1559 
1560  void setName(const std::string& name)
1561  {
1562  // This operation will fail if the client's
1563  // name is already set.
1565  _client, name.c_str());
1566  if (result != AMPS_E_OK)
1567  {
1568  AMPSException::throwFor(_client, result);
1569  }
1570  _name = name;
1571  }
1572 
1573  const std::string& getLogonCorrelationData() const
1574  {
1575  return _logonCorrelationData;
1576  }
1577 
1578  void setLogonCorrelationData(const std::string& logonCorrelationData_)
1579  {
1580  _logonCorrelationData = logonCorrelationData_;
1581  }
1582 
1583  size_t getServerVersion() const
1584  {
1585  return _serverVersion.getOldStyleVersion();
1586  }
1587 
1588  VersionInfo getServerVersionInfo() const
1589  {
1590  return _serverVersion;
1591  }
1592 
1593  const std::string& getURI() const
1594  {
1595  Lock<Mutex> l(_lock);
1596  return _lastUri;
1597  }
1598 
1599  virtual void connect(const std::string& uri)
1600  {
1601  Lock<Mutex> l(_lock);
1602  _lastUri = uri;
1604  _client, uri.c_str());
1605  if (result != AMPS_E_OK)
1606  {
1607  AMPSException::throwFor(_client, result);
1608  }
1609  _message.reset();
1610  _deltaMessage.setCommandEnum(Message::Command::DeltaPublish);
1611  _publishMessage.setCommandEnum(Message::Command::Publish);
1612  _beatMessage.setCommandEnum(Message::Command::Heartbeat);
1613  _beatMessage.setOptions("beat");
1614  _readMessage.setClientImpl(this);
1615  if(_queueAckTimeout)
1616  {
1617  amps_client_set_idle_time(_client,_queueAckTimeout);
1618  }
1619  _connected = true;
1620  broadcastConnectionStateChanged(ConnectionStateListener::Connected);
1621  }
1622 
1623  void setDisconnected()
1624  {
1625  {
1626  Lock<Mutex> l(_lock);
1627  if (_connected)
1628  broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
1629  _connected = false;
1630  _routes.clear();
1631  _heartbeatTimer.setTimeout(0.0);
1632  }
1633  clearAcks(INT_MAX);
1634  }
1635 
1636  virtual void disconnect()
1637  {
1638  {
1639  Lock<Mutex> l(_lock);
1640  _message.reset();
1641  _message.setCommandEnum(Message::Command::Unsubscribe);
1642  _message.newCommandId();
1643  _message.setSubscriptionId("all");
1644  AMPS_CALL_EXCEPTION_WRAPPER(_sendWithoutRetry(_message));
1645  }
1646  AMPS_CALL_EXCEPTION_WRAPPER(flushAcks());
1647  setDisconnected();
1648  amps_client_disconnect(_client);
1649  Lock<Mutex> l(_lock);
1650  broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
1651  }
1652 
1653  void clearAcks(unsigned failedVersion)
1654  {
1655  // Have to lock to prevent race conditions
1656  Lock<Mutex> l(_lock);
1657  {
1658  // Go ahead and signal any waiters if they are around...
1659  std::list<std::string> worklist;
1660  for(AckMap::iterator i = _acks.begin(); i != _acks.end(); i++)
1661  {
1662  if (i->second.getConnectionVersion() <= failedVersion)
1663  {
1664  i->second.setAbandoned(true);
1665  worklist.push_back(i->first);
1666  }
1667  }
1668 
1669  for(std::list<std::string>::iterator j = worklist.begin(); j != worklist.end(); j++)
1670  {
1671  _acks.erase(*j);
1672  }
1673  }
1674 
1675  _lock.signalAll();
1676  }
1677 
1678  int send(const Message& message)
1679  {
1680  Lock<Mutex> l(_lock);
1681  return _send(message);
1682  }
1683 
1684  void sendWithoutRetry(const Message& message_)
1685  {
1686  Lock<Mutex> l(_lock);
1687  _sendWithoutRetry(message_);
1688  }
1689 
1690  void _sendWithoutRetry(const Message& message_)
1691  {
1692  amps_result result = amps_client_send(_client, message_.getMessage());
1693  if(result != AMPS_E_OK)
1694  {
1695  AMPSException::throwFor(_client,result);
1696  }
1697  }
1698 
1699  int _send(const Message& message, amps_uint64_t haSeq = (amps_uint64_t)0,
1700  bool isHASubscribe_ = false)
1701  {
1702  // Lock is already acquired
1703  amps_result result = AMPS_E_RETRY;
1704 
1705  // Create a local reference to this message, as we'll need to hold on
1706  // to a reference to it in case reconnect occurs.
1707  Message localMessage = message;
1708  unsigned version = 0;
1709 
1710  while(result == AMPS_E_RETRY)
1711  {
1712  if (haSeq != (amps_uint64_t)0 && _badTimeToHAPublish > 0)
1713  {
1714  // If retrySend is disabled, do not wait for the reconnect
1715  // to finish, just throw.
1716  if(!_isRetryOnDisconnect)
1717  {
1718  AMPSException::throwFor(_client,AMPS_E_RETRY);
1719  }
1720  Unlock<Mutex> l(_lock);
1721 #ifdef _WIN32
1722  Sleep(0);
1723 #elif defined(sun)
1724  sched_yield();
1725 #else
1726  pthread_yield();
1727 #endif
1728  }
1729  else
1730  {
1731  if ((haSeq && haSeq <= _lastSentHaSequenceNumber) ||
1732  (isHASubscribe_ && _badTimeToHASubscribe != 0))
1733  {
1734  return (int)version;
1735  }
1736  // It's possible to get here out of order, but this way we'll
1737  // always send in order.
1738  if (haSeq > _lastSentHaSequenceNumber)
1739  {
1740  while (haSeq > _lastSentHaSequenceNumber + 1)
1741  {
1742  try
1743  {
1744  // Replayer updates _lastSentHaSsequenceNumber
1745  if (!_publishStore.replaySingle(_replayer,
1746  _lastSentHaSequenceNumber+1))
1747  {
1748  //++_lastSentHaSequenceNumber;
1749  continue;
1750  }
1751  result = AMPS_E_OK;
1752  version = _replayer._version;
1753  }
1754  #ifdef _WIN32
1755  catch(const DisconnectedException&)
1756  #else
1757  catch(const DisconnectedException& e)
1758  #endif
1759  {
1760  result = _replayer._res;
1761  break;
1762  }
1763  }
1764  result = amps_client_send_with_version(_client,
1765  localMessage.getMessage(),
1766  &version);
1767  ++_lastSentHaSequenceNumber;
1768  }
1769  else
1770  result = amps_client_send_with_version(_client,
1771  localMessage.getMessage(),
1772  &version);
1773  if (result != AMPS_E_OK)
1774  {
1775  if (!isHASubscribe_ && !haSeq &&
1776  localMessage.getMessage() == message.getMessage())
1777  {
1778  localMessage = message.deepCopy();
1779  }
1780  if(_isRetryOnDisconnect)
1781  {
1782  Unlock<Mutex> u(_lock);
1783  result = amps_client_attempt_reconnect(_client, version);
1784  // If this is an HA publish or subscrbie command, it was
1785  // stored first and will have already been replayed by the
1786  // store or sub manager after reconnect, so just return.
1787  if ((isHASubscribe_ || haSeq) &&
1788  result == AMPS_E_RETRY)
1789  {
1790  return (int)version;
1791  }
1792  }
1793  else
1794  {
1795  // retrySend is disabled so throw the error
1796  // from the send as an exception, do not retry.
1797  AMPSException::throwFor(_client, result);
1798  }
1799  }
1800  }
1801  if (result == AMPS_E_RETRY)
1802  amps_invoke_waiting_function();
1803  }
1804 
1805  if (result != AMPS_E_OK) AMPSException::throwFor(_client, result);
1806  return (int)version;
1807  }
1808 
1809  void addMessageHandler(const Field& commandId_,
1810  const AMPS::MessageHandler& messageHandler_,
1811  unsigned requestedAcks_, bool isSubscribe_)
1812  {
1813  Lock<Mutex> lock(_lock);
1814  _routes.addRoute(commandId_, messageHandler_, requestedAcks_,
1815  0, isSubscribe_);
1816  }
1817 
1818  bool removeMessageHandler(const Field& commandId_)
1819  {
1820  Lock<Mutex> lock(_lock);
1821  return _routes.removeRoute(commandId_);
1822  }
1823 
1824  std::string send(MessageHandler messageHandler_, Message& message_, int timeout_ = 0)
1825  {
1826  Field id = message_.getCommandId();
1827  bool isSubscribe = false;
1828  unsigned requestedAcks = message_.getAckTypeEnum();
1829  unsigned systemAddedAcks = Message::AckType::None;
1830  switch(message_.getCommandEnum())
1831  {
1832  case Message::Command::Subscribe:
1833  case Message::Command::DeltaSubscribe:
1834  if (!message_.getBookmark().empty() && _bookmarkStore.isValid())
1835  {
1836  systemAddedAcks |= Message::AckType::Persisted;
1837  }
1838  // fall through
1839  case Message::Command::SOWAndSubscribe:
1840  case Message::Command::SOWAndDeltaSubscribe:
1841  if (id.empty())
1842  {
1843  message_.newCommandId();
1844  id = message_.getCommandId();
1845  }
1846  if (message_.getSubscriptionId().empty())
1847  {
1848  message_.setSubscriptionId(id);
1849  }
1850  isSubscribe = true;
1851  // fall through
1852  case Message::Command::SOW:
1853  if(id.empty())
1854  {
1855  message_.newCommandId();
1856  id = message_.getCommandId();
1857  }
1858  if (message_.getQueryID().empty())
1859  {
1860  message_.setQueryID(id);
1861  }
1862  systemAddedAcks |= Message::AckType::Processed;
1863  // for SOW only, we get a completed ack so we know when to remove the handler.
1864  if (!isSubscribe) systemAddedAcks |= Message::AckType::Completed;
1865  message_.setAckTypeEnum(requestedAcks | systemAddedAcks);
1866  {
1867  Lock<Mutex> l(_lock);
1868  _routes.addRoute(message_.getQueryID(), messageHandler_,
1869  requestedAcks, systemAddedAcks, isSubscribe);
1870  if (!message_.getSubscriptionId().empty() &&
1871  message_.getQueryID() != message_.getSubscriptionId() &&
1872  messageHandler_.isValid() &&
1873  !_routes.hasRoute(message_.getSubscriptionId()))
1874  {
1875  _routes.addRoute(message_.getSubscriptionId(),
1876  messageHandler_, requestedAcks,
1877  systemAddedAcks, true);
1878  }
1879  try
1880  {
1881  // We aren't adding to subscription manager, so this isn't
1882  // an HA subscribe.
1883  syncAckProcessing(timeout_, message_, 0, false);
1884  message_.setAckTypeEnum(requestedAcks);
1885  }
1886  catch (...)
1887  {
1888  _routes.removeRoute(message_.getQueryID());
1889  _routes.removeRoute(message_.getSubscriptionId());
1890  _routes.removeRoute(id);
1891  message_.setAckTypeEnum(requestedAcks);
1892  throw;
1893  }
1894  }
1895  break;
1896  // These are valid commands that are used as-is
1897  case Message::Command::Unsubscribe:
1898  case Message::Command::Heartbeat:
1899  case Message::Command::Logon:
1900  case Message::Command::StartTimer:
1901  case Message::Command::StopTimer:
1902  case Message::Command::DeltaPublish:
1903  case Message::Command::Publish:
1904  case Message::Command::SOWDelete:
1905  {
1906  Lock<Mutex> l(_lock);
1907  // if an ack is requested, it'll need a command ID.
1908  if (message_.getAckTypeEnum() != Message::AckType::None)
1909  {
1910  if (id.empty())
1911  {
1912  message_.newCommandId();
1913  id = message_.getCommandId();
1914  }
1915  if (messageHandler_.isValid())
1916  {
1917  _routes.addRoute(id, messageHandler_, requestedAcks,
1918  Message::AckType::None, false);
1919  }
1920  }
1921  _send(message_);
1922  }
1923  break;
1924  // These are things that shouldn't be sent (not meaningful)
1925  case Message::Command::GroupBegin:
1926  case Message::Command::GroupEnd:
1927  case Message::Command::OOF:
1928  case Message::Command::Ack:
1929  case Message::Command::Unknown:
1930  default:
1931  throw CommandException("Command type " + message_.getCommand() + " can not be sent directly to AMPS");
1932  }
1933  message_.setAckTypeEnum(requestedAcks);
1934  return id;
1935  }
1936 
1937  void setDisconnectHandler(DisconnectHandler disconnectHandler)
1938  {
1939  Lock<Mutex> l(_lock);
1940  _disconnectHandler = disconnectHandler;
1941  }
1942 
1943  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
1944  {
1945  switch (command_[0])
1946  {
1947 #if 0 // Not currently implemented to avoid an extra branch in delivery
1948  case 'p':
1949  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
1950  break;
1951  case 's':
1952  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
1953  break;
1954 #endif
1955  case 'h':
1956  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
1957  break;
1958 #if 0 // Not currently implemented to avoid an extra branch in delivery
1959  case 'g':
1960  if (command_[6] == 'b')
1961  {
1962  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
1963  }
1964  else if (command_[6] == 'e')
1965  {
1966  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
1967  }
1968  else
1969  {
1970  std::ostringstream os;
1971  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
1972  throw CommandException(os.str());
1973  }
1974  break;
1975  case 'o':
1976  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
1977  break;
1978 #endif
1979  case 'a':
1980  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
1981  break;
1982  case 'l':
1983  case 'L':
1984  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance] = handler_;
1985  break;
1986  case 'd':
1987  case 'D':
1988  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage] = handler_;
1989  break;
1990  default:
1991  std::ostringstream os;
1992  os << "Invalid command '" << command_ << "' passed to setGlobalCommandTypeHandler";
1993  throw CommandException(os.str());
1994  break;
1995  }
1996  }
1997 
1998  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
1999  {
2000  switch (command_)
2001  {
2002 #if 0 // Not currently implemented to avoid an extra branch in delivery
2003  case Message::Command::Publish:
2004  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Publish] = handler_;
2005  break;
2006  case Message::Command::SOW:
2007  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::SOW] = handler_;
2008  break;
2009 #endif
2010  case Message::Command::Heartbeat:
2011  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat] = handler_;
2012  break;
2013 #if 0 // Not currently implemented to avoid an extra branch in delivery
2014  case Message::Command::GroupBegin:
2015  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupBegin] = handler_;
2016  break;
2017  case Message::Command::GroupEnd:
2018  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::GroupEnd] = handler_;
2019  break;
2020  case Message::Command::OOF:
2021  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::OOF] = handler_;
2022  break;
2023 #endif
2024  case Message::Command::Ack:
2025  _globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack] = handler_;
2026  break;
2027  default:
2028  unsigned bits = 0;
2029  unsigned command = command_;
2030  while (command > 0) { ++bits; command >>= 1; }
2031  char errBuf[128];
2032  AMPS_snprintf(errBuf, sizeof(errBuf),
2033  "Invalid command '%.*s' passed to setGlobalCommandTypeHandler",
2034  CommandConstants<0>::Lengths[bits],
2035  CommandConstants<0>::Values[bits]);
2036  throw CommandException(errBuf);
2037  break;
2038  }
2039  }
2040 
2041  void setGlobalCommandTypeMessageHandler(const GlobalCommandTypeHandlers handlerType_, const MessageHandler& handler_)
2042  {
2043  _globalCommandTypeHandlers[handlerType_] = handler_;
2044  }
2045 
2046  void setFailedWriteHandler(FailedWriteHandler* handler_)
2047  {
2048  Lock<Mutex> l(_lock);
2049  _failedWriteHandler.reset(handler_);
2050  }
2051 
2052  void setPublishStore(const Store& publishStore_)
2053  {
2054  Lock<Mutex> l(_lock);
2055  if (_connected) throw AlreadyConnectedException("Setting a publish store on a connected client is undefined behavior");
2056  _publishStore = publishStore_;
2057  }
2058 
2059  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
2060  {
2061  Lock<Mutex> l(_lock);
2062  if (_connected) throw AlreadyConnectedException("Setting a bookmark store on a connected client is undefined behavior");
2063  _bookmarkStore = bookmarkStore_;
2064  }
2065 
2066  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
2067  {
2068  Lock<Mutex> l(_lock);
2069  _subscriptionManager.reset(subscriptionManager_);
2070  }
2071 
2072  SubscriptionManager* getSubscriptionManager()
2073  {
2074  Lock<Mutex> l(_lock);
2075  return _subscriptionManager.get();
2076  }
2077 
2078  DisconnectHandler getDisconnectHandler()
2079  {
2080  Lock<Mutex> l(_lock);
2081  return _disconnectHandler;
2082  }
2083 
2084  MessageHandler getDuplicateMessageHandler()
2085  {
2086  return _globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage];
2087  }
2088 
2089  FailedWriteHandler* getFailedWriteHandler()
2090  {
2091  Lock<Mutex> l(_lock);
2092  return _failedWriteHandler.get();
2093  }
2094 
2095  Store getPublishStore()
2096  {
2097  Lock<Mutex> l(_lock);
2098  return _publishStore;
2099  }
2100 
2101  BookmarkStore getBookmarkStore()
2102  {
2103  Lock<Mutex> l(_lock);
2104  return _bookmarkStore;
2105  }
2106 
2107  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,size_t dataLen_)
2108  {
2109  if (!_publishStore.isValid())
2110  {
2111  Lock<Mutex> l(_lock);
2112  _publishMessage.assignTopic(topic_, topicLen_);
2113  _publishMessage.assignData(data_, dataLen_);
2114  _send(_publishMessage);
2115  return 0;
2116  }
2117  else
2118  {
2119  if (!publishStoreMessage)
2120  {
2121  publishStoreMessage = new Message();
2122  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2123  }
2124  publishStoreMessage->reset();
2125  publishStoreMessage->setCommandEnum(Message::Command::Publish);
2126  return _publish(topic_, topicLen_, data_, dataLen_);
2127  }
2128  }
2129 
2130  amps_uint64_t publish(const char* topic_, size_t topicLen_, const char* data_,
2131  size_t dataLen_, unsigned long expiration_)
2132  {
2133  if (!_publishStore.isValid())
2134  {
2135  Lock<Mutex> l(_lock);
2136  _publishMessage.assignTopic(topic_, topicLen_);
2137  _publishMessage.assignData(data_, dataLen_);
2138  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2139  size_t pos = convertToCharArray(exprBuf, expiration_);
2140  _publishMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2141  _send(_publishMessage);
2142  _publishMessage.assignExpiration(NULL, 0);
2143  return 0;
2144  }
2145  else
2146  {
2147  if (!publishStoreMessage)
2148  {
2149  publishStoreMessage = new Message();
2150  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2151  }
2152  publishStoreMessage->reset();
2153  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2154  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2155  publishStoreMessage->setCommandEnum(Message::Command::Publish)
2156  .assignExpiration(exprBuf+exprPos,
2157  AMPS_NUMBER_BUFFER_LEN-exprPos);
2158  return _publish(topic_, topicLen_, data_, dataLen_);
2159  }
2160  }
2161 
2162  void publishFlush(long timeout_)
2163  {
2164  static const char* processed = "processed";
2165  static const size_t processedLen = strlen(processed);
2166  if (!_publishStore.isValid())
2167  {
2168  if (_serverVersion.getOldStyleVersion() >= AMPS_FLUSH_MIN_VERSION)
2169  {
2170  Lock<Mutex> l(_lock);
2171  _message.reset();
2172  _message.newCommandId();
2173  _message.assignAckType(processed, processedLen);
2174  static const char* flush = "flush";
2175  static const size_t flushLen = strlen(flush);
2176  _message.assignCommand(flush, flushLen);
2177  try
2178  {
2179  syncAckProcessing(timeout_, _message);
2180  }
2181  catch(const AMPSException& ex)
2182  {
2183  AMPS_UNHANDLED_EXCEPTION(ex);
2184  throw;
2185  }
2186  }
2187  else
2188  {
2189  if (timeout_ > 0) { AMPS_USLEEP(timeout_ * 1000); }
2190  else { AMPS_USLEEP(1000 * 1000); }
2191  return;
2192  }
2193  }
2194  else
2195  {
2196  try
2197  {
2198  _publishStore.flush(timeout_);
2199  }
2200  catch (const AMPSException& ex)
2201  {
2202  AMPS_UNHANDLED_EXCEPTION(ex);
2203  throw;
2204  }
2205  }
2206  }
2207 
2208  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2209  const char* data_, size_t dataLength_)
2210  {
2211  if (!_publishStore.isValid())
2212  {
2213  Lock<Mutex> l(_lock);
2214  _deltaMessage.assignTopic(topic_, topicLength_);
2215  _deltaMessage.assignData(data_, dataLength_);
2216  _send(_deltaMessage);
2217  return 0;
2218  }
2219  else
2220  {
2221  if (!publishStoreMessage)
2222  {
2223  publishStoreMessage = new Message();
2224  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2225  }
2226  publishStoreMessage->reset();
2227  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish);
2228  return _publish(topic_, topicLength_, data_, dataLength_);
2229  }
2230  }
2231 
2232  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
2233  const char* data_, size_t dataLength_,
2234  unsigned long expiration_)
2235  {
2236  if (!_publishStore.isValid())
2237  {
2238  Lock<Mutex> l(_lock);
2239  _deltaMessage.assignTopic(topic_, topicLength_);
2240  _deltaMessage.assignData(data_, dataLength_);
2241  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2242  size_t pos = convertToCharArray(exprBuf, expiration_);
2243  _deltaMessage.assignExpiration(exprBuf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2244  _send(_deltaMessage);
2245  _deltaMessage.assignExpiration(NULL, 0);
2246  return 0;
2247  }
2248  else
2249  {
2250  if (!publishStoreMessage)
2251  {
2252  publishStoreMessage = new Message();
2253  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2254  }
2255  publishStoreMessage->reset();
2256  char exprBuf[AMPS_NUMBER_BUFFER_LEN];
2257  size_t exprPos = convertToCharArray(exprBuf, expiration_);
2258  publishStoreMessage->setCommandEnum(Message::Command::DeltaPublish)
2259  .assignExpiration(exprBuf+exprPos,
2260  AMPS_NUMBER_BUFFER_LEN-exprPos);
2261  return _publish(topic_, topicLength_, data_, dataLength_);
2262  }
2263  }
2264 
2265  amps_uint64_t _publish(const char* topic_, size_t topicLength_,
2266  const char* data_, size_t dataLength_)
2267  {
2268  publishStoreMessage->assignTopic(topic_, topicLength_)
2269  .setAckTypeEnum(Message::AckType::Persisted)
2270  .assignData(data_, dataLength_);
2271  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
2272  char buf[AMPS_NUMBER_BUFFER_LEN];
2273  size_t pos = convertToCharArray(buf, haSequenceNumber);
2274  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2275  {
2276  Lock<Mutex> l(_lock);
2277  _send(*publishStoreMessage, haSequenceNumber);
2278  }
2279  return haSequenceNumber;
2280  }
2281 
2282  virtual std::string logon(long timeout_, Authenticator& authenticator_,
2283  const char* options_ = NULL)
2284  {
2285  Lock<Mutex> l(_lock);
2286  AtomicFlagFlip pubFlip(&_badTimeToHAPublish);
2287  _message.reset();
2288  _message.setCommandEnum(Message::Command::Logon);
2289  _message.newCommandId();
2290  std::string newCommandId = _message.getCommandId();
2291  _message.setClientName(_name);
2292 #ifdef AMPS_CLIENT_VERSION_WITH_LANGUAGE
2293  _message.assignVersion(AMPS_CLIENT_VERSION_WITH_LANGUAGE,
2294  strlen(AMPS_CLIENT_VERSION_WITH_LANGUAGE));
2295 #endif
2296  URI uri(_lastUri);
2297  if(uri.user().size()) _message.setUserId(uri.user());
2298  if(uri.password().size()) _message.setPassword(uri.password());
2299  if(uri.protocol() == "amps" && uri.messageType().size())
2300  {
2301  _message.setMessageType(uri.messageType());
2302  }
2303  if(uri.isTrue("pretty"))
2304  {
2305  _message.setOptions("pretty");
2306  }
2307 
2308  _message.setPassword(authenticator_.authenticate(_message.getUserId(), _message.getPassword()));
2309  if (!_logonCorrelationData.empty())
2310  {
2311  _message.assignCorrelationId(_logonCorrelationData);
2312  }
2313  if (options_)
2314  {
2315  _message.setOptions(options_);
2316  }
2317  _username = _message.getUserId();
2318  try
2319  {
2320  while(true)
2321  {
2322  _message.setAckTypeEnum(Message::AckType::Processed);
2323  AckResponse ack = syncAckProcessing(timeout_, _message);
2324  if (ack.status() == "retry")
2325  {
2326  _message.setPassword(authenticator_.retry(ack.username(), ack.password()));
2327  _username = ack.username();
2328  _message.setUserId(_username);
2329  }
2330  else
2331  {
2332  authenticator_.completed(ack.username(), ack.password(), ack.reason());
2333  break;
2334  }
2335  }
2336  broadcastConnectionStateChanged(ConnectionStateListener::LoggedOn);
2337 
2338  // Now re-send the heartbeat command if configured
2339  _sendHeartbeat();
2340  }
2341  catch(const AMPSException& ex)
2342  {
2343  AMPS_UNHANDLED_EXCEPTION(ex);
2344  throw;
2345  }
2346  catch(...)
2347  {
2348  throw;
2349  }
2350 
2351  if (_publishStore.isValid())
2352  {
2353  try
2354  {
2355  _publishStore.replay(_replayer);
2356  broadcastConnectionStateChanged(ConnectionStateListener::PublishReplayed);
2357  }
2358  catch(const StoreException& ex)
2359  {
2360  std::ostringstream os;
2361  os << "A local store exception occurred while logging on."
2362  << ex.toString();
2363  throw ConnectionException(os.str());
2364  }
2365  catch(const AMPSException& ex)
2366  {
2367  AMPS_UNHANDLED_EXCEPTION(ex);
2368  throw ex;
2369  }
2370  catch(const std::exception& ex)
2371  {
2372  AMPS_UNHANDLED_EXCEPTION(ex);
2373  throw ex;
2374  }
2375  catch(...)
2376  {
2377  throw;
2378  }
2379  }
2380  return newCommandId;
2381  }
2382 
2383  std::string subscribe(MessageHandler messageHandler_,
2384  const std::string& topic_,
2385  long timeout_,
2386  const std::string& filter_,
2387  const std::string& bookmark_,
2388  const std::string& options_,
2389  const std::string& subId_,
2390  bool isHASubscribe_ = true)
2391  {
2392  isHASubscribe_ &= (bool)_subscriptionManager;
2393  Lock<Mutex> l(_lock);
2394  _message.reset();
2395  _message.setCommandEnum(Message::Command::Subscribe);
2396  _message.newCommandId();
2397  std::string subId(subId_);
2398  if (subId.empty())
2399  {
2400  if (options_.find(AMPS_OPTIONS_REPLACE, 0, strlen(AMPS_OPTIONS_REPLACE)-1) != std::string::npos)
2401  throw ConnectionException("Cannot issue a replacement subscription; a valid subscription id is required.");
2402 
2403  subId = _message.getCommandId();
2404  }
2405  _message.setSubscriptionId(subId);
2406  // we need to deep copy this before sending the message; while we are
2407  // waiting for a response, the fields in _message may get blown away for
2408  // other operations.
2409  AMPS::Message::Field subIdField(subId);
2410  unsigned ackTypes = Message::AckType::Processed;
2411 
2412  if (!bookmark_.empty() && _bookmarkStore.isValid())
2413  {
2414  ackTypes |= Message::AckType::Persisted;
2415  }
2416  _message.setTopic(topic_);
2417 
2418  if (filter_.length()) _message.setFilter(filter_);
2419  if (bookmark_.length())
2420  {
2421  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2422  {
2423  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2424  _message.setBookmark(mostRecent);
2425  }
2426  else
2427  {
2428  _message.setBookmark(bookmark_);
2429  if (_bookmarkStore.isValid())
2430  {
2431  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2432  bookmark_ != AMPS_BOOKMARK_EPOCH)
2433  {
2434  _bookmarkStore.log(_message);
2435  _bookmarkStore.discard(_message);
2436  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2437  }
2438  }
2439  }
2440  }
2441  if (options_.length()) _message.setOptions(options_);
2442 
2443  Message message = _message;
2444  if (isHASubscribe_)
2445  {
2446  message = _message.deepCopy();
2447  Unlock<Mutex> u(_lock);
2448  _subscriptionManager->subscribe(messageHandler_, message,
2449  Message::AckType::None);
2450  if (_badTimeToHASubscribe) return subId;
2451  }
2452  if (!_routes.hasRoute(_message.getSubscriptionId()))
2453  {
2454  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2455  Message::AckType::None, ackTypes, true);
2456  }
2457  message.setAckTypeEnum(ackTypes);
2458  if (!options_.empty()) message.setOptions(options_);
2459  try
2460  {
2461  syncAckProcessing(timeout_, message, isHASubscribe_);
2462  }
2463  catch (const DisconnectedException&)
2464  {
2465  if (!isHASubscribe_)
2466  {
2467  _routes.removeRoute(subIdField);
2468  throw;
2469  }
2470  else
2471  {
2472  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2473  throw;
2474  }
2475  }
2476  catch (const TimedOutException&)
2477  {
2478  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2479  throw;
2480  }
2481  catch (...)
2482  {
2483  if (isHASubscribe_)
2484  {
2485  // Have to unlock before calling into sub manager to avoid deadlock
2486  Unlock<Mutex> unlock(_lock);
2487  _subscriptionManager->unsubscribe(subIdField);
2488  }
2489  _routes.removeRoute(subIdField);
2490  throw;
2491  }
2492 
2493  return subId;
2494  }
2495  std::string deltaSubscribe(MessageHandler messageHandler_,
2496  const std::string& topic_,
2497  long timeout_,
2498  const std::string& filter_,
2499  const std::string& bookmark_,
2500  const std::string& options_,
2501  const std::string& subId_ = "",
2502  bool isHASubscribe_ = true)
2503  {
2504  isHASubscribe_ &= (bool)_subscriptionManager;
2505  Lock<Mutex> l(_lock);
2506  _message.reset();
2507  _message.setCommandEnum(Message::Command::DeltaSubscribe);
2508  _message.newCommandId();
2509  std::string subId(subId_);
2510  if (subId.empty())
2511  {
2512  subId = _message.getCommandId();
2513  }
2514  _message.setSubscriptionId(subId);
2515  // we need to deep copy this before sending the message; while we are
2516  // waiting for a response, the fields in _message may get blown away for
2517  // other operations.
2518  AMPS::Message::Field subIdField(subId);
2519  unsigned ackTypes = Message::AckType::Processed;
2520 
2521  if (!bookmark_.empty() && _bookmarkStore.isValid())
2522  {
2523  ackTypes |= Message::AckType::Persisted;
2524  }
2525  _message.setTopic(topic_);
2526  if (filter_.length()) _message.setFilter(filter_);
2527  if (bookmark_.length())
2528  {
2529  if (bookmark_ == AMPS_BOOKMARK_RECENT)
2530  {
2531  Message::Field mostRecent = _bookmarkStore.getMostRecent(subIdField);
2532  _message.setBookmark(mostRecent);
2533  }
2534  else
2535  {
2536  _message.setBookmark(bookmark_);
2537  if (_bookmarkStore.isValid())
2538  {
2539  if (bookmark_ != AMPS_BOOKMARK_NOW &&
2540  bookmark_ != AMPS_BOOKMARK_EPOCH)
2541  {
2542  _bookmarkStore.log(_message);
2543  _bookmarkStore.discard(_message);
2544  _bookmarkStore.persisted(subIdField, _message.getBookmark());
2545  }
2546  }
2547  }
2548  }
2549  if (options_.length()) _message.setOptions(options_);
2550  Message message = _message;
2551  if (isHASubscribe_)
2552  {
2553  message = _message.deepCopy();
2554  Unlock<Mutex> u(_lock);
2555  _subscriptionManager->subscribe(messageHandler_, message,
2556  Message::AckType::None);
2557  if (_badTimeToHASubscribe) return subId;
2558  }
2559  if (!_routes.hasRoute(_message.getSubscriptionId()))
2560  {
2561  _routes.addRoute(_message.getSubscriptionId(), messageHandler_,
2562  Message::AckType::None, ackTypes, true);
2563  }
2564  message.setAckTypeEnum(ackTypes);
2565  if (!options_.empty()) message.setOptions(options_);
2566  try
2567  {
2568  syncAckProcessing(timeout_, message, isHASubscribe_);
2569  }
2570  catch (const DisconnectedException&)
2571  {
2572  if (!isHASubscribe_)
2573  {
2574  _routes.removeRoute(subIdField);
2575  throw;
2576  }
2577  }
2578  catch (const TimedOutException&)
2579  {
2580  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subIdField));
2581  throw;
2582  }
2583  catch (...)
2584  {
2585  if (isHASubscribe_)
2586  {
2587  // Have to unlock before calling into sub manager to avoid deadlock
2588  Unlock<Mutex> unlock(_lock);
2589  _subscriptionManager->unsubscribe(subIdField);
2590  }
2591  _routes.removeRoute(subIdField);
2592  throw;
2593  }
2594  return subId;
2595  }
2596 
2597  void unsubscribe(const std::string& id)
2598  {
2599  Lock<Mutex> l(_lock);
2600  unsubscribeInternal(id);
2601  }
2602 
2603  void unsubscribe(void)
2604  {
2605  if (_subscriptionManager)
2606  {
2607  _subscriptionManager->clear();
2608  }
2609  {
2610  _routes.unsubscribeAll();
2611  Lock<Mutex> l(_lock);
2612  _message.reset();
2613  _message.setCommandEnum(Message::Command::Unsubscribe);
2614  _message.newCommandId();
2615  _message.setSubscriptionId("all");
2616  _sendWithoutRetry(_message);
2617  }
2618  }
2619 
2620  std::string sow(MessageHandler messageHandler_,
2621  const std::string& topic_,
2622  const std::string& filter_ = "",
2623  const std::string& orderBy_ = "",
2624  const std::string& bookmark_ = "",
2625  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2626  int topN_ = AMPS_DEFAULT_TOP_N,
2627  const std::string& options_ = "",
2628  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT)
2629  {
2630  Lock<Mutex> l(_lock);
2631  _message.reset();
2632  _message.setCommandEnum(Message::Command::SOW);
2633  _message.newCommandId();
2634  // need to keep our own copy of the command ID.
2635  std::string commandId = _message.getCommandId();
2636  _message.setQueryID(_message.getCommandId());
2637  unsigned ackTypes = Message::AckType::Processed | Message::AckType::Completed;
2638  _message.setAckTypeEnum(ackTypes);
2639  _message.setTopic(topic_);
2640  if (filter_.length()) _message.setFilter(filter_);
2641  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2642  if (bookmark_.length()) _message.setBookmark(bookmark_);
2643  _message.setBatchSize(AMPS::asString(batchSize_));
2644  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2645  if (options_.length()) _message.setOptions(options_);
2646 
2647  _routes.addRoute(_message.getQueryID(), messageHandler_,
2648  Message::AckType::None, ackTypes, false);
2649 
2650  try
2651  {
2652  syncAckProcessing(timeout_, _message);
2653  }
2654  catch (...)
2655  {
2656  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId));
2657  throw;
2658  }
2659 
2660  return commandId;
2661  }
2662 
2663  std::string sow(MessageHandler messageHandler_,
2664  const std::string& topic_,
2665  long timeout_,
2666  const std::string& filter_ = "",
2667  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2668  int topN_ = AMPS_DEFAULT_TOP_N)
2669  {
2670  std::string notSet;
2671  return sow(messageHandler_,
2672  topic_,
2673  filter_,
2674  notSet, // orderBy
2675  notSet, // bookmark
2676  batchSize_,
2677  topN_,
2678  notSet,
2679  timeout_);
2680  }
2681 
2682  std::string sowAndSubscribe(MessageHandler messageHandler_,
2683  const std::string& topic_,
2684  const std::string& filter_ = "",
2685  const std::string& orderBy_ = "",
2686  const std::string& bookmark_ = "",
2687  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2688  int topN_ = AMPS_DEFAULT_TOP_N,
2689  const std::string& options_ = "",
2690  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2691  bool isHASubscribe_ = true)
2692  {
2693  isHASubscribe_ &= (bool)_subscriptionManager;
2694  Lock<Mutex> l(_lock);
2695  _message.reset();
2696  _message.setCommandEnum(Message::Command::SOWAndSubscribe);
2697  _message.newCommandId();
2698  Field cid = _message.getCommandId();
2699  _message.setQueryID(cid);
2700  _message.setSubscriptionId(cid);
2701  std::string subId = cid;
2702  _message.setTopic(topic_);
2703  if (filter_.length()) _message.setFilter(filter_);
2704  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2705  if (bookmark_.length()) _message.setBookmark(bookmark_);
2706  _message.setBatchSize(AMPS::asString(batchSize_));
2707  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2708  if (options_.length()) _message.setOptions(options_);
2709 
2710  Message message = _message;
2711  if (isHASubscribe_)
2712  {
2713  message = _message.deepCopy();
2714  Unlock<Mutex> u(_lock);
2715  _subscriptionManager->subscribe(messageHandler_, message,
2716  Message::AckType::None);
2717  if (_badTimeToHASubscribe) return subId;
2718  }
2719  if (!_routes.hasRoute(cid))
2720  {
2721  _routes.addRoute(cid, messageHandler_,
2722  Message::AckType::None, Message::AckType::Processed, true);
2723  }
2724  message.setAckTypeEnum(Message::AckType::Processed);
2725  if (!options_.empty()) message.setOptions(options_);
2726  try
2727  {
2728  syncAckProcessing(timeout_, message, isHASubscribe_);
2729  }
2730  catch (const DisconnectedException&)
2731  {
2732  if (!isHASubscribe_)
2733  {
2734  _routes.removeRoute(subId);
2735  throw;
2736  }
2737  }
2738  catch (const TimedOutException&)
2739  {
2740  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
2741  throw;
2742  }
2743  catch (...)
2744  {
2745  if (isHASubscribe_)
2746  {
2747  // Have to unlock before calling into sub manager to avoid deadlock
2748  Unlock<Mutex> unlock(_lock);
2749  _subscriptionManager->unsubscribe(cid);
2750  }
2751  _routes.removeRoute(subId);
2752  throw;
2753  }
2754  return subId;
2755  }
2756 
2757  std::string sowAndSubscribe(MessageHandler messageHandler_,
2758  const std::string& topic_,
2759  long timeout_,
2760  const std::string& filter_ = "",
2761  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2762  bool oofEnabled_ = false,
2763  int topN_ = AMPS_DEFAULT_TOP_N,
2764  bool isHASubscribe_ = true)
2765  {
2766  std::string notSet;
2767  return sowAndSubscribe(messageHandler_,
2768  topic_,
2769  filter_,
2770  notSet, // orderBy
2771  notSet, // bookmark
2772  batchSize_,
2773  topN_,
2774  (oofEnabled_ ? "oof" : ""),
2775  timeout_,
2776  isHASubscribe_);
2777  }
2778 
2779  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
2780  const std::string& topic_,
2781  const std::string& filter_ = "",
2782  const std::string& orderBy_ = "",
2783  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2784  int topN_ = AMPS_DEFAULT_TOP_N,
2785  const std::string& options_ = "",
2786  long timeout_ = AMPS_DEFAULT_COMMAND_TIMEOUT,
2787  bool isHASubscribe_ = true)
2788  {
2789  isHASubscribe_ &= (bool)_subscriptionManager;
2790  Lock<Mutex> l(_lock);
2791  _message.reset();
2792  _message.setCommandEnum(Message::Command::SOWAndDeltaSubscribe);
2793  _message.newCommandId();
2794  _message.setQueryID(_message.getCommandId());
2795  _message.setSubscriptionId(_message.getCommandId());
2796  std::string subId = _message.getSubscriptionId();
2797  _message.setTopic(topic_);
2798  if (filter_.length()) _message.setFilter(filter_);
2799  if (orderBy_.length()) _message.setOrderBy(orderBy_);
2800  _message.setBatchSize(AMPS::asString(batchSize_));
2801  if (topN_ != AMPS_DEFAULT_TOP_N) _message.setTopNRecordsReturned(AMPS::asString(topN_));
2802  if (options_.length()) _message.setOptions(options_);
2803  Message message = _message;
2804  if (isHASubscribe_)
2805  {
2806  message = _message.deepCopy();
2807  Unlock<Mutex> u(_lock);
2808  _subscriptionManager->subscribe(messageHandler_, message,
2809  Message::AckType::None);
2810  if (_badTimeToHASubscribe) return subId;
2811  }
2812  if (!_routes.hasRoute(message.getSubscriptionId()))
2813  {
2814  _routes.addRoute(message.getQueryID(), messageHandler_,
2815  Message::AckType::None, Message::AckType::Processed, true);
2816  }
2817  message.setAckTypeEnum(Message::AckType::Processed);
2818  if (!options_.empty()) message.setOptions(options_);
2819  try
2820  {
2821  syncAckProcessing(timeout_, message, isHASubscribe_);
2822  }
2823  catch (const DisconnectedException&)
2824  {
2825  if (!isHASubscribe_)
2826  {
2827  _routes.removeRoute(subId);
2828  throw;
2829  }
2830  }
2831  catch (const TimedOutException&)
2832  {
2833  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
2834  throw;
2835  }
2836  catch (...)
2837  {
2838  if (isHASubscribe_)
2839  {
2840  // Have to unlock before calling into sub manager to avoid deadlock
2841  Unlock<Mutex> unlock(_lock);
2842  _subscriptionManager->unsubscribe(Field(subId));
2843  }
2844  _routes.removeRoute(subId);
2845  throw;
2846  }
2847  return subId;
2848  }
2849 
2850  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
2851  const std::string& topic_,
2852  long timeout_,
2853  const std::string& filter_ = "",
2854  int batchSize_ = AMPS_DEFAULT_BATCH_SIZE,
2855  bool oofEnabled_ = false,
2856  bool sendEmpties_ = false,
2857  int topN_ = AMPS_DEFAULT_TOP_N,
2858  bool isHASubscribe_ = true)
2859  {
2860  std::string notSet;
2861  Message::Options options;
2862  if (oofEnabled_) options.setOOF();
2863  if (sendEmpties_ == false) options.setNoEmpties();
2864  return sowAndDeltaSubscribe(messageHandler_,
2865  topic_,
2866  filter_,
2867  notSet, // orderBy
2868  batchSize_,
2869  topN_,
2870  options,
2871  timeout_,
2872  isHASubscribe_);
2873  }
2874 
2875  std::string sowDelete(MessageHandler messageHandler_,
2876  const std::string& topic_,
2877  const std::string& filter_,
2878  long timeout_,
2879  Message::Field commandId_ = Message::Field())
2880  {
2881  if (_publishStore.isValid())
2882  {
2883  unsigned ackType = Message::AckType::Processed |
2884  Message::AckType::Stats |
2885  Message::AckType::Persisted;
2886  if (!publishStoreMessage)
2887  {
2888  publishStoreMessage = new Message();
2889  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2890  }
2891  publishStoreMessage->reset();
2892  if (commandId_.empty())
2893  {
2894  publishStoreMessage->newCommandId();
2895  commandId_ = publishStoreMessage->getCommandId();
2896  }
2897  else
2898  {
2899  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
2900  }
2901  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
2902  .assignSubscriptionId(commandId_.data(), commandId_.len())
2903  .assignQueryID(commandId_.data(), commandId_.len())
2904  .setAckTypeEnum(ackType)
2905  .assignTopic(topic_.c_str(), topic_.length())
2906  .assignFilter(filter_.c_str(), filter_.length());
2907  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
2908  char buf[AMPS_NUMBER_BUFFER_LEN];
2909  size_t pos = convertToCharArray(buf, haSequenceNumber);
2910  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
2911  {
2912  try
2913  {
2914  Lock<Mutex> l(_lock);
2915  _routes.addRoute(commandId_, messageHandler_,
2916  Message::AckType::Stats,
2917  Message::AckType::Processed|Message::AckType::Persisted,
2918  false);
2919  syncAckProcessing(timeout_, *publishStoreMessage,
2920  haSequenceNumber);
2921  }
2922  catch (const DisconnectedException&)
2923  {
2924  // Pass - it will get replayed upon reconnect
2925  }
2926  catch (...)
2927  {
2928  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
2929  throw;
2930  }
2931  }
2932  return (std::string)commandId_;
2933  }
2934  else
2935  {
2936  Lock<Mutex> l(_lock);
2937  _message.reset();
2938  if (commandId_.empty())
2939  {
2940  _message.newCommandId();
2941  commandId_ = _message.getCommandId();
2942  }
2943  else
2944  {
2945  _message.setCommandId(commandId_.data(), commandId_.len());
2946  }
2947  _message.setCommandEnum(Message::Command::SOWDelete)
2948  .assignSubscriptionId(commandId_.data(), commandId_.len())
2949  .assignQueryID(commandId_.data(), commandId_.len())
2950  .setAckTypeEnum(Message::AckType::Processed |
2951  Message::AckType::Stats)
2952  .assignTopic(topic_.c_str(), topic_.length())
2953  .assignFilter(filter_.c_str(), filter_.length());
2954  _routes.addRoute(commandId_, messageHandler_,
2955  Message::AckType::Stats,
2956  Message::AckType::Processed,
2957  false);
2958  try
2959  {
2960  syncAckProcessing(timeout_, _message);
2961  }
2962  catch (...)
2963  {
2964  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
2965  throw;
2966  }
2967  return (std::string)commandId_;
2968  }
2969  }
2970 
2971  std::string sowDeleteByData(MessageHandler messageHandler_,
2972  const std::string& topic_,
2973  const std::string& data_,
2974  long timeout_,
2975  Message::Field commandId_ = Message::Field())
2976  {
2977  if (_publishStore.isValid())
2978  {
2979  unsigned ackType = Message::AckType::Processed |
2980  Message::AckType::Stats |
2981  Message::AckType::Persisted;
2982  if (!publishStoreMessage)
2983  {
2984  publishStoreMessage = new Message();
2985  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
2986  }
2987  publishStoreMessage->reset();
2988  if (commandId_.empty())
2989  {
2990  publishStoreMessage->newCommandId();
2991  commandId_ = publishStoreMessage->getCommandId();
2992  }
2993  else
2994  {
2995  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
2996  }
2997  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
2998  .assignSubscriptionId(commandId_.data(), commandId_.len())
2999  .assignQueryID(commandId_.data(), commandId_.len())
3000  .setAckTypeEnum(ackType)
3001  .assignTopic(topic_.c_str(), topic_.length())
3002  .assignData(data_.c_str(), data_.length());
3003  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3004  char buf[AMPS_NUMBER_BUFFER_LEN];
3005  size_t pos = convertToCharArray(buf, haSequenceNumber);
3006  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3007  {
3008  try
3009  {
3010  Lock<Mutex> l(_lock);
3011  _routes.addRoute(commandId_, messageHandler_,
3012  Message::AckType::Stats,
3013  Message::AckType::Processed|Message::AckType::Persisted,
3014  false);
3015  syncAckProcessing(timeout_, *publishStoreMessage,
3016  haSequenceNumber);
3017  }
3018  catch (const DisconnectedException&)
3019  {
3020  // Pass - it will get replayed upon reconnect
3021  }
3022  catch (...)
3023  {
3024  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3025  throw;
3026  }
3027  }
3028  return (std::string)commandId_;
3029  }
3030  else
3031  {
3032  Lock<Mutex> l(_lock);
3033  _message.reset();
3034  if (commandId_.empty())
3035  {
3036  _message.newCommandId();
3037  commandId_ = _message.getCommandId();
3038  }
3039  else
3040  {
3041  _message.setCommandId(commandId_.data(), commandId_.len());
3042  }
3043  _message.setCommandEnum(Message::Command::SOWDelete)
3044  .assignSubscriptionId(commandId_.data(), commandId_.len())
3045  .assignQueryID(commandId_.data(), commandId_.len())
3046  .setAckTypeEnum(Message::AckType::Processed |
3047  Message::AckType::Stats)
3048  .assignTopic(topic_.c_str(), topic_.length())
3049  .assignData(data_.c_str(), data_.length());
3050  _routes.addRoute(commandId_, messageHandler_,
3051  Message::AckType::Stats,
3052  Message::AckType::Processed,
3053  false);
3054  try
3055  {
3056  syncAckProcessing(timeout_, _message);
3057  }
3058  catch (...)
3059  {
3060  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3061  throw;
3062  }
3063  return (std::string)commandId_;
3064  }
3065  }
3066 
3067  std::string sowDeleteByKeys(MessageHandler messageHandler_,
3068  const std::string& topic_,
3069  const std::string& keys_,
3070  long timeout_,
3071  Message::Field commandId_ = Message::Field())
3072  {
3073  if (_publishStore.isValid())
3074  {
3075  unsigned ackType = Message::AckType::Processed |
3076  Message::AckType::Stats |
3077  Message::AckType::Persisted;
3078  if (!publishStoreMessage)
3079  {
3080  publishStoreMessage = new Message();
3081  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3082  }
3083  publishStoreMessage->reset();
3084  if (commandId_.empty())
3085  {
3086  publishStoreMessage->newCommandId();
3087  commandId_ = publishStoreMessage->getCommandId();
3088  }
3089  else
3090  {
3091  publishStoreMessage->setCommandId(commandId_.data(), commandId_.len());
3092  }
3093  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3094  .assignSubscriptionId(commandId_.data(), commandId_.len())
3095  .assignQueryID(commandId_.data(), commandId_.len())
3096  .setAckTypeEnum(ackType)
3097  .assignTopic(topic_.c_str(), topic_.length())
3098  .assignSowKeys(keys_.c_str(), keys_.length());
3099  amps_uint64_t haSequenceNumber = _publishStore.store(*publishStoreMessage);
3100  char buf[AMPS_NUMBER_BUFFER_LEN];
3101  size_t pos = convertToCharArray(buf, haSequenceNumber);
3102  publishStoreMessage->assignSequence(buf+pos, AMPS_NUMBER_BUFFER_LEN-pos);
3103  {
3104  try
3105  {
3106  Lock<Mutex> l(_lock);
3107  _routes.addRoute(commandId_, messageHandler_,
3108  Message::AckType::Stats,
3109  Message::AckType::Processed|Message::AckType::Persisted,
3110  false);
3111  syncAckProcessing(timeout_, *publishStoreMessage,
3112  haSequenceNumber);
3113  }
3114  catch (const DisconnectedException&)
3115  {
3116  // Pass - it will get replayed upon reconnect
3117  }
3118  catch (...)
3119  {
3120  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3121  throw;
3122  }
3123  }
3124  return (std::string)commandId_;
3125  }
3126  else
3127  {
3128  Lock<Mutex> l(_lock);
3129  _message.reset();
3130  if (commandId_.empty())
3131  {
3132  _message.newCommandId();
3133  commandId_ = _message.getCommandId();
3134  }
3135  else
3136  {
3137  _message.setCommandId(commandId_.data(), commandId_.len());
3138  }
3139  _message.setCommandEnum(Message::Command::SOWDelete)
3140  .assignSubscriptionId(commandId_.data(), commandId_.len())
3141  .assignQueryID(commandId_.data(), commandId_.len())
3142  .setAckTypeEnum(Message::AckType::Processed |
3143  Message::AckType::Stats)
3144  .assignTopic(topic_.c_str(), topic_.length())
3145  .assignSowKeys(keys_.c_str(), keys_.length());
3146  _routes.addRoute(commandId_, messageHandler_,
3147  Message::AckType::Stats,
3148  Message::AckType::Processed,
3149  false);
3150  try
3151  {
3152  syncAckProcessing(timeout_, _message);
3153  }
3154  catch (...)
3155  {
3156  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(commandId_));
3157  throw;
3158  }
3159  return (std::string)commandId_;
3160  }
3161  }
3162 
3163  void startTimer(void)
3164  {
3165  if (_serverVersion >= "5.3.2.0")
3166  {
3167  throw CommandException("The start_timer command is deprecated.");
3168  }
3169  Lock<Mutex> l(_lock);
3170  _message.reset();
3171  _message.setCommandEnum(Message::Command::StartTimer);
3172 
3173  _send(_message);
3174  }
3175 
3176  std::string stopTimer(MessageHandler messageHandler_)
3177  {
3178  if (_serverVersion >= "5.3.2.0")
3179  {
3180  throw CommandException("The stop_timer command is deprecated.");
3181  }
3182  return executeAsync(Command("stop_timer").addAckType("completed"), messageHandler_);
3183  }
3184 
3185  amps_handle getHandle(void)
3186  {
3187  return _client;
3188  }
3189 
3197  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
3198  {
3199  _pExceptionListener = pListener_;
3200  _exceptionListener = _pExceptionListener.get();
3201  }
3202 
3203  void setExceptionListener(const ExceptionListener& listener_)
3204  {
3205  _exceptionListener = &listener_;
3206  }
3207 
3208  const ExceptionListener& getExceptionListener(void) const
3209  {
3210  return *_exceptionListener;
3211  }
3212 
3213  void setHeartbeat(unsigned heartbeatInterval_, unsigned readTimeout_)
3214  {
3215  if (readTimeout_ && readTimeout_ < heartbeatInterval_)
3216  {
3217  throw UsageException("The socket read timeout must be >= the heartbeat interval.");
3218  }
3219  Lock<Mutex> l(_lock);
3220  if(_heartbeatInterval != heartbeatInterval_ ||
3221  _readTimeout != readTimeout_)
3222  {
3223  _heartbeatInterval = heartbeatInterval_;
3224  _readTimeout = readTimeout_;
3225  _sendHeartbeat();
3226  }
3227  }
3228 
3229  void _sendHeartbeat(void)
3230  {
3231  if (_connected && _heartbeatInterval != 0)
3232  {
3233  std::ostringstream options;
3234  options << "start," << _heartbeatInterval;
3235  Message startMessage = Message()
3236  .setCommandEnum(Message::Command::Heartbeat)
3237  .setOptions(options.str());
3238 
3239  _heartbeatTimer.setTimeout(_heartbeatInterval * 1000.0);
3240  _heartbeatTimer.start();
3241  try
3242  {
3243  _sendWithoutRetry(startMessage);
3244  broadcastConnectionStateChanged(ConnectionStateListener::HeartbeatInitiated);
3245  }
3246  catch(ConnectionException &ex_)
3247  {
3248  // If we are disconnected when we attempt to send, that's OK;
3249  // we'll send this message after we re-connect (if we do).
3250  AMPS_UNHANDLED_EXCEPTION(ex_);
3251  }
3252  }
3253  amps_result result = AMPS_E_OK;
3254  if(_readTimeout && _connected)
3255  {
3256  result = amps_client_set_read_timeout(_client, (int)_readTimeout);
3257  }
3258  if (result != AMPS_E_OK && result != AMPS_E_DISCONNECTED)
3259  {
3260  AMPSException::throwFor(_client, result);
3261  }
3262  }
3263 
3264  void addConnectionStateListener(ConnectionStateListener *listener_)
3265  {
3266  Lock<Mutex> lock(_lock);
3267  _connectionStateListeners.insert(listener_);
3268  }
3269 
3270  void removeConnectionStateListener(ConnectionStateListener *listener_)
3271  {
3272  Lock<Mutex> lock(_lock);
3273  _connectionStateListeners.erase(listener_);
3274  }
3275 
3276  void _registerHandler(Command& command_, Message::Field& cid_,
3277  MessageHandler& handler_, unsigned requestedAcks_,
3278  unsigned systemAddedAcks_, bool isSubscribe_)
3279  {
3280  Message message = command_.getMessage();
3281  Message::Command::Type commandType = message.getCommandEnum();
3282  Message::Field subid = message.getSubscriptionId();
3283  Message::Field qid = message.getQueryID();
3284  // If we have an id, we're good, even if it's an existing route
3285  bool added = qid.len() || subid.len() || cid_.len();
3286  if (qid.len() > 0)
3287  {
3288  if (!_routes.hasRoute(qid))
3289  {
3290  _routes.addRoute(qid, handler_, requestedAcks_,
3291  systemAddedAcks_, isSubscribe_);
3292  }
3293  }
3294  if (subid.len() > 0)
3295  {
3296  if (!_routes.hasRoute(subid))
3297  {
3298  _routes.addRoute(subid, handler_, requestedAcks_,
3299  systemAddedAcks_, isSubscribe_);
3300  }
3301  }
3302  if (cid_.len() > 0)
3303  {
3304  if (!_routes.hasRoute(cid_))
3305  {
3306  _routes.addRoute(cid_, handler_, requestedAcks_,
3307  systemAddedAcks_, isSubscribe_);
3308  }
3309  }
3310  else if (commandType == Message::Command::Publish ||
3311  commandType == Message::Command::DeltaPublish)
3312  {
3313  cid_ = command_.getMessage().newCommandId().getCommandId();
3314  _routes.addRoute(cid_, handler_, requestedAcks_,
3315  systemAddedAcks_, isSubscribe_);
3316  added=true;
3317  }
3318  if (!added)
3319  {
3320  throw UsageException("To use a messagehandler, you must also supply a command or subscription ID.");
3321  }
3322  }
3323 
3324  std::string executeAsyncNoLock(Command& command_, MessageHandler& handler_,
3325  bool isHASubscribe_ = true)
3326  {
3327  isHASubscribe_ &= (bool)_subscriptionManager;
3328  Message& message = command_.getMessage();
3329  unsigned systemAddedAcks = (handler_.isValid() || command_.hasProcessedAck()) ?
3330  Message::AckType::Processed : Message::AckType::None;
3331  unsigned requestedAcks = message.getAckTypeEnum();
3332  bool isPublishStore = _publishStore.isValid() && command_.needsSequenceNumber();
3333  Message::Command::Type commandType = message.getCommandEnum();
3334  if (commandType == Message::Command::SOW ||
3335  commandType == Message::Command::StopTimer)
3336  systemAddedAcks |= Message::AckType::Completed;
3337  Message::Field cid = message.getCommandId();
3338  if (handler_.isValid() && cid.empty())
3339  {
3340  cid = message.newCommandId().getCommandId();
3341  }
3342  if (message.getBookmark().len() > 0)
3343  {
3344  if (command_.isSubscribe())
3345  {
3346  Message::Field bookmark = message.getBookmark();
3347  if (_bookmarkStore.isValid())
3348  {
3349  systemAddedAcks |= Message::AckType::Persisted;
3350  if (bookmark == AMPS_BOOKMARK_RECENT)
3351  {
3352  message.setBookmark(_bookmarkStore.getMostRecent(message.getSubscriptionId()));
3353  }
3354  else if (bookmark != AMPS_BOOKMARK_NOW &&
3355  bookmark != AMPS_BOOKMARK_EPOCH)
3356  {
3357  _bookmarkStore.log(message);
3358  if (!BookmarkRange::isRange(bookmark))
3359  {
3360  _bookmarkStore.discard(message);
3361  _bookmarkStore.persisted(message.getSubscriptionId(),
3362  bookmark);
3363  }
3364  }
3365  }
3366  else if (bookmark == AMPS_BOOKMARK_RECENT)
3367  {
3369  }
3370  }
3371  }
3372  if (isPublishStore)
3373  {
3374  systemAddedAcks |= Message::AckType::Persisted;
3375  }
3376  bool isSubscribe = command_.isSubscribe();
3377  if (handler_.isValid() && !isSubscribe)
3378  {
3379  _registerHandler(command_, cid, handler_,
3380  requestedAcks, systemAddedAcks, isSubscribe);
3381  }
3382  bool useSyncSend = cid.len() > 0 && command_.hasProcessedAck();
3383  if (isPublishStore)
3384  {
3385  amps_uint64_t haSequenceNumber = (amps_uint64_t)0;
3386  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3387  {
3388  Unlock<Mutex> u(_lock);
3389  haSequenceNumber = _publishStore.store(message);
3390  }
3391  message.setSequence(haSequenceNumber);
3392  if (useSyncSend)
3393  {
3394  try
3395  {
3396  syncAckProcessing((long)command_.getTimeout(), message,
3397  haSequenceNumber);
3398  }
3399  catch (const DisconnectedException&)
3400  {
3401  // Pass - message will get replayed when reconnected
3402  }
3403  catch (...)
3404  {
3405  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3406  throw;
3407  }
3408  }
3409  else _send(message, haSequenceNumber);
3410  }
3411  else
3412  {
3413  if(command_.isSubscribe())
3414  {
3415  const Message::Field& subId = message.getSubscriptionId();
3416  if (isHASubscribe_)
3417  {
3418  Unlock<Mutex> u(_lock);
3419  _subscriptionManager->subscribe(handler_,
3420  message.deepCopy(),
3421  requestedAcks);
3422  if (_badTimeToHASubscribe)
3423  {
3424  message.setAckTypeEnum(requestedAcks);
3425  return std::string(subId.data(), subId.len());
3426  }
3427  }
3428  if (handler_.isValid())
3429  {
3430  _registerHandler(command_, cid, handler_,
3431  requestedAcks, systemAddedAcks, isSubscribe);
3432  }
3433  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3434  if (useSyncSend)
3435  {
3436  try
3437  {
3438  syncAckProcessing((long)command_.getTimeout(), message,
3439  isHASubscribe_);
3440  }
3441  catch (const DisconnectedException&)
3442  {
3443  if (!isHASubscribe_)
3444  {
3445  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3446  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(subId));
3447  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3448  message.setAckTypeEnum(requestedAcks);
3449  throw;
3450  }
3451  }
3452  catch (const TimedOutException&)
3453  {
3454  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(cid));
3455  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(subId));
3456  AMPS_CALL_EXCEPTION_WRAPPER(unsubscribeInternal(message.getQueryId()));
3457  throw;
3458  }
3459  catch (...)
3460  {
3461  if (isHASubscribe_)
3462  {
3463  // Have to unlock before calling into sub manager to avoid deadlock
3464  Unlock<Mutex> unlock(_lock);
3465  _subscriptionManager->unsubscribe(subId);
3466  }
3467  if (message.getQueryID().len() > 0)
3468  _routes.removeRoute(message.getQueryID());
3469  _routes.removeRoute(cid);
3470  _routes.removeRoute(subId);
3471  throw;
3472  }
3473  }
3474  else _send(message);
3475  if (subId.len() > 0)
3476  {
3477  message.setAckTypeEnum(requestedAcks);
3478  return std::string(subId.data(), subId.len());
3479  }
3480  }
3481  else
3482  {
3483  message.setAckTypeEnum(requestedAcks|systemAddedAcks);
3484  if (useSyncSend)
3485  {
3486  try
3487  {
3488  syncAckProcessing((long)(command_.getTimeout()), message);
3489  }
3490  catch (const DisconnectedException&)
3491  {
3492  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3493  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3494  message.setAckTypeEnum(requestedAcks);
3495  throw;
3496  }
3497  catch (...)
3498  {
3499  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(cid));
3500  AMPS_CALL_EXCEPTION_WRAPPER(_routes.removeRoute(message.getQueryId()));
3501  message.setAckTypeEnum(requestedAcks);
3502  throw;
3503  }
3504  }
3505  else _send(message);
3506  }
3507  }
3508  message.setAckTypeEnum(requestedAcks);
3509  return cid;
3510  }
3511 
3512  MessageStream getEmptyMessageStream(void);
3513 
3514  std::string executeAsync(Command& command_, MessageHandler& handler_,
3515  bool isHASubscribe_ = true)
3516  {
3517  Lock<Mutex> lock(_lock);
3518  return executeAsyncNoLock(command_, handler_, isHASubscribe_);
3519  }
3520 
3521  // Queue Methods //
3522  void setAutoAck(bool isAutoAckEnabled_)
3523  {
3524  _isAutoAckEnabled = isAutoAckEnabled_;
3525  }
3526  bool getAutoAck(void) const
3527  {
3528  return _isAutoAckEnabled;
3529  }
3530  void setAckBatchSize(const unsigned batchSize_)
3531  {
3532  _ackBatchSize = batchSize_;
3533  if (!_queueAckTimeout)
3534  {
3535  _queueAckTimeout = AMPS_DEFAULT_QUEUE_ACK_TIMEOUT;
3536  amps_client_set_idle_time(_client, _queueAckTimeout);
3537  }
3538  }
3539  unsigned getAckBatchSize(void) const
3540  {
3541  return _ackBatchSize;
3542  }
3543  int getAckTimeout(void) const
3544  {
3545  return _queueAckTimeout;
3546  }
3547  void setAckTimeout(const int ackTimeout_)
3548  {
3549  amps_client_set_idle_time(_client,ackTimeout_);
3550  _queueAckTimeout = ackTimeout_;
3551  }
3552  size_t _ack(QueueBookmarks& queueBookmarks_)
3553  {
3554  if(queueBookmarks_._bookmarkCount)
3555  {
3556  if (!publishStoreMessage)
3557  {
3558  publishStoreMessage = new Message();
3559  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3560  }
3561  publishStoreMessage->reset();
3562  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3563  .setTopic(queueBookmarks_._topic)
3564  .setBookmark(queueBookmarks_._data)
3565  .setCommandId("AMPS-queue-ack");
3566  amps_uint64_t haSequenceNumber = 0;
3567  if (_publishStore.isValid())
3568  {
3569  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3570  publishStoreMessage->setAckType("persisted")
3571  .setSequence(haSequenceNumber);
3572  queueBookmarks_._data.erase();
3573  queueBookmarks_._bookmarkCount = 0;
3574  }
3575  _send(*publishStoreMessage, haSequenceNumber);
3576  if (!_publishStore.isValid())
3577  {
3578  queueBookmarks_._data.erase();
3579  queueBookmarks_._bookmarkCount = 0;
3580  }
3581  return 1;
3582  }
3583  return 0;
3584  }
3585  void ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3586  {
3587  if (_isAutoAckEnabled) return;
3588  _ack(topic_, bookmark_, options_);
3589  }
3590  void _ack(const Field& topic_, const Field& bookmark_, const char* options_ = NULL)
3591  {
3592  if (bookmark_.len() == 0) return;
3593  Lock<Mutex> lock(_lock);
3594  if(_ackBatchSize < 2 || options_ != NULL)
3595  {
3596  if (!publishStoreMessage)
3597  {
3598  publishStoreMessage = new Message();
3599  PerThreadMessageTracker::addMessageToCleanupList(publishStoreMessage);
3600  }
3601  publishStoreMessage->reset();
3602  publishStoreMessage->setCommandEnum(Message::Command::SOWDelete)
3603  .setCommandId("AMPS-queue-ack")
3604  .setTopic(topic_).setBookmark(bookmark_);
3605  if (options_) publishStoreMessage->setOptions(options_);
3606  amps_uint64_t haSequenceNumber = 0;
3607  if (_publishStore.isValid())
3608  {
3609  haSequenceNumber = _publishStore.store(*publishStoreMessage);
3610  publishStoreMessage->setAckType("persisted")
3611  .setSequence(haSequenceNumber);
3612  }
3613  _send(*publishStoreMessage, haSequenceNumber);
3614  return;
3615  }
3616  // have we acked anything for this hash
3617  topic_hash hash = CRC<0>::crcNoSSE(topic_.data(),topic_.len());
3618  TopicHashMap::iterator it = _topicHashMap.find(hash);
3619  if(it == _topicHashMap.end())
3620  {
3621  // add a new one to the map
3622  it = _topicHashMap.insert(TopicHashMap::value_type(hash,QueueBookmarks(topic_))).first;
3623  }
3624  QueueBookmarks &queueBookmarks = it->second;
3625  if(queueBookmarks._data.length())
3626  {
3627  queueBookmarks._data.append(",");
3628  }
3629  else
3630  {
3631  queueBookmarks._oldestTime = amps_now();
3632  }
3633  queueBookmarks._data.append(bookmark_);
3634  if(++queueBookmarks._bookmarkCount >= _ackBatchSize) _ack(queueBookmarks);
3635  }
3636  void flushAcks(void)
3637  {
3638  size_t sendCount = 0;
3639  if(1)
3640  {
3641  Lock<Mutex> lock(_lock);
3642  typedef TopicHashMap::iterator iterator;
3643  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3644  {
3645  QueueBookmarks& queueBookmarks = it->second;
3646  sendCount += _ack(queueBookmarks);
3647  }
3648  }
3649  if(sendCount) publishFlush(0);
3650  }
3651  // called when there's idle time, to see if we need to flush out any "acks"
3652  void checkQueueAcks(void)
3653  {
3654  if(!_topicHashMap.size()) return;
3655  Lock<Mutex> lock(_lock);
3656  try
3657  {
3658  amps_uint64_t threshold = amps_now() - (amps_uint64_t)_queueAckTimeout;
3659  typedef TopicHashMap::iterator iterator;
3660  for(iterator it = _topicHashMap.begin(), end = _topicHashMap.end(); it!=end; ++it)
3661  {
3662  QueueBookmarks& queueBookmarks = it->second;
3663  if(queueBookmarks._bookmarkCount && queueBookmarks._oldestTime < threshold) _ack(queueBookmarks);
3664  }
3665  }
3666  catch(std::exception& ex)
3667  {
3668  AMPS_UNHANDLED_EXCEPTION(ex);
3669  }
3670  }
3671 
3672  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
3673  {
3674  Lock<Mutex> lock(_lock);
3675  _deferredExecutionList.push_back(
3676  DeferredExecutionRequest(func_,userData_));
3677  }
3678 
3679  inline void processDeferredExecutions(void)
3680  {
3681  if(_deferredExecutionList.size())
3682  {
3683  Lock<Mutex> lock(_lock);
3684  DeferredExecutionList::iterator it = _deferredExecutionList.begin();
3685  DeferredExecutionList::iterator end = _deferredExecutionList.end();
3686  for(; it != end; ++it)
3687  {
3688  it->_func(it->_userData);
3689  }
3690  _deferredExecutionList.clear();
3691  _routes.invalidateCache();
3692  _routeCache.invalidateCache();
3693  }
3694  }
3695 
3696  bool getRetryOnDisconnect(void) const
3697  {
3698  return _isRetryOnDisconnect;
3699  }
3700 
3701  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
3702  {
3703  _isRetryOnDisconnect = isRetryOnDisconnect_;
3704  }
3705 
3706  void setDefaultMaxDepth(unsigned maxDepth_)
3707  {
3708  _defaultMaxDepth = maxDepth_;
3709  }
3710 
3711  unsigned getDefaultMaxDepth(void) const
3712  {
3713  return _defaultMaxDepth;
3714  }
3715 
3716  void setTransportFilterFunction(amps_transport_filter_function filter_,
3717  void* userData_)
3718  {
3719  amps_client_set_transport_filter_function(_client, filter_, userData_);
3720  }
3721 
3722  void setThreadCreatedCallback(amps_thread_created_callback callback_,
3723  void* userData_)
3724  {
3725  amps_client_set_thread_created_callback(_client, callback_, userData_);
3726  }
3727 }; // class ClientImpl
3802 
3804 {
3805  RefHandle<MessageStreamImpl> _body;
3806  public:
3811  class iterator
3812  {
3813  MessageStream* _pStream;
3814  Message _current;
3815  inline void advance(void);
3816 
3817  public:
3818  iterator() // end
3819  :_pStream(NULL)
3820  {;}
3821  iterator(MessageStream* pStream_)
3822  :_pStream(pStream_)
3823  {
3824  advance();
3825  }
3826 
3827  bool operator==(const iterator& rhs)
3828  {
3829  return _pStream == rhs._pStream;
3830  }
3831  bool operator!=(const iterator& rhs)
3832  {
3833  return _pStream != rhs._pStream;
3834  }
3835  void operator++(void) { advance(); }
3836  Message operator*(void) { return _current; }
3837  Message* operator->(void) { return &_current; }
3838  };
3840  bool isValid() const { return _body.isValid(); }
3841 
3845  {
3846  if(!_body.isValid())
3847  {
3848  throw UsageException("This MessageStream is not valid and cannot be iterated.");
3849  }
3850  return iterator(this);
3851  }
3854  // For non-SOW queries, the end is never reached.
3855  iterator end(void) { return iterator(); }
3856  inline MessageStream(void);
3857 
3863  MessageStream timeout(unsigned timeout_);
3864 
3868  MessageStream conflate(void);
3874  MessageStream maxDepth(unsigned maxDepth_);
3877  unsigned getMaxDepth(void) const;
3880  unsigned getDepth(void) const;
3881 
3882  private:
3883  inline MessageStream(const Client& client_);
3884  inline void setSOWOnly(const std::string& commandId_,
3885  const std::string& queryId_ = "");
3886  inline void setSubscription(const std::string& subId_,
3887  const std::string& commandId_ = "",
3888  const std::string& queryId_ = "");
3889  inline void setStatsOnly(const std::string& commandId_,
3890  const std::string& queryId_ = "");
3891  inline void setAcksOnly(const std::string& commandId_, unsigned acks_);
3892 
3893  inline operator MessageHandler(void);
3894 
3895  inline static MessageStream fromExistingHandler(const MessageHandler& handler);
3896 
3897  friend class Client;
3898 
3899 };
3900 
3920 class Client
3921 {
3922 protected:
3923  BorrowRefHandle<ClientImpl> _body;
3924 public:
3925  static const int DEFAULT_COMMAND_TIMEOUT = AMPS_DEFAULT_COMMAND_TIMEOUT;
3926  static const int DEFAULT_BATCH_SIZE = AMPS_DEFAULT_BATCH_SIZE;
3927  static const int DEFAULT_TOP_N = AMPS_DEFAULT_TOP_N;
3928 
3937  Client(const std::string& clientName = "")
3938  : _body(new ClientImpl(clientName), true)
3939  {;}
3940 
3941  Client(ClientImpl* existingClient)
3942  : _body(existingClient, true)
3943  {;}
3944 
3945  Client(ClientImpl* existingClient, bool isRef)
3946  : _body(existingClient, isRef)
3947  {;}
3948 
3949  Client(const Client& rhs) : _body(rhs._body) {;}
3950  virtual ~Client(void) {;}
3951 
3952  Client& operator=(const Client& rhs)
3953  {
3954  _body = rhs._body;
3955  return *this;
3956  }
3957 
3958  bool isValid()
3959  {
3960  return _body.isValid();
3961  }
3962 
3975  void setName(const std::string& name)
3976  {
3977  _body.get().setName(name);
3978  }
3979 
3982  const std::string& getName() const
3983  {
3984  return _body.get().getName();
3985  }
3986 
3993  void setLogonCorrelationData(const std::string& logonCorrelationData_)
3994  {
3995  _body.get().setLogonCorrelationData(logonCorrelationData_);
3996  }
3997 
4000  const std::string& getLogonCorrelationData() const
4001  {
4002  return _body.get().getLogonCorrelationData();
4003  }
4004 
4013  size_t getServerVersion() const
4014  {
4015  return _body.get().getServerVersion();
4016  }
4017 
4024  VersionInfo getServerVersionInfo() const
4025  {
4026  return _body.get().getServerVersionInfo();
4027  }
4028 
4038  static size_t convertVersionToNumber(const std::string& version_)
4039  {
4040  return AMPS::convertVersionToNumber(version_.c_str(), version_.length());
4041  }
4042 
4053  static size_t convertVersionToNumber(const char* data_, size_t len_)
4054  {
4055  return AMPS::convertVersionToNumber(data_, len_);
4056  }
4057 
4060  const std::string& getURI() const
4061  {
4062  return _body.get().getURI();
4063  }
4064 
4071 
4073 
4084  void connect(const std::string& uri)
4085  {
4086  _body.get().connect(uri);
4087  }
4088 
4091  void disconnect()
4092  {
4093  _body.get().disconnect();
4094  }
4095 
4109  void send(const Message& message)
4110  {
4111  _body.get().send(message);
4112  }
4113 
4122  void addMessageHandler(const Field& commandId_,
4123  const AMPS::MessageHandler& messageHandler_,
4124  unsigned requestedAcks_, bool isSubscribe_)
4125  {
4126  _body.get().addMessageHandler(commandId_, messageHandler_,
4127  requestedAcks_, isSubscribe_);
4128  }
4129 
4133  bool removeMessageHandler(const Field& commandId_)
4134  {
4135  return _body.get().removeMessageHandler(commandId_);
4136  }
4137 
4161  std::string send(MessageHandler messageHandler, Message& message, int timeout = 0)
4162  {
4163  return _body.get().send(messageHandler, message, timeout);
4164  }
4165 
4175  void setDisconnectHandler(DisconnectHandler disconnectHandler)
4176  {
4177  _body.get().setDisconnectHandler(disconnectHandler);
4178  }
4179 
4183  DisconnectHandler getDisconnectHandler(void)
4184  {
4185  return _body.get().getDisconnectHandler();
4186  }
4187 
4192  virtual ConnectionInfo getConnectionInfo() const
4193  {
4194  return _body.get().getConnectionInfo();
4195  }
4196 
4205  void setBookmarkStore(const BookmarkStore& bookmarkStore_)
4206  {
4207  _body.get().setBookmarkStore(bookmarkStore_);
4208  }
4209 
4214  {
4215  return _body.get().getBookmarkStore();
4216  }
4217 
4222  {
4223  return _body.get().getSubscriptionManager();
4224  }
4225 
4233  void setSubscriptionManager(SubscriptionManager* subscriptionManager_)
4234  {
4235  _body.get().setSubscriptionManager(subscriptionManager_);
4236  }
4237 
4257  void setPublishStore(const Store& publishStore_)
4258  {
4259  _body.get().setPublishStore(publishStore_);
4260  }
4261 
4266  {
4267  return _body.get().getPublishStore();
4268  }
4269 
4273  void setDuplicateMessageHandler(const MessageHandler& duplicateMessageHandler_)
4274  {
4275  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::DuplicateMessage,
4276  duplicateMessageHandler_);
4277  }
4278 
4289  {
4290  return _body.get().getDuplicateMessageHandler();
4291  }
4292 
4303  {
4304  _body.get().setFailedWriteHandler(handler_);
4305  }
4306 
4311  {
4312  return _body.get().getFailedWriteHandler();
4313  }
4314 
4315 
4333  amps_uint64_t publish(const std::string& topic_, const std::string& data_)
4334  {
4335  return _body.get().publish(topic_.c_str(), topic_.length(),
4336  data_.c_str(), data_.length());
4337  }
4338 
4358  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4359  const char* data_, size_t dataLength_)
4360  {
4361  return _body.get().publish(topic_, topicLength_, data_, dataLength_);
4362  }
4363 
4382  amps_uint64_t publish(const std::string& topic_, const std::string& data_,
4383  unsigned long expiration_)
4384  {
4385  return _body.get().publish(topic_.c_str(), topic_.length(),
4386  data_.c_str(), data_.length(), expiration_);
4387  }
4388 
4409  amps_uint64_t publish(const char* topic_, size_t topicLength_,
4410  const char* data_, size_t dataLength_,
4411  unsigned long expiration_)
4412  {
4413  return _body.get().publish(topic_, topicLength_,
4414  data_, dataLength_, expiration_);
4415  }
4416 
4450  void publishFlush(long timeout_ = 0)
4451  {
4452  _body.get().publishFlush(timeout_);
4453  }
4454 
4455 
4469  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_)
4470  {
4471  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4472  data_.c_str(), data_.length());
4473  }
4474 
4490  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4491  const char* data_, size_t dataLength_)
4492  {
4493  return _body.get().deltaPublish(topic_, topicLength_,
4494  data_, dataLength_);
4495  }
4496 
4511  amps_uint64_t deltaPublish(const std::string& topic_, const std::string& data_,
4512  unsigned long expiration_)
4513  {
4514  return _body.get().deltaPublish(topic_.c_str(), topic_.length(),
4515  data_.c_str(), data_.length(),
4516  expiration_);
4517  }
4518 
4535  amps_uint64_t deltaPublish(const char* topic_, size_t topicLength_,
4536  const char* data_, size_t dataLength_,
4537  unsigned long expiration_)
4538  {
4539  return _body.get().deltaPublish(topic_, topicLength_,
4540  data_, dataLength_, expiration_);
4541  }
4542 
4557  std::string logon(int timeout_ = 0,
4558  Authenticator& authenticator_ = DefaultAuthenticator::instance(),
4559  const char* options_ = NULL)
4560  {
4561  return _body.get().logon(timeout_, authenticator_, options_);
4562  }
4575  std::string logon(const char* options_, int timeout_ = 0)
4576  {
4577  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4578  options_);
4579  }
4580 
4593  std::string logon(const std::string& options_, int timeout_ = 0)
4594  {
4595  return _body.get().logon(timeout_, DefaultAuthenticator::instance(),
4596  options_.c_str());
4597  }
4598 
4618  std::string subscribe(MessageHandler messageHandler_,
4619  const std::string& topic_,
4620  long timeout_=0,
4621  const std::string& filter_="",
4622  const std::string& options_ = "",
4623  const std::string& subId_ = "")
4624  {
4625  return _body.get().subscribe(messageHandler_, topic_, timeout_,
4626  filter_, "", options_, subId_);
4627  }
4628 
4644  MessageStream subscribe(const std::string& topic_,
4645  long timeout_=0, const std::string& filter_="",
4646  const std::string& options_ = "",
4647  const std::string& subId_ = "")
4648  {
4649  MessageStream result(*this);
4650  if (_body.get().getDefaultMaxDepth())
4651  result.maxDepth(_body.get().getDefaultMaxDepth());
4652  result.setSubscription(_body.get().subscribe(
4653  result.operator MessageHandler(),
4654  topic_, timeout_, filter_, "",
4655  options_, subId_, false));
4656  return result;
4657  }
4658 
4674  MessageStream subscribe(const char* topic_,
4675  long timeout_ = 0, const std::string& filter_ = "",
4676  const std::string& options_ = "",
4677  const std::string& subId_ = "")
4678  {
4679  MessageStream result(*this);
4680  if (_body.get().getDefaultMaxDepth())
4681  result.maxDepth(_body.get().getDefaultMaxDepth());
4682  result.setSubscription(_body.get().subscribe(
4683  result.operator MessageHandler(),
4684  topic_, timeout_, filter_, "",
4685  options_, subId_, false));
4686  return result;
4687  }
4688 
4701  std::string deltaSubscribe(MessageHandler messageHandler_,
4702  const std::string& topic_,
4703  long timeout_,
4704  const std::string& filter_="",
4705  const std::string& options_ = "",
4706  const std::string& subId_ = "")
4707  {
4708  return _body.get().deltaSubscribe(messageHandler_, topic_, timeout_,
4709  filter_, "", options_, subId_);
4710  }
4719  MessageStream deltaSubscribe(const std::string& topic_,
4720  long timeout_, const std::string& filter_="",
4721  const std::string& options_ = "",
4722  const std::string& subId_ = "")
4723  {
4724  MessageStream result(*this);
4725  if (_body.get().getDefaultMaxDepth())
4726  result.maxDepth(_body.get().getDefaultMaxDepth());
4727  result.setSubscription(_body.get().deltaSubscribe(
4728  result.operator MessageHandler(),
4729  topic_, timeout_, filter_, "",
4730  options_, subId_, false));
4731  return result;
4732  }
4733 
4735  MessageStream deltaSubscribe(const char* topic_,
4736  long timeout_, const std::string& filter_ = "",
4737  const std::string& options_ = "",
4738  const std::string& subId_ = "")
4739  {
4740  MessageStream result(*this);
4741  if (_body.get().getDefaultMaxDepth())
4742  result.maxDepth(_body.get().getDefaultMaxDepth());
4743  result.setSubscription(_body.get().deltaSubscribe(
4744  result.operator MessageHandler(),
4745  topic_, timeout_, filter_, "",
4746  options_, subId_, false));
4747  return result;
4748  }
4749 
4775  std::string bookmarkSubscribe(MessageHandler messageHandler_,
4776  const std::string& topic_,
4777  long timeout_,
4778  const std::string& bookmark_,
4779  const std::string& filter_="",
4780  const std::string& options_ = "",
4781  const std::string& subId_ = "")
4782  {
4783  return _body.get().subscribe(messageHandler_, topic_, timeout_,
4784  filter_, bookmark_, options_, subId_);
4785  }
4803  MessageStream bookmarkSubscribe(const std::string& topic_,
4804  long timeout_,
4805  const std::string& bookmark_,
4806  const std::string& filter_="",
4807  const std::string& options_ = "",
4808  const std::string& subId_ = "")
4809  {
4810  MessageStream result(*this);
4811  if (_body.get().getDefaultMaxDepth())
4812  result.maxDepth(_body.get().getDefaultMaxDepth());
4813  result.setSubscription(_body.get().subscribe(
4814  result.operator MessageHandler(),
4815  topic_, timeout_, filter_,
4816  bookmark_, options_,
4817  subId_, false));
4818  return result;
4819  }
4820 
4822  MessageStream bookmarkSubscribe(const char* topic_,
4823  long timeout_,
4824  const std::string& bookmark_,
4825  const std::string& filter_ = "",
4826  const std::string& options_ = "",
4827  const std::string& subId_ = "")
4828  {
4829  MessageStream result(*this);
4830  if (_body.get().getDefaultMaxDepth())
4831  result.maxDepth(_body.get().getDefaultMaxDepth());
4832  result.setSubscription(_body.get().subscribe(
4833  result.operator MessageHandler(),
4834  topic_, timeout_, filter_,
4835  bookmark_, options_,
4836  subId_, false));
4837  return result;
4838  }
4839 
4848  void unsubscribe(const std::string& commandId)
4849  {
4850  return _body.get().unsubscribe(commandId);
4851  }
4852 
4861  {
4862  return _body.get().unsubscribe();
4863  }
4864 
4865 
4895  std::string sow(MessageHandler messageHandler_,
4896  const std::string& topic_,
4897  const std::string& filter_ = "",
4898  const std::string& orderBy_ = "",
4899  const std::string& bookmark_ = "",
4900  int batchSize_ = DEFAULT_BATCH_SIZE,
4901  int topN_ = DEFAULT_TOP_N,
4902  const std::string& options_ = "",
4903  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
4904  {
4905  return _body.get().sow(messageHandler_, topic_, filter_, orderBy_,
4906  bookmark_, batchSize_, topN_, options_,
4907  timeout_);
4908  }
4933  MessageStream sow(const std::string& topic_,
4934  const std::string& filter_ = "",
4935  const std::string& orderBy_ = "",
4936  const std::string& bookmark_ = "",
4937  int batchSize_ = DEFAULT_BATCH_SIZE,
4938  int topN_ = DEFAULT_TOP_N,
4939  const std::string& options_ = "",
4940  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
4941  {
4942  MessageStream result(*this);
4943  if (_body.get().getDefaultMaxDepth())
4944  result.maxDepth(_body.get().getDefaultMaxDepth());
4945  result.setSOWOnly(sow(result.operator MessageHandler(),topic_,filter_,orderBy_,bookmark_,batchSize_,topN_,options_,timeout_));
4946  return result;
4947  }
4948 
4950  MessageStream sow(const char* topic_,
4951  const std::string& filter_ = "",
4952  const std::string& orderBy_ = "",
4953  const std::string& bookmark_ = "",
4954  int batchSize_ = DEFAULT_BATCH_SIZE,
4955  int topN_ = DEFAULT_TOP_N,
4956  const std::string& options_ = "",
4957  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
4958  {
4959  MessageStream result(*this);
4960  if (_body.get().getDefaultMaxDepth())
4961  result.maxDepth(_body.get().getDefaultMaxDepth());
4962  result.setSOWOnly(sow(result.operator MessageHandler(), topic_, filter_, orderBy_, bookmark_, batchSize_, topN_, options_, timeout_));
4963  return result;
4964  }
4987  std::string sow(MessageHandler messageHandler_,
4988  const std::string& topic_,
4989  long timeout_,
4990  const std::string& filter_ = "",
4991  int batchSize_ = DEFAULT_BATCH_SIZE,
4992  int topN_ = DEFAULT_TOP_N)
4993  {
4994  return _body.get().sow(messageHandler_, topic_, timeout_, filter_,
4995  batchSize_, topN_);
4996  }
5019  std::string sowAndSubscribe(MessageHandler messageHandler_,
5020  const std::string& topic_,
5021  long timeout_,
5022  const std::string& filter_ = "",
5023  int batchSize_ = DEFAULT_BATCH_SIZE,
5024  bool oofEnabled_ = false,
5025  int topN_ = DEFAULT_TOP_N)
5026  {
5027  return _body.get().sowAndSubscribe(messageHandler_, topic_, timeout_,
5028  filter_, batchSize_, oofEnabled_,
5029  topN_);
5030  }
5031 
5051  MessageStream sowAndSubscribe(const std::string& topic_,
5052  long timeout_,
5053  const std::string& filter_ = "",
5054  int batchSize_ = DEFAULT_BATCH_SIZE,
5055  bool oofEnabled_ = false,
5056  int topN_ = DEFAULT_TOP_N)
5057  {
5058  MessageStream result(*this);
5059  if (_body.get().getDefaultMaxDepth())
5060  result.maxDepth(_body.get().getDefaultMaxDepth());
5061  result.setSubscription(_body.get().sowAndSubscribe(
5062  result.operator MessageHandler(),
5063  topic_, timeout_, filter_,
5064  batchSize_, oofEnabled_,
5065  topN_, false));
5066  return result;
5067  }
5087  MessageStream sowAndSubscribe(const char *topic_,
5088  long timeout_,
5089  const std::string& filter_ = "",
5090  int batchSize_ = DEFAULT_BATCH_SIZE,
5091  bool oofEnabled_ = false,
5092  int topN_ = DEFAULT_TOP_N)
5093  {
5094  MessageStream result(*this);
5095  if (_body.get().getDefaultMaxDepth())
5096  result.maxDepth(_body.get().getDefaultMaxDepth());
5097  result.setSubscription(_body.get().sowAndSubscribe(
5098  result.operator MessageHandler(),
5099  topic_, timeout_, filter_,
5100  batchSize_, oofEnabled_,
5101  topN_, false));
5102  return result;
5103  }
5104 
5105 
5133  std::string sowAndSubscribe(MessageHandler messageHandler_,
5134  const std::string& topic_,
5135  const std::string& filter_ = "",
5136  const std::string& orderBy_ = "",
5137  const std::string& bookmark_ = "",
5138  int batchSize_ = DEFAULT_BATCH_SIZE,
5139  int topN_ = DEFAULT_TOP_N,
5140  const std::string& options_ = "",
5141  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5142  {
5143  return _body.get().sowAndSubscribe(messageHandler_, topic_, filter_,
5144  orderBy_, bookmark_, batchSize_,
5145  topN_, options_, timeout_);
5146  }
5147 
5172  MessageStream sowAndSubscribe(const std::string& topic_,
5173  const std::string& filter_ = "",
5174  const std::string& orderBy_ = "",
5175  const std::string& bookmark_ = "",
5176  int batchSize_ = DEFAULT_BATCH_SIZE,
5177  int topN_ = DEFAULT_TOP_N,
5178  const std::string& options_ = "",
5179  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5180  {
5181  MessageStream result(*this);
5182  if (_body.get().getDefaultMaxDepth())
5183  result.maxDepth(_body.get().getDefaultMaxDepth());
5184  result.setSubscription(_body.get().sowAndSubscribe(
5185  result.operator MessageHandler(),
5186  topic_, filter_, orderBy_,
5187  bookmark_, batchSize_, topN_,
5188  options_, timeout_, false));
5189  return result;
5190  }
5191 
5193  MessageStream sowAndSubscribe(const char* topic_,
5194  const std::string& filter_ = "",
5195  const std::string& orderBy_ = "",
5196  const std::string& bookmark_ = "",
5197  int batchSize_ = DEFAULT_BATCH_SIZE,
5198  int topN_ = DEFAULT_TOP_N,
5199  const std::string& options_ = "",
5200  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5201  {
5202  MessageStream result(*this);
5203  if (_body.get().getDefaultMaxDepth())
5204  result.maxDepth(_body.get().getDefaultMaxDepth());
5205  result.setSubscription(_body.get().sowAndSubscribe(
5206  result.operator MessageHandler(),
5207  topic_, filter_, orderBy_,
5208  bookmark_, batchSize_, topN_,
5209  options_, timeout_, false));
5210  return result;
5211  }
5212 
5237  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
5238  const std::string& topic_,
5239  const std::string& filter_ = "",
5240  const std::string& orderBy_ = "",
5241  int batchSize_ = DEFAULT_BATCH_SIZE,
5242  int topN_ = DEFAULT_TOP_N,
5243  const std::string& options_ = "",
5244  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5245  {
5246  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5247  filter_, orderBy_, batchSize_,
5248  topN_, options_, timeout_);
5249  }
5270  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5271  const std::string& filter_ = "",
5272  const std::string& orderBy_ = "",
5273  int batchSize_ = DEFAULT_BATCH_SIZE,
5274  int topN_ = DEFAULT_TOP_N,
5275  const std::string& options_ = "",
5276  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5277  {
5278  MessageStream result(*this);
5279  if (_body.get().getDefaultMaxDepth())
5280  result.maxDepth(_body.get().getDefaultMaxDepth());
5281  result.setSubscription(sowAndDeltaSubscribe(result.operator MessageHandler(), topic_, filter_, orderBy_, batchSize_, topN_, options_, timeout_));
5282  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5283  result.operator MessageHandler(),
5284  topic_, filter_, orderBy_,
5285  batchSize_, topN_, options_,
5286  timeout_, false));
5287  return result;
5288  }
5289 
5292  const std::string& filter_ = "",
5293  const std::string& orderBy_ = "",
5294  int batchSize_ = DEFAULT_BATCH_SIZE,
5295  int topN_ = DEFAULT_TOP_N,
5296  const std::string& options_ = "",
5297  long timeout_ = DEFAULT_COMMAND_TIMEOUT)
5298  {
5299  MessageStream result(*this);
5300  if (_body.get().getDefaultMaxDepth())
5301  result.maxDepth(_body.get().getDefaultMaxDepth());
5302  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5303  result.operator MessageHandler(),
5304  topic_, filter_, orderBy_,
5305  batchSize_, topN_, options_,
5306  timeout_, false));
5307  return result;
5308  }
5309 
5334  std::string sowAndDeltaSubscribe(MessageHandler messageHandler_,
5335  const std::string& topic_,
5336  long timeout_,
5337  const std::string& filter_ = "",
5338  int batchSize_ = DEFAULT_BATCH_SIZE,
5339  bool oofEnabled_ = false,
5340  bool sendEmpties_ = false,
5341  int topN_ = DEFAULT_TOP_N)
5342  {
5343  return _body.get().sowAndDeltaSubscribe(messageHandler_, topic_,
5344  timeout_, filter_, batchSize_,
5345  oofEnabled_, sendEmpties_,
5346  topN_);
5347  }
5348 
5370  MessageStream sowAndDeltaSubscribe(const std::string& topic_,
5371  long timeout_,
5372  const std::string& filter_ = "",
5373  int batchSize_ = DEFAULT_BATCH_SIZE,
5374  bool oofEnabled_ = false,
5375  bool sendEmpties_ = false,
5376  int topN_ = DEFAULT_TOP_N)
5377  {
5378  MessageStream result(*this);
5379  if (_body.get().getDefaultMaxDepth())
5380  result.maxDepth(_body.get().getDefaultMaxDepth());
5381  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5382  result.operator MessageHandler(),
5383  topic_, timeout_, filter_,
5384  batchSize_, oofEnabled_,
5385  sendEmpties_, topN_, false));
5386  return result;
5387  }
5410  long timeout_,
5411  const std::string& filter_ = "",
5412  int batchSize_ = DEFAULT_BATCH_SIZE,
5413  bool oofEnabled_ = false,
5414  bool sendEmpties_ = false,
5415  int topN_ = DEFAULT_TOP_N)
5416  {
5417  MessageStream result(*this);
5418  if (_body.get().getDefaultMaxDepth())
5419  result.maxDepth(_body.get().getDefaultMaxDepth());
5420  result.setSubscription(_body.get().sowAndDeltaSubscribe(
5421  result.operator MessageHandler(),
5422  topic_, timeout_, filter_,
5423  batchSize_, oofEnabled_,
5424  sendEmpties_, topN_, false));
5425  return result;
5426  }
5446  std::string sowDelete(MessageHandler messageHandler,
5447  const std::string& topic,
5448  const std::string& filter,
5449  long timeout)
5450  {
5451  return _body.get().sowDelete(messageHandler, topic, filter, timeout);
5452  }
5469  Message sowDelete(const std::string& topic, const std::string& filter,
5470  long timeout=0)
5471  {
5472  MessageStream stream(*this);
5473  char buf[Message::IdentifierLength+1];
5474  buf[Message::IdentifierLength] = 0;
5475  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5476  Field cid(buf);
5477  try
5478  {
5479  stream.setStatsOnly(cid);
5480  _body.get().sowDelete(stream.operator MessageHandler(),topic,filter,timeout,cid);
5481  return *(stream.begin());
5482  }
5483  catch (const DisconnectedException&)
5484  {
5485  removeMessageHandler(cid);
5486  throw;
5487  }
5488  }
5489 
5494  void startTimer()
5495  {
5496  _body.get().startTimer();
5497  }
5498 
5505  std::string stopTimer(MessageHandler messageHandler)
5506  {
5507  return _body.get().stopTimer(messageHandler);
5508  }
5509 
5531  std::string sowDeleteByKeys(MessageHandler messageHandler_,
5532  const std::string& topic_,
5533  const std::string& keys_,
5534  long timeout_=0)
5535  {
5536  return _body.get().sowDeleteByKeys(messageHandler_, topic_, keys_, timeout_);
5537  }
5558  Message sowDeleteByKeys(const std::string& topic_, const std::string& keys_,
5559  long timeout_ = 0)
5560  {
5561  MessageStream stream(*this);
5562  char buf[Message::IdentifierLength+1];
5563  buf[Message::IdentifierLength] = 0;
5564  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5565  Field cid(buf);
5566  try
5567  {
5568  stream.setStatsOnly(cid);
5569  _body.get().sowDeleteByKeys(stream.operator MessageHandler(),topic_,keys_,timeout_,cid);
5570  return *(stream.begin());
5571  }
5572  catch (const DisconnectedException&)
5573  {
5574  removeMessageHandler(cid);
5575  throw;
5576  }
5577  }
5578 
5593  std::string sowDeleteByData(MessageHandler messageHandler_,
5594  const std::string& topic_, const std::string& data_,
5595  long timeout_=0)
5596  {
5597  return _body.get().sowDeleteByData(messageHandler_, topic_, data_, timeout_);
5598  }
5599 
5614  Message sowDeleteByData(const std::string& topic_, const std::string& data_,
5615  long timeout_=0)
5616  {
5617  MessageStream stream(*this);
5618  char buf[Message::IdentifierLength+1];
5619  buf[Message::IdentifierLength] = 0;
5620  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , MessageImpl::newId());
5621  Field cid(buf);
5622  try
5623  {
5624  stream.setStatsOnly(cid);
5625  _body.get().sowDeleteByData(stream.operator MessageHandler(),topic_,data_,timeout_,cid);
5626  return *(stream.begin());
5627  }
5628  catch (const DisconnectedException&)
5629  {
5630  removeMessageHandler(cid);
5631  throw;
5632  }
5633  }
5634 
5639  {
5640  return _body.get().getHandle();
5641  }
5642 
5651  void setExceptionListener(const std::shared_ptr<const ExceptionListener>& pListener_)
5652  {
5653  _body.get().setExceptionListener(pListener_);
5654  }
5655 
5665  {
5666  _body.get().setExceptionListener(listener_);
5667  }
5668 
5672  {
5673  return _body.get().getExceptionListener();
5674  }
5675 
5683  // type of message) from the server for the specified interval (plus a grace period),
5697  void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
5698  {
5699  _body.get().setHeartbeat(heartbeatTime_, readTimeout_);
5700  }
5701 
5709  // type of message) from the server for the specified interval (plus a grace period),
5721  void setHeartbeat(unsigned heartbeatTime_)
5722  {
5723  _body.get().setHeartbeat(heartbeatTime_, 2 * heartbeatTime_);
5724  }
5725 
5728  {
5729  setLastChanceMessageHandler(messageHandler);
5730  }
5731 
5735  {
5736  _body.get().setGlobalCommandTypeMessageHandler(ClientImpl::GlobalCommandTypeHandlers::LastChance,
5737  messageHandler);
5738  }
5739 
5760  void setGlobalCommandTypeMessageHandler(const std::string& command_, const MessageHandler& handler_)
5761  {
5762  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
5763  }
5764 
5785  void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler& handler_)
5786  {
5787  _body.get().setGlobalCommandTypeMessageHandler(command_, handler_);
5788  }
5789 
5790  static const char* BOOKMARK_NOW() { return AMPS_BOOKMARK_NOW; }
5791  static const char* BOOKMARK_EPOCH() { return AMPS_BOOKMARK_EPOCH; }
5792  static const char* BOOKMARK_RECENT() { return AMPS_BOOKMARK_RECENT; }
5793  static const char* BOOKMARK_MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
5794  static const char* NOW() { return AMPS_BOOKMARK_NOW; }
5795  static const char* EPOCH() { return AMPS_BOOKMARK_EPOCH; }
5796  static const char* MOST_RECENT() { return AMPS_BOOKMARK_RECENT; }
5797 
5798 
5805  {
5806  _body.get().addConnectionStateListener(listener);
5807  }
5808 
5813  {
5814  _body.get().removeConnectionStateListener(listener);
5815  }
5816 
5842  std::string executeAsync(Command& command_, MessageHandler handler_)
5843  {
5844  return _body.get().executeAsync(command_, handler_);
5845  }
5846 
5876  std::string executeAsyncNoResubscribe(Command& command_,
5877  MessageHandler handler_)
5878  {
5879  std::string id;
5880  try
5881  {
5882  id = _body.get().executeAsync(command_, handler_, false);
5883  }
5884  catch (const DisconnectedException&)
5885  {
5886  removeMessageHandler(command_.getMessage().getCommandId());
5887  if (command_.isSubscribe())
5888  {
5889  removeMessageHandler(command_.getMessage().getSubscriptionId());
5890  }
5891  if (command_.isSow())
5892  {
5893  removeMessageHandler(command_.getMessage().getQueryID());
5894  }
5895  throw;
5896  }
5897  return id;
5898  }
5899 
5912  MessageStream execute(Command& command_);
5913 
5922  void ack(Field& topic_, Field& bookmark_, const char* options_ = NULL)
5923  {
5924  _body.get().ack(topic_,bookmark_,options_);
5925  }
5926 
5934  void ack(Message& message_, const char* options_ = NULL)
5935  {
5936  _body.get().ack(message_.getTopic(),message_.getBookmark(),options_);
5937  }
5946  void ack(const std::string& topic_, const std::string& bookmark_,
5947  const char* options_ = NULL)
5948  {
5949  _body.get().ack(Field(topic_.data(),topic_.length()), Field(bookmark_.data(),bookmark_.length()),options_);
5950  }
5951 
5957  void ackDeferredAutoAck(Field& topic_, Field& bookmark_, const char* options_ = NULL)
5958  {
5959  _body.get()._ack(topic_,bookmark_,options_);
5960  }
5970  void flushAcks(void)
5971  {
5972  _body.get().flushAcks();
5973  }
5974 
5979  bool getAutoAck(void) const
5980  {
5981  return _body.get().getAutoAck();
5982  }
5989  void setAutoAck(bool isAutoAckEnabled_)
5990  {
5991  _body.get().setAutoAck(isAutoAckEnabled_);
5992  }
5997  unsigned getAckBatchSize(void) const
5998  {
5999  return _body.get().getAckBatchSize();
6000  }
6007  void setAckBatchSize(const unsigned ackBatchSize_)
6008  {
6009  _body.get().setAckBatchSize(ackBatchSize_);
6010  }
6011 
6018  int getAckTimeout(void) const
6019  {
6020  return _body.get().getAckTimeout();
6021  }
6028  void setAckTimeout(const int ackTimeout_)
6029  {
6030  _body.get().setAckTimeout(ackTimeout_);
6031  }
6032 
6033 
6042  void setRetryOnDisconnect(bool isRetryOnDisconnect_)
6043  {
6044  _body.get().setRetryOnDisconnect(isRetryOnDisconnect_);
6045  }
6046 
6051  bool getRetryOnDisconnect(void) const
6052  {
6053  return _body.get().getRetryOnDisconnect();
6054  }
6055 
6060  void setDefaultMaxDepth(unsigned maxDepth_)
6061  {
6062  _body.get().setDefaultMaxDepth(maxDepth_);
6063  }
6064 
6069  unsigned getDefaultMaxDepth(void) const
6070  {
6071  return _body.get().getDefaultMaxDepth();
6072  }
6073 
6081  void* userData_)
6082  {
6083  return _body.get().setTransportFilterFunction(filter_, userData_);
6084  }
6085 
6093  void* userData_)
6094  {
6095  return _body.get().setThreadCreatedCallback(callback_, userData_);
6096  }
6097 
6103  void deferredExecution(DeferredExecutionFunc func_, void* userData_)
6104  {
6105  _body.get().deferredExecution(func_,userData_);
6106  }
6110 };
6111 
6112 inline void
6113 ClientImpl::lastChance(AMPS::Message& message)
6114 {
6115  AMPS_CALL_EXCEPTION_WRAPPER(_globalCommandTypeHandlers[GlobalCommandTypeHandlers::LastChance].invoke(message));
6116 }
6117 
6118 inline unsigned
6119 ClientImpl::persistedAck(AMPS::Message& message)
6120 {
6121  unsigned deliveries = 0;
6122  try
6123  {
6124  /*
6125  * Best Practice: If you don't care about the dupe acks that
6126  * occur during failover or rapid disconnect/reconnect, then just
6127  * ignore them. We could discard each duplicate from the
6128  * persisted store, but the storage costs of doing 1 record
6129  * discards is heavy. In most scenarios we'll just quickly blow
6130  * through the duplicates and get back to processing the
6131  * non-dupes.
6132  */
6133  const char* data = NULL;
6134  size_t len = 0;
6135  const char* status = NULL;
6136  size_t statusLen = 0;
6137  amps_handle messageHandle = message.getMessage();
6138  const size_t NotEntitled = 12, Duplicate = 9, Failure = 7;
6139  amps_message_get_field_value(messageHandle, AMPS_Reason, &data, &len);
6140  amps_message_get_field_value(messageHandle, AMPS_Status,
6141  &status, &statusLen);
6142  if (len == NotEntitled || len == Duplicate ||
6143  (statusLen == Failure && status[0] == 'f'))
6144  {
6145  if (_failedWriteHandler)
6146  {
6147  if (_publishStore.isValid())
6148  {
6149  amps_uint64_t sequence =
6150  amps_message_get_field_uint64(messageHandle,
6151  AMPS_Sequence);
6152  FailedWriteStoreReplayer replayer(this, data, len);
6153  _publishStore.replaySingle(replayer, sequence);
6154  }
6155  else // Call the handler with what little we have
6156  {
6157  static Message emptyMessage;
6158  emptyMessage.setSequence(message.getSequence());
6159  _failedWriteHandler->failedWrite(emptyMessage, data, len);
6160  }
6161  ++deliveries;
6162  }
6163  }
6164  if (_publishStore.isValid())
6165  {
6166  // Ack for publisher will have sequence while
6167  // ack for bookmark subscribe won't
6168  amps_uint64_t seq = amps_message_get_field_uint64(messageHandle,
6169  AMPS_Sequence);
6170  if (seq > 0)
6171  {
6172  ++deliveries;
6173  _publishStore.discardUpTo(seq);
6174  }
6175  }
6176 
6177  if (!deliveries && _bookmarkStore.isValid())
6178  {
6179  amps_message_get_field_value(messageHandle, AMPS_SubscriptionId,
6180  &data, &len);
6181  if (len > 0)
6182  {
6183  Message::Field subId(data, len);
6184  const char* bookmarkData = NULL;
6185  size_t bookmarkLen = 0;
6186  amps_message_get_field_value(messageHandle, AMPS_Bookmark,
6187  &bookmarkData, &bookmarkLen);
6188  // Everything is there and not unsubscribed AC-912
6189  if (bookmarkLen > 0 && _routes.hasRoute(subId))
6190  {
6191  ++deliveries;
6192  _bookmarkStore.persisted(subId, Message::Field(bookmarkData, bookmarkLen));
6193  }
6194  }
6195  }
6196  }
6197  catch (std::exception& ex)
6198  {
6199  AMPS_UNHANDLED_EXCEPTION(ex);
6200  }
6201  return deliveries;
6202 }
6203 
6204 inline unsigned
6205 ClientImpl::processedAck(Message &message)
6206 {
6207  unsigned deliveries = 0;
6208  AckResponse ack;
6209  const char* data = NULL;
6210  size_t len = 0;
6211  amps_handle messageHandle = message.getMessage();
6212  amps_message_get_field_value(messageHandle, AMPS_CommandId, &data, &len);
6213  Lock<Mutex> l(_lock);
6214  if (data && len)
6215  {
6216  AckMap::iterator i = _acks.find(std::string(data, len));
6217  if (i != _acks.end())
6218  {
6219  ++deliveries;
6220  ack = i->second;
6221  _acks.erase(i);
6222  }
6223  }
6224  if (deliveries)
6225  {
6226  amps_message_get_field_value(messageHandle, AMPS_Status, &data,
6227  &len);
6228  ack.setStatus(data, len);
6229  amps_message_get_field_value(messageHandle, AMPS_Reason, &data,
6230  &len);
6231  ack.setReason(data, len);
6232  amps_message_get_field_value(messageHandle, AMPS_UserId, &data,
6233  &len);
6234  ack.setUsername(data, len);
6235  amps_message_get_field_value(messageHandle, AMPS_Password, &data,
6236  &len);
6237  ack.setPassword(data, len);
6238  amps_message_get_field_value(messageHandle, AMPS_Sequence, &data,
6239  &len);
6240  ack.setSequenceNo(data, len);
6241  amps_message_get_field_value(messageHandle, AMPS_Version, &data,
6242  &len);
6243  ack.setServerVersion(data, len);
6244  amps_message_get_field_value(messageHandle, AMPS_Options, &data, &len);
6245  ack.setOptions(data,len);
6246  ack.setResponded(true);
6247  _lock.signalAll();
6248  }
6249  return deliveries;
6250 }
6251 
6252 inline void
6253 ClientImpl::checkAndSendHeartbeat(bool force)
6254 {
6255  if (force || _heartbeatTimer.check())
6256  {
6257  _heartbeatTimer.start();
6258  try
6259  {
6260  sendWithoutRetry(_beatMessage);
6261  }
6262  catch (const AMPSException&)
6263  {
6264  ;
6265  }
6266  }
6267 }
6268 
6269 inline ConnectionInfo ClientImpl::getConnectionInfo() const
6270 {
6271  ConnectionInfo info;
6272  std::ostringstream writer;
6273 
6274  info["client.uri"] = _lastUri;
6275  info["client.name"] = _name;
6276  info["client.username"] = _username;
6277  if(_publishStore.isValid())
6278  {
6279  writer << _publishStore.unpersistedCount();
6280  info["publishStore.unpersistedCount"] = writer.str();
6281  writer.clear();
6282  writer.str("");
6283  }
6284 
6285  return info;
6286 }
6287 
6288 inline amps_result
6289 ClientImpl::ClientImplMessageHandler(amps_handle messageHandle_, void* userData_)
6290 {
6291  const unsigned SOWMask = Message::Command::SOW | Message::Command::GroupBegin | Message::Command::GroupEnd;
6292  const unsigned PublishMask = Message::Command::OOF | Message::Command::Publish | Message::Command::DeltaPublish;
6293  ClientImpl* me = (ClientImpl*) userData_;
6294  if(!messageHandle_)
6295  {
6296  if(me->_queueAckTimeout) me->checkQueueAcks();
6297  return AMPS_E_OK;
6298  }
6299  me->processDeferredExecutions();
6300 
6301  me->_readMessage.replace(messageHandle_);
6302  Message& message = me->_readMessage;
6303  Message::Command::Type commandType = message.getCommandEnum();
6304  if (commandType & SOWMask)
6305  {
6306 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6307  // A small cheat here to get the right handler, using knowledge of the
6308  // Command values of SOW (8), GroupBegin (8192), and GroupEnd (16384)
6309  // and their GlobalCommandTypeHandlers values 1, 2, 3.
6310  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6311  me->_globalCommandTypeHandlers[1+(commandType/8192)].invoke(message));
6312 #endif
6313  AMPS_CALL_EXCEPTION_WRAPPER_2(me,me->_routes.deliverData(message,
6314  message.getQueryID()));
6315  }
6316  else if (commandType & PublishMask)
6317  {
6318 #if 0 // Not currently implemented, to avoid an extra branch in delivery
6319  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6320  me->_globalCommandTypeHandlers[(commandType==Message::Command::Publish ?
6321  GlobalCommandTypeHandlers::Publish :
6322  GlobalCommandTypeHandlers::OOF)].invoke(message));
6323 #endif
6324  const char* subIds = NULL;
6325  size_t subIdsLen = 0;
6326  // Publish command, send to subscriptions
6327  amps_message_get_field_value(messageHandle_, AMPS_SubscriptionIds, &subIds, &subIdsLen);
6328  size_t subIdCount = me->_routes.parseRoutes(AMPS::Field(subIds, subIdsLen), me->_routeCache);
6329  for(size_t i=0; i<subIdCount; ++i)
6330  {
6331  MessageRouter::RouteCache::value_type& lookupResult = me->_routeCache[i];
6332  MessageHandler& handler = lookupResult.handler;
6333  if (handler.isValid())
6334  {
6335  amps_message_set_field_value(messageHandle_, AMPS_SubscriptionId,
6336  subIds + lookupResult.idOffset, lookupResult.idLength);
6337  Message::Field bookmark = message.getBookmark();
6338  bool isMessageQueue = message.getLeasePeriod().len() != 0;
6339  bool isAutoAck = me->_isAutoAckEnabled;
6340 
6341  if (!isMessageQueue && !bookmark.empty() &&
6342  me->_bookmarkStore.isValid())
6343  {
6344  if (me->_bookmarkStore.isDiscarded(me->_readMessage))
6345  {
6346  //Call duplicate message handler in handlers map
6347  if (me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].isValid())
6348  {
6349  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::DuplicateMessage].invoke(message);
6350  }
6351  }
6352  else
6353  {
6354  me->_bookmarkStore.log(me->_readMessage);
6355  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6356  handler.invoke(message));
6357  }
6358  }
6359  else
6360  {
6361  if(isMessageQueue && isAutoAck)
6362  {
6363  try
6364  {
6365  AMPS_CALL_EXCEPTION_WRAPPER_STREAM_FULL_2(me, handler.invoke(message));
6366  if (!message.getIgnoreAutoAck())
6367  {
6368  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6369  me->_ack(message.getTopic(),message.getBookmark()));
6370  }
6371  }
6372  catch(std::exception& ex)
6373  {
6374  if (!message.getIgnoreAutoAck())
6375  {
6376  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6377  me->_ack(message.getTopic(),message.getBookmark(),"cancel"));
6378  }
6379  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6380  }
6381  }
6382  else
6383  {
6384  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6385  handler.invoke(message));
6386  }
6387  }
6388  }
6389  else me->lastChance(message);
6390  } // for (subidsEnd)
6391  }
6392  else if (commandType == Message::Command::Ack)
6393  {
6394  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6395  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Ack].invoke(message));
6396  unsigned ackType = message.getAckTypeEnum();
6397  unsigned deliveries = 0U;
6398  switch (ackType)
6399  {
6400  case Message::AckType::Persisted:
6401  deliveries += me->persistedAck(message);
6402  break;
6403  case Message::AckType::Processed: // processed
6404  deliveries += me->processedAck(message);
6405  break;
6406  }
6407  AMPS_CALL_EXCEPTION_WRAPPER_2(me, deliveries += me->_routes.deliverAck(message, ackType));
6408  if (deliveries == 0)
6409  {
6410  me->lastChance(message);
6411  }
6412  }
6413  else if (commandType == Message::Command::Heartbeat)
6414  {
6415  AMPS_CALL_EXCEPTION_WRAPPER_2(me,
6416  me->_globalCommandTypeHandlers[GlobalCommandTypeHandlers::Heartbeat].invoke(message));
6417  if(me->_heartbeatTimer.getTimeout() != 0.0)
6418  {
6419  me->checkAndSendHeartbeat(true);
6420  }
6421  else
6422  {
6423  me->lastChance(message);
6424  }
6425  return AMPS_E_OK;
6426  }
6427  else if (!message.getCommandId().empty())
6428  {
6429  unsigned deliveries = 0U;
6430  try
6431  {
6432  while(me->_connected) // Keep sending heartbeats when stream is full
6433  {
6434  try
6435  {
6436  deliveries = me->_routes.deliverData(message, message.getCommandId());
6437  break;
6438  }
6439 #ifdef _WIN32
6440  catch(MessageStreamFullException&)
6441 #else
6442  catch(MessageStreamFullException& ex_)
6443 #endif
6444  {
6445  me->checkAndSendHeartbeat(false);
6446  }
6447  }
6448  }
6449  catch (std::exception& ex_)
6450  {
6451  try
6452  {
6453  me->_exceptionListener->exceptionThrown(ex_);
6454  }
6455  catch(...)
6456  {
6457  ;
6458  }
6459  }
6460  if (deliveries == 0)
6461  me->lastChance(message);
6462  }
6463  me->checkAndSendHeartbeat();
6464  return AMPS_E_OK;
6465 }
6466 
6467 inline void
6468 ClientImpl::ClientImplPreDisconnectHandler(amps_handle /*client*/, unsigned failedConnectionVersion, void* userData)
6469 {
6470  ClientImpl* me = (ClientImpl*) userData;
6471  //Client wrapper(me);
6472  // Go ahead and signal any waiters if they are around...
6473  me->clearAcks(failedConnectionVersion);
6474 }
6475 
6476 inline amps_result
6477 ClientImpl::ClientImplDisconnectHandler(amps_handle /*client*/, void* userData)
6478 {
6479  ClientImpl* me = (ClientImpl*) userData;
6480  Lock<Mutex> l(me->_lock);
6481  Client wrapper(me,false);
6482  if (me->_connected)
6483  me->broadcastConnectionStateChanged(ConnectionStateListener::Disconnected);
6484  while(true)
6485  {
6486  AtomicFlagFlip subFlip(&me->_badTimeToHASubscribe);
6487  try
6488  {
6489  AtomicFlagFlip pubFlip(&me->_badTimeToHAPublish);
6490  me->_connected = false;
6491  {
6492  // Have to release the lock here or receive thread can't
6493  // invoke the message handler.
6494  Unlock<Mutex> unlock(me->_lock);
6495  me->_disconnectHandler.invoke(wrapper);
6496  }
6497  }
6498  catch(const std::exception& ex)
6499  {
6500  AMPS_UNHANDLED_EXCEPTION_2(me,ex);
6501  }
6502 
6503  if (!me->_connected)
6504  {
6505  me->broadcastConnectionStateChanged(ConnectionStateListener::Shutdown);
6506  AMPS_UNHANDLED_EXCEPTION_2(me,DisconnectedException("Reconnect failed."));
6507  return AMPS_E_DISCONNECTED;
6508  }
6509  try
6510  {
6511  // Resubscribe
6512  if (me->_subscriptionManager)
6513  {
6514  {
6515  // Have to release the lock here or receive thread can't
6516  // invoke the message handler.
6517  Unlock<Mutex> unlock(me->_lock);
6518  me->_subscriptionManager->resubscribe(wrapper);
6519  }
6520  me->broadcastConnectionStateChanged(ConnectionStateListener::Resubscribed);
6521  }
6522  return AMPS_E_OK;
6523  }
6524  catch(const AMPSException& subEx)
6525  {
6526  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6527  }
6528  catch(const std::exception& subEx)
6529  {
6530  AMPS_UNHANDLED_EXCEPTION_2(me,subEx);
6531  return AMPS_E_RETRY;
6532  }
6533  catch(...)
6534  {
6535  return AMPS_E_RETRY;
6536  }
6537  }
6538  return AMPS_E_RETRY;
6539 }
6540 
6541 class FIX
6542 {
6543  const char* _data;
6544  size_t _len;
6545  char _fieldSep;
6546 public:
6547  class iterator
6548  {
6549  const char* _data;
6550  size_t _len;
6551  size_t _pos;
6552  char _fieldSep;
6553  iterator(const char* data_, size_t len_, size_t pos_, char fieldSep_)
6554  : _data(data_), _len(len_),_pos(pos_), _fieldSep(fieldSep_)
6555  {
6556  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6557  }
6558  public:
6559  typedef void* difference_type;
6560  typedef std::forward_iterator_tag iterator_category;
6561  typedef std::pair<Message::Field, Message::Field> value_type;
6562  typedef value_type* pointer;
6563  typedef value_type& reference;
6564  bool operator==(const iterator& rhs) const
6565  {
6566  return _pos == rhs._pos;
6567  }
6568  bool operator!=(const iterator& rhs) const
6569  {
6570  return _pos != rhs._pos;
6571  }
6572  iterator& operator++()
6573  {
6574  // Skip through the data
6575  while(_pos != _len && _data[_pos] != _fieldSep) ++_pos;
6576  // Skip through any field separators
6577  while(_pos != _len && _data[_pos] == _fieldSep) ++_pos;
6578  return *this;
6579  }
6580 
6581  value_type operator*() const
6582  {
6583  value_type result;
6584  size_t i = _pos, keyLength =0, valueStart = 0, valueLength = 0;
6585  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
6586 
6587  result.first.assign(_data+_pos, keyLength);
6588 
6589  if (i < _len && _data[i] == '=')
6590  {
6591  ++i;
6592  valueStart = i;
6593  for(; i < _len && _data[i] != _fieldSep; ++i)
6594  {
6595  valueLength++;
6596  }
6597  }
6598  result.second.assign(_data+valueStart, valueLength);
6599  return result;
6600  }
6601 
6602  friend class FIX;
6603  };
6604  class reverse_iterator
6605  {
6606  const char* _data;
6607  size_t _len;
6608  const char* _pos;
6609  char _fieldSep;
6610  public:
6611  typedef std::pair<Message::Field, Message::Field> value_type;
6612  reverse_iterator(const char* data, size_t len, const char* pos, char fieldsep)
6613  : _data(data), _len(len), _pos(pos), _fieldSep(fieldsep)
6614  {
6615  if (_pos)
6616  {
6617  // skip past meaningless trailing fieldseps
6618  while(_pos >=_data && *_pos == _fieldSep) --_pos;
6619  while(_pos > _data && *_pos != _fieldSep) --_pos;
6620  // if we stopped before the 0th character, it's because
6621  // it's a field sep. advance one to point to the first character
6622  // of a key.
6623  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6624  if (_pos < _data) _pos = 0;
6625  }
6626  }
6627  bool operator==(const reverse_iterator& rhs) const
6628  {
6629  return _pos == rhs._pos;
6630  }
6631  bool operator!=(const reverse_iterator& rhs) const
6632  {
6633  return _pos != rhs._pos;
6634  }
6635  reverse_iterator& operator++()
6636  {
6637  if (_pos == _data)
6638  {
6639  _pos = 0;
6640  }
6641  else
6642  {
6643  // back up 1 to a field separator
6644  --_pos;
6645  // keep backing up through field separators
6646  while(_pos >=_data && *_pos == _fieldSep) --_pos;
6647  // now back up to the beginning of this field
6648  while(_pos >_data && *_pos != _fieldSep) --_pos;
6649  if (_pos > _data || (_pos==_data && *_pos == _fieldSep)) ++_pos;
6650  if (_pos < _data) _pos = 0;
6651  }
6652  return *this;
6653  }
6654  value_type operator*() const
6655  {
6656  value_type result;
6657  size_t keyLength = 0, valueStart = 0, valueLength = 0;
6658  size_t i = (size_t)(_pos - _data);
6659  for(; i < _len && _data[i] != '='; ++i) ++keyLength;
6660  result.first.assign(_pos, keyLength);
6661  if (i<_len && _data[i] == '=')
6662  {
6663  ++i;
6664  valueStart = i;
6665  for(; i<_len && _data[i] != _fieldSep; ++i)
6666  {
6667  valueLength++;
6668  }
6669  }
6670  result.second.assign(_data+valueStart, valueLength);
6671  return result;
6672  }
6673  };
6674  FIX(const Message::Field& data, char fieldSeparator=1)
6675  : _data(data.data()), _len(data.len()),
6676  _fieldSep(fieldSeparator)
6677  {
6678  }
6679 
6680  FIX(const char* data, size_t len, char fieldSeparator=1)
6681  : _data(data), _len(len), _fieldSep(fieldSeparator)
6682  {
6683  }
6684 
6685  iterator begin() const
6686  {
6687  return iterator(_data, _len, 0, _fieldSep);
6688  }
6689  iterator end() const
6690  {
6691  return iterator(_data, _len, _len, _fieldSep);
6692  }
6693 
6694 
6695  reverse_iterator rbegin() const
6696  {
6697  return reverse_iterator(_data, _len, _data+(_len-1), _fieldSep);
6698  }
6699 
6700  reverse_iterator rend() const
6701  {
6702  return reverse_iterator(_data, _len, 0, _fieldSep);
6703  }
6704 };
6705 
6706 
6719 
6720 template <class T>
6722 {
6723  std::stringstream _data;
6724  char _fs;
6725 public:
6731  _FIXBuilder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
6732 
6740  void append(const T& tag, const char* value, size_t offset, size_t length)
6741  {
6742  _data << tag<<'=';
6743  _data.write(value+offset, (std::streamsize)length);
6744  _data << _fs;
6745  }
6751  void append(const T& tag, const std::string& value)
6752  {
6753  _data << tag << '=' << value << _fs;
6754  }
6755 
6758  std::string getString() const
6759  {
6760  return _data.str();
6761  }
6762  operator std::string() const
6763  {
6764  return _data.str();
6765  }
6766 
6768  void reset()
6769  {
6770  _data.str(std::string());
6771  }
6772 };
6773 
6777 
6779 
6783 
6785 
6786 
6794 
6796 {
6797  char _fs;
6798 public:
6803  FIXShredder(char fieldSep_ = (char)1) : _fs(fieldSep_) {;}
6804 
6807  typedef std::map<Message::Field, Message::Field> map_type;
6808 
6814  map_type toMap(const Message::Field& data)
6815  {
6816  FIX fix(data, _fs);
6817  map_type retval;
6818  for(FIX::iterator a = fix.begin(); a!= fix.end(); ++a)
6819  {
6820  retval.insert(*a);
6821  }
6822 
6823  return retval;
6824  }
6825 };
6826 
6827 class MessageStreamImpl : public AMPS::RefBody, AMPS::ConnectionStateListener
6828 {
6829  Mutex _lock;
6830  std::deque<Message> _q;
6831  std::string _commandId;
6832  std::string _subId;
6833  std::string _queryId;
6834  Client _client;
6835  unsigned _timeout;
6836  unsigned _maxDepth;
6837  unsigned _requestedAcks;
6838  Message::Field _previousTopic;
6839  Message::Field _previousBookmark;
6840  volatile enum { Unset=0x0, Running=0x10, Subscribe=0x11, SOWOnly=0x12, AcksOnly=0x13, Conflate=0x14, Closed=0x1, Disconnected=0x2 } _state;
6841  typedef std::map<std::string, Message*> SOWKeyMap;
6842  SOWKeyMap _sowKeyMap;
6843  public:
6844  MessageStreamImpl(const Client& client_)
6845  : _client(client_),
6846  _timeout(0),
6847  _maxDepth((unsigned)~0),
6848  _requestedAcks(0),
6849  _state(Unset)
6850  {
6851  if (_client.isValid())
6852  {
6853  _client.addConnectionStateListener(this);
6854  }
6855  }
6856 
6857  MessageStreamImpl(ClientImpl* client_)
6858  : _client(client_),
6859  _timeout(0),
6860  _maxDepth((unsigned)~0),
6861  _requestedAcks(0),
6862  _state(Unset)
6863  {
6864  if (_client.isValid())
6865  {
6866  _client.addConnectionStateListener(this);
6867  }
6868  }
6869 
6870  ~MessageStreamImpl()
6871  {
6872  }
6873 
6874  virtual void destroy()
6875  {
6876  try
6877  {
6878  close();
6879  }
6880  catch(std::exception &e)
6881  {
6882  try
6883  {
6884  if (_client.isValid())
6885  {
6886  _client.getExceptionListener().exceptionThrown(e);
6887  }
6888  } catch (...) {}
6889  }
6890  if (_client.isValid())
6891  {
6892  _client.removeConnectionStateListener(this);
6893  Client c = _client;
6894  _client = Client((ClientImpl*)NULL);
6895  c.deferredExecution(MessageStreamImpl::destroyer, this);
6896  }
6897  else
6898  {
6899  delete this;
6900  }
6901  }
6902 
6903  static void destroyer(void* vpMessageStreamImpl_)
6904  {
6905  delete ((MessageStreamImpl*)vpMessageStreamImpl_);
6906  }
6907 
6908  void setSubscription(const std::string& subId_,
6909  const std::string& commandId_ = "",
6910  const std::string& queryId_ = "")
6911  {
6912  Lock<Mutex> lock(_lock);
6913  _subId = subId_;
6914  if (!commandId_.empty() && commandId_ != subId_)
6915  _commandId = commandId_;
6916  if (!queryId_.empty() && queryId_ != subId_ && queryId_ != commandId_)
6917  _queryId = queryId_;
6918  // It's possible to disconnect between creation/registration and here.
6919  if (Disconnected == _state) return;
6920  assert(Unset==_state);
6921  _state = Subscribe;
6922  }
6923 
6924  void setSOWOnly(const std::string& commandId_,
6925  const std::string& queryId_ = "")
6926  {
6927  Lock<Mutex> lock(_lock);
6928  _commandId = commandId_;
6929  if (!queryId_.empty() && queryId_ != commandId_)
6930  _queryId = queryId_;
6931  // It's possible to disconnect between creation/registration and here.
6932  if (Disconnected == _state) return;
6933  assert(Unset==_state);
6934  _state = SOWOnly;
6935  }
6936 
6937  void setStatsOnly(const std::string& commandId_,
6938  const std::string& queryId_ = "")
6939  {
6940  Lock<Mutex> lock(_lock);
6941  _commandId = commandId_;
6942  if (!queryId_.empty() && queryId_ != commandId_)
6943  _queryId = queryId_;
6944  // It's possible to disconnect between creation/registration and here.
6945  if (Disconnected == _state) return;
6946  assert(Unset==_state);
6947  _state = AcksOnly;
6948  _requestedAcks = Message::AckType::Stats;
6949  }
6950 
6951  void setAcksOnly(const std::string& commandId_, unsigned acks_)
6952  {
6953  Lock<Mutex> lock(_lock);
6954  _commandId = commandId_;
6955  // It's possible to disconnect between creation/registration and here.
6956  if (Disconnected == _state) return;
6957  assert(Unset==_state);
6958  _state = AcksOnly;
6959  _requestedAcks = acks_;
6960  }
6961 
6962  void connectionStateChanged(ConnectionStateListener::State state_)
6963  {
6964  Lock<Mutex> lock(_lock);
6965  if(state_ == AMPS::ConnectionStateListener::Disconnected)
6966  {
6967  _state = Disconnected;
6968  }
6969  _lock.signalAll();
6970  }
6971 
6972  void timeout(unsigned timeout_)
6973  {
6974  _timeout = timeout_;
6975  }
6976  void conflate(void)
6977  {
6978  if(_state == Subscribe) _state = Conflate;
6979  }
6980  void maxDepth(unsigned maxDepth_)
6981  {
6982  if(maxDepth_) _maxDepth = maxDepth_;
6983  else _maxDepth = (unsigned)~0;
6984  }
6985  unsigned getMaxDepth(void) const
6986  {
6987  return _maxDepth;
6988  }
6989  unsigned getDepth(void) const
6990  {
6991  return (unsigned)(_q.size());
6992  }
6993 
6994  bool next(Message& current_)
6995  {
6996  Lock<Mutex> lock(_lock);
6997  if (!_previousTopic.empty() && !_previousBookmark.empty())
6998  {
6999  try
7000  {
7001  if (_client.isValid())
7002  {
7003  _client.ackDeferredAutoAck(_previousTopic, _previousBookmark);
7004  }
7005  }
7006 #ifdef _WIN32
7007  catch (AMPSException&)
7008 #else
7009  catch (AMPSException& e)
7010 #endif
7011  {
7012  current_.invalidate();
7013  _previousTopic.clear();
7014  _previousBookmark.clear();
7015  return false;
7016  }
7017  _previousTopic.clear();
7018  _previousBookmark.clear();
7019  }
7020  double minWaitTime = (double)(_timeout ? _timeout : 1000);
7021  Timer timer(minWaitTime);
7022  while(_q.empty() && _state & Running)
7023  {
7024  // Using timeout so python can interrupt
7025  _lock.wait((long)minWaitTime);
7026  amps_invoke_waiting_function();
7027  if (_timeout)
7028  {
7029  // In case we woke up early, see how much longer to wait
7030  if(timer.checkAndGetRemaining(&minWaitTime))
7031  {
7032  break;
7033  }
7034  }
7035  }
7036  if(!_q.empty())
7037  {
7038  current_ = _q.front();
7039  if(_q.size() == _maxDepth) _lock.signalAll();
7040  _q.pop_front();
7041  if(_state == Conflate)
7042  {
7043  std::string sowKey = current_.getSowKey();
7044  if(sowKey.length()) _sowKeyMap.erase(sowKey);
7045  }
7046  else if(_state == AcksOnly)
7047  {
7048  _requestedAcks &= ~(current_.getAckTypeEnum());
7049  }
7050  if((_state == AcksOnly && _requestedAcks == 0) ||
7051  (_state == SOWOnly && current_.getCommand()=="group_end"))
7052  {
7053  _state = Closed;
7054  }
7055  else if (current_.getCommandEnum() == Message::Command::Publish &&
7056  _client.isValid() && _client.getAutoAck() &&
7057  !current_.getLeasePeriod().empty() &&
7058  !current_.getBookmark().empty())
7059  {
7060  _previousTopic = current_.getTopic().deepCopy();
7061  _previousBookmark = current_.getBookmark().deepCopy();
7062  }
7063  return true;
7064  }
7065  if(_state == Disconnected)
7066  {
7067  throw DisconnectedException("Connection closed.");
7068  }
7069  current_.invalidate();
7070  if(_state == Closed)
7071  {
7072  return false;
7073  }
7074  return _timeout != 0;
7075  }
7076  void close(void)
7077  {
7078  if (_client.isValid())
7079  {
7080  if (_state == SOWOnly || _state == Subscribe) //not delete
7081  {
7082  if (!_commandId.empty()) _client.unsubscribe(_commandId);
7083  if (!_subId.empty()) _client.unsubscribe(_subId);
7084  if (!_queryId.empty()) _client.unsubscribe(_queryId);
7085  }
7086  else
7087  {
7088  if (!_commandId.empty()) _client.removeMessageHandler(_commandId);
7089  if (!_subId.empty()) _client.removeMessageHandler(_subId);
7090  if (!_queryId.empty()) _client.removeMessageHandler(_queryId);
7091  }
7092  }
7093  if(_state==SOWOnly || _state==Subscribe || _state==Unset)
7094  {
7095  _state = Closed;
7096  }
7097  }
7098  static void _messageHandler(const Message& message_, MessageStreamImpl* this_)
7099  {
7100  Lock<Mutex> lock(this_->_lock);
7101  if(this_->_state != Conflate)
7102  {
7103  AMPS_TESTING_SLOW_MESSAGE_STREAM
7104  if(this_->_q.size() >= this_->_maxDepth)
7105  {
7106  // We throw here so that heartbeats can be sent. The exception
7107  // will be handled internally only, and the same Message will
7108  // come back to try again. Make sure to signal.
7109  this_->_lock.signalAll();
7110  throw MessageStreamFullException("Stream is currently full.");
7111  }
7112  this_->_q.push_back(message_.deepCopy());
7113  if (message_.getCommandEnum() == Message::Command::Publish &&
7114  this_->_client.isValid() && this_->_client.getAutoAck() &&
7115  !message_.getLeasePeriod().empty() &&
7116  !message_.getBookmark().empty())
7117  {
7118  message_.setIgnoreAutoAck();
7119  }
7120  }
7121  else
7122  {
7123  std::string sowKey = message_.getSowKey();
7124  if(sowKey.length())
7125  {
7126  SOWKeyMap::iterator it = this_->_sowKeyMap.find(sowKey);
7127  if(it != this_->_sowKeyMap.end())
7128  {
7129  *(it->second) = message_.deepCopy();
7130  }
7131  else
7132  {
7133  if(this_->_q.size() >= this_->_maxDepth)
7134  {
7135  // We throw here so that heartbeats can be sent. The
7136  // exception will be handled internally only, and the
7137  // same Message will come back to try again. Make sure
7138  // to signal.
7139  this_->_lock.signalAll();
7140  throw MessageStreamFullException("Stream is currently full.");
7141  }
7142  this_->_q.push_back(message_.deepCopy());
7143  this_->_sowKeyMap[sowKey] = &(this_->_q.back());
7144  }
7145  }
7146  else
7147  {
7148  while(this_->_q.size() >= this_->_maxDepth)
7149  {
7150  this_->_lock.wait(1);
7151  }
7152  this_->_q.push_back(message_.deepCopy());
7153  }
7154  }
7155  this_->_lock.signalAll();
7156  }
7157 };
7158 inline MessageStream::MessageStream(void)
7159 {
7160 }
7161 inline MessageStream::MessageStream(const Client& client_)
7162  :_body(new MessageStreamImpl(client_))
7163 {
7164 }
7165 inline void MessageStream::iterator::advance(void)
7166 {
7167  _pStream = _pStream->_body->next(_current) ? _pStream:NULL;
7168 }
7169 inline MessageStream::operator MessageHandler(void)
7170 {
7171  return MessageHandler((void(*)(const Message&,void*))MessageStreamImpl::_messageHandler, &_body.get());
7172 }
7173 inline MessageStream MessageStream::fromExistingHandler(const MessageHandler& handler_)
7174 {
7175  MessageStream result;
7176  if(handler_._func == (MessageHandler::FunctionType)MessageStreamImpl::_messageHandler)
7177  {
7178  result._body = (MessageStreamImpl*)(handler_._userData);
7179  }
7180  return result;
7181 }
7182 
7183 inline void MessageStream::setSOWOnly(const std::string& commandId_,
7184  const std::string& queryId_)
7185 {
7186  _body->setSOWOnly(commandId_, queryId_);
7187 }
7188 inline void MessageStream::setSubscription(const std::string& subId_,
7189  const std::string& commandId_,
7190  const std::string& queryId_)
7191 {
7192  _body->setSubscription(subId_, commandId_, queryId_);
7193 }
7194 inline void MessageStream::setStatsOnly(const std::string& commandId_,
7195  const std::string& queryId_)
7196 {
7197  _body->setStatsOnly(commandId_, queryId_);
7198 }
7199 inline void MessageStream::setAcksOnly(const std::string& commandId_,
7200  unsigned acks_)
7201 {
7202  _body->setAcksOnly(commandId_, acks_);
7203 }
7204 inline MessageStream MessageStream::timeout(unsigned timeout_)
7205 {
7206  _body->timeout(timeout_);
7207  return *this;
7208 }
7210 {
7211  _body->conflate();
7212  return *this;
7213 }
7214 inline MessageStream MessageStream::maxDepth(unsigned maxDepth_)
7215 {
7216  _body->maxDepth(maxDepth_);
7217  return *this;
7218 }
7219 inline unsigned MessageStream::getMaxDepth(void) const
7220 {
7221  return _body->getMaxDepth();
7222 }
7223 inline unsigned MessageStream::getDepth(void) const
7224 {
7225  return _body->getDepth();
7226 }
7227 
7228 inline MessageStream ClientImpl::getEmptyMessageStream(void)
7229 {
7230  return *(_pEmptyMessageStream.get());
7231 }
7232 
7234 {
7235  // If the command is sow and has a sub_id, OR
7236  // if the command has a replace option, return the existing
7237  // messagestream, don't create a new one.
7238  Message& message = command_.getMessage();
7239  Field subId = message.getSubscriptionId();
7240  unsigned ackTypes = message.getAckTypeEnum();
7241  bool useExistingHandler = !subId.empty() && ((!message.getOptions().empty() && message.getOptions().contains("replace",7)) || message.getCommandEnum() == Message::Command::SOW);
7242  if(useExistingHandler)
7243  {
7244  // Try to find the existing message handler.
7245  if(!subId.empty())
7246  {
7247  MessageHandler existingHandler;
7248  if (_body.get()._routes.getRoute(subId, existingHandler))
7249  {
7250  // we found an existing handler. It might not be a message stream, but that's okay.
7251  _body.get().executeAsync(command_, existingHandler, false);
7252  return MessageStream::fromExistingHandler(existingHandler);
7253  }
7254  }
7255  // fall through; we'll a new handler altogether.
7256  }
7257  // Make sure something will be returned to the stream or use the empty one
7258  Message::Command::Type command = command_.getMessage().getCommandEnum();
7259  if ((command & Message::Command::NoDataCommands)
7260  && (ackTypes == Message::AckType::Persisted
7261  || ackTypes == Message::AckType::None))
7262  {
7263  executeAsync(command_, MessageHandler());
7264  if (!_body.get()._pEmptyMessageStream)
7265  {
7266  _body.get()._pEmptyMessageStream.reset(new MessageStream((ClientImpl*)0));
7267  _body.get()._pEmptyMessageStream.get()->_body->close();
7268  }
7269  return _body.get().getEmptyMessageStream();
7270  }
7271  MessageStream stream(*this);
7272  if (_body.get().getDefaultMaxDepth())
7273  stream.maxDepth(_body.get().getDefaultMaxDepth());
7274  MessageHandler handler = stream.operator MessageHandler();
7275  std::string commandID = _body.get().executeAsync(command_, handler, false);
7276  if (command_.hasStatsAck())
7277  {
7278  stream.setStatsOnly(commandID, command_.getMessage().getQueryId());
7279  }
7280  else if (command_.isSow())
7281  {
7282  stream.setSOWOnly(commandID, command_.getMessage().getQueryId());
7283  }
7284  else if (command_.isSubscribe())
7285  {
7286  stream.setSubscription(commandID,
7287  command_.getMessage().getCommandId(),
7288  command_.getMessage().getQueryId());
7289  }
7290  else
7291  {
7292  // Persisted acks for writes don't come back with command id
7293  if (command == Message::Command::Publish ||
7294  command == Message::Command::DeltaPublish ||
7295  command == Message::Command::SOWDelete)
7296  {
7297  stream.setAcksOnly(commandID,
7298  ackTypes & (unsigned)~Message::AckType::Persisted);
7299  }
7300  else
7301  {
7302  stream.setAcksOnly(commandID, ackTypes);
7303  }
7304  }
7305  return stream;
7306 }
7307 
7308 // This is here because it uses api from Client.
7309 inline void Message::ack(const char* options_) const
7310 {
7311  ClientImpl* pClient = _body.get().clientImpl();
7312  Message::Field bookmark = getBookmark();
7313  if(pClient && bookmark.len() &&
7314  !pClient->getAutoAck())
7315  //(!pClient->getAutoAck() || getIgnoreAutoAck()))
7316  pClient->ack(getTopic(),bookmark,options_);
7317 }
7318 }// end namespace AMPS
7319 #endif
Command & setCorrelationId(const std::string &v_)
Set the correlation ID for this command.
Definition: ampsplusplus.hpp:504
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:181
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1229
Core type and function declarations for the AMPS C client.
Client(const std::string &clientName="")
Constructs a new client with a given client name.
Definition: ampsplusplus.hpp:3937
Field getUserId() const
Retrieves the value of the UserId header of the Message as a new Field.
Definition: Message.hpp:1012
static const unsigned int IdentifierLength
The length of identifiers used for unique identification of commands and subscriptions.
Definition: Message.hpp:457
std::string getAckType() const
Definition: ampsplusplus.hpp:583
bool removeMessageHandler(const Field &commandId_)
Removes a MessageHandler for a given ComandId from self.
Definition: ampsplusplus.hpp:4133
Message & assignTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1032
Provides a convenient way of building messages in FIX format, typically referenced using the typedefs...
Definition: ampsplusplus.hpp:6721
void startTimer()
Definition: ampsplusplus.hpp:5494
MessageStream sowAndSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5051
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:706
Command & addAckType(const std::string &v_)
Definition: ampsplusplus.hpp:555
MessageStream conflate(void)
Sets self to conflation mode, where a new update for a matching sow key will replace the previous one...
Definition: ampsplusplus.hpp:7209
std::string subscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4618
Field getSequence() const
Retrieves the value of the Sequence header of the Message as a new Field.
Definition: Message.hpp:1119
Message & setCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1118
Message & setQueryID(const std::string &v)
Sets the value of the QueryID header for this Message.
Definition: Message.hpp:1112
Command(const std::string &command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:433
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:939
void setGlobalCommandTypeMessageHandler(const Message::Command::Type command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type: currently supported types are heartbeat message...
Definition: ampsplusplus.hpp:5785
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: BookmarkStore.hpp:404
void setDefaultMaxDepth(unsigned maxDepth_)
Sets a default max depth on all subsequently created MessageStream objects.
Definition: ampsplusplus.hpp:6060
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1105
static size_t convertVersionToNumber(const std::string &version_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4038
void unsubscribe(const std::string &commandId)
Unsubscribe from a topic.
Definition: ampsplusplus.hpp:4848
Command & setQueryId(const std::string &v_)
Definition: ampsplusplus.hpp:493
Message & setOrderBy(const std::string &v)
Sets the value of the OrderBy header for this Message.
Definition: Message.hpp:1116
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:678
void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Called internally to indicate messages up to and including bookmark are replicated to all replication...
Definition: BookmarkStore.hpp:384
unsigned getDefaultMaxDepth(void) const
Returns the default max depth for returned MessageStream objects.
Definition: ampsplusplus.hpp:6069
void ack(Message &message_, const char *options_=NULL)
Acknoweldge a message queue message by supplying the message directly: this adds the ack to the curre...
Definition: ampsplusplus.hpp:5934
MessageStream bookmarkSubscribe(const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:4803
Command & setFilter(const std::string &v_)
Definition: ampsplusplus.hpp:487
void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: BookmarkStore.hpp:278
void setPublishStore(const Store &publishStore_)
Set the publish store to be used by the client.
Definition: ampsplusplus.hpp:4257
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4511
static size_t convertVersionToNumber(const char *data_, size_t len_)
Converts a string version, such as "3.8.1.5" into the same numeric form used internally and returned ...
Definition: ampsplusplus.hpp:4053
DisconnectHandler getDisconnectHandler(void)
Returns the callback function that is invoked when a disconnect occurs.
Definition: ampsplusplus.hpp:4183
void AMPSDLL amps_client_disconnect(amps_handle handle)
Disconnects from the AMPS server, if connected.
Message & setFilter(const std::string &v)
Sets the value of the Filter header for this Message.
Definition: Message.hpp:1033
amps_result amps_client_set_read_timeout(amps_handle client, int readTimeout)
Sets a read timeout (seconds), in which if no message is received, the connection is presumed dead...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:668
void setDuplicateMessageHandler(const MessageHandler &duplicateMessageHandler_)
Sets a callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4273
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:483
void removeConnectionStateListener(ConnectionStateListener *listener)
Attempts to remove listener from self&#39;s set of ConnectionStateListeners.
Definition: ampsplusplus.hpp:5812
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:657
int getAckTimeout(void) const
Returns the current value of the message queue ack timeout setting – that is, the amount of time aft...
Definition: ampsplusplus.hpp:6018
VersionInfo getServerVersionInfo() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4024
Message sowDeleteByKeys(const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5558
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:446
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1095
amps_uint64_t getLastPersisted()
Get the last persisted message sequence in the store.
Definition: ampsplusplus.hpp:895
std::string getString() const
Returns the current contents of this builder as a string.
Definition: ampsplusplus.hpp:6758
void setTransportFilterFunction(amps_transport_filter_function filter_, void *userData_)
Sets a filter function on the transport that is called with all raw data sent or received.
Definition: ampsplusplus.hpp:6080
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4409
iterator begin(void)
Returns an iterator representing the beginning of the topic or subscription.
Definition: ampsplusplus.hpp:3844
Field getLeasePeriod() const
Retrieves the value of the LeasePeriod header of the Message as a new Field.
Definition: Message.hpp:1110
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5237
void send(const Message &message)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4109
Message & setSowKey(const std::string &v)
Sets the value of the SowKey header for this Message.
Definition: Message.hpp:1128
static size_t getUnsetPosition()
Method to return the value used to represent not found or unset.
Definition: ampsplusplus.hpp:758
void setRetryOnDisconnect(bool isRetryOnDisconnect_)
Enables or disables automatic retry of a command to AMPS after a reconnect.
Definition: ampsplusplus.hpp:6042
unsigned getMaxDepth(void) const
Gets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7219
Message & assignUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1012
void connect(const std::string &uri)
Connect to an AMPS server.
Definition: ampsplusplus.hpp:4084
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
Abstract base class to manage all subscriptions placed on a client so that they can be re-established...
Definition: ampsplusplus.hpp:989
Success.
Definition: amps.h:189
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:999
std::string deltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:4701
amps_handle AMPSDLL amps_client_create(const amps_char *clientName)
Functions for creation of an AMPS client.
std::string authenticate(const std::string &, const std::string &password_)
A simple implementation that returns an unmodified password.
Definition: ampsplusplus.hpp:652
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4490
FIXShredder(char fieldSep_=(char) 1)
Construct an instance of FIXShredder using the specified value as the delimiter between fields...
Definition: ampsplusplus.hpp:6803
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:498
void addMessageHandler(const Field &commandId_, const AMPS::MessageHandler &messageHandler_, unsigned requestedAcks_, bool isSubscribe_)
Adds a MessageHandler to be invoked for Messages with the given CommandId as their command id...
Definition: ampsplusplus.hpp:4122
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
bool isValid() const
Returns true if self is a valid stream that may be iterated.
Definition: ampsplusplus.hpp:3840
amps_result
Return values from amps_xxx functions.
Definition: amps.h:184
FailedWriteHandler * getFailedWriteHandler()
Get the handler that is invoked to report on failed writes.
Definition: ampsplusplus.hpp:4310
Field getAckType() const
Retrieves the value of the AckType header of the Message as a new Field.
Definition: Message.hpp:1045
Field getCommand() const
Retrieves the value of the Command header of the Message as a new Field.
Definition: Message.hpp:1009
MessageStream execute(Command &command_)
Execute the provided command and return messages received in response in a MessageStream.
Definition: ampsplusplus.hpp:7233
void(* amps_transport_filter_function)(const unsigned char *, size_t, short, void *)
Prototype for a user-supplied callback function for filtering data before it is sent and after it is ...
Definition: amps.h:601
amps_result AMPSDLL amps_client_connect(amps_handle handle, const amps_char *uri)
Connects to the AMPS server specified in uri.
Message & setTopic(const std::string &v)
Sets the value of the Topic header for this Message.
Definition: Message.hpp:1032
Store getPublishStore()
Get the publish store used by the client.
Definition: ampsplusplus.hpp:4265
void setThreadCreatedCallback(amps_thread_created_callback callback_, void *userData_)
Sets a callback function on the transport that is called when a new thread is created to receive data...
Definition: ampsplusplus.hpp:6092
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5019
unsigned getAckTypeEnum() const
Definition: ampsplusplus.hpp:588
size_t getServerVersion() const
Returns the server version retrieved during logon.
Definition: ampsplusplus.hpp:4013
Command & setExpiration(unsigned v_)
Set the expiration time for a publish command.
Definition: ampsplusplus.hpp:553
State
Constants for the state of the connection.
Definition: ampsplusplus.hpp:1023
Command & setData(const char *v_, size_t length_)
Sets the data for this command.
Definition: ampsplusplus.hpp:525
std::string logon(int timeout_=0, Authenticator &authenticator_=DefaultAuthenticator::instance(), const char *options_=NULL)
Logon to the server, providing the client name, credentials (if available), and client information (s...
Definition: ampsplusplus.hpp:4557
void discardUpTo(amps_uint64_t index_)
Called by Client to indicate that all messages up to and including.
Definition: ampsplusplus.hpp:828
std::string sowAndDeltaSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5334
Command & setAckType(unsigned v_)
Definition: ampsplusplus.hpp:573
std::string send(MessageHandler messageHandler, Message &message, int timeout=0)
Sends a Message to the connected AMPS server, performing only minimal validation and bypassing client...
Definition: ampsplusplus.hpp:4161
MessageStream sowAndSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5193
Client represents a connection to an AMPS server, but does not provide failover or reconnection behav...
Definition: ampsplusplus.hpp:3920
void setGlobalCommandTypeMessageHandler(const std::string &command_, const MessageHandler &handler_)
Sets a handler for all messages of a particular type, or for messages that would be delivered to a pa...
Definition: ampsplusplus.hpp:5760
Message & newCommandId()
Creates and sets a new sequential value for the CommandId header for this Message.
Definition: Message.hpp:1010
void addConnectionStateListener(ConnectionStateListener *listener)
Adds a ConnectionStateListener to self&#39;s set of listeners.
Definition: ampsplusplus.hpp:5804
Command & reset(const std::string &command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:448
Message & setSequence(const std::string &v)
Sets the value of the Sequence header for this Message.
Definition: Message.hpp:1119
amps_uint64_t publish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4358
Message & setUserId(const std::string &v)
Sets the value of the UserId header for this Message.
Definition: Message.hpp:1012
void publishFlush(long timeout_=0)
Ensure that AMPS messages are sent and have been processed by the AMPS server.
Definition: ampsplusplus.hpp:4450
Field getTopic() const
Retrieves the value of the Topic header of the Message as a new Field.
Definition: Message.hpp:1032
SubscriptionManager * getSubscriptionManager()
Get the subscription manager being used by the client.
Definition: ampsplusplus.hpp:4221
Command(Message::Command::Type command_)
Creates an object to represent the given AMPS command, such as "sow" or "subscribe".
Definition: ampsplusplus.hpp:440
Message & setMessageType(const std::string &v)
Sets the value of the MessageType header for this Message.
Definition: Message.hpp:1034
Field getCommandId() const
Retrieves the value of the CommandId header of the Message as a new Field.
Definition: Message.hpp:1010
std::string sowDelete(MessageHandler messageHandler, const std::string &topic, const std::string &filter, long timeout)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5446
const ExceptionListener & getExceptionListener(void) const
Returns the exception listener set on this Client.
Definition: ampsplusplus.hpp:5671
static Authenticator & instance()
Static function to return a static instance used when no Authenticator is supplied to a Client...
Definition: ampsplusplus.hpp:669
Abstract base class for connection state listeners.
Definition: ampsplusplus.hpp:1019
Message & setSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1105
Command & setTopic(const std::string &v_)
Definition: ampsplusplus.hpp:485
amps_result amps_client_attempt_reconnect(amps_handle client, unsigned version)
Manually invokes the user-supplied disconnect handler for this client.
amps_result amps_client_set_thread_created_callback(amps_handle client, amps_thread_created_callback callback, void *userData)
Sets a user-supplied callback function to allow thread attributes to set when a new thread is created...
Command & setBatchSize(unsigned v_)
Sets the batch size for this command, which controls how many records are sent together in the result...
Definition: ampsplusplus.hpp:542
std::string sowAndSubscribe(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5133
#define AMPS_BOOKMARK_RECENT
Start the subscription at the first undiscarded message in the bookmark store, or at the end of the b...
Definition: BookmarkStore.hpp:47
Message & assignExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1107
void setDisconnectHandler(DisconnectHandler disconnectHandler)
Sets the function to be called when the client is unintentionally disconnected.
Definition: ampsplusplus.hpp:4175
Command & setOrderBy(const std::string &v_)
Definition: ampsplusplus.hpp:489
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: ampsplusplus.hpp:849
void completed(const std::string &, const std::string &, const std::string &)
Called by Client once a logon completes successfully.
Definition: ampsplusplus.hpp:664
MessageStream deltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:4719
Interface for BookmarkStoreImpl classes.
Definition: BookmarkStore.hpp:225
std::string logon(const char *options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4575
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void completed(const std::string &userName_, const std::string &password_, const std::string &reason_)=0
Called by Client once a logon completes successfully.
Command & setTimeout(unsigned v_)
Sets the client-side timeout for this command.
Definition: ampsplusplus.hpp:535
Message & setClientName(const std::string &v)
Sets the value of the ClientName header for this Message.
Definition: Message.hpp:1011
Field getSowKey() const
Retrieves the value of the SowKey header of the Message as a new Field.
Definition: Message.hpp:1128
void AMPSDLL amps_client_destroy(amps_handle handle)
Disconnects and destroys an AMPS client object.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
A default implementation of Authenticator that only uses an unchanged password and does not implement...
Definition: ampsplusplus.hpp:646
void setLogonCorrelationData(const std::string &logonCorrelationData_)
Sets the logon correlation data for the client.
Definition: ampsplusplus.hpp:3993
Message & setCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1009
std::string bookmarkSubscribe(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:4775
_FIXBuilder(char fieldSep_=(char) 1)
Construct an instance of _FIXBuilder, using the specified separator between fields.
Definition: ampsplusplus.hpp:6731
void ack(const std::string &topic_, const std::string &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark string: this adds the ack to th...
Definition: ampsplusplus.hpp:5946
std::string retry(const std::string &, const std::string &)
Throws an AuthenticationException because retry is not implemented.
Definition: ampsplusplus.hpp:659
Message & assignSubscriptionId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1105
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:700
MessageStream sowAndSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5087
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:783
void append(const T &tag, const char *value, size_t offset, size_t length)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6740
Message & assignCommand(const std::string &v)
Sets the value of the Command header for this Message.
Definition: Message.hpp:1009
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:909
amps_result amps_client_send_with_version(amps_handle client, amps_handle message, unsigned *version_out)
Sends a message to the AMPS server.
virtual std::string retry(const std::string &userName_, const std::string &password_)=0
Called by Client when a logon ack is received with a status of retry.
BookmarkStore getBookmarkStore()
Get the bookmark store being used by the client.
Definition: ampsplusplus.hpp:4213
bool isValid() const
Method to return if there is an underlying implementation for the Store.
Definition: ampsplusplus.hpp:866
void setAckBatchSize(const unsigned ackBatchSize_)
Sets the queue ack batch size setting.
Definition: ampsplusplus.hpp:6007
void amps_client_set_message_handler(amps_handle client, amps_handler messageHandler, void *userData)
Sets the message handler function for this client.
Field getPassword() const
Retrieves the value of the Password header of the Message as a new Field.
Definition: Message.hpp:1132
Message & setTopNRecordsReturned(const std::string &v)
Sets the value of the TopNRecordsReturned header for this Message.
Definition: Message.hpp:1115
Class to handle when a client receives a duplicate publish message, or not entitled message...
Definition: ampsplusplus.hpp:937
Message sowDelete(const std::string &topic, const std::string &filter, long timeout=0)
Deletes one or more messages from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5469
Message & setSowKeys(const std::string &v)
Sets the value of the SowKeys header for this Message.
Definition: Message.hpp:1117
void setBookmarkStore(const BookmarkStore &bookmarkStore_)
Set the bookmark store to be used by the client.
Definition: ampsplusplus.hpp:4205
This class multiplexes messages from AMPS to multiple subscribers and uses the stream of acks from AM...
Definition: MessageRouter.hpp:121
Message & setExpiration(const std::string &v)
Sets the value of the Expiration header for this Message.
Definition: Message.hpp:1107
virtual std::string authenticate(const std::string &userName_, const std::string &password_)=0
Called by Client just before the logon command is sent.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Command & setCommandId(const std::string &v_)
Definition: ampsplusplus.hpp:483
void setExceptionListener(const ExceptionListener &listener_)
Definition: ampsplusplus.hpp:5664
Command & reset(Message::Command::Type command_)
Resets the fields of self, and sets the command to command_.
Definition: ampsplusplus.hpp:456
void setSubscriptionManager(SubscriptionManager *subscriptionManager_)
Set the subscription manager to be used by the client.
Definition: ampsplusplus.hpp:4233
void setUnhandledMessageHandler(const AMPS::MessageHandler &messageHandler)
Definition: ampsplusplus.hpp:5727
void append(const T &tag, const std::string &value)
Write a field with the provided tag and value to the message being constructed.
Definition: ampsplusplus.hpp:6751
unsigned getAckBatchSize(void) const
Returns the value of the queue ack batch size setting.
Definition: ampsplusplus.hpp:5997
amps_uint64_t deltaPublish(const std::string &topic_, const std::string &data_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4469
MessageStream sowAndSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new subscription on it.
Definition: ampsplusplus.hpp:5172
Command & setSequence(const std::string &v_)
Definition: ampsplusplus.hpp:509
Represents an iterator over messages in an AMPS topic.
Definition: ampsplusplus.hpp:3811
size_t log(Message &message_)
Log a bookmark to the persistent log.
Definition: BookmarkStore.hpp:265
MessageStream sowAndDeltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5409
size_t unpersistedCount() const
Method to return how many messages are in the store that have not been discarded, indicating that the...
Definition: ampsplusplus.hpp:858
Message & assignAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1045
std::string executeAsync(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:5842
MessageStream sowAndDeltaSubscribe(const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, bool oofEnabled_=false, bool sendEmpties_=false, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5370
std::string logon(const std::string &options_, int timeout_=0)
Logon to the server, providing the client name, credentials (if available) client information (such a...
Definition: ampsplusplus.hpp:4593
Command & setSequence(const amps_uint64_t v_)
Definition: ampsplusplus.hpp:511
void unsubscribe()
Unsubscribe from all topics.
Definition: ampsplusplus.hpp:4860
void setAutoAck(bool isAutoAckEnabled_)
Sets the queue auto-ack setting on this client.
Definition: ampsplusplus.hpp:5989
MessageStream deltaSubscribe(const char *topic_, long timeout_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Delta Subscribe to a topic.
Definition: ampsplusplus.hpp:4735
Message & assignCorrelationId(const std::string &v)
Sets the value of the CorrelationId header for this Message.
Definition: Message.hpp:1118
amps_uint64_t store(const Message &message_)
Called by Client to store a message being published.
Definition: ampsplusplus.hpp:819
void setExceptionListener(const std::shared_ptr< const ExceptionListener > &pListener_)
Sets the exception listener for exceptions that are not thrown back to the user (for example...
Definition: ampsplusplus.hpp:5651
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
StoreImpl * get()
Used to get a pointer to the implementation.
Definition: ampsplusplus.hpp:923
MessageHandler getDuplicateMessageHandler(void)
Returns the callback function that is invoked when a duplicate message is detected.
Definition: ampsplusplus.hpp:4288
amps_result amps_client_set_idle_time(amps_handle client, int idleTime)
Sets an idle-time (milliseconds).
amps_result AMPSDLL amps_client_set_name(amps_handle handle, const amps_char *clientName)
Sets the name on an amps client object.
Class for parsing a FIX format message into a std::map of keys and values, where the keys and values ...
Definition: ampsplusplus.hpp:6795
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
Message & setAckType(const std::string &v)
Sets the value of the AckType header for this Message.
Definition: Message.hpp:1045
virtual ConnectionInfo getConnectionInfo() const
Get the connection information for the current connection.
Definition: ampsplusplus.hpp:4192
void ack(Field &topic_, Field &bookmark_, const char *options_=NULL)
Acknowledge a message queue message by supplying a topic and bookmark: this adds the ack to the curre...
Definition: ampsplusplus.hpp:5922
std::map< Message::Field, Message::Field > map_type
Convenience defintion for the std::map specialization used for this class.
Definition: ampsplusplus.hpp:6807
amps_result amps_client_set_transport_filter_function(amps_handle client, amps_transport_filter_function filter, void *userData)
Sets a user-supplied callback function for filtering data before it is sent and after it is received...
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:804
map_type toMap(const Message::Field &data)
Returns the key/value pairs within the message, represented as AMPS::Field objects that contain point...
Definition: ampsplusplus.hpp:6814
Exception listener for unhandled exceptions.
Definition: ampsplusplus.hpp:204
MessageStream sowAndDeltaSubscribe(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5270
void setHeartbeat(unsigned heartbeatTime_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:5721
Command & setSubId(const std::string &v_)
Definition: ampsplusplus.hpp:491
MessageStream subscribe(const char *topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4674
An iterable object representing the results of an AMPS subscription and/or query. ...
Definition: ampsplusplus.hpp:3803
std::string sow(MessageHandler messageHandler_, const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query a State-of-the-World topic.
Definition: ampsplusplus.hpp:4895
Message & setCommandId(const std::string &v)
Sets the value of the CommandId header for this Message.
Definition: Message.hpp:1010
amps_result(* amps_thread_created_callback)(AMPS_THREAD_T, void *)
Prototype for a user-supplied callback function to allow thread attributes to be set when a new threa...
Definition: amps.h:627
amps_uint64_t getLowestUnpersisted()
Get the oldest unpersisted message sequence in the store.
Definition: ampsplusplus.hpp:887
MessageStream maxDepth(unsigned maxDepth_)
Sets the maximum number of messages that can be held in the underlying queue.
Definition: ampsplusplus.hpp:7214
void amps_client_set_disconnect_handler(amps_handle client, amps_handler disconnectHandler, void *userData)
Sets the disconnect handler function to be called when a disconnect occurs.
std::string sow(MessageHandler messageHandler_, const std::string &topic_, long timeout_, const std::string &filter_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:4987
Command & setTopN(unsigned v_)
Definition: ampsplusplus.hpp:537
Message & assignVersion(const std::string &v)
Sets the value of the Version header for this Message.
Definition: Message.hpp:1133
void disconnect()
Disconnect from an AMPS server.
Definition: ampsplusplus.hpp:4091
MessageStream sow(const std::string &topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:4933
Command & setAckType(const std::string &v_)
Definition: ampsplusplus.hpp:563
Command & setData(const std::string &v_)
Sets the data for this command from an existing string.
Definition: ampsplusplus.hpp:521
unsigned getDepth(void) const
Gets the current number of messages held in the underlying queue.
Definition: ampsplusplus.hpp:7223
Field getQueryID() const
Retrieves the value of the QueryID header of the Message as a new Field.
Definition: Message.hpp:1112
The operation has not succeeded, but ought to be retried.
Definition: amps.h:213
const std::string & getLogonCorrelationData() const
Returns the currently set logoon correlation data for the client.
Definition: ampsplusplus.hpp:4000
void setHeartbeat(unsigned heartbeatTime_, unsigned readTimeout_)
Requests heartbeating with the AMPS server.
Definition: ampsplusplus.hpp:5697
const std::string & getURI() const
Returns the last URI this client is connected to.
Definition: ampsplusplus.hpp:4060
Command & setOptions(const std::string &v_)
Sets the options string for this command: see Message.Options for a helper class for constructing the...
Definition: ampsplusplus.hpp:507
void amps_client_set_predisconnect_handler(amps_handle client, amps_predisconnect_handler predisconnectHandler, void *userData)
Sets the predisconnect handler function to be called when a disconnect occurs.
void replay(StoreReplayer &replayer_)
Called by Client to get all stored and non-discarded messages replayed by the store onto the StoreRep...
Definition: ampsplusplus.hpp:837
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1120
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
std::string sowDeleteByData(MessageHandler messageHandler_, const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5593
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1075
BookmarkStoreImpl * get()
Used to get a pointer to the implementation.
Definition: BookmarkStore.hpp:434
Command & setBookmark(const std::string &v_)
Set the bookmark to be used this command.
Definition: ampsplusplus.hpp:497
amps_handle getHandle()
Returns the underlying amps_handle for this client, to be used with amps_client_* functions from the ...
Definition: ampsplusplus.hpp:5638
void reset()
Clear all data from the builder.
Definition: ampsplusplus.hpp:6768
void setFailedWriteHandler(FailedWriteHandler *handler_)
Set the handler that is invoked to report when a publish fails, for example if the publisher is not e...
Definition: ampsplusplus.hpp:4302
MessageStream sowAndDeltaSubscribe(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic and initiates a new delta subscription on it.
Definition: ampsplusplus.hpp:5291
void setAckTimeout(const int ackTimeout_)
Sets the message queue ack timeout value.
Definition: ampsplusplus.hpp:6028
Command & setSowKeys(const std::string &sowKeys_)
Sets the SowKeys for the command.
Definition: ampsplusplus.hpp:481
Message & setPassword(const std::string &v)
Sets the value of the Password header for this Message.
Definition: Message.hpp:1132
bool getAutoAck(void) const
Returns the value of the queue auto-ack setting.
Definition: ampsplusplus.hpp:5979
amps_uint64_t publish(const std::string &topic_, const std::string &data_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store if on...
Definition: ampsplusplus.hpp:4333
Definition: ampsplusplus.hpp:105
void setName(const std::string &name)
Sets the name of this client, assuming no name was provided previously.
Definition: ampsplusplus.hpp:3975
std::string stopTimer(MessageHandler messageHandler)
Definition: ampsplusplus.hpp:5505
The interface for handling authentication with the AMPS server.
Definition: ampsplusplus.hpp:615
std::string sowDeleteByKeys(MessageHandler messageHandler_, const std::string &topic_, const std::string &keys_, long timeout_=0)
Deletes messages that match SOW keys from a topic&#39;s SOW cache.
Definition: ampsplusplus.hpp:5531
void flush(long timeout_=0)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: ampsplusplus.hpp:879
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1120
bool DangerousFlushPublishStoreResizeHandler(Store store_, size_t, void *data_)
PublishStoreResizeHandler that will block up to the timeout specified in user data milliseconds tryin...
Definition: ampsplusplus.hpp:965
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:762
MessageStream sow(const char *topic_, const std::string &filter_="", const std::string &orderBy_="", const std::string &bookmark_="", int batchSize_=DEFAULT_BATCH_SIZE, int topN_=DEFAULT_TOP_N, const std::string &options_="", long timeout_=DEFAULT_COMMAND_TIMEOUT)
Query the SOW cache of a topic.
Definition: ampsplusplus.hpp:4950
amps_uint64_t publish(const std::string &topic_, const std::string &data_, unsigned long expiration_)
Publish a message to an AMPS topic, returning the sequence number assigned by the publish store (if a...
Definition: ampsplusplus.hpp:4382
void setLastChanceMessageHandler(const AMPS::MessageHandler &messageHandler)
Sets the message handler called when no other handler matches.
Definition: ampsplusplus.hpp:5734
Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: BookmarkStore.hpp:301
amps_uint64_t deltaPublish(const char *topic_, size_t topicLength_, const char *data_, size_t dataLength_, unsigned long expiration_)
Publish the changed fields of a message to an AMPS topic.
Definition: ampsplusplus.hpp:4535
The client and server are disconnected.
Definition: amps.h:217
Message sowDeleteByData(const std::string &topic_, const std::string &data_, long timeout_=0)
Deletes the message whose keys match the message data provided.
Definition: ampsplusplus.hpp:5614
MessageStream timeout(unsigned timeout_)
Sets the maximum time to wait for the next message in milliseconds; if no message is available within...
Definition: ampsplusplus.hpp:7204
const std::string & getName() const
Returns the name of this client passed in the constructor.
Definition: ampsplusplus.hpp:3982
Command is an encapsulation of a single AMPS command sent by the client.
Definition: ampsplusplus.hpp:354
Message & setBatchSize(const std::string &v)
Sets the value of the BatchSize header for this Message.
Definition: Message.hpp:1114
MessageStream subscribe(const std::string &topic_, long timeout_=0, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic.
Definition: ampsplusplus.hpp:4644
MessageStream bookmarkSubscribe(const char *topic_, long timeout_, const std::string &bookmark_, const std::string &filter_="", const std::string &options_="", const std::string &subId_="")
Subscribe to a topic using a bookmark.
Definition: ampsplusplus.hpp:4822
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Command & setSowKey(const std::string &sowKey_)
Sets the SowKey field of the command, typically used for a publish command to a topic in the state of...
Definition: ampsplusplus.hpp:468
std::string executeAsyncNoResubscribe(Command &command_, MessageHandler handler_)
Execute the provided command and, once AMPS acknowledges the command, process messages in response to...
Definition: ampsplusplus.hpp:5876
amps_result amps_client_send(amps_handle client, amps_handle message)
Sends a message to the AMPS server.
iterator end(void)
Returns an iterator representing the end of the topic or subscription.
Definition: ampsplusplus.hpp:3855
bool getRetryOnDisconnect(void) const
Returns true if automatic retry of a command to AMPS after a reconnect is enabled.
Definition: ampsplusplus.hpp:6051
void flushAcks(void)
Sends any queued message queue ack messages to the server immediately.
Definition: ampsplusplus.hpp:5970