AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.0
Message.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef __AMPS_MESSAGE_HPP__
26 #define __AMPS_MESSAGE_HPP__
27 #include "util.hpp"
28 #include "constants.hpp"
29 #include "amps_generated.h"
30 #include "Field.hpp"
31 #include <stdio.h>
32 #include <algorithm>
33 #include <ostream>
34 #include <string>
35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1
36 
37 // Macros for determing what parts of TR1 we can depend on
38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600)
39 #define AMPS_USE_FUNCTIONAL 1
40 #endif
41 
42 #if (_MSC_VER >= 1600) || ( (__GNUC__ >= 4) && (__GNUC_MINOR__) >=5 )
43 #define AMPS_USE_LAMBDAS 1
44 #endif
45 
46 #ifdef AMPS_USE_FUNCTIONAL
47 #include <functional>
48 #endif
49 
50 #include <algorithm>
51 
55 
56 #define AMPS_OPTIONS_NONE ""
57 #define AMPS_OPTIONS_LIVE "live,"
58 #define AMPS_OPTIONS_OOF "oof,"
59 #define AMPS_OPTIONS_REPLACE "replace,"
60 #define AMPS_OPTIONS_NOEMPTIES "no_empties,"
61 #define AMPS_OPTIONS_SENDKEYS "send_keys,"
62 #define AMPS_OPTIONS_TIMESTAMP "timestamp,"
63 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey,"
64 #define AMPS_OPTIONS_CANCEL "cancel,"
65 #define AMPS_OPTIONS_RESUME "resume,"
66 #define AMPS_OPTIONS_PAUSE "pause,"
67 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable,"
68 #define AMPS_OPTIONS_EXPIRE "expire,"
69 #define AMPS_OPTIONS_TOPN(x) "top_n=##x,"
70 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x,"
71 #define AMPS_OPTIONS_RATE(x) "rate=##x,"
72 
73 namespace AMPS
74 {
75 typedef void* amps_subscription_handle;
76 
77 class ClientImpl;
78 
83 class MessageImpl : public RefBody
84 {
85 private:
86  amps_handle _message;
87  Mutex _lock;
88  bool _owner;
89  mutable bool _isIgnoreAutoAck;
90  size_t _bookmarkSeqNo;
91  amps_subscription_handle _subscription;
92  ClientImpl* _clientImpl;
93 public:
105  MessageImpl(amps_handle message_, bool owner_ = false,
106  bool ignoreAutoAck_ = false, size_t bookmarkSeqNo_ = 0,
107  amps_subscription_handle subscription_ = NULL,
108  ClientImpl* clientImpl_=NULL)
109  : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
110  , _bookmarkSeqNo(bookmarkSeqNo_)
111  , _subscription(subscription_), _clientImpl(clientImpl_)
112  {
113  }
114 
119  : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
120  {
121  // try to create one
122  _message = amps_message_create(NULL);
123  }
124 
125  virtual ~MessageImpl()
126  {
127  if (_owner && _message) amps_message_destroy(_message);
128  }
129 
130  MessageImpl* copy() const
131  {
132  amps_handle copy = amps_message_copy(_message);
133  return new MessageImpl(copy, true, _isIgnoreAutoAck, _bookmarkSeqNo,
134  _subscription,_clientImpl);
135  }
136 
137  void setClientImpl(ClientImpl* clientImpl_)
138  {
139  _clientImpl = clientImpl_;
140  }
141 
142  ClientImpl* clientImpl(void) const
143  {
144  return _clientImpl;
145  }
146 
150  {
151  return _message;
152  }
153 
154  void reset()
155  {
156  Lock<Mutex> l(_lock);
157  amps_message_reset(_message);
158  _bookmarkSeqNo = 0;
159  _subscription = NULL;
160  _isIgnoreAutoAck = false;
161  }
162 
168  void replace(amps_handle message_, bool owner_ = false)
169  {
170  Lock<Mutex> l(_lock);
171  if (_message == message_) return;
172  if (_owner && _message)
173  {
174  amps_message_destroy(_message);
175  }
176  _owner = owner_;
177  _message = message_;
178  _bookmarkSeqNo = 0;
179  _subscription = NULL;
180  _isIgnoreAutoAck = false;
181  }
182 
183  void disown()
184  {
185  Lock<Mutex> l(_lock);
186  _owner = false;
187  }
188 
189  static unsigned long newId()
190  {
191  static ATOMIC_TYPE _id = 0;
192  return (unsigned long)(AMPS_FETCH_ADD(&_id,1));
193  }
194 
195  void setBookmarkSeqNo(size_t val_)
196  {
197  _bookmarkSeqNo = val_;
198  }
199 
200  size_t getBookmarkSeqNo(void) const
201  {
202  return _bookmarkSeqNo;
203  }
204 
205  void setSubscriptionHandle(amps_subscription_handle subscription_)
206  {
207  _subscription = subscription_;
208  }
209 
210  amps_subscription_handle getSubscriptionHandle(void) const
211  {
212  return _subscription;
213  }
214 
215  void setIgnoreAutoAck() const
216  {
217  _isIgnoreAutoAck = true;
218  }
219 
220  bool getIgnoreAutoAck() const
221  {
222  return _isIgnoreAutoAck;
223  }
224 };
225 
226 
227 // This block of macros works with the Doxygen preprocessor to
228 // create documentation comments for fields defined with the AMPS_FIELD macro.
229 // A C++ compiler removes comments before expanding macros, so these macros
230 // must ONLY be defined for Doxygen and not for actual compilation.
231 
232 #ifdef DOXYGEN_PREPROCESSOR
233 
234 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## /
235 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions)
236 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) DOX_GROUPNAME(s)
237 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@})
238 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
239 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
240 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
241 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
242 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as CommandId and SubId.)
243 
244 #define DOX_ALIAS_MAKEGETCOMMENT(x,y) DOX_COMMENTHEAD( Retrieves the value of the y [x] header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
245 #define DOX_ALIAS_MAKEGETRAWCOMMENT(x,y) DOX_COMMENTHEAD( Retrieves the value of the y [x] header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
246 #define DOX_ALIAS_MAKESETCOMMENT(x,y) DOX_COMMENTHEAD( Sets the value of the y [x] header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
247 #define DOX_ALIAS_MAKEASSIGNCOMMENT(x,y) DOX_COMMENTHEAD( Sets the value of the y [x] header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
248 #define DOX_ALIAS_MAKENEWCOMMENT(x,y) DOX_COMMENTHEAD(Creates and sets a new sequential value for the y [x] header for this Message. This function is most useful for headers such as CommandId and SubId.)
249 
250 #else
251 
252 #define DOX_COMMENTHEAD(s)
253 #define DOX_GROUPNAME(s)
254 #define DOX_OPENGROUP(x)
255 #define DOX_CLOSEGROUP(x)
256 #define DOX_MAKEGETCOMMENT(x)
257 #define DOX_MAKEGETRAWCOMMENT(x)
258 #define DOX_MAKESETCOMMENT(x)
259 #define DOX_MAKEASSIGNCOMMENT(x)
260 #define DOX_MAKENEWCOMMENT(x)
261 #define DOX_ALIAS_MAKEGETCOMMENT(x,y)
262 #define DOX_ALIAS_MAKEGETRAWCOMMENT(x,y)
263 #define DOX_ALIAS_MAKESETCOMMENT(x,y)
264 #define DOX_ALIAS_MAKEASSIGNCOMMENT(x,y)
265 #define DOX_ALIAS_MAKENEWCOMMENT(x,y)
266 
267 #endif
268 
269 // Macro for defining all of the necessary methods for a field in an AMPS
270 // message.
271 
272 
273 #define AMPS_FIELD(x) \
274  DOX_OPENGROUP(x) \
275  DOX_MAKEGETCOMMENT(x) \
276  Field get##x() const { \
277  Field returnValue;\
278  const char* ptr;\
279  size_t sz;\
280  amps_message_get_field_value(_body.get().getMessage(),\
281  AMPS_##x, &ptr, &sz);\
282  returnValue.assign(ptr, sz);\
283  return returnValue;\
284  }\
285  DOX_MAKEGETRAWCOMMENT(x) \
286  void getRaw##x(const char** dataptr, size_t* sizeptr) const { \
287  amps_message_get_field_value(_body.get().getMessage(), \
288  AMPS_##x, dataptr, sizeptr);\
289  return;\
290  }\
291  DOX_MAKESETCOMMENT(x) \
292  Message& set##x(const std::string& v) {\
293  amps_message_set_field_value(_body.get().getMessage(),\
294  AMPS_##x, v.c_str(), v.length());\
295  return *this;\
296  }\
297  DOX_MAKESETCOMMENT(x) \
298  Message& set##x(amps_uint64_t v) {\
299  char buf[22];\
300  AMPS_snprintf_amps_uint64_t(buf,22,v);\
301  amps_message_set_field_value_nts(_body.get().getMessage(),\
302  AMPS_##x, buf);\
303  return *this;\
304  }\
305  DOX_MAKEASSIGNCOMMENT(x) \
306  Message& assign##x(const std::string& v) {\
307  amps_message_assign_field_value(_body.get().getMessage(),\
308  AMPS_##x, v.c_str(), v.length());\
309  return *this;\
310  }\
311  DOX_MAKEASSIGNCOMMENT(x) \
312  Message& assign##x(const char* data, size_t len) {\
313  amps_message_assign_field_value(_body.get().getMessage(),\
314  AMPS_##x, data, len);\
315  return *this;\
316  }\
317  DOX_MAKESETCOMMENT(x) \
318  Message& set##x(const char* str) {\
319  amps_message_set_field_value_nts(_body.get().getMessage(),\
320  AMPS_##x, str);\
321  return *this;\
322  }\
323  DOX_MAKESETCOMMENT(x) \
324  Message& set##x(const char* str,size_t len) {\
325  amps_message_set_field_value(_body.get().getMessage(),\
326  AMPS_##x, str,len);\
327  return *this;\
328  }\
329  DOX_MAKENEWCOMMENT(x) \
330  Message& new##x() {\
331  char buf[Message::IdentifierLength+1];\
332  buf[Message::IdentifierLength] = 0;\
333  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%lx" , (size_t)(_body.get().newId()));\
334  amps_message_set_field_value_nts(_body.get().getMessage(),\
335  AMPS_##x, buf);\
336  return *this;\
337  } \
338  DOX_CLOSEGROUP(x)
339 
340 #define AMPS_FIELD_ALIAS(x,y) \
341  DOX_OPENGROUP(x) \
342  DOX_ALIAS_MAKEGETCOMMENT(x,y) \
343  Field get##x() const { \
344  Field returnValue;\
345  const char* ptr;\
346  size_t sz;\
347  amps_message_get_field_value(_body.get().getMessage(),\
348  AMPS_##y, &ptr, &sz);\
349  returnValue.assign(ptr, sz);\
350  return returnValue;\
351  }\
352  DOX_ALIAS_MAKEGETRAWCOMMENT(x,y) \
353  void getRaw##x(const char** dataptr, size_t* sizeptr) const { \
354  amps_message_get_field_value(_body.get().getMessage(), \
355  AMPS_##y, dataptr, sizeptr);\
356  return;\
357  }\
358  DOX_ALIAS_MAKESETCOMMENT(x,y) \
359  Message& set##x(const std::string& v) {\
360  amps_message_set_field_value(_body.get().getMessage(),\
361  AMPS_##y, v.c_str(), v.length());\
362  return *this;\
363  }\
364  DOX_ALIAS_MAKESETCOMMENT(x,y) \
365  Message& set##x(amps_uint64_t v) {\
366  char buf[22];\
367  AMPS_snprintf_amps_uint64_t(buf,22,v);\
368  amps_message_set_field_value_nts(_body.get().getMessage(),\
369  AMPS_##y, buf);\
370  return *this;\
371  }\
372  DOX_ALIAS_MAKEASSIGNCOMMENT(x,y) \
373  Message& assign##x(const std::string& v) {\
374  amps_message_assign_field_value(_body.get().getMessage(),\
375  AMPS_##y, v.c_str(), v.length());\
376  return *this;\
377  }\
378  DOX_ALIAS_MAKEASSIGNCOMMENT(x,y) \
379  Message& assign##x(const char* data, size_t len) {\
380  amps_message_assign_field_value(_body.get().getMessage(),\
381  AMPS_##y, data, len);\
382  return *this;\
383  }\
384  DOX_ALIAS_MAKESETCOMMENT(x,y) \
385  Message& set##x(const char* str) {\
386  amps_message_set_field_value_nts(_body.get().getMessage(),\
387  AMPS_##y, str);\
388  return *this;\
389  }\
390  DOX_ALIAS_MAKESETCOMMENT(x,y) \
391  Message& set##x(const char* str,size_t len) {\
392  amps_message_set_field_value(_body.get().getMessage(),\
393  AMPS_##y, str,len);\
394  return *this;\
395  }\
396  DOX_ALIAS_MAKENEWCOMMENT(x,y) \
397  Message& new##x() {\
398  char buf[Message::IdentifierLength+1];\
399  buf[Message::IdentifierLength] = 0;\
400  AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , _body.get().newId());\
401  amps_message_set_field_value_nts(_body.get().getMessage(),\
402  AMPS_##y, buf);\
403  return *this;\
404  } \
405  DOX_CLOSEGROUP(x)
406 
447 class Message
448 {
449  RefHandle<MessageImpl> _body;
450 
451  Message(MessageImpl* body_) : _body(body_) { ; }
452 
453 public:
454  typedef AMPS::Field Field;
455 
458  static const unsigned int IdentifierLength = 32;
459 
462  static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
463 
470  Message(amps_handle message_, bool owner_ = false)
471  : _body(new MessageImpl(message_, owner_))
472  {
473  }
474 
478  Message() : _body(new MessageImpl())
479  {
480  }
481 
484  Message deepCopy(void) const
485  {
486  return Message(_body.get().copy());
487  }
488 
499  class Options
500  {
501  public:
502  static const char* None(void) { return AMPS_OPTIONS_NONE; }
503  static const char* Live(void) { return AMPS_OPTIONS_LIVE; }
504  static const char* OOF(void) { return AMPS_OPTIONS_OOF; }
505  static const char* Replace(void) { return AMPS_OPTIONS_REPLACE; }
506  static const char* NoEmpties(void) { return AMPS_OPTIONS_NOEMPTIES; }
507  static const char* SendKeys(void) { return AMPS_OPTIONS_SENDKEYS; }
508  static const char* Timestamp(void) { return AMPS_OPTIONS_TIMESTAMP; }
509  static const char* NoSowKey(void) { return AMPS_OPTIONS_NOSOWKEY; }
510  static const char* Cancel(void) { return AMPS_OPTIONS_CANCEL; }
511  static const char* Resume(void) { return AMPS_OPTIONS_RESUME; }
512  static const char* Pause(void) { return AMPS_OPTIONS_PAUSE; }
513  static const char* FullyDurable(void) { return AMPS_OPTIONS_FULLY_DURABLE; }
514  static const char* Expire(void) { return AMPS_OPTIONS_EXPIRE; }
515  static std::string Conflation(const char* conflation_)
516  {
517  char buf[64];
518  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
519  return buf;
520  }
521  static std::string ConflationKey(const char* conflationKey_)
522  {
523  std::string option("conflation_key=");
524  option.append(conflationKey_).append(",");
525  return option;
526  }
527  static std::string TopN(int topN_)
528  {
529  char buf[24];
530  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
531  return buf;
532  }
533  static std::string MaxBacklog(int maxBacklog_)
534  {
535  char buf[24];
536  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
537  return buf;
538  }
539  static std::string Rate(const char* rate_)
540  {
541  char buf[64];
542  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
543  return buf;
544  }
545  static std::string RateMaxGap(const char* rateMaxGap_)
546  {
547  char buf[64];
548  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
549  return buf;
550  }
551  static std::string SkipN(int skipN_)
552  {
553  char buf[24];
554  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
555  return buf;
556  }
557 
558  static std::string Projection(const std::string& projection_)
559  {
560  return "projection=[" + projection_ + "],";
561  }
562 
563  template<class Iterator>
564  static std::string Projection(Iterator begin_, Iterator end_)
565  {
566  std::string projection = "projection=[";
567  for (Iterator i = begin_; i != end_; ++i)
568  {
569  projection += *i;
570  projection += ',';
571  }
572  projection.insert(projection.length() - 1, "]");
573  return projection;
574  }
575 
576  static std::string Grouping(const std::string& grouping_)
577  {
578  return "grouping=[" + grouping_ + "],";
579  }
580 
581  template<class Iterator>
582  static std::string Grouping(Iterator begin_, Iterator end_)
583  {
584  std::string grouping = "grouping=[";
585  for (Iterator i = begin_; i != end_; ++i)
586  {
587  grouping += *i;
588  grouping += ',';
589  }
590  grouping.insert(grouping.length() - 1, "]");
591  return grouping;
592  }
593 
594  static std::string Select(const std::string& select_)
595  {
596  return "select=[" + select_ + "],";
597  }
598 
599  template<class Iterator>
600  static std::string Select(Iterator begin_, Iterator end_)
601  {
602  std::string select = "select=[";
603  for (Iterator i = begin_; i != end_; ++i)
604  {
605  select += *i;
606  select += ',';
607  }
608  select.insert(select.length() - 1, "]");
609  return select;
610  }
611 
612  static std::string AckConflationInterval(const std::string& interval_)
613  {
614  return "ack_conflation=" + interval_ + ",";
615  }
616 
617  static std::string AckConflationInterval(const char* interval_)
618  {
619  static const std::string start("ack_conflation=");
620  return start + interval_ + ",";
621  }
622 
625  Options(std::string options_ = "")
626  : _optionStr(options_)
627  , _maxBacklog(0)
628  , _topN(0)
629  , _skipN(0)
630  {;}
631 
632  int getMaxBacklog(void) const { return _maxBacklog; }
633  std::string getConflation(void) const { return _conflation; }
634  std::string getConflationKey(void) const { return _conflationKey; }
635  int getTopN(void) const { return _topN; }
636  std::string getRate(void) const { return _rate; }
637  std::string getRateMaxGap(void) const { return _rateMaxGap; }
638 
642  void setNone(void) { _optionStr.clear(); }
643 
653  void setLive(void) { _optionStr += AMPS_OPTIONS_LIVE; }
654 
659  void setOOF(void) { _optionStr += AMPS_OPTIONS_OOF; }
660 
665  void setReplace(void) { _optionStr += AMPS_OPTIONS_REPLACE; }
666 
670  void setNoEmpties(void) { _optionStr += AMPS_OPTIONS_NOEMPTIES; }
671 
675  void setSendKeys(void) { _optionStr += AMPS_OPTIONS_SENDKEYS; }
676 
681  void setTimestamp(void) { _optionStr += AMPS_OPTIONS_TIMESTAMP; }
682 
686  void setNoSowKey(void) { _optionStr += AMPS_OPTIONS_NOSOWKEY; }
687 
691  void setCancel(void) { _optionStr += AMPS_OPTIONS_CANCEL; }
692 
699  void setResume(void) { _optionStr += AMPS_OPTIONS_RESUME; }
700 
711  void setPause(void) { _optionStr += AMPS_OPTIONS_PAUSE; }
712 
719  void setFullyDurable(void) { _optionStr += AMPS_OPTIONS_FULLY_DURABLE; }
720 
731  void setMaxBacklog(int maxBacklog_)
732  {
733  char buf[24];
734  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
735  _optionStr += buf;
736  _maxBacklog = maxBacklog_;
737  }
738 
744  void setConflation(const char* conflation_)
745  {
746  char buf[64];
747  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
748  _optionStr += buf;
749  _conflation = conflation_;
750  }
751 
761  void setConflationKey(const char* conflationKey_)
762  {
763  char buf[64];
764  AMPS_snprintf(buf, sizeof(buf), "conflation_key=%s,", conflationKey_);
765  _optionStr += buf;
766  _conflationKey = conflationKey_;
767  }
768 
774  void setTopN(int topN_)
775  {
776  char buf[24];
777  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
778  _optionStr += buf;
779  _topN = topN_;
780  }
781 
788  void setRate(const char* rate_)
789  {
790  char buf[64];
791  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
792  _optionStr += buf;
793  _rate = rate_;
794  }
795 
810  void setRateMaxGap(const char* rateMaxGap_)
811  {
812  char buf[64];
813  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
814  _optionStr += buf;
815  _rateMaxGap = rateMaxGap_;
816  }
817 
823  void setSkipN(int skipN_)
824  {
825  char buf[24];
826  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
827  _optionStr += buf;
828  _skipN = skipN_;
829  }
830 
835  void setProjection(const std::string& projection_)
836  {
837  _projection = "projection=[" + projection_ + "],";
838  _optionStr += _projection;
839  }
840 
841 
847  template<class Iterator>
848  void setProjection(Iterator begin_, Iterator end_)
849  {
850  _projection = "projection=[";
851  for (Iterator i = begin_; i != end_; ++i)
852  {
853  _projection += *i;
854  _projection += ',';
855  }
856  _projection.insert(_projection.length() - 1, "]");
857  _optionStr += _projection;
858  }
859 
864  void setGrouping(const std::string& grouping_)
865  {
866  _grouping = "grouping=[" + grouping_ + "],";
867  _optionStr += _grouping;
868  }
869 
870 
876  template<class Iterator>
877  void setGrouping(Iterator begin_, Iterator end_)
878  {
879  _grouping = "grouping=[";
880  for (Iterator i = begin_; i != end_; ++i)
881  {
882  _grouping += *i;
883  _grouping += ',';
884  }
885  _grouping.insert(_grouping.length() - 1, "]");
886  _optionStr += _grouping;
887  }
888 
892  operator const std::string()
893  {
894  return _optionStr.substr(0, _optionStr.length()-1);
895  }
896 
897  private:
898  std::string _optionStr;
899  int _maxBacklog;
900  std::string _conflation;
901  std::string _conflationKey;
902  int _topN;
903  std::string _rate;
904  std::string _rateMaxGap;
905  int _skipN;
906  std::string _projection;
907  std::string _grouping;
908  };
909 
913  struct Command
914  {
915  typedef enum
916  {
917  Unknown = 0,
918  Publish = 1,
919  Subscribe = 2,
920  Unsubscribe = 4,
921  SOW = 8,
922  Heartbeat = 16,
923  SOWDelete = 32,
924  DeltaPublish = 64,
925  Logon = 128,
926  SOWAndSubscribe = 256,
927  DeltaSubscribe = 512,
928  SOWAndDeltaSubscribe = 1024,
929  StartTimer = 2048,
930  StopTimer = 4096,
931  GroupBegin = 8192,
932  GroupEnd = 16384,
933  OOF = 32768,
934  Ack = 65536,
935  Flush = 131072,
936  NoDataCommands = Publish|Unsubscribe|Heartbeat|SOWDelete|DeltaPublish
937  |Logon|StartTimer|StopTimer|Flush
938  } Type;
939  };
941  Command::Type getCommandEnum() const
942  {
943  const char* data = NULL; size_t len = 0;
944  amps_message_get_field_value(_body.get().getMessage(), AMPS_Command, &data, &len);
945  switch (len)
946  {
947  case 1: return Command::Publish; // -V1037
948  case 3:
949  switch (data[0])
950  {
951  case 's': return Command::SOW;
952  case 'o': return Command::OOF;
953  case 'a': return Command::Ack;
954  }
955  break;
956  case 5:
957  switch (data[0])
958  {
959  case 'l': return Command::Logon;
960  case 'f': return Command::Flush;
961  }
962  break;
963  case 7:
964  return Command::Publish; // -V1037
965  break;
966  case 9:
967  switch (data[0])
968  {
969  case 's': return Command::Subscribe;
970  case 'h': return Command::Heartbeat;
971  case 'g': return Command::GroupEnd;
972  }
973  break;
974  case 10:
975  switch (data[1])
976  {
977  case 'o': return Command::SOWDelete;
978  case 't': return Command::StopTimer;
979  }
980  break;
981  case 11:
982  switch (data[0])
983  {
984  case 'g': return Command::GroupBegin;
985  case 'u': return Command::Unsubscribe;
986  }
987  break;
988  case 13:
989  return Command::DeltaPublish;
990  case 15:
991  return Command::DeltaSubscribe;
992  case 17:
993  return Command::SOWAndSubscribe;
994  case 23:
995  return Command::SOWAndDeltaSubscribe;
996  }
997  return Command::Unknown;
998  }
999 
1001  Message& setCommandEnum(Command::Type command_)
1002  {
1003  unsigned bits = 0;
1004  unsigned command = command_;
1005  while (command > 0) { ++bits; command >>= 1; }
1006  amps_message_assign_field_value(_body.get().getMessage(), AMPS_Command,
1007  CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1008  return *this;
1009  }
1010 
1011  AMPS_FIELD(Command)
1012  AMPS_FIELD(CommandId)
1013  AMPS_FIELD(ClientName)
1014  AMPS_FIELD(UserId)
1015  AMPS_FIELD(Timestamp)
1016 
1020  Field getTransmissionTime() const
1021  {
1022  return getTimestamp();
1023  }
1024 
1029  void getRawTransmissionTime(const char** dataptr, size_t* sizeptr) const
1030  {
1031  getRawTimestamp(dataptr, sizeptr);
1032  }
1033 
1034  AMPS_FIELD(Topic)
1035  AMPS_FIELD(Filter)
1036  AMPS_FIELD(MessageType)
1037 
1040  struct AckType
1041  {
1042  typedef enum
1043  {
1044  None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
1045  } Type;
1046  };
1047  AMPS_FIELD(AckType);
1050  static inline AckType::Type decodeSingleAckType(const char* begin, const char* end)
1051  {
1052  switch (end - begin)
1053  {
1054  case 5:
1055  return AckType::Stats;
1056  case 6:
1057  return AckType::Parsed;
1058  case 8:
1059  return AckType::Received;
1060  case 9:
1061  switch (begin[1])
1062  {
1063  case 'e': return AckType::Persisted;
1064  case 'r': return AckType::Processed;
1065  case 'o': return AckType::Completed;
1066  default: break;
1067  }
1068  break;
1069  default:
1070  break;
1071  }
1072  return AckType::None;
1073  }
1077  unsigned getAckTypeEnum() const
1078  {
1079  unsigned result = AckType::None;
1080  const char* data = NULL; size_t len = 0;
1081  amps_message_get_field_value(_body.get().getMessage(), AMPS_AckType, &data, &len);
1082  const char* mark = data;
1083  for (const char* end = data + len; data != end; ++data)
1084  {
1085  if (*data == ',')
1086  {
1087  result |= decodeSingleAckType(mark, data);
1088  mark = data + 1;
1089  }
1090  }
1091  if (mark < data) result |= decodeSingleAckType(mark, data);
1092  return result;
1093  }
1097  Message& setAckTypeEnum(unsigned ackType_)
1098  {
1099  if(ackType_ < AckTypeConstants<0>::Entries)
1100  {
1101  amps_message_assign_field_value(_body.get().getMessage(), AMPS_AckType,
1102  AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1103  }
1104  return *this;
1105  }
1106 
1107  AMPS_FIELD(SubscriptionId)
1108  AMPS_FIELD_ALIAS(SubId, SubscriptionId) // -V524
1109  AMPS_FIELD(Expiration)
1110  AMPS_FIELD(Heartbeat)
1111  AMPS_FIELD(TimeoutInterval)
1112  AMPS_FIELD(LeasePeriod)
1113  AMPS_FIELD(Status)
1114  AMPS_FIELD(QueryID)
1115  AMPS_FIELD_ALIAS(QueryId, QueryID)
1116  AMPS_FIELD(BatchSize)
1117  AMPS_FIELD(TopNRecordsReturned)
1118  AMPS_FIELD(OrderBy)
1119  AMPS_FIELD(SowKeys)
1120  AMPS_FIELD(CorrelationId)
1121  AMPS_FIELD(Sequence)
1122  AMPS_FIELD(Bookmark)
1123  AMPS_FIELD(RecordsInserted)
1124  AMPS_FIELD(RecordsUpdated)
1125  AMPS_FIELD(SowDelete)
1126  AMPS_FIELD(RecordsReturned)
1127  AMPS_FIELD(TopicMatches)
1128  AMPS_FIELD(Matches)
1129  AMPS_FIELD(MessageLength)
1130  AMPS_FIELD(SowKey)
1131  AMPS_FIELD(GroupSequenceNumber)
1132  AMPS_FIELD(SubscriptionIds)
1133  AMPS_FIELD(Reason)
1134  AMPS_FIELD(Password)
1135  AMPS_FIELD(Version)
1136 
1137  DOX_OPENGROUP(x)
1138  DOX_MAKEGETCOMMENT(Options) DOX_COMMENTHEAD( Retrieves the value of the Options header of the Message as a new Field.)
1139  Field getOptions() const {
1140  Field returnValue;
1141  const char* ptr;
1142  size_t sz;
1143  amps_message_get_field_value(_body.get().getMessage(),
1144  AMPS_Options, &ptr, &sz);
1145  if (sz && ptr[sz-1] == ',') --sz;
1146  returnValue.assign(ptr, sz);
1147  return returnValue;
1148  }
1149 
1150  DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the Options header of self as a Field that references the underlying buffer managed by this Message. )
1151  void getRawOptions(const char** dataptr, size_t* sizeptr) const {
1152  amps_message_get_field_value(_body.get().getMessage(),
1153  AMPS_Options, dataptr, sizeptr);
1154  if (*sizeptr && *dataptr && (*dataptr)[*sizeptr-1] == ',') --*sizeptr;
1155  return;
1156  }
1157 
1158  DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1159  Message& setOptions(const std::string& v) {
1160  size_t sz = v.length();
1161  if (sz && v[sz-1] == ',') --sz;
1162  amps_message_set_field_value(_body.get().getMessage(),
1163  AMPS_Options, v.c_str(), sz);
1164  return *this;
1165  }
1166 
1167  DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1168  Message& assignOptions(const std::string& v) {
1169  size_t sz = v.length();
1170  if (sz && v[sz-1] == ',') --sz;
1171  amps_message_assign_field_value(_body.get().getMessage(),
1172  AMPS_Options, v.c_str(), sz);
1173  return *this;
1174  }
1175 
1176  DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1177  Message& assignOptions(const char* data, size_t len) {
1178  if (len && data[len-1] == ',') --len;
1179  amps_message_assign_field_value(_body.get().getMessage(),
1180  AMPS_Options, data, len);
1181  return *this;
1182  }
1183 
1184  DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1185  Message& setOptions(const char* str) {
1186  if (str)
1187  {
1188  size_t sz = strlen(str);
1189  if (sz && str[sz-1] == ',') --sz;
1190  amps_message_set_field_value(_body.get().getMessage(),
1191  AMPS_Options, str, sz);
1192  }
1193  else
1194  {
1195  amps_message_set_field_value(_body.get().getMessage(),
1196  AMPS_Options, str, 0);
1197  }
1198  return *this;
1199  }
1200 
1201  DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the Options header for this Message.)
1202  Message& setOptions(const char* str,size_t len) {
1203  if (len && str[len-1] == ',') --len;
1204  amps_message_set_field_value(_body.get().getMessage(),
1205  AMPS_Options, str, len);
1206  return *this;
1207  }
1209 
1210 
1211 
1215  Field getData() const
1216  {
1217  Field returnValue;
1218  char* ptr;
1219  size_t sz;
1220  amps_message_get_data(_body.get().getMessage(), &ptr, &sz);
1221  returnValue.assign(ptr, sz);
1222  return returnValue;
1223  }
1224 
1225  void getRawData(const char **data, size_t *sz) const
1226  {
1227  amps_message_get_data(_body.get().getMessage(), (char**)data, sz);
1228  }
1231  Message& setData(const std::string &v_)
1232  {
1233  amps_message_set_data(_body.get().getMessage(), v_.c_str(), v_.length());
1234  return *this;
1235  }
1236  Message& assignData(const std::string &v_)
1237  {
1238  amps_message_assign_data(_body.get().getMessage(), v_.c_str(), v_.length());
1239  return *this;
1240  }
1241 
1245  Message& setData(const char* data_, size_t length_)
1246  {
1247  amps_message_set_data(_body.get().getMessage(), data_, length_);
1248  return *this;
1249  }
1250  Message& assignData(const char* data_,size_t length_)
1251  {
1252  amps_message_assign_data(_body.get().getMessage(),data_,length_);
1253  return *this;
1254  }
1255 
1258  Message& setData(const char* data_)
1259  {
1260  amps_message_set_data_nts(_body.get().getMessage(), data_);
1261  return *this;
1262  }
1263  Message& assignData(const char* data_)
1264  {
1265  amps_message_assign_data(_body.get().getMessage(), data_, strlen(data_));
1266  return *this;
1267  }
1268  amps_handle getMessage() const
1269  {
1270  return _body.get().getMessage();
1271  }
1272  void replace(amps_handle message, bool owner = false)
1273  {
1274  _body.get().replace(message, owner);
1275  }
1276  void disown()
1277  {
1278  _body.get().disown();
1279  }
1280  void invalidate()
1281  {
1282  _body = NULL;
1283  }
1284  bool isValid(void) const
1285  {
1286  return _body.isValid();
1287  }
1288  Message& reset()
1289  {
1290  _body.get().reset();
1291  return *this;
1292  }
1293 
1294  void setBookmarkSeqNo(size_t val)
1295  {
1296  _body.get().setBookmarkSeqNo(val);
1297  }
1298 
1299  size_t getBookmarkSeqNo() const
1300  {
1301  return _body.get().getBookmarkSeqNo();
1302  }
1303 
1304  void setSubscriptionHandle(amps_handle val)
1305  {
1306  _body.get().setSubscriptionHandle(val);
1307  }
1308 
1309  amps_handle getSubscriptionHandle() const
1310  {
1311  return _body.get().getSubscriptionHandle();
1312  }
1313 
1314  void ack(const char* options_ = NULL) const;
1315 
1316  void setClientImpl(ClientImpl *pClientImpl)
1317  {
1318  _body.get().setClientImpl(pClientImpl);
1319  }
1320 
1321  void setIgnoreAutoAck() const
1322  {
1323  _body.get().setIgnoreAutoAck();
1324  }
1325 
1326  bool getIgnoreAutoAck() const
1327  {
1328  return _body.get().getIgnoreAutoAck();
1329  }
1330 
1331  template <class T> // static
1332  void throwFor(const T& /*context_*/, const std::string& ackReason_) const
1333  {
1334  switch (ackReason_[0])
1335  {
1336  case 'a': // auth failure
1337  throw AuthenticationException("Logon failed for user \"" +
1338  (std::string)getUserId() + "\"");
1339  break;
1340  case 'b':
1341  switch (ackReason_.length())
1342  {
1343  case 10: // bad filter
1344  throw BadFilterException("bad filter '" +
1345  (std::string)getFilter() +
1346  "'");
1347  break;
1348  case 11: // bad sow key
1349  if (getSowKeys().len())
1350  {
1351  throw BadSowKeyException("bad sow key '" +
1352  (std::string)getSowKeys() +
1353  "'");
1354  }
1355  else
1356  {
1357  throw BadSowKeyException("bad sow key '" +
1358  (std::string)getSowKey() +
1359  "'");
1360  }
1361  break;
1362  case 15: // bad regex topic
1363  throw BadRegexTopicException("bad regex topic '" +
1364  (std::string)getTopic() +
1365  "'.");
1366  break;
1367  default:
1368  break;
1369  }
1370  break;
1371  case 'd':
1372  if (ackReason_.length() == 23) // duplicate logon attempt
1373  {
1374  throw DuplicateLogonException("Client '" +
1375  (std::string)getClientName() +
1376  "' with userid '" +
1377  (std::string)getUserId() +
1378  "' duplicate logon attempt");
1379  }
1380  break;
1381  case 'i':
1382  if (ackReason_.length() >= 9)
1383  {
1384  switch (ackReason_[8])
1385  {
1386  case 'b': // invalid bookmark
1387  throw InvalidBookmarkException("invalid bookmark '" +
1388  (std::string)getBookmark() +
1389  "'.");
1390  break;
1391  case 'm': // invalid message type
1392  throw CommandException(std::string("invalid message type '") +
1393  (std::string)getMessageType() +
1394  "'.");
1395  break;
1396  case 'o':
1397  if (ackReason_[9] == 'p') // invalid options
1398  {
1399  throw InvalidOptionsException("invalid options '" +
1400  (std::string)getOptions() +
1401  "'.");
1402  }
1403  else if (ackReason_[9] == 'r') // invalid order by
1404  {
1405  throw InvalidOrderByException("invalid order by '" +
1406  (std::string)getOrderBy() +
1407  "'.");
1408  }
1409  break;
1410  case 's': // invalid subId
1411  throw InvalidSubIdException("invalid subid '" +
1412  (std::string)getSubscriptionId() +
1413  "'.");
1414  break;
1415  case 't':
1416  if (ackReason_.length() == 13) // invalid topic
1417  {
1418  throw InvalidTopicException("invalid topic '" +
1419  (std::string)getTopic() +
1420  "'.");
1421  }
1422  else if (ackReason_.length() == 23) // invalid topic or filter
1423  {
1424  throw InvalidTopicException("invalid topic or filter. Topic '" +
1425  (std::string)getTopic() +
1426  "' Filter '" +
1427  (std::string)getFilter() +
1428  "'.");
1429  }
1430  break;
1431  default:
1432  break;
1433  }
1434  }
1435  break;
1436  case 'l':
1437  if (ackReason_.length() == 14) // logon required
1438  {
1439  throw LogonRequiredException("logon required before command");
1440  }
1441  break;
1442  case 'n':
1443  switch (ackReason_[5])
1444  {
1445  case ' ': // name in use
1446  throw NameInUseException("name in use '" +
1447  (std::string)getClientName() +
1448  "'.");
1449  break;
1450  case 'e': // not entitled
1451  throw NotEntitledException("User \"" +
1452  (std::string)getUserId() +
1453  "\" not entitled to topic \"" +
1454  (std::string)getTopic() +
1455  "\".");
1456  break;
1457  case 'i': // no filter or bookmark
1458  throw MissingFieldsException("command sent with no filter or bookmark.");
1459  break;
1460  case 'l': // no client name
1461  throw MissingFieldsException("command sent with no client name.");
1462  break;
1463  case 'o': // no topic or filter
1464  throw MissingFieldsException("command sent with no topic or filter.");
1465  break;
1466  case 's': // not supported
1467  throw CommandException("operation on topic '" +
1468  (std::string)getTopic() +
1469  "' with options '" +
1470  (std::string)getOptions() +
1471  "' not supported.");
1472  break;
1473  default:
1474  break;
1475  }
1476  break;
1477  case 'o':
1478  switch (ackReason_.length())
1479  {
1480  case 16: // orderby required
1481  throw MissingFieldsException("orderby required");
1482  break;
1483  case 17: // orderby too large
1484  throw CommandException("orderby too large '" +
1485  (std::string)getOrderBy() +
1486  "'.");
1487  break;
1488  }
1489  break;
1490  case 'p':
1491  switch (ackReason_[1])
1492  {
1493  case 'r': // projection clause too large
1494  throw CommandException("projection clause too large in options '" +
1495  (std::string)getOptions() +
1496  "'.");
1497  break;
1498  case 'u': // publish filter no match
1499  throw PublishFilterException("Publish filter '" +
1500  (std::string)getFilter() +
1501  "' doesn't match any records.");
1502  break;
1503  default:
1504  break;
1505  }
1506  break;
1507  case 'r':
1508  switch (ackReason_[2])
1509  {
1510  case 'g': // regex topic not supported
1511  throw BadRegexTopicException("'regex topic not supported '" +
1512  (std::string)getTopic() +
1513  "'.");
1514  break;
1515  default:
1516  break;
1517  }
1518  break;
1519  case 's':
1520  switch (ackReason_[5])
1521  {
1522  case ' ': // subid in use
1523  throw SubidInUseException("subid in use '" +
1524  (std::string)getSubscriptionId() +
1525  "'.");
1526  break;
1527  case 'e': // sow_delete command only supports one of: filter, sow_keys, bookmark, or data
1528  throw CommandException("sow_delete command only supports one of: filter '" +
1529  (std::string)getFilter() +
1530  "', sow_keys '" +
1531  (std::string)getSowKeys() +
1532  "', bookmark '" +
1533  (std::string)getBookmark() +
1534  "', or data '" +
1535  (std::string)getData() +
1536  "'.");
1537  break;
1538  case 't': // sow store failed
1539  throw PublishException("sow store failed.");
1540  break;
1541  default:
1542  break;
1543  }
1544  break;
1545  case 't': // sow store failed
1546  switch (ackReason_[2])
1547  {
1548  case ' ': // tx store failure
1549  throw PublishException("tx store failure.");
1550  break;
1551  case 'n': // txn replay failed
1552  throw CommandException("txn replay failed for '" +
1553  (std::string)getSubId() +
1554  "'.");
1555  break;
1556  }
1557  break;
1558  default:
1559  break;
1560  }
1561  throw CommandException("Error from server while processing this command: '" +
1562  ackReason_ + "'");
1563  }
1564 };
1565 
1566 inline std::string
1567 operator+(const std::string& lhs, const Message::Field& rhs)
1568 {
1569  return lhs + std::string(rhs);
1570 }
1571 
1572 inline std::basic_ostream<char>&
1573 operator<<(std::basic_ostream<char>& os, const Message::Field& rhs)
1574 {
1575  os.write(rhs.data(), (std::streamsize)rhs.len());
1576  return os;
1577 }
1578 inline bool
1579 AMPS::Field::operator<(const AMPS::Field& rhs) const
1580 {
1581  return std::lexicographical_compare(data(), data()+len(), rhs.data(), rhs.data()+rhs.len());
1582 }
1583 
1584 }
1585 
1586 #endif
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:719
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1231
void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:810
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:941
void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:670
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:484
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:659
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1097
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:686
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:105
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1001
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:675
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:499
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:711
DOX_CLOSEGROUP(x) Field getData() const
Returns the data from this message.
Definition: Message.hpp:1208
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:665
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:913
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1258
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:877
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:681
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:149
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:699
static AckType::Type decodeSingleAckType(const char *begin, const char *end)
Decodes a single ack string.
Definition: Message.hpp:1050
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:744
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:168
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:625
void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:788
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:1040
void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:864
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:774
void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:731
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1029
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:118
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:761
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:835
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:823
void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:470
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:642
Implementation class for a Message.
Definition: Message.hpp:83
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:848
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1077
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1245
Definition: ampsplusplus.hpp:103
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:691
Message()
Construct a new, empty Message.
Definition: Message.hpp:478
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:653
void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.