AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.2
Message.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 #ifndef __AMPS_MESSAGE_HPP__
26 #define __AMPS_MESSAGE_HPP__
27 #include "util.hpp"
28 #include "constants.hpp"
29 #include "amps_generated.h"
30 #include "Field.hpp"
31 #include <stdio.h>
32 #include <algorithm>
33 #include <ostream>
34 #include <string>
35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1
36 
37 // Macros for determing what parts of TR1 we can depend on
38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600)
39 #define AMPS_USE_FUNCTIONAL 1
40 #endif
41 
42 #if (_MSC_VER >= 1600) || ( (__GNUC__ >= 4) && (__GNUC_MINOR__) >=5 )
43 #define AMPS_USE_LAMBDAS 1
44 #endif
45 
46 #ifdef AMPS_USE_FUNCTIONAL
47 #include <functional>
48 #endif
49 
50 #include <algorithm>
51 
55 
56 #define AMPS_OPTIONS_NONE ""
57 #define AMPS_OPTIONS_LIVE "live,"
58 #define AMPS_OPTIONS_OOF "oof,"
59 #define AMPS_OPTIONS_REPLACE "replace,"
60 #define AMPS_OPTIONS_NOEMPTIES "no_empties,"
61 #define AMPS_OPTIONS_SENDKEYS "send_keys,"
62 #define AMPS_OPTIONS_TIMESTAMP "timestamp,"
63 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey,"
64 #define AMPS_OPTIONS_CANCEL "cancel,"
65 #define AMPS_OPTIONS_RESUME "resume,"
66 #define AMPS_OPTIONS_PAUSE "pause,"
67 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable,"
68 #define AMPS_OPTIONS_EXPIRE "expire,"
69 #define AMPS_OPTIONS_TOPN(x) "top_n=##x,"
70 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x,"
71 #define AMPS_OPTIONS_RATE(x) "rate=##x,"
72 
73 namespace AMPS
74 {
75 typedef void* amps_subscription_handle;
76 
77 class ClientImpl;
78 
83 class MessageImpl : public RefBody
84 {
85 private:
86  amps_handle _message;
87  Mutex _lock;
88  bool _owner;
89  mutable bool _isIgnoreAutoAck;
90  size_t _bookmarkSeqNo;
91  amps_subscription_handle _subscription;
92  ClientImpl* _clientImpl;
93 public:
105  MessageImpl(amps_handle message_, bool owner_ = false,
106  bool ignoreAutoAck_ = false, size_t bookmarkSeqNo_ = 0,
107  amps_subscription_handle subscription_ = NULL,
108  ClientImpl* clientImpl_=NULL)
109  : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
110  , _bookmarkSeqNo(bookmarkSeqNo_)
111  , _subscription(subscription_), _clientImpl(clientImpl_)
112  {
113  }
114 
119  : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
120  {
121  // try to create one
122  _message = amps_message_create(NULL);
123  }
124 
125  virtual ~MessageImpl()
126  {
127  if (_owner && _message) amps_message_destroy(_message);
128  }
129 
130  MessageImpl* copy() const
131  {
132  amps_handle copy = amps_message_copy(_message);
133  return new MessageImpl(copy, true, _isIgnoreAutoAck, _bookmarkSeqNo,
134  _subscription,_clientImpl);
135  }
136 
137  void setClientImpl(ClientImpl* clientImpl_)
138  {
139  _clientImpl = clientImpl_;
140  }
141 
142  ClientImpl* clientImpl(void) const
143  {
144  return _clientImpl;
145  }
146 
150  {
151  return _message;
152  }
153 
154  void reset()
155  {
156  Lock<Mutex> l(_lock);
157  amps_message_reset(_message);
158  _bookmarkSeqNo = 0;
159  _subscription = NULL;
160  _isIgnoreAutoAck = false;
161  }
162 
168  void replace(amps_handle message_, bool owner_ = false)
169  {
170  Lock<Mutex> l(_lock);
171  if (_message == message_) return;
172  if (_owner && _message)
173  {
174  amps_message_destroy(_message);
175  }
176  _owner = owner_;
177  _message = message_;
178  _bookmarkSeqNo = 0;
179  _subscription = NULL;
180  _isIgnoreAutoAck = false;
181  }
182 
183  void disown()
184  {
185  Lock<Mutex> l(_lock);
186  _owner = false;
187  }
188 
189  static unsigned long newId()
190  {
191  static ATOMIC_TYPE _id = 0;
192  return (unsigned long)(AMPS_FETCH_ADD(&_id,1));
193  }
194 
195  void setBookmarkSeqNo(size_t val_)
196  {
197  _bookmarkSeqNo = val_;
198  }
199 
200  size_t getBookmarkSeqNo(void) const
201  {
202  return _bookmarkSeqNo;
203  }
204 
205  void setSubscriptionHandle(amps_subscription_handle subscription_)
206  {
207  _subscription = subscription_;
208  }
209 
210  amps_subscription_handle getSubscriptionHandle(void) const
211  {
212  return _subscription;
213  }
214 
215  void setIgnoreAutoAck() const
216  {
217  _isIgnoreAutoAck = true;
218  }
219 
220  bool getIgnoreAutoAck() const
221  {
222  return _isIgnoreAutoAck;
223  }
224 };
225 
226 
227 // This block of macros works with the Doxygen preprocessor to
228 // create documentation comments for fields defined with the AMPS_FIELD macro.
229 // A C++ compiler removes comments before expanding macros, so these macros
230 // must ONLY be defined for Doxygen and not for actual compilation.
231 
232 #ifdef DOXYGEN_PREPROCESSOR
233 
234 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## /
235 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions)
236 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \
237  DOX_GROUPNAME(s)
238 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@})
239 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
240 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. )
241 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
242 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. )
243 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.)
244 
245 #else
246 
247 #define DOX_COMMENTHEAD(s)
248 #define DOX_GROUPNAME(s)
249 #define DOX_OPENGROUP(x)
250 #define DOX_CLOSEGROUP()
251 #define DOX_MAKEGETCOMMENT(x)
252 #define DOX_MAKEGETRAWCOMMENT(x)
253 #define DOX_MAKESETCOMMENT(x)
254 #define DOX_MAKEASSIGNCOMMENT(x)
255 #define DOX_MAKENEWCOMMENT(x)
256 
257 #endif
258 
259 // Macro for defining all of the necessary methods for a field in an AMPS
260 // message.
261 
262 
263 #define AMPS_FIELD(x) \
264  DOX_OPENGROUP(x) \
265  DOX_MAKEGETCOMMENT(x) \
266  Field get##x() const {\
267  Field returnValue;\
268  const char* ptr;\
269  size_t sz;\
270  amps_message_get_field_value(_body.get().getMessage(),\
271  AMPS_##x, &ptr, &sz);\
272  returnValue.assign(ptr, sz);\
273  return returnValue;\
274  }\
275  DOX_MAKEGETRAWCOMMENT(x) \
276  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
277  amps_message_get_field_value(_body.get().getMessage(),\
278  AMPS_##x, dataptr, sizeptr);\
279  return;\
280  }\
281  DOX_MAKESETCOMMENT(x) \
282  Message& set##x(const std::string& v) {\
283  amps_message_set_field_value(_body.get().getMessage(),\
284  AMPS_##x, v.c_str(), v.length());\
285  return *this;\
286  }\
287  DOX_MAKESETCOMMENT(x) \
288  Message& set##x(amps_uint64_t v) {\
289  char buf[22];\
290  AMPS_snprintf_amps_uint64_t(buf,22,v);\
291  amps_message_set_field_value_nts(_body.get().getMessage(),\
292  AMPS_##x, buf);\
293  return *this;\
294  }\
295  DOX_MAKEASSIGNCOMMENT(x) \
296  Message& assign##x(const std::string& v) {\
297  amps_message_assign_field_value(_body.get().getMessage(),\
298  AMPS_##x, v.c_str(), v.length());\
299  return *this;\
300  }\
301  DOX_MAKEASSIGNCOMMENT(x) \
302  Message& assign##x(const char* data, size_t len) {\
303  amps_message_assign_field_value(_body.get().getMessage(),\
304  AMPS_##x, data, len);\
305  return *this;\
306  }\
307  DOX_MAKESETCOMMENT(x) \
308  Message& set##x(const char* str) {\
309  amps_message_set_field_value_nts(_body.get().getMessage(),\
310  AMPS_##x, str);\
311  return *this;\
312  }\
313  DOX_MAKESETCOMMENT(x) \
314  Message& set##x(const char* str,size_t len) {\
315  amps_message_set_field_value(_body.get().getMessage(),\
316  AMPS_##x, str,len);\
317  return *this;\
318  }\
319  DOX_MAKENEWCOMMENT(x) \
320  Message& new##x() {\
321  char buf[Message::IdentifierLength+1];\
322  buf[Message::IdentifierLength] = 0;\
323  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\
324  amps_message_set_field_value_nts(_body.get().getMessage(),\
325  AMPS_##x, buf);\
326  return *this;\
327  } \
328  DOX_CLOSEGROUP()
329 
330 #define AMPS_FIELD_ALIAS(x,y) \
331  DOX_OPENGROUP(y) \
332  DOX_MAKEGETCOMMENT(y) \
333  Field get##y() const {\
334  Field returnValue;\
335  const char* ptr;\
336  size_t sz;\
337  amps_message_get_field_value(_body.get().getMessage(),\
338  AMPS_##y, &ptr, &sz);\
339  returnValue.assign(ptr, sz);\
340  return returnValue;\
341  }\
342  DOX_MAKEGETRAWCOMMENT(y) \
343  void getRaw##y(const char** dataptr, size_t* sizeptr) const {\
344  amps_message_get_field_value(_body.get().getMessage(),\
345  AMPS_##y, dataptr, sizeptr);\
346  return;\
347  }\
348  DOX_MAKESETCOMMENT(y) \
349  Message& set##y(const std::string& v) {\
350  amps_message_set_field_value(_body.get().getMessage(),\
351  AMPS_##y, v.c_str(), v.length());\
352  return *this;\
353  }\
354  DOX_MAKESETCOMMENT(y) \
355  Message& set##y(amps_uint64_t v) {\
356  char buf[22];\
357  AMPS_snprintf_amps_uint64_t(buf,22,v);\
358  amps_message_set_field_value_nts(_body.get().getMessage(),\
359  AMPS_##y, buf);\
360  return *this;\
361  }\
362  DOX_MAKEASSIGNCOMMENT(y) \
363  Message& assign##y(const std::string& v) {\
364  amps_message_assign_field_value(_body.get().getMessage(),\
365  AMPS_##y, v.c_str(), v.length());\
366  return *this;\
367  }\
368  DOX_MAKEASSIGNCOMMENT(y) \
369  Message& assign##y(const char* data, size_t len) {\
370  amps_message_assign_field_value(_body.get().getMessage(),\
371  AMPS_##y, data, len);\
372  return *this;\
373  }\
374  DOX_MAKESETCOMMENT(y) \
375  Message& set##y(const char* str) {\
376  amps_message_set_field_value_nts(_body.get().getMessage(),\
377  AMPS_##y, str);\
378  return *this;\
379  }\
380  DOX_MAKESETCOMMENT(y) \
381  Message& set##y(const char* str,size_t len) {\
382  amps_message_set_field_value(_body.get().getMessage(),\
383  AMPS_##y, str,len);\
384  return *this;\
385  }\
386  DOX_MAKENEWCOMMENT(y) \
387  Message& new##y() {\
388  char buf[Message::IdentifierLength+1];\
389  buf[Message::IdentifierLength] = 0;\
390  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\
391  amps_message_set_field_value_nts(_body.get().getMessage(),\
392  AMPS_##y, buf);\
393  return *this;\
394  }\
395  DOX_MAKEGETCOMMENT(y) \
396  Field get##x() const {\
397  Field returnValue;\
398  const char* ptr;\
399  size_t sz;\
400  amps_message_get_field_value(_body.get().getMessage(),\
401  AMPS_##y, &ptr, &sz);\
402  returnValue.assign(ptr, sz);\
403  return returnValue;\
404  }\
405  DOX_MAKEGETRAWCOMMENT(y) \
406  void getRaw##x(const char** dataptr, size_t* sizeptr) const {\
407  amps_message_get_field_value(_body.get().getMessage(),\
408  AMPS_##y, dataptr, sizeptr);\
409  return;\
410  }\
411  DOX_MAKESETCOMMENT(y) \
412  Message& set##x(const std::string& v) {\
413  amps_message_set_field_value(_body.get().getMessage(),\
414  AMPS_##y, v.c_str(), v.length());\
415  return *this;\
416  }\
417  DOX_MAKESETCOMMENT(y) \
418  Message& set##x(amps_uint64_t v) {\
419  char buf[22];\
420  AMPS_snprintf_amps_uint64_t(buf,22,v);\
421  amps_message_set_field_value_nts(_body.get().getMessage(),\
422  AMPS_##y, buf);\
423  return *this;\
424  }\
425  DOX_MAKEASSIGNCOMMENT(y) \
426  Message& assign##x(const std::string& v) {\
427  amps_message_assign_field_value(_body.get().getMessage(),\
428  AMPS_##y, v.c_str(), v.length());\
429  return *this;\
430  }\
431  DOX_MAKEASSIGNCOMMENT(y) \
432  Message& assign##x(const char* data, size_t len) {\
433  amps_message_assign_field_value(_body.get().getMessage(),\
434  AMPS_##y, data, len);\
435  return *this;\
436  }\
437  DOX_MAKESETCOMMENT(y) \
438  Message& set##x(const char* str) {\
439  amps_message_set_field_value_nts(_body.get().getMessage(),\
440  AMPS_##y, str);\
441  return *this;\
442  }\
443  DOX_MAKESETCOMMENT(y) \
444  Message& set##x(const char* str,size_t len) {\
445  amps_message_set_field_value(_body.get().getMessage(),\
446  AMPS_##y, str,len);\
447  return *this;\
448  }\
449  DOX_MAKENEWCOMMENT(y) \
450  Message& new##x() {\
451  char buf[Message::IdentifierLength+1];\
452  buf[Message::IdentifierLength] = 0;\
453  AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\
454  amps_message_set_field_value_nts(_body.get().getMessage(),\
455  AMPS_##y, buf);\
456  return *this;\
457  } \
458  DOX_CLOSEGROUP()
459 
460 
501 class Message
502 {
503  RefHandle<MessageImpl> _body;
504 
505  Message(MessageImpl* body_) : _body(body_) { ; }
506 
507 public:
508  typedef AMPS::Field Field;
509 
512  static const unsigned int IdentifierLength = 32;
513 
516  static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
517 
524  Message(amps_handle message_, bool owner_ = false)
525  : _body(new MessageImpl(message_, owner_))
526  {
527  }
528 
532  Message() : _body(new MessageImpl())
533  {
534  }
535 
538  Message deepCopy(void) const
539  {
540  return Message(_body.get().copy());
541  }
542 
553  class Options
554  {
555  public:
556  static const char* None(void) { return AMPS_OPTIONS_NONE; }
557  static const char* Live(void) { return AMPS_OPTIONS_LIVE; }
558  static const char* OOF(void) { return AMPS_OPTIONS_OOF; }
559  static const char* Replace(void) { return AMPS_OPTIONS_REPLACE; }
560  static const char* NoEmpties(void) { return AMPS_OPTIONS_NOEMPTIES; }
561  static const char* SendKeys(void) { return AMPS_OPTIONS_SENDKEYS; }
562  static const char* Timestamp(void) { return AMPS_OPTIONS_TIMESTAMP; }
563  static const char* NoSowKey(void) { return AMPS_OPTIONS_NOSOWKEY; }
564  static const char* Cancel(void) { return AMPS_OPTIONS_CANCEL; }
565  static const char* Resume(void) { return AMPS_OPTIONS_RESUME; }
566  static const char* Pause(void) { return AMPS_OPTIONS_PAUSE; }
567  static const char* FullyDurable(void) { return AMPS_OPTIONS_FULLY_DURABLE; }
568  static const char* Expire(void) { return AMPS_OPTIONS_EXPIRE; }
569  static std::string Conflation(const char* conflation_)
570  {
571  char buf[64];
572  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
573  return buf;
574  }
575  static std::string ConflationKey(const char* conflationKey_)
576  {
577  std::string option("conflation_key=");
578  option.append(conflationKey_).append(",");
579  return option;
580  }
581  static std::string TopN(int topN_)
582  {
583  char buf[24];
584  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
585  return buf;
586  }
587  static std::string MaxBacklog(int maxBacklog_)
588  {
589  char buf[24];
590  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
591  return buf;
592  }
593  static std::string Rate(const char* rate_)
594  {
595  char buf[64];
596  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
597  return buf;
598  }
599  static std::string RateMaxGap(const char* rateMaxGap_)
600  {
601  char buf[64];
602  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
603  return buf;
604  }
605  static std::string SkipN(int skipN_)
606  {
607  char buf[24];
608  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
609  return buf;
610  }
611 
612  static std::string Projection(const std::string& projection_)
613  {
614  return "projection=[" + projection_ + "],";
615  }
616 
617  template<class Iterator>
618  static std::string Projection(Iterator begin_, Iterator end_)
619  {
620  std::string projection = "projection=[";
621  for (Iterator i = begin_; i != end_; ++i)
622  {
623  projection += *i;
624  projection += ',';
625  }
626  projection.insert(projection.length() - 1, "]");
627  return projection;
628  }
629 
630  static std::string Grouping(const std::string& grouping_)
631  {
632  return "grouping=[" + grouping_ + "],";
633  }
634 
635  template<class Iterator>
636  static std::string Grouping(Iterator begin_, Iterator end_)
637  {
638  std::string grouping = "grouping=[";
639  for (Iterator i = begin_; i != end_; ++i)
640  {
641  grouping += *i;
642  grouping += ',';
643  }
644  grouping.insert(grouping.length() - 1, "]");
645  return grouping;
646  }
647 
648  static std::string Select(const std::string& select_)
649  {
650  return "select=[" + select_ + "],";
651  }
652 
653  template<class Iterator>
654  static std::string Select(Iterator begin_, Iterator end_)
655  {
656  std::string select = "select=[";
657  for (Iterator i = begin_; i != end_; ++i)
658  {
659  select += *i;
660  select += ',';
661  }
662  select.insert(select.length() - 1, "]");
663  return select;
664  }
665 
666  static std::string AckConflationInterval(const std::string& interval_)
667  {
668  return "ack_conflation=" + interval_ + ",";
669  }
670 
671  static std::string AckConflationInterval(const char* interval_)
672  {
673  static const std::string start("ack_conflation=");
674  return start + interval_ + ",";
675  }
676 
679  Options(std::string options_ = "")
680  : _optionStr(options_)
681  , _maxBacklog(0)
682  , _topN(0)
683  , _skipN(0)
684  {;}
685 
686  int getMaxBacklog(void) const { return _maxBacklog; }
687  std::string getConflation(void) const { return _conflation; }
688  std::string getConflationKey(void) const { return _conflationKey; }
689  int getTopN(void) const { return _topN; }
690  std::string getRate(void) const { return _rate; }
691  std::string getRateMaxGap(void) const { return _rateMaxGap; }
692 
696  void setNone(void) { _optionStr.clear(); }
697 
707  void setLive(void) { _optionStr += AMPS_OPTIONS_LIVE; }
708 
713  void setOOF(void) { _optionStr += AMPS_OPTIONS_OOF; }
714 
719  void setReplace(void) { _optionStr += AMPS_OPTIONS_REPLACE; }
720 
724  void setNoEmpties(void) { _optionStr += AMPS_OPTIONS_NOEMPTIES; }
725 
729  void setSendKeys(void) { _optionStr += AMPS_OPTIONS_SENDKEYS; }
730 
735  void setTimestamp(void) { _optionStr += AMPS_OPTIONS_TIMESTAMP; }
736 
740  void setNoSowKey(void) { _optionStr += AMPS_OPTIONS_NOSOWKEY; }
741 
745  void setCancel(void) { _optionStr += AMPS_OPTIONS_CANCEL; }
746 
753  void setResume(void) { _optionStr += AMPS_OPTIONS_RESUME; }
754 
765  void setPause(void) { _optionStr += AMPS_OPTIONS_PAUSE; }
766 
773  void setFullyDurable(void) { _optionStr += AMPS_OPTIONS_FULLY_DURABLE; }
774 
785  void setMaxBacklog(int maxBacklog_)
786  {
787  char buf[24];
788  AMPS_snprintf(buf, sizeof(buf), "max_backlog=%d,", maxBacklog_);
789  _optionStr += buf;
790  _maxBacklog = maxBacklog_;
791  }
792 
798  void setConflation(const char* conflation_)
799  {
800  char buf[64];
801  AMPS_snprintf(buf, sizeof(buf), "conflation=%s,", conflation_);
802  _optionStr += buf;
803  _conflation = conflation_;
804  }
805 
815  void setConflationKey(const char* conflationKey_)
816  {
817  char buf[64];
818  AMPS_snprintf(buf, sizeof(buf), "conflation_key=%s,", conflationKey_);
819  _optionStr += buf;
820  _conflationKey = conflationKey_;
821  }
822 
828  void setTopN(int topN_)
829  {
830  char buf[24];
831  AMPS_snprintf(buf, sizeof(buf), "top_n=%d,", topN_);
832  _optionStr += buf;
833  _topN = topN_;
834  }
835 
842  void setRate(const char* rate_)
843  {
844  char buf[64];
845  AMPS_snprintf(buf, sizeof(buf), "rate=%s,", rate_);
846  _optionStr += buf;
847  _rate = rate_;
848  }
849 
864  void setRateMaxGap(const char* rateMaxGap_)
865  {
866  char buf[64];
867  AMPS_snprintf(buf, sizeof(buf), "rate_max_gap=%s,", rateMaxGap_);
868  _optionStr += buf;
869  _rateMaxGap = rateMaxGap_;
870  }
871 
877  void setSkipN(int skipN_)
878  {
879  char buf[24];
880  AMPS_snprintf(buf, sizeof(buf), "skip_n=%d,", skipN_);
881  _optionStr += buf;
882  _skipN = skipN_;
883  }
884 
889  void setProjection(const std::string& projection_)
890  {
891  _projection = "projection=[" + projection_ + "],";
892  _optionStr += _projection;
893  }
894 
895 
901  template<class Iterator>
902  void setProjection(Iterator begin_, Iterator end_)
903  {
904  _projection = "projection=[";
905  for (Iterator i = begin_; i != end_; ++i)
906  {
907  _projection += *i;
908  _projection += ',';
909  }
910  _projection.insert(_projection.length() - 1, "]");
911  _optionStr += _projection;
912  }
913 
918  void setGrouping(const std::string& grouping_)
919  {
920  _grouping = "grouping=[" + grouping_ + "],";
921  _optionStr += _grouping;
922  }
923 
924 
930  template<class Iterator>
931  void setGrouping(Iterator begin_, Iterator end_)
932  {
933  _grouping = "grouping=[";
934  for (Iterator i = begin_; i != end_; ++i)
935  {
936  _grouping += *i;
937  _grouping += ',';
938  }
939  _grouping.insert(_grouping.length() - 1, "]");
940  _optionStr += _grouping;
941  }
942 
946  operator const std::string()
947  {
948  return _optionStr.substr(0, _optionStr.length()-1);
949  }
950 
951  private:
952  std::string _optionStr;
953  int _maxBacklog;
954  std::string _conflation;
955  std::string _conflationKey;
956  int _topN;
957  std::string _rate;
958  std::string _rateMaxGap;
959  int _skipN;
960  std::string _projection;
961  std::string _grouping;
962  };
963 
966  struct AckType
967  {
968  typedef enum
969  {
970  None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
971  } Type;
972  };
973  AMPS_FIELD(AckType)
976  static inline AckType::Type decodeSingleAckType(const char* begin, const char* end)
977  {
978  switch (end - begin)
979  {
980  case 5:
981  return AckType::Stats;
982  case 6:
983  return AckType::Parsed;
984  case 8:
985  return AckType::Received;
986  case 9:
987  switch (begin[1])
988  {
989  case 'e': return AckType::Persisted;
990  case 'r': return AckType::Processed;
991  case 'o': return AckType::Completed;
992  default: break;
993  }
994  break;
995  default:
996  break;
997  }
998  return AckType::None;
999  }
1003  unsigned getAckTypeEnum() const
1004  {
1005  unsigned result = AckType::None;
1006  const char* data = NULL; size_t len = 0;
1007  amps_message_get_field_value(_body.get().getMessage(), AMPS_AckType, &data, &len);
1008  const char* mark = data;
1009  for (const char* end = data + len; data != end; ++data)
1010  {
1011  if (*data == ',')
1012  {
1013  result |= decodeSingleAckType(mark, data);
1014  mark = data + 1;
1015  }
1016  }
1017  if (mark < data) result |= decodeSingleAckType(mark, data);
1018  return result;
1019  }
1023  Message& setAckTypeEnum(unsigned ackType_)
1024  {
1025  if(ackType_ < AckTypeConstants<0>::Entries)
1026  {
1027  amps_message_assign_field_value(_body.get().getMessage(), AMPS_AckType,
1028  AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1029  }
1030  return *this;
1031  }
1032 
1033  AMPS_FIELD(BatchSize)
1034  AMPS_FIELD(Bookmark)
1035  AMPS_FIELD(Command)
1036 
1040  struct Command
1041  {
1042  typedef enum
1043  {
1044  Unknown = 0,
1045  Publish = 1,
1046  Subscribe = 2,
1047  Unsubscribe = 4,
1048  SOW = 8,
1049  Heartbeat = 16,
1050  SOWDelete = 32,
1051  DeltaPublish = 64,
1052  Logon = 128,
1053  SOWAndSubscribe = 256,
1054  DeltaSubscribe = 512,
1055  SOWAndDeltaSubscribe = 1024,
1056  StartTimer = 2048,
1057  StopTimer = 4096,
1058  GroupBegin = 8192,
1059  GroupEnd = 16384,
1060  OOF = 32768,
1061  Ack = 65536,
1062  Flush = 131072,
1063  NoDataCommands = Publish|Unsubscribe|Heartbeat|SOWDelete|DeltaPublish
1064  |Logon|StartTimer|StopTimer|Flush
1065  } Type;
1066  };
1068  Command::Type getCommandEnum() const
1069  {
1070  const char* data = NULL; size_t len = 0;
1071  amps_message_get_field_value(_body.get().getMessage(), AMPS_Command, &data, &len);
1072  switch (len)
1073  {
1074  case 1: return Command::Publish; // -V1037
1075  case 3:
1076  switch (data[0])
1077  {
1078  case 's': return Command::SOW;
1079  case 'o': return Command::OOF;
1080  case 'a': return Command::Ack;
1081  }
1082  break;
1083  case 5:
1084  switch (data[0])
1085  {
1086  case 'l': return Command::Logon;
1087  case 'f': return Command::Flush;
1088  }
1089  break;
1090  case 7:
1091  return Command::Publish; // -V1037
1092  break;
1093  case 9:
1094  switch (data[0])
1095  {
1096  case 's': return Command::Subscribe;
1097  case 'h': return Command::Heartbeat;
1098  case 'g': return Command::GroupEnd;
1099  }
1100  break;
1101  case 10:
1102  switch (data[1])
1103  {
1104  case 'o': return Command::SOWDelete;
1105  case 't': return Command::StopTimer;
1106  }
1107  break;
1108  case 11:
1109  switch (data[0])
1110  {
1111  case 'g': return Command::GroupBegin;
1112  case 'u': return Command::Unsubscribe;
1113  }
1114  break;
1115  case 13:
1116  return Command::DeltaPublish;
1117  case 15:
1118  return Command::DeltaSubscribe;
1119  case 17:
1120  return Command::SOWAndSubscribe;
1121  case 23:
1122  return Command::SOWAndDeltaSubscribe;
1123  }
1124  return Command::Unknown;
1125  }
1126 
1128  Message& setCommandEnum(Command::Type command_)
1129  {
1130  unsigned bits = 0;
1131  unsigned command = command_;
1132  while (command > 0) { ++bits; command >>= 1; }
1133  amps_message_assign_field_value(_body.get().getMessage(), AMPS_Command,
1134  CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1135  return *this;
1136  }
1137 
1138  AMPS_FIELD(CommandId)
1139  AMPS_FIELD(ClientName)
1140  AMPS_FIELD(CorrelationId)
1141  AMPS_FIELD(Expiration)
1142  AMPS_FIELD(Filter)
1143  AMPS_FIELD(GroupSequenceNumber)
1144  AMPS_FIELD(Heartbeat)
1145  AMPS_FIELD(LeasePeriod)
1146  AMPS_FIELD(Matches)
1147  AMPS_FIELD(MessageLength)
1148  AMPS_FIELD(MessageType)
1149 
1150  DOX_OPENGROUP(Options)
1151  DOX_MAKEGETCOMMENT(Options)
1152  Field getOptions() const {
1153  Field returnValue;
1154  const char* ptr;
1155  size_t sz;
1156  amps_message_get_field_value(_body.get().getMessage(),
1157  AMPS_Options, &ptr, &sz);
1158  if (sz && ptr[sz-1] == ',') --sz;
1159  returnValue.assign(ptr, sz);
1160  return returnValue;
1161  }
1162 
1163  DOX_MAKEGETRAWCOMMENT(Options)
1164  void getRawOptions(const char** dataptr, size_t* sizeptr) const {
1165  amps_message_get_field_value(_body.get().getMessage(),
1166  AMPS_Options, dataptr, sizeptr);
1167  if (*sizeptr && *dataptr && (*dataptr)[*sizeptr-1] == ',') --*sizeptr;
1168  return;
1169  }
1170 
1171  DOX_MAKESETCOMMENT(Options)
1172  Message& setOptions(const std::string& v) {
1173  size_t sz = v.length();
1174  if (sz && v[sz-1] == ',') --sz;
1175  amps_message_set_field_value(_body.get().getMessage(),
1176  AMPS_Options, v.c_str(), sz);
1177  return *this;
1178  }
1179 
1180  DOX_MAKEASSIGNCOMMENT(Options)
1181  Message& assignOptions(const std::string& v) {
1182  size_t sz = v.length();
1183  if (sz && v[sz-1] == ',') --sz;
1184  amps_message_assign_field_value(_body.get().getMessage(),
1185  AMPS_Options, v.c_str(), sz);
1186  return *this;
1187  }
1188 
1189  DOX_MAKEASSIGNCOMMENT(Options)
1190  Message& assignOptions(const char* data, size_t len) {
1191  if (len && data[len-1] == ',') --len;
1192  amps_message_assign_field_value(_body.get().getMessage(),
1193  AMPS_Options, data, len);
1194  return *this;
1195  }
1196 
1197  DOX_MAKESETCOMMENT(Options)
1198  Message& setOptions(const char* str) {
1199  if (str)
1200  {
1201  size_t sz = strlen(str);
1202  if (sz && str[sz-1] == ',') --sz;
1203  amps_message_set_field_value(_body.get().getMessage(),
1204  AMPS_Options, str, sz);
1205  }
1206  else
1207  {
1208  amps_message_set_field_value(_body.get().getMessage(),
1209  AMPS_Options, str, 0);
1210  }
1211  return *this;
1212  }
1213 
1214  DOX_MAKESETCOMMENT(Options)
1215  Message& setOptions(const char* str,size_t len) {
1216  if (len && str[len-1] == ',') --len;
1217  amps_message_set_field_value(_body.get().getMessage(),
1218  AMPS_Options, str, len);
1219  return *this;
1220  }
1221  DOX_CLOSEGROUP()
1222 
1223  AMPS_FIELD(OrderBy)
1224  AMPS_FIELD(Password)
1225  AMPS_FIELD_ALIAS(QueryId, QueryID)
1226  AMPS_FIELD(Reason)
1227  AMPS_FIELD(RecordsInserted)
1228  AMPS_FIELD(RecordsReturned)
1229  AMPS_FIELD(RecordsUpdated)
1230  AMPS_FIELD(Sequence)
1231  AMPS_FIELD(SowDelete)
1232  AMPS_FIELD(SowKey)
1233  AMPS_FIELD(SowKeys)
1234  AMPS_FIELD(Status)
1235  AMPS_FIELD_ALIAS(SubId, SubscriptionId) // -V524
1236  AMPS_FIELD(SubscriptionIds)
1237  AMPS_FIELD(TimeoutInterval)
1238  AMPS_FIELD(Timestamp)
1239 
1243  Field getTransmissionTime() const
1244  {
1245  return getTimestamp();
1246  }
1247 
1252  void getRawTransmissionTime(const char** dataptr, size_t* sizeptr) const
1253  {
1254  getRawTimestamp(dataptr, sizeptr);
1255  }
1256 
1257  AMPS_FIELD(Topic)
1258  AMPS_FIELD(TopicMatches)
1259  AMPS_FIELD(TopNRecordsReturned)
1260  AMPS_FIELD(Version)
1261  AMPS_FIELD(UserId)
1262 
1267 
1268  Field getData() const
1269  {
1270  Field returnValue;
1271  char* ptr;
1272  size_t sz;
1273  amps_message_get_data(_body.get().getMessage(), &ptr, &sz);
1274  returnValue.assign(ptr, sz);
1275  return returnValue;
1276  }
1277 
1278  void getRawData(const char **data, size_t *sz) const
1279  {
1280  amps_message_get_data(_body.get().getMessage(), (char**)data, sz);
1281  }
1284  Message& setData(const std::string &v_)
1285  {
1286  amps_message_set_data(_body.get().getMessage(), v_.c_str(), v_.length());
1287  return *this;
1288  }
1289  Message& assignData(const std::string &v_)
1290  {
1291  amps_message_assign_data(_body.get().getMessage(), v_.c_str(), v_.length());
1292  return *this;
1293  }
1294 
1298  Message& setData(const char* data_, size_t length_)
1299  {
1300  amps_message_set_data(_body.get().getMessage(), data_, length_);
1301  return *this;
1302  }
1303  Message& assignData(const char* data_,size_t length_)
1304  {
1305  amps_message_assign_data(_body.get().getMessage(),data_,length_);
1306  return *this;
1307  }
1308 
1311  Message& setData(const char* data_)
1312  {
1313  amps_message_set_data_nts(_body.get().getMessage(), data_);
1314  return *this;
1315  }
1316  Message& assignData(const char* data_)
1317  {
1318  amps_message_assign_data(_body.get().getMessage(), data_, strlen(data_));
1319  return *this;
1320  }
1321  amps_handle getMessage() const
1322  {
1323  return _body.get().getMessage();
1324  }
1325  void replace(amps_handle message, bool owner = false)
1326  {
1327  _body.get().replace(message, owner);
1328  }
1329  void disown()
1330  {
1331  _body.get().disown();
1332  }
1333  void invalidate()
1334  {
1335  _body = NULL;
1336  }
1337  bool isValid(void) const
1338  {
1339  return _body.isValid();
1340  }
1341  Message& reset()
1342  {
1343  _body.get().reset();
1344  return *this;
1345  }
1346 
1347  void setBookmarkSeqNo(size_t val)
1348  {
1349  _body.get().setBookmarkSeqNo(val);
1350  }
1351 
1352  size_t getBookmarkSeqNo() const
1353  {
1354  return _body.get().getBookmarkSeqNo();
1355  }
1356 
1357  void setSubscriptionHandle(amps_handle val)
1358  {
1359  _body.get().setSubscriptionHandle(val);
1360  }
1361 
1362  amps_handle getSubscriptionHandle() const
1363  {
1364  return _body.get().getSubscriptionHandle();
1365  }
1366 
1367  void ack(const char* options_ = NULL) const;
1368 
1369  void setClientImpl(ClientImpl *pClientImpl)
1370  {
1371  _body.get().setClientImpl(pClientImpl);
1372  }
1373 
1374  void setIgnoreAutoAck() const
1375  {
1376  _body.get().setIgnoreAutoAck();
1377  }
1378 
1379  bool getIgnoreAutoAck() const
1380  {
1381  return _body.get().getIgnoreAutoAck();
1382  }
1383 
1384  template <class T> // static
1385  void throwFor(const T& /*context_*/, const std::string& ackReason_) const
1386  {
1387  switch (ackReason_[0])
1388  {
1389  case 'a': // auth failure
1390  throw AuthenticationException("Logon failed for user \"" +
1391  (std::string)getUserId() + "\"");
1392  break;
1393  case 'b':
1394  switch (ackReason_.length())
1395  {
1396  case 10: // bad filter
1397  throw BadFilterException("bad filter '" +
1398  (std::string)getFilter() +
1399  "'");
1400  break;
1401  case 11: // bad sow key
1402  if (getSowKeys().len())
1403  {
1404  throw BadSowKeyException("bad sow key '" +
1405  (std::string)getSowKeys() +
1406  "'");
1407  }
1408  else
1409  {
1410  throw BadSowKeyException("bad sow key '" +
1411  (std::string)getSowKey() +
1412  "'");
1413  }
1414  break;
1415  case 15: // bad regex topic
1416  throw BadRegexTopicException("bad regex topic '" +
1417  (std::string)getTopic() +
1418  "'.");
1419  break;
1420  default:
1421  break;
1422  }
1423  break;
1424  case 'd':
1425  if (ackReason_.length() == 23) // duplicate logon attempt
1426  {
1427  throw DuplicateLogonException("Client '" +
1428  (std::string)getClientName() +
1429  "' with userid '" +
1430  (std::string)getUserId() +
1431  "' duplicate logon attempt");
1432  }
1433  break;
1434  case 'i':
1435  if (ackReason_.length() >= 9)
1436  {
1437  switch (ackReason_[8])
1438  {
1439  case 'b': // invalid bookmark
1440  throw InvalidBookmarkException("invalid bookmark '" +
1441  (std::string)getBookmark() +
1442  "'.");
1443  break;
1444  case 'm': // invalid message type
1445  throw CommandException(std::string("invalid message type '") +
1446  (std::string)getMessageType() +
1447  "'.");
1448  break;
1449  case 'o':
1450  if (ackReason_[9] == 'p') // invalid options
1451  {
1452  throw InvalidOptionsException("invalid options '" +
1453  (std::string)getOptions() +
1454  "'.");
1455  }
1456  else if (ackReason_[9] == 'r') // invalid order by
1457  {
1458  throw InvalidOrderByException("invalid order by '" +
1459  (std::string)getOrderBy() +
1460  "'.");
1461  }
1462  break;
1463  case 's': // invalid subId
1464  throw InvalidSubIdException("invalid subid '" +
1465  (std::string)getSubscriptionId() +
1466  "'.");
1467  break;
1468  case 't':
1469  if (ackReason_.length() == 13) // invalid topic
1470  {
1471  throw InvalidTopicException("invalid topic '" +
1472  (std::string)getTopic() +
1473  "'.");
1474  }
1475  else if (ackReason_.length() == 23) // invalid topic or filter
1476  {
1477  throw InvalidTopicException("invalid topic or filter. Topic '" +
1478  (std::string)getTopic() +
1479  "' Filter '" +
1480  (std::string)getFilter() +
1481  "'.");
1482  }
1483  break;
1484  default:
1485  break;
1486  }
1487  }
1488  break;
1489  case 'l':
1490  if (ackReason_.length() == 14) // logon required
1491  {
1492  throw LogonRequiredException("logon required before command");
1493  }
1494  break;
1495  case 'n':
1496  switch (ackReason_[4])
1497  {
1498  case ' ': // name in use
1499  throw NameInUseException("name in use '" +
1500  (std::string)getClientName() +
1501  "'.");
1502  break;
1503  case 'e': // not entitled
1504  throw NotEntitledException("User \"" +
1505  (std::string)getUserId() +
1506  "\" not entitled to topic \"" +
1507  (std::string)getTopic() +
1508  "\".");
1509  break;
1510  case 'i': // no filter or bookmark
1511  throw MissingFieldsException("command sent with no filter or bookmark.");
1512  break;
1513  case 'l': // no client name
1514  throw MissingFieldsException("command sent with no client name.");
1515  break;
1516  case 'o': // no topic or filter
1517  throw MissingFieldsException("command sent with no topic or filter.");
1518  break;
1519  case 's': // not supported
1520  throw CommandException("operation on topic '" +
1521  (std::string)getTopic() +
1522  "' with options '" +
1523  (std::string)getOptions() +
1524  "' not supported.");
1525  break;
1526  default:
1527  break;
1528  }
1529  break;
1530  case 'o':
1531  switch (ackReason_.length())
1532  {
1533  case 16: // orderby required
1534  throw MissingFieldsException("orderby required");
1535  break;
1536  case 17: // orderby too large
1537  throw CommandException("orderby too large '" +
1538  (std::string)getOrderBy() +
1539  "'.");
1540  break;
1541  }
1542  break;
1543  case 'p':
1544  switch (ackReason_[1])
1545  {
1546  case 'r': // projection clause too large
1547  throw CommandException("projection clause too large in options '" +
1548  (std::string)getOptions() +
1549  "'.");
1550  break;
1551  case 'u': // publish filter no match
1552  throw PublishFilterException("Publish filter '" +
1553  (std::string)getFilter() +
1554  "' doesn't match any records.");
1555  break;
1556  default:
1557  break;
1558  }
1559  break;
1560  case 'r':
1561  switch (ackReason_[2])
1562  {
1563  case 'g': // regex topic not supported
1564  throw BadRegexTopicException("'regex topic not supported '" +
1565  (std::string)getTopic() +
1566  "'.");
1567  break;
1568  default:
1569  break;
1570  }
1571  break;
1572  case 's':
1573  switch (ackReason_[5])
1574  {
1575  case ' ': // subid in use
1576  throw SubidInUseException("subid in use '" +
1577  (std::string)getSubscriptionId() +
1578  "'.");
1579  break;
1580  case 'e': // sow_delete command only supports one of: filter, sow_keys, bookmark, or data
1581  throw CommandException("sow_delete command only supports one of: filter '" +
1582  (std::string)getFilter() +
1583  "', sow_keys '" +
1584  (std::string)getSowKeys() +
1585  "', bookmark '" +
1586  (std::string)getBookmark() +
1587  "', or data '" +
1588  (std::string)getData() +
1589  "'.");
1590  break;
1591  case 't': // sow store failed
1592  throw PublishException("sow store failed.");
1593  break;
1594  default:
1595  break;
1596  }
1597  break;
1598  case 't':
1599  switch (ackReason_[2])
1600  {
1601  case ' ': // tx store failure
1602  throw PublishException("tx store failure.");
1603  break;
1604  case 'n': // txn replay failed
1605  throw CommandException("txn replay failed for '" +
1606  (std::string)getSubId() +
1607  "'.");
1608  break;
1609  }
1610  break;
1611  default:
1612  break;
1613  }
1614  throw CommandException("Error from server while processing this command: '" +
1615  ackReason_ + "'");
1616  }
1617 };
1618 
1619 inline std::string
1620 operator+(const std::string& lhs, const Message::Field& rhs)
1621 {
1622  return lhs + std::string(rhs);
1623 }
1624 
1625 inline std::basic_ostream<char>&
1626 operator<<(std::basic_ostream<char>& os, const Message::Field& rhs)
1627 {
1628  os.write(rhs.data(), (std::streamsize)rhs.len());
1629  return os;
1630 }
1631 inline bool
1632 AMPS::Field::operator<(const AMPS::Field& rhs) const
1633 {
1634  return std::lexicographical_compare(data(), data()+len(), rhs.data(), rhs.data()+rhs.len());
1635 }
1636 
1637 }
1638 
1639 #endif
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:773
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:864
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self&#39;s "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:740
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:105
STL namespace.
Message & setCommandEnum(Command::Type command_)
Set self&#39;s "command" field from one of the values in Command.
Definition: Message.hpp:1128
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:729
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:765
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:719
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1040
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1311
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:931
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:735
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:149
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:753
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:798
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:168
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:679
void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:842
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:966
void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:918
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:828
void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:785
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1252
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:118
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:815
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:889
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:877
void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:524
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:696
Implementation class for a Message.
Definition: Message.hpp:83
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:902
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1298
Definition: ampsplusplus.hpp:103
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:745
Message()
Construct a new, empty Message.
Definition: Message.hpp:532
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:707
void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.