AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
Message.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef __AMPS_MESSAGE_HPP__
26 #define __AMPS_MESSAGE_HPP__
27 #include "util.hpp"
28 #include "constants.hpp"
29 #include "amps_generated.h"
30 #include "Field.hpp"
31 #include <stdio.h>
32 #ifdef _WIN32
33 #if (_MSC_VER >= 1400) // VS2005 or higher
34 #define AMPS_snprintf sprintf_s
35 #define AMPS_snprintf_amps_uint64_t(buf,sz,val) sprintf_s(buf,sz,"%I64u",val)
36 #ifdef _WIN64
37 #define AMPS_snprintf_sizet(buf,sz,val) sprintf_s(buf,sz,"%lu",val)
38 #else
39 #define AMPS_snprintf_sizet(buf,sz,val) sprintf_s(buf,sz,"%u",val)
40 #endif
41 #else // VS2003 or older
42 #define AMPS_snprintf _snprintf
43 #ifdef _WIN64
44 #define AMPS_snprintf_sizet(buf,sz,val) _sprintf(buf,sz,"%lu",val)
45 #else
46 #define AMPS_snprintf_sizet(buf,sz,val) _sprintf(buf,sz,"%u",val)
47 #endif
48 #endif
49 #else
50 #define AMPS_snprintf snprintf
51 #ifdef __x86_64__
52 #define AMPS_snprintf_amps_uint64_t(buf,sz,val) snprintf(buf,sz,"%lu",val)
53 #define AMPS_snprintf_sizet(buf,sz,val) snprintf(buf,sz,"%lu",val)
54 #else
55 #define AMPS_snprintf_amps_uint64_t(buf,sz,val) snprintf(buf,sz,"%llu",val)
56 #define AMPS_snprintf_sizet(buf,sz,val) snprintf(buf,sz,"%u",val)
57 #endif
58 #endif
59 #define AMPS_UNSET_INDEX (size_t)-1
60 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1
61 
62 // Macros for determing what parts of TR1 we can depend on
63 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600)
64 #define AMPS_USE_FUNCTIONAL 1
65 #endif
66 
67 #if (_MSC_VER >= 1600) || ( (__GNUC__ >= 4) && (__GNUC_MINOR__) >=5 )
68 #define AMPS_USE_LAMBDAS 1
69 #endif
70 
71 #ifdef AMPS_USE_FUNCTIONAL
72 #include <functional>
73 #endif
74 
75 #include <algorithm>
76 
80 
81 #define AMPS_OPTIONS_NONE ""
82 #define AMPS_OPTIONS_LIVE "live,"
83 #define AMPS_OPTIONS_OOF "oof,"
84 #define AMPS_OPTIONS_REPLACE "replace,"
85 #define AMPS_OPTIONS_NOEMPTIES "no_empties,"
86 #define AMPS_OPTIONS_SENDKEYS "send_keys,"
87 #define AMPS_OPTIONS_TIMESTAMP "timestamp,"
88 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey,"
89 #define AMPS_OPTIONS_CANCEL "cancel,"
90 #define AMPS_OPTIONS_RESUME "resume,"
91 #define AMPS_OPTIONS_PAUSE "pause,"
92 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable,"
93 #define AMPS_OPTIONS_EXPIRE "expire,"
94 #define AMPS_OPTIONS_TOPN(x) "top_n=##x,"
95 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x,"
96 #define AMPS_OPTIONS_RATE(x) "rate=##x,"
97 
98 namespace AMPS
99 {
100 typedef void* amps_subscription_handle;
101 
102 class ClientImpl;
103 
108 class MessageImpl : public RefBody
109 {
110 private:
111  amps_handle _message;
112  Mutex _lock;
113  bool _owner;
114  mutable bool _isIgnoreAutoAck;
115  size_t _bookmarkSeqNo;
116  amps_subscription_handle _subscription;
117  ClientImpl* _clientImpl;
118 public:
130  MessageImpl(amps_handle message_, bool owner_ = false,
131  bool ignoreAutoAck_ = false, size_t bookmarkSeqNo_ = 0,
132  amps_subscription_handle subscription_ = NULL,
133  ClientImpl* clientImpl_=NULL)
134  : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
135  , _bookmarkSeqNo(bookmarkSeqNo_)
136  , _subscription(subscription_), _clientImpl(clientImpl_)
137  {
138  }
139 
144  : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
145  {
146  // try to create one
147  _message = amps_message_create(NULL);
148  }
149 
150  virtual ~MessageImpl()
151  {
152  if (_owner && _message) amps_message_destroy(_message);
153  }
154 
155  MessageImpl* copy() const
156  {
157  amps_handle copy = amps_message_copy(_message);
158  return new MessageImpl(copy, true, _isIgnoreAutoAck, _bookmarkSeqNo,
159  _subscription,_clientImpl);
160  }
161 
162  void setClientImpl(ClientImpl* clientImpl_)
163  {
164  _clientImpl = clientImpl_;
165  }
166 
167  ClientImpl* clientImpl(void) const
168  {
169  return _clientImpl;
170  }
171 
175  {
176  return _message;
177  }
178 
179  void reset()
180  {
181  Lock<Mutex> l(_lock);
182  amps_message_reset(_message);
183  _bookmarkSeqNo = 0;
184  _subscription = NULL;
185  _isIgnoreAutoAck = false;
186  }
187 
193  void replace(amps_handle message_, bool owner_ = false)
194  {
195  Lock<Mutex> l(_lock);
196  if (_message == message_) return;
197  if (_owner && _message)
198  {
199  amps_message_destroy(_message);
200  }
201  _owner = owner_;
202  _message = message_;
203  _bookmarkSeqNo = 0;
204  _subscription = NULL;
205  _isIgnoreAutoAck = false;
206  }
207 
208  void disown()
209  {
210  Lock<Mutex> l(_lock);
211  _owner = false;
212  }
213 
214  static unsigned long newId()
215  {
216  static ATOMIC_TYPE _id = 0;
217  return (unsigned long)(AMPS_FETCH_ADD(&_id,1));
218  }
219 
220  void setBookmarkSeqNo(size_t val_)
221  {
222  _bookmarkSeqNo = val_;
223  }
224 
225  size_t getBookmarkSeqNo(void) const
226  {
227  return _bookmarkSeqNo;
228  }
229 
230  void setSubscriptionHandle(amps_subscription_handle subscription_)
231  {
232  _subscription = subscription_;
233  }
234 
235  amps_subscription_handle getSubscriptionHandle(void) const
236  {
237  return _subscription;
238  }
239 
240  void setIgnoreAutoAck() const
241  {
242  _isIgnoreAutoAck = true;
243  }
244 
245  bool getIgnoreAutoAck() const
246  {
247  return _isIgnoreAutoAck;
248  }
249 };
250 
251 
252 // This block of macros works with the Doxygen preprocessor to
253 // create documentation comments for fields defined with the AMPS_FIELD macro.
254 // A C++ compiler removes comments before expanding macros, so these macros
255 // must ONLY be defined for Doxygen and not for actual compilation.
256 
257 #ifdef DOXYGEN_PREPROCESSOR
258 
259 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## /
260 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions)
261 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) DOX_GROUPNAME(s)
262 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@})
263 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
264 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
265 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
266 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
267 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as CommandId and SubId.)
268 
269 #else
270 
271 #define DOX_COMMENTHEAD(s)
272 #define DOX_GROUPNAME(s)
273 #define DOX_OPENGROUP(x)
274 #define DOX_CLOSEGROUP(x)
275 #define DOX_MAKEGETCOMMENT(x)
276 #define DOX_MAKEGETRAWCOMMENT(x)
277 #define DOX_MAKESETCOMMENT(x)
278 #define DOX_MAKEASSIGNCOMMENT(x)
279 #define DOX_MAKENEWCOMMENT(x)
280 
281 #endif
282 
283 // Macro for defining all of the necessary methods for a field in an AMPS
284 // message.
285 
286 #define AMPS_FIELD(x) \
287  DOX_OPENGROUP(x) \
288  DOX_MAKEGETCOMMENT(x) \
289  Field get##x() const { \
290  Field returnValue;\
291  const char* ptr;\
292  size_t sz;\
293  amps_message_get_field_value(_body.get().getMessage(),\
294  AMPS_##x, &ptr, &sz);\
295  returnValue.assign(ptr, sz);\
296  return returnValue;\
297  }\
298  DOX_MAKEGETRAWCOMMENT(x) \
299  void getRaw##x(const char** dataptr, size_t* sizeptr) const { \
300  amps_message_get_field_value(_body.get().getMessage(), \
301  AMPS_##x, dataptr, sizeptr);\
302  return;\
303  }\
304  DOX_MAKESETCOMMENT(x) \
305  Message& set##x(const std::string& v) {\
306  amps_message_set_field_value(_body.get().getMessage(),\
307  AMPS_##x, v.c_str(), v.length());\
308  return *this;\
309  }\
310  DOX_MAKESETCOMMENT(x) \
311  Message& set##x(amps_uint64_t v) {\
312  char buf[22];\
313  AMPS_snprintf_amps_uint64_t(buf,22,v);\
314  amps_message_set_field_value_nts(_body.get().getMessage(),\
315  AMPS_##x, buf);\
316  return *this;\
317  }\
318  DOX_MAKEASSIGNCOMMENT(x) \
319  Message& assign##x(const std::string& v) {\
320  amps_message_assign_field_value(_body.get().getMessage(),\
321  AMPS_##x, v.c_str(), v.length());\
322  return *this;\
323  }\
324  DOX_MAKEASSIGNCOMMENT(x) \
325  Message& assign##x(const char* data, size_t len) {\
326  amps_message_assign_field_value(_body.get().getMessage(),\
327  AMPS_##x, data, len);\
328  return *this;\
329  }\
330  DOX_MAKESETCOMMENT(x) \
331  Message& set##x(const char* str) {\
332  amps_message_set_field_value_nts(_body.get().getMessage(),\
333  AMPS_##x, str);\
334  return *this;\
335  }\
336  DOX_MAKESETCOMMENT(x) \
337  Message& set##x(const char* str,size_t len) {\
338  amps_message_set_field_value(_body.get().getMessage(),\
339  AMPS_##x, str,len);\
340  return *this;\
341  }\
342  DOX_MAKENEWCOMMENT(x) \
343  Message& new##x() {\
344  char buf[Message::IdentifierLength+1];\
345  buf[Message::IdentifierLength] = 0;\
346  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , _body.get().newId());\
347  amps_message_set_field_value_nts(_body.get().getMessage(),\
348  AMPS_##x, buf);\
349  return *this;\
350  } \
351  DOX_CLOSEGROUP(x)
352 
393 class Message
394 {
395  RefHandle<MessageImpl> _body;
396 
397  Message(MessageImpl* body_) : _body(body_) { ; }
398 
399 public:
400  typedef AMPS::Field Field;
401 
404  static const unsigned int IdentifierLength = 16;
405 
408  static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
409 
416  Message(amps_handle message_, bool owner_ = false)
417  : _body(new MessageImpl(message_, owner_))
418  {
419  }
420 
424  Message() : _body(new MessageImpl())
425  {
426  }
427 
430  Message deepCopy(void) const
431  {
432  return Message(_body.get().copy());
433  }
434 
445  class Options
446  {
447  public:
448  static const char* None(void) { return AMPS_OPTIONS_NONE; }
449  static const char* Live(void) { return AMPS_OPTIONS_LIVE; }
450  static const char* OOF(void) { return AMPS_OPTIONS_OOF; }
451  static const char* Replace(void) { return AMPS_OPTIONS_REPLACE; }
452  static const char* NoEmpties(void) { return AMPS_OPTIONS_NOEMPTIES; }
453  static const char* SendKeys(void) { return AMPS_OPTIONS_SENDKEYS; }
454  static const char* Timestamp(void) { return AMPS_OPTIONS_TIMESTAMP; }
455  static const char* NoSowKey(void) { return AMPS_OPTIONS_NOSOWKEY; }
456  static const char* Cancel(void) { return AMPS_OPTIONS_CANCEL; }
457  static const char* Resume(void) { return AMPS_OPTIONS_RESUME; }
458  static const char* Pause(void) { return AMPS_OPTIONS_PAUSE; }
459  static const char* FullyDurable(void) { return AMPS_OPTIONS_FULLY_DURABLE; }
460  static const char* Expire(void) { return AMPS_OPTIONS_EXPIRE; }
461  static std::string Conflation(const char* conflation_)
462  {
463  char buf[64];
464  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
465  return buf;
466  }
467  static std::string ConflationKey(const char* conflationKey_)
468  {
469  std::string option("conflation_key=");
470  option.append(conflationKey_).append(",");
471  return option;
472  }
473  static std::string TopN(int topN_)
474  {
475  char buf[24];
476  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
477  return buf;
478  }
479  static std::string MaxBacklog(int maxBacklog_)
480  {
481  char buf[24];
482  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
483  return buf;
484  }
485  static std::string Rate(const char* rate_)
486  {
487  char buf[64];
488  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
489  return buf;
490  }
491  static std::string RateMaxGap(const char* rateMaxGap_)
492  {
493  char buf[64];
494  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
495  return buf;
496  }
497  static std::string SkipN(int skipN_)
498  {
499  char buf[24];
500  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
501  return buf;
502  }
503 
504  static std::string Projection(std::string projection_)
505  {
506  return "projection=[" + projection_ + "],";
507  }
508 
509  template<class Iterator>
510  static std::string Projection(Iterator begin_, Iterator end_)
511  {
512  std::string projection = "projection=[";
513  for (Iterator i = begin_; i != end_; ++i)
514  {
515  projection += *i;
516  projection += ',';
517  }
518  projection.insert(projection.length() - 1, "]");
519  return projection;
520  }
521 
522  static std::string Grouping(std::string grouping_)
523  {
524  return "grouping=[" + grouping_ + "],";
525  }
526 
527  template<class Iterator>
528  static std::string Grouping(Iterator begin_, Iterator end_)
529  {
530  std::string grouping = "grouping=[";
531  for (Iterator i = begin_; i != end_; ++i)
532  {
533  grouping += *i;
534  grouping += ',';
535  }
536  grouping.insert(grouping.length() - 1, "]");
537  return grouping;
538  }
539 
540  static std::string Select(const std::string& select_)
541  {
542  return "select=[" + select_ + "],";
543  }
544 
545  template<class Iterator>
546  static std::string Select(Iterator begin_, Iterator end_)
547  {
548  std::string select = "select=[";
549  for (Iterator i = begin_; i != end_; ++i)
550  {
551  select += *i;
552  select += ',';
553  }
554  select.insert(select.length() - 1, "]");
555  return select;
556  }
557 
558  static std::string AckConflationInterval(const std::string& interval_)
559  {
560  return "ack_conflation=" + interval_ + ",";
561  }
562 
563  static std::string AckConflationInterval(const char* interval_)
564  {
565  static const std::string start("ack_conflation=");
566  return start + interval_ + ",";
567  }
568 
571  Options(std::string options_ = "")
572  : _optionStr(options_)
573  , _maxBacklog(0)
574  , _topN(0)
575  {;}
576 
577  int getMaxBacklog(void) const { return _maxBacklog; }
578  std::string getConflation(void) const { return _conflation; }
579  std::string getConflationKey(void) const { return _conflationKey; }
580  int getTopN(void) const { return _topN; }
581  std::string getRate(void) const { return _rate; }
582  std::string getRateMaxGap(void) const { return _rateMaxGap; }
583 
587  void setNone(void) { _optionStr = AMPS_OPTIONS_NONE; }
588 
598  void setLive(void) { _optionStr += AMPS_OPTIONS_LIVE; }
599 
604  void setOOF(void) { _optionStr += AMPS_OPTIONS_OOF; }
605 
610  void setReplace(void) { _optionStr += AMPS_OPTIONS_REPLACE; }
611 
615  void setNoEmpties(void) { _optionStr += AMPS_OPTIONS_NOEMPTIES; }
616 
620  void setSendKeys(void) { _optionStr += AMPS_OPTIONS_SENDKEYS; }
621 
626  void setTimestamp(void) { _optionStr += AMPS_OPTIONS_TIMESTAMP; }
627 
631  void setNoSowKey(void) { _optionStr += AMPS_OPTIONS_NOSOWKEY; }
632 
636  void setCancel(void) { _optionStr += AMPS_OPTIONS_CANCEL; }
637 
644  void setResume(void) { _optionStr += AMPS_OPTIONS_RESUME; }
645 
656  void setPause(void) { _optionStr += AMPS_OPTIONS_PAUSE; }
657 
664  void setFullyDurable(void) { _optionStr += AMPS_OPTIONS_FULLY_DURABLE; }
665 
676  void setMaxBacklog(int maxBacklog_)
677  {
678  char buf[24];
679  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
680  _optionStr += buf;
681  _maxBacklog = maxBacklog_;
682  }
683 
689  void setConflation(const char* conflation_)
690  {
691  char buf[64];
692  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
693  _optionStr += buf;
694  _conflation = conflation_;
695  }
696 
706  void setConflationKey(const char* conflationKey_)
707  {
708  char buf[64];
709  AMPS_snprintf(buf, sizeof(buf), "conflation_key=%s,", conflationKey_);
710  _optionStr += buf;
711  _conflationKey = conflationKey_;
712  }
713 
719  void setTopN(int topN_)
720  {
721  char buf[24];
722  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
723  _optionStr += buf;
724  _topN = topN_;
725  }
726 
733  void setRate(const char* rate_)
734  {
735  char buf[64];
736  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
737  _optionStr += buf;
738  _rate = rate_;
739  }
740 
755  void setRateMaxGap(const char* rateMaxGap_)
756  {
757  char buf[64];
758  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
759  _optionStr += buf;
760  _rateMaxGap = rateMaxGap_;
761  }
762 
768  void setSkipN(int skipN_)
769  {
770  char buf[24];
771  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
772  _optionStr += buf;
773  _skipN = skipN_;
774  }
775 
780  void setProjection(std::string projection_)
781  {
782  _projection = "projection=[" + projection_ + "],";
783  _optionStr += _projection;
784  }
785 
786 
792  template<class Iterator>
793  void setProjection(Iterator begin_, Iterator end_)
794  {
795  _projection = "projection=[";
796  for (Iterator i = begin_; i != end_; ++i)
797  {
798  _projection += *i;
799  _projection += ',';
800  }
801  _projection.insert(_projection.length() - 1, "]");
802  _optionStr += _projection;
803  }
804 
810  void setGrouping(std::string grouping_)
811  {
812  _grouping = "grouping=[" + grouping_ + "],";
813  _optionStr += _grouping;
814  }
815 
816 
822  template<class Iterator>
823  void setGrouping(Iterator begin_, Iterator end_)
824  {
825  _grouping = "grouping=[";
826  for (Iterator i = begin_; i != end_; ++i)
827  {
828  _grouping += *i;
829  _grouping += ',';
830  }
831  _grouping.insert(_grouping.length() - 1, "]");
832  _optionStr += _grouping;
833  }
834 
838  operator const std::string()
839  {
840  return _optionStr.substr(0, _optionStr.length()-1);
841  }
842 
843  private:
844  std::string _optionStr;
845  int _maxBacklog;
846  std::string _conflation;
847  std::string _conflationKey;
848  int _topN;
849  std::string _rate;
850  std::string _rateMaxGap;
851  int _skipN;
852  std::string _projection;
853  std::string _grouping;
854  };
855 
859  struct Command
860  {
861  typedef enum
862  {
863  Unknown = 0,
864  Publish = 1,
865  Subscribe = 2,
866  Unsubscribe = 4,
867  SOW = 8,
868  Heartbeat = 16,
869  SOWDelete = 32,
870  DeltaPublish = 64,
871  Logon = 128,
872  SOWAndSubscribe = 256,
873  DeltaSubscribe = 512,
874  SOWAndDeltaSubscribe = 1024,
875  StartTimer = 2048,
876  StopTimer = 4096,
877  GroupBegin = 8192,
878  GroupEnd = 16384,
879  OOF = 32768,
880  Ack = 65536,
881  Flush = 131072
882  } Type;
883  };
885  Command::Type getCommandEnum() const
886  {
887  const char* data = NULL; size_t len = 0;
888  amps_message_get_field_value(_body.get().getMessage(), AMPS_Command, &data, &len);
889  switch (len)
890  {
891  case 1: return Command::Publish;
892  case 3:
893  switch (data[0])
894  {
895  case 's': return Command::SOW;
896  case 'o': return Command::OOF;
897  case 'a': return Command::Ack;
898  }
899  break;
900  case 5:
901  switch (data[0])
902  {
903  case 'l': return Command::Logon;
904  case 'f': return Command::Flush;
905  }
906  break;
907  case 7:
908  return Command::Publish;
909  break;
910  case 9:
911  switch (data[0])
912  {
913  case 's': return Command::Subscribe;
914  case 'h': return Command::Heartbeat;
915  case 'g': return Command::GroupEnd;
916  }
917  break;
918  case 10:
919  switch (data[1])
920  {
921  case 'o': return Command::SOWDelete;
922  case 't': return Command::StopTimer;
923  }
924  break;
925  case 11:
926  switch (data[0])
927  {
928  case 'g': return Command::GroupBegin;
929  case 'u': return Command::Unsubscribe;
930  }
931  break;
932  case 13:
933  return Command::DeltaPublish;
934  case 15:
935  return Command::DeltaSubscribe;
936  case 17:
937  return Command::SOWAndSubscribe;
938  case 23:
939  return Command::SOWAndDeltaSubscribe;
940  }
941  return Command::Unknown;
942  }
943 
945  Message& setCommandEnum(Command::Type command_)
946  {
947  unsigned bits = 0;
948  unsigned command = command_;
949  while (command > 0) { ++bits; command >>= 1; }
950  amps_message_assign_field_value(_body.get().getMessage(), AMPS_Command,
951  CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
952  return *this;
953  }
954 
955  AMPS_FIELD(Command)
956  AMPS_FIELD(CommandId)
957  AMPS_FIELD(ClientName)
958  AMPS_FIELD(UserId)
959  AMPS_FIELD(Timestamp)
960 
964  Field getTransmissionTime() const
965  {
966  return getTimestamp();
967  }
968 
973  void getRawTransmissionTime(const char** dataptr, size_t* sizeptr) const
974  {
975  getRawTimestamp(dataptr, sizeptr);
976  }
977 
978  AMPS_FIELD(Topic)
979  AMPS_FIELD(Filter)
980  AMPS_FIELD(MessageType)
981 
984  struct AckType
985  {
986  typedef enum
987  {
988  None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
989  } Type;
990  };
991  AMPS_FIELD(AckType);
994  static inline AckType::Type decodeSingleAckType(const char* begin, const char* end)
995  {
996  switch (end - begin)
997  {
998  case 5:
999  return AckType::Stats;
1000  case 6:
1001  return AckType::Parsed;
1002  case 8:
1003  return AckType::Received;
1004  case 9:
1005  switch (begin[1])
1006  {
1007  case 'e': return AckType::Persisted;
1008  case 'r': return AckType::Processed;
1009  case 'o': return AckType::Completed;
1010  default: break;
1011  }
1012  break;
1013  default:
1014  break;
1015  }
1016  return AckType::None;
1017  }
1021  unsigned getAckTypeEnum() const
1022  {
1023  unsigned result = AckType::None;
1024  const char* data = NULL; size_t len = 0;
1025  amps_message_get_field_value(_body.get().getMessage(), AMPS_AckType, &data, &len);
1026  const char* mark = data;
1027  for (const char* end = data + len; data != end; ++data)
1028  {
1029  if (*data == ',')
1030  {
1031  result |= decodeSingleAckType(mark, data);
1032  mark = data + 1;
1033  }
1034  }
1035  if (mark < data) result |= decodeSingleAckType(mark, data);
1036  return result;
1037  }
1041  Message& setAckTypeEnum(unsigned ackType_)
1042  {
1043  if(ackType_ < AckTypeConstants<0>::Entries)
1044  {
1045  amps_message_assign_field_value(_body.get().getMessage(), AMPS_AckType,
1046  AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1047  }
1048  return *this;
1049  }
1050 
1051  AMPS_FIELD(SubscriptionId)
1052  AMPS_FIELD(Expiration)
1053  AMPS_FIELD(Heartbeat)
1054  AMPS_FIELD(TimeoutInterval)
1055  AMPS_FIELD(LeasePeriod)
1056  AMPS_FIELD(Status)
1057  AMPS_FIELD(QueryID)
1058  AMPS_FIELD(BatchSize)
1059  AMPS_FIELD(TopNRecordsReturned)
1060  AMPS_FIELD(OrderBy)
1061  AMPS_FIELD(SowKeys)
1062  AMPS_FIELD(CorrelationId)
1063  AMPS_FIELD(Sequence)
1064  AMPS_FIELD(Bookmark)
1065  AMPS_FIELD(RecordsInserted)
1066  AMPS_FIELD(RecordsUpdated)
1067  AMPS_FIELD(SowDelete)
1068  AMPS_FIELD(RecordsReturned)
1069  AMPS_FIELD(TopicMatches)
1070  AMPS_FIELD(Matches)
1071  AMPS_FIELD(MessageLength)
1072  AMPS_FIELD(SowKey)
1073  AMPS_FIELD(GroupSequenceNumber)
1074  AMPS_FIELD(SubscriptionIds)
1075  AMPS_FIELD(Reason)
1076  AMPS_FIELD(Password)
1077  AMPS_FIELD(Version)
1078 
1079  DOX_OPENGROUP(x)
1080  DOX_MAKEGETCOMMENT(Options) DOX_COMMENTHEAD( Retrieves the value of the Options header of the Message as a new Field.)
1081  Field getOptions() const {
1082  Field returnValue;
1083  const char* ptr;
1084  size_t sz;
1085  amps_message_get_field_value(_body.get().getMessage(),
1086  AMPS_Options, &ptr, &sz);
1087  if (sz && ptr[sz-1] == ',') --sz;
1088  returnValue.assign(ptr, sz);
1089  return returnValue;
1090  }
1091 
1092  DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the Options header of self as a Field that references the underlying buffer managed by this Message. )
1093  void getRawOptions(const char** dataptr, size_t* sizeptr) const {
1094  amps_message_get_field_value(_body.get().getMessage(),
1095  AMPS_Options, dataptr, sizeptr);
1096  if (*sizeptr && *dataptr && (*dataptr)[*sizeptr-1] == ',') --*sizeptr;
1097  return;
1098  }
1099 
1100  DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1101  Message& setOptions(const std::string& v) {
1102  size_t sz = v.length();
1103  if (sz && v[sz-1] == ',') --sz;
1104  amps_message_set_field_value(_body.get().getMessage(),
1105  AMPS_Options, v.c_str(), sz);
1106  return *this;
1107  }
1108 
1109  DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1110  Message& assignOptions(const std::string& v) {
1111  size_t sz = v.length();
1112  if (sz && v[sz-1] == ',') --sz;
1113  amps_message_assign_field_value(_body.get().getMessage(),
1114  AMPS_Options, v.c_str(), sz);
1115  return *this;
1116  }
1117 
1118  DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1119  Message& assignOptions(const char* data, size_t len) {
1120  if (len && data[len-1] == ',') --len;
1121  amps_message_assign_field_value(_body.get().getMessage(),
1122  AMPS_Options, data, len);
1123  return *this;
1124  }
1125 
1126  DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1127  Message& setOptions(const char* str) {
1128  if (str)
1129  {
1130  size_t sz = strlen(str);
1131  if (sz && str[sz-1] == ',') --sz;
1132  amps_message_set_field_value(_body.get().getMessage(),
1133  AMPS_Options, str, sz);
1134  }
1135  else
1136  {
1137  amps_message_set_field_value(_body.get().getMessage(),
1138  AMPS_Options, str, 0);
1139  }
1140  return *this;
1141  }
1142 
1143  DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1144  Message& setOptions(const char* str,size_t len) {
1145  if (len && str[len-1] == ',') --len;
1146  amps_message_set_field_value(_body.get().getMessage(),
1147  AMPS_Options, str, len);
1148  return *this;
1149  }
1151 
1152 
1153 
1157  Field getData() const
1158  {
1159  Field returnValue;
1160  char* ptr;
1161  size_t sz;
1162  amps_message_get_data(_body.get().getMessage(), &ptr, &sz);
1163  returnValue.assign(ptr, sz);
1164  return returnValue;
1165  }
1166 
1167  void getRawData(const char **data, size_t *sz) const
1168  {
1169  amps_message_get_data(_body.get().getMessage(), (char**)data, sz);
1170  }
1173  Message& setData(const std::string &v_)
1174  {
1175  amps_message_set_data(_body.get().getMessage(), v_.c_str(), v_.length());
1176  return *this;
1177  }
1178  Message& assignData(const std::string &v_)
1179  {
1180  amps_message_assign_data(_body.get().getMessage(), v_.c_str(), v_.length());
1181  return *this;
1182  }
1183 
1187  Message& setData(const char* data_, size_t length_)
1188  {
1189  amps_message_set_data(_body.get().getMessage(), data_, length_);
1190  return *this;
1191  }
1192  Message& assignData(const char* data_,size_t length_)
1193  {
1194  amps_message_assign_data(_body.get().getMessage(),data_,length_);
1195  return *this;
1196  }
1197 
1200  Message& setData(const char* data_)
1201  {
1202  amps_message_set_data_nts(_body.get().getMessage(), data_);
1203  return *this;
1204  }
1205  Message& assignData(const char* data_)
1206  {
1207  amps_message_assign_data(_body.get().getMessage(), data_, strlen(data_));
1208  return *this;
1209  }
1210  amps_handle getMessage() const
1211  {
1212  return _body.get().getMessage();
1213  }
1214  void replace(amps_handle message, bool owner = false)
1215  {
1216  _body.get().replace(message, owner);
1217  }
1218  void disown()
1219  {
1220  _body.get().disown();
1221  }
1222  void invalidate()
1223  {
1224  _body = NULL;
1225  }
1226  bool isValid(void) const
1227  {
1228  return _body.isValid();
1229  }
1230  Message& reset()
1231  {
1232  _body.get().reset();
1233  return *this;
1234  }
1235 
1236  void setBookmarkSeqNo(size_t val)
1237  {
1238  _body.get().setBookmarkSeqNo(val);
1239  }
1240 
1241  size_t getBookmarkSeqNo() const
1242  {
1243  return _body.get().getBookmarkSeqNo();
1244  }
1245 
1246  void setSubscriptionHandle(amps_handle val)
1247  {
1248  _body.get().setSubscriptionHandle(val);
1249  }
1250 
1251  amps_handle getSubscriptionHandle() const
1252  {
1253  return _body.get().getSubscriptionHandle();
1254  }
1255 
1256  void ack(const char* options_ = NULL) const;
1257 
1258  void setClientImpl(ClientImpl *pClientImpl)
1259  {
1260  _body.get().setClientImpl(pClientImpl);
1261  }
1262 
1263  void setIgnoreAutoAck() const
1264  {
1265  _body.get().setIgnoreAutoAck();
1266  }
1267 
1268  bool getIgnoreAutoAck() const
1269  {
1270  return _body.get().getIgnoreAutoAck();
1271  }
1272 
1273  template <class T> // static
1274  void throwFor(const T& /*context_*/, const std::string& ackReason_) const
1275  {
1276  switch (ackReason_[0])
1277  {
1278  case 'a': // auth failure
1279  throw AuthenticationException("Logon failed for user \"" +
1280  (std::string)getUserId() + "\"");
1281  break;
1282  case 'b':
1283  switch (ackReason_.length())
1284  {
1285  case 10: // bad filter
1286  throw BadFilterException("Filter '" +
1287  (std::string)getFilter() +
1288  "' is invalid");
1289  break;
1290  case 15: // bad regex topic
1291  throw BadRegexTopicException("Regular Expression Topic '" +
1292  (std::string)getTopic() +
1293  "' is invalid.");
1294  break;
1295  default:
1296  break;
1297  }
1298  break;
1299  case 'i':
1300  if (ackReason_.length() == 13)
1301  {
1302  switch (ackReason_[8])
1303  {
1304  case 't': // invalid topic
1305  throw InvalidTopicException("Topic '" +
1306  (std::string)getTopic() +
1307  "' is invalid.");
1308  break;
1309  case 's': // invalid subId
1310  throw InvalidSubIdException("SubId '" +
1311  (std::string)getSubscriptionId() +
1312  "' is invalid.");
1313  break;
1314  default:
1315  break;
1316  }
1317  }
1318  break;
1319  case 'n':
1320  switch (ackReason_[1])
1321  {
1322  case 'a': // name in use
1323  throw NameInUseException("Client name '" +
1324  (std::string)getClientName() +
1325  "' already exists.");
1326  break;
1327  case 'o': // not entitled
1328  throw NotEntitledException("User \"" +
1329  (std::string)getUserId() +
1330  "\" not entitled to topic \"" +
1331  (std::string)getTopic() +
1332  "\".");
1333  break;
1334  default:
1335  break;
1336  }
1337  break;
1338  case 's':
1339  switch (ackReason_.length())
1340  {
1341  case 27: // subscription already exists
1342  throw SubscriptionAlreadyExistsException(
1343  "Subscription for command '" +
1344  (std::string)getCommandId() +
1345  "' already exists.");
1346  break;
1347  case 12: // subid in use
1348  throw SubidInUseException("Subscription with id '" +
1349  (std::string)getSubscriptionId() +
1350  "' already exists.");
1351  break;
1352  default:
1353  break;
1354  }
1355  break;
1356  default:
1357  break;
1358  }
1359  throw CommandException("Error from server while processing this command: '" +
1360  ackReason_ + "'");
1361  }
1362 };
1363 
1364 inline std::string
1365 operator+(const std::string& lhs, const Message::Field& rhs)
1366 {
1367  return lhs + std::string(rhs);
1368 }
1369 
1370 inline std::basic_ostream<char>&
1371 operator<<(std::basic_ostream<char>& os, const Message::Field& rhs)
1372 {
1373  os.write(rhs.data(), (std::streamsize)rhs.len());
1374  return os;
1375 }
1376 inline bool
1377 AMPS::Field::operator<(const AMPS::Field& rhs) const
1378 {
1379  return std::lexicographical_compare(data(), data()+len(), rhs.data(), rhs.data()+rhs.len());
1380 }
1381 
1382 }
1383 
1384 #endif
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:664
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:212
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1173
void setProjection(std::string projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:780
void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:755
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:885
void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:615
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:430
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:604
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1041
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:631
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:130
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:945
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:620
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:106
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:445
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:656
DOX_CLOSEGROUP(x) Field getData() const
Returns the data from this message.
Definition: Message.hpp:1150
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:610
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:859
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1200
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:823
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:626
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:174
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:644
static AckType::Type decodeSingleAckType(const char *begin, const char *end)
Decodes a single ack string.
Definition: Message.hpp:994
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:689
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:193
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:571
void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:733
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:984
void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:719
void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:676
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:973
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:143
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
void setGrouping(std::string grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:810
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:706
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:768
void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:416
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:587
Implementation class for a Message.
Definition: Message.hpp:108
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:793
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1021
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1187
Definition: ampsplusplus.hpp:136
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:636
Message()
Construct a new, empty Message.
Definition: Message.hpp:424
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:598
void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.