AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
Message.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef __AMPS_MESSAGE_HPP__
26 #define __AMPS_MESSAGE_HPP__
27 #include "util.hpp"
28 #include "constants.hpp"
29 #include "amps_generated.h"
30 #include "Field.hpp"
31 #include <stdio.h>
32 #include <algorithm>
33 #include <ostream>
34 #include <string>
35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1
36 
37 // Macros for determing what parts of TR1 we can depend on
38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600)
39  #define AMPS_USE_FUNCTIONAL 1
40 #endif
41 
42 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=5 )
43  #define AMPS_USE_LAMBDAS 1
44 #endif
45 
46 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=8 )
47  #define AMPS_USE_EMPLACE 1
48 #endif
49 
50 #ifdef AMPS_USE_FUNCTIONAL
51  #include <functional>
52 #endif
53 
54 #include <algorithm>
55 
59 
60 #define AMPS_OPTIONS_NONE ""
61 #define AMPS_OPTIONS_LIVE "live,"
62 #define AMPS_OPTIONS_OOF "oof,"
63 #define AMPS_OPTIONS_REPLACE "replace,"
64 #define AMPS_OPTIONS_NOEMPTIES "no_empties,"
65 #define AMPS_OPTIONS_SENDKEYS "send_keys,"
66 #define AMPS_OPTIONS_TIMESTAMP "timestamp,"
67 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey,"
68 #define AMPS_OPTIONS_CANCEL "cancel,"
69 #define AMPS_OPTIONS_RESUME "resume,"
70 #define AMPS_OPTIONS_PAUSE "pause,"
71 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable,"
72 #define AMPS_OPTIONS_EXPIRE "expire,"
73 #define AMPS_OPTIONS_TOPN(x) "top_n=##x,"
74 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x,"
75 #define AMPS_OPTIONS_RATE(x) "rate=##x,"
76 
77 namespace AMPS
78 {
79  typedef void* amps_subscription_handle;
80 
81  class ClientImpl;
82 
87  class MessageImpl : public RefBody
88  {
89  private:
90  amps_handle _message;
91  Mutex _lock;
92  bool _owner;
93  mutable bool _isIgnoreAutoAck;
94  size_t _bookmarkSeqNo;
95  amps_subscription_handle _subscription;
96  ClientImpl* _clientImpl;
97  public:
109  MessageImpl(amps_handle message_, bool owner_ = false,
110  bool ignoreAutoAck_ = false, size_t bookmarkSeqNo_ = 0,
111  amps_subscription_handle subscription_ = NULL,
112  ClientImpl* clientImpl_ = NULL)
113  : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
114  , _bookmarkSeqNo(bookmarkSeqNo_)
115  , _subscription(subscription_), _clientImpl(clientImpl_)
116  {
117  }
118 
123  : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
124  {
125  // try to create one
126  _message = amps_message_create(NULL);
127  }
128 
129  virtual ~MessageImpl()
130  {
131  if (_owner && _message)
132  {
133  amps_message_destroy(_message);
134  }
135  }
136 
137  MessageImpl* copy() const
138  {
139  amps_handle copy = amps_message_copy(_message);
140  return new MessageImpl(copy, true, _isIgnoreAutoAck, _bookmarkSeqNo,
141  _subscription, _clientImpl);
142  }
143 
144  void setClientImpl(ClientImpl* clientImpl_)
145  {
146  _clientImpl = clientImpl_;
147  }
148 
149  ClientImpl* clientImpl(void) const
150  {
151  return _clientImpl;
152  }
153 
157  {
158  return _message;
159  }
160 
161  void reset()
162  {
163  Lock<Mutex> l(_lock);
164  amps_message_reset(_message);
165  _bookmarkSeqNo = 0;
166  _subscription = NULL;
167  _isIgnoreAutoAck = false;
168  }
169 
175  void replace(amps_handle message_, bool owner_ = false)
176  {
177  Lock<Mutex> l(_lock);
178  if (_message == message_)
179  {
180  return;
181  }
182  if (_owner && _message)
183  {
184  amps_message_destroy(_message);
185  }
186  _owner = owner_;
187  _message = message_;
188  _bookmarkSeqNo = 0;
189  _subscription = NULL;
190  _isIgnoreAutoAck = false;
191  }
192 
193  void disown()
194  {
195  Lock<Mutex> l(_lock);
196  _owner = false;
197  }
198 
199  static unsigned long newId()
200  {
201  static ATOMIC_TYPE _id = 0;
202  return (unsigned long)(AMPS_FETCH_ADD(&_id, 1));
203  }
204 
205  void setBookmarkSeqNo(size_t val_)
206  {
207  _bookmarkSeqNo = val_;
208  }
209 
210  size_t getBookmarkSeqNo(void) const
211  {
212  return _bookmarkSeqNo;
213  }
214 
215  void setSubscriptionHandle(amps_subscription_handle subscription_)
216  {
217  _subscription = subscription_;
218  }
219 
220  amps_subscription_handle getSubscriptionHandle(void) const
221  {
222  return _subscription;
223  }
224 
225  void setIgnoreAutoAck() const
226  {
227  _isIgnoreAutoAck = true;
228  }
229 
230  bool getIgnoreAutoAck() const
231  {
232  return _isIgnoreAutoAck;
233  }
234  };
235 
236 
237 // This block of macros works with the Doxygen preprocessor to
238 // create documentation comments for fields defined with the AMPS_FIELD macro.
239 // A C++ compiler removes comments before expanding macros, so these macros
240 // must ONLY be defined for Doxygen and not for actual compilation.
241 
242 #ifdef DOXYGEN_PREPROCESSOR
243 
244 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## /
245 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions)
246 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \
247  DOX_GROUPNAME(s)
248 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@})
249 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
250 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
251 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
252 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
253 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.)
254 
255 #else
256 
257 #define DOX_COMMENTHEAD(s)
258 #define DOX_GROUPNAME(s)
259 #define DOX_OPENGROUP(x)
260 #define DOX_CLOSEGROUP()
261 #define DOX_MAKEGETCOMMENT(x)
262 #define DOX_MAKEGETRAWCOMMENT(x)
263 #define DOX_MAKESETCOMMENT(x)
264 #define DOX_MAKEASSIGNCOMMENT(x)
265 #define DOX_MAKENEWCOMMENT(x)
266 
267 #endif
268 
269 // Macro for defining all of the necessary methods for a field in an AMPS
270 // message.
271 
272 
273 #define AMPS_FIELD(x) \
274  DOX_OPENGROUP(x) \
275  DOX_MAKEGETCOMMENT(x) \
276  Field get##x() const {\
277  Field returnValue;\
278  const char* ptr;\
279  size_t sz;\
280  amps_message_get_field_value(_body.get().getMessage(),\
281  AMPS_##x, &ptr, &sz);\
282  returnValue.assign(ptr, sz);\
283  return returnValue;\
284  }\
285  DOX_MAKEGETRAWCOMMENT(x) \
286  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
287  amps_message_get_field_value(_body.get().getMessage(),\
288  AMPS_##x, dataptr, sizeptr);\
289  return;\
290  }\
291  DOX_MAKESETCOMMENT(x) \
292  Message& set##x(const std::string& v) {\
293  amps_message_set_field_value(_body.get().getMessage(),\
294  AMPS_##x, v.c_str(), v.length());\
295  return *this;\
296  }\
297  DOX_MAKESETCOMMENT(x) \
298  Message& set##x(amps_uint64_t v) {\
299  char buf[22];\
300  AMPS_snprintf_amps_uint64_t(buf,22,v);\
301  amps_message_set_field_value_nts(_body.get().getMessage(),\
302  AMPS_##x, buf);\
303  return *this;\
304  }\
305  DOX_MAKEASSIGNCOMMENT(x) \
306  Message& assign##x(const std::string& v) {\
307  amps_message_assign_field_value(_body.get().getMessage(),\
308  AMPS_##x, v.c_str(), v.length());\
309  return *this;\
310  }\
311  DOX_MAKEASSIGNCOMMENT(x) \
312  Message& assign##x(const char* data, size_t len) {\
313  amps_message_assign_field_value(_body.get().getMessage(),\
314  AMPS_##x, data, len);\
315  return *this;\
316  }\
317  DOX_MAKESETCOMMENT(x) \
318  Message& set##x(const char* str) {\
319  amps_message_set_field_value_nts(_body.get().getMessage(),\
320  AMPS_##x, str);\
321  return *this;\
322  }\
323  DOX_MAKESETCOMMENT(x) \
324  Message& set##x(const char* str,size_t len) {\
325  amps_message_set_field_value(_body.get().getMessage(),\
326  AMPS_##x, str,len);\
327  return *this;\
328  }\
329  DOX_MAKENEWCOMMENT(x) \
330  Message& new##x() {\
331  char buf[Message::IdentifierLength+1];\
332  buf[Message::IdentifierLength] = 0;\
333  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\
334  amps_message_set_field_value_nts(_body.get().getMessage(),\
335  AMPS_##x, buf);\
336  return *this;\
337  } \
338  DOX_CLOSEGROUP()
339 
340 #define AMPS_FIELD_ALIAS(x,y) \
341  DOX_OPENGROUP(y) \
342  DOX_MAKEGETCOMMENT(y) \
343  Field get##y() const {\
344  Field returnValue;\
345  const char* ptr;\
346  size_t sz;\
347  amps_message_get_field_value(_body.get().getMessage(),\
348  AMPS_##y, &ptr, &sz);\
349  returnValue.assign(ptr, sz);\
350  return returnValue;\
351  }\
352  DOX_MAKEGETRAWCOMMENT(y) \
353  void getRaw##y(const char** dataptr, size_t* sizeptr) const {\
354  amps_message_get_field_value(_body.get().getMessage(),\
355  AMPS_##y, dataptr, sizeptr);\
356  return;\
357  }\
358  DOX_MAKESETCOMMENT(y) \
359  Message& set##y(const std::string& v) {\
360  amps_message_set_field_value(_body.get().getMessage(),\
361  AMPS_##y, v.c_str(), v.length());\
362  return *this;\
363  }\
364  DOX_MAKESETCOMMENT(y) \
365  Message& set##y(amps_uint64_t v) {\
366  char buf[22];\
367  AMPS_snprintf_amps_uint64_t(buf,22,v);\
368  amps_message_set_field_value_nts(_body.get().getMessage(),\
369  AMPS_##y, buf);\
370  return *this;\
371  }\
372  DOX_MAKEASSIGNCOMMENT(y) \
373  Message& assign##y(const std::string& v) {\
374  amps_message_assign_field_value(_body.get().getMessage(),\
375  AMPS_##y, v.c_str(), v.length());\
376  return *this;\
377  }\
378  DOX_MAKEASSIGNCOMMENT(y) \
379  Message& assign##y(const char* data, size_t len) {\
380  amps_message_assign_field_value(_body.get().getMessage(),\
381  AMPS_##y, data, len);\
382  return *this;\
383  }\
384  DOX_MAKESETCOMMENT(y) \
385  Message& set##y(const char* str) {\
386  amps_message_set_field_value_nts(_body.get().getMessage(),\
387  AMPS_##y, str);\
388  return *this;\
389  }\
390  DOX_MAKESETCOMMENT(y) \
391  Message& set##y(const char* str,size_t len) {\
392  amps_message_set_field_value(_body.get().getMessage(),\
393  AMPS_##y, str,len);\
394  return *this;\
395  }\
396  DOX_MAKENEWCOMMENT(y) \
397  Message& new##y() {\
398  char buf[Message::IdentifierLength+1];\
399  buf[Message::IdentifierLength] = 0;\
400  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\
401  amps_message_set_field_value_nts(_body.get().getMessage(),\
402  AMPS_##y, buf);\
403  return *this;\
404  }\
405  DOX_MAKEGETCOMMENT(y) \
406  Field get##x() const {\
407  Field returnValue;\
408  const char* ptr;\
409  size_t sz;\
410  amps_message_get_field_value(_body.get().getMessage(),\
411  AMPS_##y, &ptr, &sz);\
412  returnValue.assign(ptr, sz);\
413  return returnValue;\
414  }\
415  DOX_MAKEGETRAWCOMMENT(y) \
416  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
417  amps_message_get_field_value(_body.get().getMessage(),\
418  AMPS_##y, dataptr, sizeptr);\
419  return;\
420  }\
421  DOX_MAKESETCOMMENT(y) \
422  Message& set##x(const std::string& v) {\
423  amps_message_set_field_value(_body.get().getMessage(),\
424  AMPS_##y, v.c_str(), v.length());\
425  return *this;\
426  }\
427  DOX_MAKESETCOMMENT(y) \
428  Message& set##x(amps_uint64_t v) {\
429  char buf[22];\
430  AMPS_snprintf_amps_uint64_t(buf,22,v);\
431  amps_message_set_field_value_nts(_body.get().getMessage(),\
432  AMPS_##y, buf);\
433  return *this;\
434  }\
435  DOX_MAKEASSIGNCOMMENT(y) \
436  Message& assign##x(const std::string& v) {\
437  amps_message_assign_field_value(_body.get().getMessage(),\
438  AMPS_##y, v.c_str(), v.length());\
439  return *this;\
440  }\
441  DOX_MAKEASSIGNCOMMENT(y) \
442  Message& assign##x(const char* data, size_t len) {\
443  amps_message_assign_field_value(_body.get().getMessage(),\
444  AMPS_##y, data, len);\
445  return *this;\
446  }\
447  DOX_MAKESETCOMMENT(y) \
448  Message& set##x(const char* str) {\
449  amps_message_set_field_value_nts(_body.get().getMessage(),\
450  AMPS_##y, str);\
451  return *this;\
452  }\
453  DOX_MAKESETCOMMENT(y) \
454  Message& set##x(const char* str,size_t len) {\
455  amps_message_set_field_value(_body.get().getMessage(),\
456  AMPS_##y, str,len);\
457  return *this;\
458  }\
459  DOX_MAKENEWCOMMENT(y) \
460  Message& new##x() {\
461  char buf[Message::IdentifierLength+1];\
462  buf[Message::IdentifierLength] = 0;\
463  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\
464  amps_message_set_field_value_nts(_body.get().getMessage(),\
465  AMPS_##y, buf);\
466  return *this;\
467  } \
468  DOX_CLOSEGROUP()
469 
470 
511  class Message
512  {
513  RefHandle<MessageImpl> _body;
514 
515  Message(MessageImpl* body_) : _body(body_) { ; }
516 
517  public:
518  typedef AMPS::Field Field;
519 
522  static const unsigned int IdentifierLength = 32;
523 
526  static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
527 
534  Message(amps_handle message_, bool owner_ = false)
535  : _body(new MessageImpl(message_, owner_))
536  {
537  }
538 
542  Message() : _body(new MessageImpl())
543  {
544  }
545 
548  Message deepCopy(void) const
549  {
550  return Message(_body.get().copy());
551  }
552 
563  class Options
564  {
565  public:
566  static const char* None(void)
567  {
568  return AMPS_OPTIONS_NONE;
569  }
570  static const char* Live(void)
571  {
572  return AMPS_OPTIONS_LIVE;
573  }
574  static const char* OOF(void)
575  {
576  return AMPS_OPTIONS_OOF;
577  }
578  static const char* Replace(void)
579  {
580  return AMPS_OPTIONS_REPLACE;
581  }
582  static const char* NoEmpties(void)
583  {
584  return AMPS_OPTIONS_NOEMPTIES;
585  }
586  static const char* SendKeys(void)
587  {
588  return AMPS_OPTIONS_SENDKEYS;
589  }
590  static const char* Timestamp(void)
591  {
592  return AMPS_OPTIONS_TIMESTAMP;
593  }
594  static const char* NoSowKey(void)
595  {
596  return AMPS_OPTIONS_NOSOWKEY;
597  }
598  static const char* Cancel(void)
599  {
600  return AMPS_OPTIONS_CANCEL;
601  }
602  static const char* Resume(void)
603  {
604  return AMPS_OPTIONS_RESUME;
605  }
606  static const char* Pause(void)
607  {
608  return AMPS_OPTIONS_PAUSE;
609  }
610  static const char* FullyDurable(void)
611  {
612  return AMPS_OPTIONS_FULLY_DURABLE;
613  }
614  static const char* Expire(void)
615  {
616  return AMPS_OPTIONS_EXPIRE;
617  }
618  static std::string Conflation(const char* conflation_)
619  {
620  char buf[64];
621  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
622  return buf;
623  }
624  static std::string ConflationKey(const char* conflationKey_)
625  {
626  std::string option("conflation_key=");
627  option.append(conflationKey_).append(",");
628  return option;
629  }
630  static std::string TopN(int topN_)
631  {
632  char buf[24];
633  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
634  return buf;
635  }
636  static std::string MaxBacklog(int maxBacklog_)
637  {
638  char buf[24];
639  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
640  return buf;
641  }
642  static std::string Rate(const char* rate_)
643  {
644  char buf[64];
645  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
646  return buf;
647  }
648  static std::string RateMaxGap(const char* rateMaxGap_)
649  {
650  char buf[64];
651  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
652  return buf;
653  }
654  static std::string SkipN(int skipN_)
655  {
656  char buf[24];
657  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
658  return buf;
659  }
660 
661  static std::string Projection(const std::string& projection_)
662  {
663  return "projection=[" + projection_ + "],";
664  }
665 
666  template<class Iterator>
667  static std::string Projection(Iterator begin_, Iterator end_)
668  {
669  std::string projection = "projection=[";
670  for (Iterator i = begin_; i != end_; ++i)
671  {
672  projection += *i;
673  projection += ',';
674  }
675  projection.insert(projection.length() - 1, "]");
676  return projection;
677  }
678 
679  static std::string Grouping(const std::string& grouping_)
680  {
681  return "grouping=[" + grouping_ + "],";
682  }
683 
684  template<class Iterator>
685  static std::string Grouping(Iterator begin_, Iterator end_)
686  {
687  std::string grouping = "grouping=[";
688  for (Iterator i = begin_; i != end_; ++i)
689  {
690  grouping += *i;
691  grouping += ',';
692  }
693  grouping.insert(grouping.length() - 1, "]");
694  return grouping;
695  }
696 
697  static std::string Select(const std::string& select_)
698  {
699  return "select=[" + select_ + "],";
700  }
701 
702  template<class Iterator>
703  static std::string Select(Iterator begin_, Iterator end_)
704  {
705  std::string select = "select=[";
706  for (Iterator i = begin_; i != end_; ++i)
707  {
708  select += *i;
709  select += ',';
710  }
711  select.insert(select.length() - 1, "]");
712  return select;
713  }
714 
715  static std::string AckConflationInterval(const std::string& interval_)
716  {
717  return "ack_conflation=" + interval_ + ",";
718  }
719 
720  static std::string AckConflationInterval(const char* interval_)
721  {
722  static const std::string start("ack_conflation=");
723  return start + interval_ + ",";
724  }
725 
728  Options(std::string options_ = "")
729  : _optionStr(options_)
730  , _maxBacklog(0)
731  , _topN(0)
732  , _skipN(0)
733  {;}
734 
735  int getMaxBacklog(void) const
736  {
737  return _maxBacklog;
738  }
739  std::string getConflation(void) const
740  {
741  return _conflation;
742  }
743  std::string getConflationKey(void) const
744  {
745  return _conflationKey;
746  }
747  int getTopN(void) const
748  {
749  return _topN;
750  }
751  std::string getRate(void) const
752  {
753  return _rate;
754  }
755  std::string getRateMaxGap(void) const
756  {
757  return _rateMaxGap;
758  }
759 
763  void setNone(void)
764  {
765  _optionStr.clear();
766  }
767 
777  void setLive(void)
778  {
779  _optionStr += AMPS_OPTIONS_LIVE;
780  }
781 
786  void setOOF(void)
787  {
788  _optionStr += AMPS_OPTIONS_OOF;
789  }
790 
795  void setReplace(void)
796  {
797  _optionStr += AMPS_OPTIONS_REPLACE;
798  }
799 
803  void setNoEmpties(void)
804  {
805  _optionStr += AMPS_OPTIONS_NOEMPTIES;
806  }
807 
811  void setSendKeys(void)
812  {
813  _optionStr += AMPS_OPTIONS_SENDKEYS;
814  }
815 
820  void setTimestamp(void)
821  {
822  _optionStr += AMPS_OPTIONS_TIMESTAMP;
823  }
824 
828  void setNoSowKey(void)
829  {
830  _optionStr += AMPS_OPTIONS_NOSOWKEY;
831  }
832 
836  void setCancel(void)
837  {
838  _optionStr += AMPS_OPTIONS_CANCEL;
839  }
840 
847  void setResume(void)
848  {
849  _optionStr += AMPS_OPTIONS_RESUME;
850  }
851 
862  void setPause(void)
863  {
864  _optionStr += AMPS_OPTIONS_PAUSE;
865  }
866 
873  void setFullyDurable(void)
874  {
875  _optionStr += AMPS_OPTIONS_FULLY_DURABLE;
876  }
877 
888  void setMaxBacklog(int maxBacklog_)
889  {
890  char buf[24];
891  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
892  _optionStr += buf;
893  _maxBacklog = maxBacklog_;
894  }
895 
901  void setConflation(const char* conflation_)
902  {
903  char buf[64];
904  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
905  _optionStr += buf;
906  _conflation = conflation_;
907  }
908 
918  void setConflationKey(const char* conflationKey_)
919  {
920  char buf[64];
921  AMPS_snprintf(buf, sizeof(buf), "conflation_key=%s,", conflationKey_);
922  _optionStr += buf;
923  _conflationKey = conflationKey_;
924  }
925 
931  void setTopN(int topN_)
932  {
933  char buf[24];
934  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
935  _optionStr += buf;
936  _topN = topN_;
937  }
938 
945  void setRate(const char* rate_)
946  {
947  char buf[64];
948  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
949  _optionStr += buf;
950  _rate = rate_;
951  }
952 
967  void setRateMaxGap(const char* rateMaxGap_)
968  {
969  char buf[64];
970  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
971  _optionStr += buf;
972  _rateMaxGap = rateMaxGap_;
973  }
974 
980  void setSkipN(int skipN_)
981  {
982  char buf[24];
983  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
984  _optionStr += buf;
985  _skipN = skipN_;
986  }
987 
992  void setProjection(const std::string& projection_)
993  {
994  _projection = "projection=[" + projection_ + "],";
995  _optionStr += _projection;
996  }
997 
998 
1004  template<class Iterator>
1005  void setProjection(Iterator begin_, Iterator end_)
1006  {
1007  _projection = "projection=[";
1008  for (Iterator i = begin_; i != end_; ++i)
1009  {
1010  _projection += *i;
1011  _projection += ',';
1012  }
1013  _projection.insert(_projection.length() - 1, "]");
1014  _optionStr += _projection;
1015  }
1016 
1021  void setGrouping(const std::string& grouping_)
1022  {
1023  _grouping = "grouping=[" + grouping_ + "],";
1024  _optionStr += _grouping;
1025  }
1026 
1027 
1033  template<class Iterator>
1034  void setGrouping(Iterator begin_, Iterator end_)
1035  {
1036  _grouping = "grouping=[";
1037  for (Iterator i = begin_; i != end_; ++i)
1038  {
1039  _grouping += *i;
1040  _grouping += ',';
1041  }
1042  _grouping.insert(_grouping.length() - 1, "]");
1043  _optionStr += _grouping;
1044  }
1045 
1049  operator const std::string()
1050  {
1051  return _optionStr.substr(0, _optionStr.length() - 1);
1052  }
1053 
1054  private:
1055  std::string _optionStr;
1056  int _maxBacklog;
1057  std::string _conflation;
1058  std::string _conflationKey;
1059  int _topN;
1060  std::string _rate;
1061  std::string _rateMaxGap;
1062  int _skipN;
1063  std::string _projection;
1064  std::string _grouping;
1065  };
1066 
1069  struct AckType
1070  {
1071  typedef enum
1072  {
1073  None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
1074  } Type;
1075  };
1076  AMPS_FIELD(AckType)
1079  static inline AckType::Type decodeSingleAckType(const char* begin, const char* end)
1080  {
1081  switch (end - begin)
1082  {
1083  case 5:
1084  return AckType::Stats;
1085  case 6:
1086  return AckType::Parsed;
1087  case 8:
1088  return AckType::Received;
1089  case 9:
1090  switch (begin[1])
1091  {
1092  case 'e': return AckType::Persisted;
1093  case 'r': return AckType::Processed;
1094  case 'o': return AckType::Completed;
1095  default: break;
1096  }
1097  break;
1098  default:
1099  break;
1100  }
1101  return AckType::None;
1102  }
1106  unsigned getAckTypeEnum() const
1107  {
1108  unsigned result = AckType::None;
1109  const char* data = NULL; size_t len = 0;
1110  amps_message_get_field_value(_body.get().getMessage(), AMPS_AckType, &data, &len);
1111  const char* mark = data;
1112  for (const char* end = data + len; data != end; ++data)
1113  {
1114  if (*data == ',')
1115  {
1116  result |= decodeSingleAckType(mark, data);
1117  mark = data + 1;
1118  }
1119  }
1120  if (mark < data)
1121  {
1122  result |= decodeSingleAckType(mark, data);
1123  }
1124  return result;
1125  }
1129  Message& setAckTypeEnum(unsigned ackType_)
1130  {
1131  if (ackType_ < AckTypeConstants<0>::Entries)
1132  {
1133  amps_message_assign_field_value(_body.get().getMessage(), AMPS_AckType,
1134  AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1135  }
1136  return *this;
1137  }
1138 
1139  AMPS_FIELD(BatchSize)
1140  AMPS_FIELD(Bookmark)
1141  AMPS_FIELD(Command)
1142 
1146  struct Command
1147  {
1148  typedef enum
1149  {
1150  Unknown = 0,
1151  Publish = 1,
1152  Subscribe = 2,
1153  Unsubscribe = 4,
1154  SOW = 8,
1155  Heartbeat = 16,
1156  SOWDelete = 32,
1157  DeltaPublish = 64,
1158  Logon = 128,
1159  SOWAndSubscribe = 256,
1160  DeltaSubscribe = 512,
1161  SOWAndDeltaSubscribe = 1024,
1162  StartTimer = 2048,
1163  StopTimer = 4096,
1164  GroupBegin = 8192,
1165  GroupEnd = 16384,
1166  OOF = 32768,
1167  Ack = 65536,
1168  Flush = 131072,
1169  NoDataCommands = Publish | Unsubscribe | Heartbeat | SOWDelete | DeltaPublish
1170  | Logon | StartTimer | StopTimer | Flush
1171  } Type;
1172  };
1174  Command::Type getCommandEnum() const
1175  {
1176  const char* data = NULL; size_t len = 0;
1177  amps_message_get_field_value(_body.get().getMessage(), AMPS_Command, &data, &len);
1178  switch (len)
1179  {
1180  case 1: return Command::Publish; // -V1037
1181  case 3:
1182  switch (data[0])
1183  {
1184  case 's': return Command::SOW;
1185  case 'o': return Command::OOF;
1186  case 'a': return Command::Ack;
1187  }
1188  break;
1189  case 5:
1190  switch (data[0])
1191  {
1192  case 'l': return Command::Logon;
1193  case 'f': return Command::Flush;
1194  }
1195  break;
1196  case 7:
1197  return Command::Publish; // -V1037
1198  break;
1199  case 9:
1200  switch (data[0])
1201  {
1202  case 's': return Command::Subscribe;
1203  case 'h': return Command::Heartbeat;
1204  case 'g': return Command::GroupEnd;
1205  }
1206  break;
1207  case 10:
1208  switch (data[1])
1209  {
1210  case 'o': return Command::SOWDelete;
1211  case 't': return Command::StopTimer;
1212  }
1213  break;
1214  case 11:
1215  switch (data[0])
1216  {
1217  case 'g': return Command::GroupBegin;
1218  case 'u': return Command::Unsubscribe;
1219  }
1220  break;
1221  case 13:
1222  return Command::DeltaPublish;
1223  case 15:
1224  return Command::DeltaSubscribe;
1225  case 17:
1226  return Command::SOWAndSubscribe;
1227  case 23:
1228  return Command::SOWAndDeltaSubscribe;
1229  }
1230  return Command::Unknown;
1231  }
1232 
1234  Message& setCommandEnum(Command::Type command_)
1235  {
1236  unsigned bits = 0;
1237  unsigned command = command_;
1238  while (command > 0)
1239  {
1240  ++bits;
1241  command >>= 1;
1242  }
1243  amps_message_assign_field_value(_body.get().getMessage(), AMPS_Command,
1244  CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1245  return *this;
1246  }
1247 
1248  AMPS_FIELD(CommandId)
1249  AMPS_FIELD(ClientName)
1250  AMPS_FIELD(CorrelationId)
1251  AMPS_FIELD(Expiration)
1252  AMPS_FIELD(Filter)
1253  AMPS_FIELD(GroupSequenceNumber)
1254  AMPS_FIELD(Heartbeat)
1255  AMPS_FIELD(LeasePeriod)
1256  AMPS_FIELD(Matches)
1257  AMPS_FIELD(MessageLength)
1258  AMPS_FIELD(MessageType)
1259 
1260  DOX_OPENGROUP(Options)
1261  DOX_MAKEGETCOMMENT(Options)
1262  Field getOptions() const
1263  {
1264  Field returnValue;
1265  const char* ptr;
1266  size_t sz;
1267  amps_message_get_field_value(_body.get().getMessage(),
1268  AMPS_Options, &ptr, &sz);
1269  if (sz && ptr[sz - 1] == ',')
1270  {
1271  --sz;
1272  }
1273  returnValue.assign(ptr, sz);
1274  return returnValue;
1275  }
1276 
1277  DOX_MAKEGETRAWCOMMENT(Options)
1278  void getRawOptions(const char** dataptr, size_t* sizeptr) const
1279  {
1280  amps_message_get_field_value(_body.get().getMessage(),
1281  AMPS_Options, dataptr, sizeptr);
1282  if (*sizeptr && *dataptr && (*dataptr)[*sizeptr - 1] == ',')
1283  {
1284  --*sizeptr;
1285  }
1286  return;
1287  }
1288 
1289  DOX_MAKESETCOMMENT(Options)
1290  Message& setOptions(const std::string& v)
1291  {
1292  size_t sz = v.length();
1293  if (sz && v[sz - 1] == ',')
1294  {
1295  --sz;
1296  }
1297  amps_message_set_field_value(_body.get().getMessage(),
1298  AMPS_Options, v.c_str(), sz);
1299  return *this;
1300  }
1301 
1302  DOX_MAKEASSIGNCOMMENT(Options)
1303  Message& assignOptions(const std::string& v)
1304  {
1305  size_t sz = v.length();
1306  if (sz && v[sz - 1] == ',')
1307  {
1308  --sz;
1309  }
1310  amps_message_assign_field_value(_body.get().getMessage(),
1311  AMPS_Options, v.c_str(), sz);
1312  return *this;
1313  }
1314 
1315  DOX_MAKEASSIGNCOMMENT(Options)
1316  Message& assignOptions(const char* data, size_t len)
1317  {
1318  if (len && data[len - 1] == ',')
1319  {
1320  --len;
1321  }
1322  amps_message_assign_field_value(_body.get().getMessage(),
1323  AMPS_Options, data, len);
1324  return *this;
1325  }
1326 
1327  DOX_MAKESETCOMMENT(Options)
1328  Message& setOptions(const char* str)
1329  {
1330  if (str)
1331  {
1332  size_t sz = strlen(str);
1333  if (sz && str[sz - 1] == ',')
1334  {
1335  --sz;
1336  }
1337  amps_message_set_field_value(_body.get().getMessage(),
1338  AMPS_Options, str, sz);
1339  }
1340  else
1341  {
1342  amps_message_set_field_value(_body.get().getMessage(),
1343  AMPS_Options, str, 0);
1344  }
1345  return *this;
1346  }
1347 
1348  DOX_MAKESETCOMMENT(Options)
1349  Message& setOptions(const char* str, size_t len)
1350  {
1351  if (len && str[len - 1] == ',')
1352  {
1353  --len;
1354  }
1355  amps_message_set_field_value(_body.get().getMessage(),
1356  AMPS_Options, str, len);
1357  return *this;
1358  }
1359  DOX_CLOSEGROUP()
1360 
1361  AMPS_FIELD(OrderBy)
1362  AMPS_FIELD(Password)
1363  AMPS_FIELD_ALIAS(QueryId, QueryID)
1364  AMPS_FIELD(Reason)
1365  AMPS_FIELD(RecordsInserted)
1366  AMPS_FIELD(RecordsReturned)
1367  AMPS_FIELD(RecordsUpdated)
1368  AMPS_FIELD(Sequence)
1369  AMPS_FIELD(SowDelete)
1370  AMPS_FIELD(SowKey)
1371  AMPS_FIELD(SowKeys)
1372  AMPS_FIELD(Status)
1373  AMPS_FIELD_ALIAS(SubId, SubscriptionId) // -V524
1374  AMPS_FIELD(SubscriptionIds)
1375  AMPS_FIELD(TimeoutInterval)
1376  AMPS_FIELD(Timestamp)
1377 
1381  Field getTransmissionTime() const
1382  {
1383  return getTimestamp();
1384  }
1385 
1390  void getRawTransmissionTime(const char** dataptr, size_t* sizeptr) const
1391  {
1392  getRawTimestamp(dataptr, sizeptr);
1393  }
1394 
1395  AMPS_FIELD(Topic)
1396  AMPS_FIELD(TopicMatches)
1397  AMPS_FIELD(TopNRecordsReturned)
1398  AMPS_FIELD(Version)
1399  AMPS_FIELD(UserId)
1400 
1405 
1406  Field getData() const
1407  {
1408  Field returnValue;
1409  char* ptr;
1410  size_t sz;
1411  amps_message_get_data(_body.get().getMessage(), &ptr, &sz);
1412  returnValue.assign(ptr, sz);
1413  return returnValue;
1414  }
1415 
1416  void getRawData(const char** data, size_t* sz) const
1417  {
1418  amps_message_get_data(_body.get().getMessage(), (char**)data, sz);
1419  }
1422  Message& setData(const std::string& v_)
1423  {
1424  amps_message_set_data(_body.get().getMessage(), v_.c_str(), v_.length());
1425  return *this;
1426  }
1427  Message& assignData(const std::string& v_)
1428  {
1429  amps_message_assign_data(_body.get().getMessage(), v_.c_str(), v_.length());
1430  return *this;
1431  }
1432 
1436  Message& setData(const char* data_, size_t length_)
1437  {
1438  amps_message_set_data(_body.get().getMessage(), data_, length_);
1439  return *this;
1440  }
1441  Message& assignData(const char* data_, size_t length_)
1442  {
1443  amps_message_assign_data(_body.get().getMessage(), data_, length_);
1444  return *this;
1445  }
1446 
1449  Message& setData(const char* data_)
1450  {
1451  amps_message_set_data_nts(_body.get().getMessage(), data_);
1452  return *this;
1453  }
1454  Message& assignData(const char* data_)
1455  {
1456  amps_message_assign_data(_body.get().getMessage(), data_, strlen(data_));
1457  return *this;
1458  }
1459  amps_handle getMessage() const
1460  {
1461  return _body.get().getMessage();
1462  }
1463  void replace(amps_handle message, bool owner = false)
1464  {
1465  _body.get().replace(message, owner);
1466  }
1467  void disown()
1468  {
1469  _body.get().disown();
1470  }
1471  void invalidate()
1472  {
1473  _body = NULL;
1474  }
1475  bool isValid(void) const
1476  {
1477  return _body.isValid();
1478  }
1479  Message& reset()
1480  {
1481  _body.get().reset();
1482  return *this;
1483  }
1484 
1485  void setBookmarkSeqNo(size_t val)
1486  {
1487  _body.get().setBookmarkSeqNo(val);
1488  }
1489 
1490  size_t getBookmarkSeqNo() const
1491  {
1492  return _body.get().getBookmarkSeqNo();
1493  }
1494 
1495  void setSubscriptionHandle(amps_handle val)
1496  {
1497  _body.get().setSubscriptionHandle(val);
1498  }
1499 
1500  amps_handle getSubscriptionHandle() const
1501  {
1502  return _body.get().getSubscriptionHandle();
1503  }
1504 
1505  void ack(const char* options_ = NULL) const;
1506 
1507  void setClientImpl(ClientImpl* pClientImpl)
1508  {
1509  _body.get().setClientImpl(pClientImpl);
1510  }
1511 
1512  void setIgnoreAutoAck() const
1513  {
1514  _body.get().setIgnoreAutoAck();
1515  }
1516 
1517  bool getIgnoreAutoAck() const
1518  {
1519  return _body.get().getIgnoreAutoAck();
1520  }
1521 
1522  template <class T> // static
1523  void throwFor(const T& /*context_*/, const std::string& ackReason_) const
1524  {
1525  switch (ackReason_[0])
1526  {
1527  case 'a': // auth failure
1528  throw AuthenticationException("Logon failed for user \"" +
1529  (std::string)getUserId() + "\"");
1530  break;
1531  case 'b':
1532  switch (ackReason_.length())
1533  {
1534  case 10: // bad filter
1535  throw BadFilterException("bad filter '" +
1536  (std::string)getFilter() +
1537  "'");
1538  break;
1539  case 11: // bad sow key
1540  if (getSowKeys().len())
1541  {
1542  throw BadSowKeyException("bad sow key '" +
1543  (std::string)getSowKeys() +
1544  "'");
1545  }
1546  else
1547  {
1548  throw BadSowKeyException("bad sow key '" +
1549  (std::string)getSowKey() +
1550  "'");
1551  }
1552  break;
1553  case 15: // bad regex topic
1554  throw BadRegexTopicException("bad regex topic '" +
1555  (std::string)getTopic() +
1556  "'.");
1557  break;
1558  default:
1559  break;
1560  }
1561  break;
1562  case 'd':
1563  if (ackReason_.length() == 23) // duplicate logon attempt
1564  {
1565  throw DuplicateLogonException("Client '" +
1566  (std::string)getClientName() +
1567  "' with userid '" +
1568  (std::string)getUserId() +
1569  "' duplicate logon attempt");
1570  }
1571  break;
1572  case 'i':
1573  if (ackReason_.length() >= 9)
1574  {
1575  switch (ackReason_[8])
1576  {
1577  case 'b': // invalid bookmark
1578  throw InvalidBookmarkException("invalid bookmark '" +
1579  (std::string)getBookmark() +
1580  "'.");
1581  break;
1582  case 'm': // invalid message type
1583  throw CommandException(std::string("invalid message type '") +
1584  (std::string)getMessageType() +
1585  "'.");
1586  break;
1587  case 'o':
1588  if (ackReason_[9] == 'p') // invalid options
1589  {
1590  throw InvalidOptionsException("invalid options '" +
1591  (std::string)getOptions() +
1592  "'.");
1593  }
1594  else if (ackReason_[9] == 'r') // invalid order by
1595  {
1596  throw InvalidOrderByException("invalid order by '" +
1597  (std::string)getOrderBy() +
1598  "'.");
1599  }
1600  break;
1601  case 's': // invalid subId
1602  throw InvalidSubIdException("invalid subid '" +
1603  (std::string)getSubscriptionId() +
1604  "'.");
1605  break;
1606  case 't':
1607  if (ackReason_.length() == 13) // invalid topic
1608  {
1609  throw InvalidTopicException("invalid topic '" +
1610  (std::string)getTopic() +
1611  "'.");
1612  }
1613  else if (ackReason_.length() == 23) // invalid topic or filter
1614  {
1615  throw InvalidTopicException("invalid topic or filter. Topic '" +
1616  (std::string)getTopic() +
1617  "' Filter '" +
1618  (std::string)getFilter() +
1619  "'.");
1620  }
1621  break;
1622  default:
1623  break;
1624  }
1625  }
1626  break;
1627  case 'l':
1628  if (ackReason_.length() == 14) // logon required
1629  {
1630  throw LogonRequiredException("logon required before command");
1631  }
1632  break;
1633  case 'n':
1634  switch (ackReason_[4])
1635  {
1636  case ' ': // name in use
1637  throw NameInUseException("name in use '" +
1638  (std::string)getClientName() +
1639  "'.");
1640  break;
1641  case 'e': // not entitled
1642  throw NotEntitledException("User \"" +
1643  (std::string)getUserId() +
1644  "\" not entitled to topic \"" +
1645  (std::string)getTopic() +
1646  "\".");
1647  break;
1648  case 'i': // no filter or bookmark
1649  throw MissingFieldsException("command sent with no filter or bookmark.");
1650  break;
1651  case 'l': // no client name
1652  throw MissingFieldsException("command sent with no client name.");
1653  break;
1654  case 'o': // no topic or filter
1655  throw MissingFieldsException("command sent with no topic or filter.");
1656  break;
1657  case 's': // not supported
1658  throw CommandException("operation on topic '" +
1659  (std::string)getTopic() +
1660  "' with options '" +
1661  (std::string)getOptions() +
1662  "' not supported.");
1663  break;
1664  default:
1665  break;
1666  }
1667  break;
1668  case 'o':
1669  switch (ackReason_.length())
1670  {
1671  case 16: // orderby required
1672  throw MissingFieldsException("orderby required");
1673  break;
1674  case 17: // orderby too large
1675  throw CommandException("orderby too large '" +
1676  (std::string)getOrderBy() +
1677  "'.");
1678  break;
1679  }
1680  break;
1681  case 'p':
1682  throw CommandException("projection clause too large in options '" +
1683  (std::string)getOptions() +
1684  "'.");
1685  break;
1686  case 'r':
1687  switch (ackReason_[2])
1688  {
1689  case 'g': // regex topic not supported
1690  throw BadRegexTopicException("'regex topic not supported '" +
1691  (std::string)getTopic() +
1692  "'.");
1693  break;
1694  default:
1695  break;
1696  }
1697  break;
1698  case 's':
1699  switch (ackReason_[5])
1700  {
1701  case ' ': // subid in use
1702  throw SubidInUseException("subid in use '" +
1703  (std::string)getSubscriptionId() +
1704  "'.");
1705  break;
1706  case 'e': // sow_delete command only supports one of: filter, sow_keys, bookmark, or data
1707  throw CommandException("sow_delete command only supports one of: filter '" +
1708  (std::string)getFilter() +
1709  "', sow_keys '" +
1710  (std::string)getSowKeys() +
1711  "', bookmark '" +
1712  (std::string)getBookmark() +
1713  "', or data '" +
1714  (std::string)getData() +
1715  "'.");
1716  break;
1717  case 't': // sow store failed
1718  throw PublishException("sow store failed.");
1719  break;
1720  default:
1721  break;
1722  }
1723  break;
1724  case 't':
1725  switch (ackReason_[2])
1726  {
1727  case ' ': // tx store failure
1728  throw PublishException("tx store failure.");
1729  break;
1730  case 'n': // txn replay failed
1731  throw CommandException("txn replay failed for '" +
1732  (std::string)getSubId() +
1733  "'.");
1734  break;
1735  }
1736  break;
1737  default:
1738  break;
1739  }
1740  throw CommandException("Error from server while processing this command: '" +
1741  ackReason_ + "'");
1742  }
1743  };
1744 
1745  inline std::string
1746  operator+(const std::string& lhs, const Message::Field& rhs)
1747  {
1748  return lhs + std::string(rhs);
1749  }
1750 
1751  inline std::basic_ostream<char>&
1752  operator<<(std::basic_ostream<char>& os, const Message::Field& rhs)
1753  {
1754  os.write(rhs.data(), (std::streamsize)rhs.len());
1755  return os;
1756  }
1757  inline bool
1758  AMPS::Field::operator<(const AMPS::Field& rhs) const
1759  {
1760  return std::lexicographical_compare(data(), data() + len(), rhs.data(), rhs.data() + rhs.len());
1761  }
1762 
1763 }
1764 
1765 #endif
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:873
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:180
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1422
AMPSDLL amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:967
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1174
AMPSDLL amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:803
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:548
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:786
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1129
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:828
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:109
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
STL namespace.
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1234
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:811
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:196
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:563
AMPSDLL void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:862
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:795
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1146
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1449
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1034
AMPSDLL void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
AMPSDLL void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:820
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:156
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:847
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:901
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:175
AMPSDLL void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
AMPSDLL void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:728
AMPSDLL void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:945
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:1069
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1021
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:931
AMPSDLL void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:888
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1390
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:122
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:918
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:992
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:980
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:534
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:763
Implementation class for a Message.
Definition: Message.hpp:87
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1005
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1106
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1436
Definition: ampsplusplus.hpp:103
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:836
Message()
Construct a new, empty Message.
Definition: Message.hpp:542
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:777