25 #ifndef __AMPS_MESSAGE_HPP__ 26 #define __AMPS_MESSAGE_HPP__ 28 #include "constants.hpp" 29 #include "amps_generated.h" 35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1 38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600) 39 #define AMPS_USE_FUNCTIONAL 1 42 #if (_MSC_VER >= 1600) || ( (__GNUC__ >= 4) && (__GNUC_MINOR__) >=5 ) 43 #define AMPS_USE_LAMBDAS 1 46 #ifdef AMPS_USE_FUNCTIONAL 56 #define AMPS_OPTIONS_NONE "" 57 #define AMPS_OPTIONS_LIVE "live," 58 #define AMPS_OPTIONS_OOF "oof," 59 #define AMPS_OPTIONS_REPLACE "replace," 60 #define AMPS_OPTIONS_NOEMPTIES "no_empties," 61 #define AMPS_OPTIONS_SENDKEYS "send_keys," 62 #define AMPS_OPTIONS_TIMESTAMP "timestamp," 63 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey," 64 #define AMPS_OPTIONS_CANCEL "cancel," 65 #define AMPS_OPTIONS_RESUME "resume," 66 #define AMPS_OPTIONS_PAUSE "pause," 67 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable," 68 #define AMPS_OPTIONS_EXPIRE "expire," 69 #define AMPS_OPTIONS_TOPN(x) "top_n=##x," 70 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x," 71 #define AMPS_OPTIONS_RATE(x) "rate=##x," 75 typedef void* amps_subscription_handle;
89 mutable bool _isIgnoreAutoAck;
90 size_t _bookmarkSeqNo;
91 amps_subscription_handle _subscription;
92 ClientImpl* _clientImpl;
106 bool ignoreAutoAck_ =
false,
size_t bookmarkSeqNo_ = 0,
107 amps_subscription_handle subscription_ = NULL,
108 ClientImpl* clientImpl_=NULL)
109 : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
110 , _bookmarkSeqNo(bookmarkSeqNo_)
111 , _subscription(subscription_), _clientImpl(clientImpl_)
119 : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
133 return new MessageImpl(copy,
true, _isIgnoreAutoAck, _bookmarkSeqNo,
134 _subscription,_clientImpl);
137 void setClientImpl(ClientImpl* clientImpl_)
139 _clientImpl = clientImpl_;
142 ClientImpl* clientImpl(
void)
const 156 Lock<Mutex> l(_lock);
159 _subscription = NULL;
160 _isIgnoreAutoAck =
false;
170 Lock<Mutex> l(_lock);
171 if (_message == message_)
return;
172 if (_owner && _message)
179 _subscription = NULL;
180 _isIgnoreAutoAck =
false;
185 Lock<Mutex> l(_lock);
189 static unsigned long newId()
191 static ATOMIC_TYPE _id = 0;
192 return (
unsigned long)(AMPS_FETCH_ADD(&_id,1));
195 void setBookmarkSeqNo(
size_t val_)
197 _bookmarkSeqNo = val_;
200 size_t getBookmarkSeqNo(
void)
const 202 return _bookmarkSeqNo;
205 void setSubscriptionHandle(amps_subscription_handle subscription_)
207 _subscription = subscription_;
210 amps_subscription_handle getSubscriptionHandle(
void)
const 212 return _subscription;
215 void setIgnoreAutoAck()
const 217 _isIgnoreAutoAck =
true;
220 bool getIgnoreAutoAck()
const 222 return _isIgnoreAutoAck;
232 #ifdef DOXYGEN_PREPROCESSOR 234 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## / 235 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions) 236 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \ 238 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@}) 239 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 240 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 241 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 242 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 243 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.) 247 #define DOX_COMMENTHEAD(s) 248 #define DOX_GROUPNAME(s) 249 #define DOX_OPENGROUP(x) 250 #define DOX_CLOSEGROUP() 251 #define DOX_MAKEGETCOMMENT(x) 252 #define DOX_MAKEGETRAWCOMMENT(x) 253 #define DOX_MAKESETCOMMENT(x) 254 #define DOX_MAKEASSIGNCOMMENT(x) 255 #define DOX_MAKENEWCOMMENT(x) 263 #define AMPS_FIELD(x) \ 265 DOX_MAKEGETCOMMENT(x) \ 266 Field get##x() const {\ 270 amps_message_get_field_value(_body.get().getMessage(),\ 271 AMPS_##x, &ptr, &sz);\ 272 returnValue.assign(ptr, sz);\ 275 DOX_MAKEGETRAWCOMMENT(x) \ 276 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\ 277 amps_message_get_field_value(_body.get().getMessage(),\ 278 AMPS_##x, dataptr, sizeptr);\ 281 DOX_MAKESETCOMMENT(x) \ 282 Message& set##x(const std::string& v) {\ 283 amps_message_set_field_value(_body.get().getMessage(),\ 284 AMPS_##x, v.c_str(), v.length());\ 287 DOX_MAKESETCOMMENT(x) \ 288 Message& set##x(amps_uint64_t v) {\ 290 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 291 amps_message_set_field_value_nts(_body.get().getMessage(),\ 295 DOX_MAKEASSIGNCOMMENT(x) \ 296 Message& assign##x(const std::string& v) {\ 297 amps_message_assign_field_value(_body.get().getMessage(),\ 298 AMPS_##x, v.c_str(), v.length());\ 301 DOX_MAKEASSIGNCOMMENT(x) \ 302 Message& assign##x(const char* data, size_t len) {\ 303 amps_message_assign_field_value(_body.get().getMessage(),\ 304 AMPS_##x, data, len);\ 307 DOX_MAKESETCOMMENT(x) \ 308 Message& set##x(const char* str) {\ 309 amps_message_set_field_value_nts(_body.get().getMessage(),\ 313 DOX_MAKESETCOMMENT(x) \ 314 Message& set##x(const char* str,size_t len) {\ 315 amps_message_set_field_value(_body.get().getMessage(),\ 319 DOX_MAKENEWCOMMENT(x) \ 321 char buf[Message::IdentifierLength+1];\ 322 buf[Message::IdentifierLength] = 0;\ 323 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\ 324 amps_message_set_field_value_nts(_body.get().getMessage(),\ 330 #define AMPS_FIELD_ALIAS(x,y) \ 332 DOX_MAKEGETCOMMENT(y) \ 333 Field get##y() const {\ 337 amps_message_get_field_value(_body.get().getMessage(),\ 338 AMPS_##y, &ptr, &sz);\ 339 returnValue.assign(ptr, sz);\ 342 DOX_MAKEGETRAWCOMMENT(y) \ 343 void getRaw##y(const char** dataptr, size_t* sizeptr) const {\ 344 amps_message_get_field_value(_body.get().getMessage(),\ 345 AMPS_##y, dataptr, sizeptr);\ 348 DOX_MAKESETCOMMENT(y) \ 349 Message& set##y(const std::string& v) {\ 350 amps_message_set_field_value(_body.get().getMessage(),\ 351 AMPS_##y, v.c_str(), v.length());\ 354 DOX_MAKESETCOMMENT(y) \ 355 Message& set##y(amps_uint64_t v) {\ 357 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 358 amps_message_set_field_value_nts(_body.get().getMessage(),\ 362 DOX_MAKEASSIGNCOMMENT(y) \ 363 Message& assign##y(const std::string& v) {\ 364 amps_message_assign_field_value(_body.get().getMessage(),\ 365 AMPS_##y, v.c_str(), v.length());\ 368 DOX_MAKEASSIGNCOMMENT(y) \ 369 Message& assign##y(const char* data, size_t len) {\ 370 amps_message_assign_field_value(_body.get().getMessage(),\ 371 AMPS_##y, data, len);\ 374 DOX_MAKESETCOMMENT(y) \ 375 Message& set##y(const char* str) {\ 376 amps_message_set_field_value_nts(_body.get().getMessage(),\ 380 DOX_MAKESETCOMMENT(y) \ 381 Message& set##y(const char* str,size_t len) {\ 382 amps_message_set_field_value(_body.get().getMessage(),\ 386 DOX_MAKENEWCOMMENT(y) \ 388 char buf[Message::IdentifierLength+1];\ 389 buf[Message::IdentifierLength] = 0;\ 390 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\ 391 amps_message_set_field_value_nts(_body.get().getMessage(),\ 395 DOX_MAKEGETCOMMENT(y) \ 396 Field get##x() const {\ 400 amps_message_get_field_value(_body.get().getMessage(),\ 401 AMPS_##y, &ptr, &sz);\ 402 returnValue.assign(ptr, sz);\ 405 DOX_MAKEGETRAWCOMMENT(y) \ 406 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\ 407 amps_message_get_field_value(_body.get().getMessage(),\ 408 AMPS_##y, dataptr, sizeptr);\ 411 DOX_MAKESETCOMMENT(y) \ 412 Message& set##x(const std::string& v) {\ 413 amps_message_set_field_value(_body.get().getMessage(),\ 414 AMPS_##y, v.c_str(), v.length());\ 417 DOX_MAKESETCOMMENT(y) \ 418 Message& set##x(amps_uint64_t v) {\ 420 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 421 amps_message_set_field_value_nts(_body.get().getMessage(),\ 425 DOX_MAKEASSIGNCOMMENT(y) \ 426 Message& assign##x(const std::string& v) {\ 427 amps_message_assign_field_value(_body.get().getMessage(),\ 428 AMPS_##y, v.c_str(), v.length());\ 431 DOX_MAKEASSIGNCOMMENT(y) \ 432 Message& assign##x(const char* data, size_t len) {\ 433 amps_message_assign_field_value(_body.get().getMessage(),\ 434 AMPS_##y, data, len);\ 437 DOX_MAKESETCOMMENT(y) \ 438 Message& set##x(const char* str) {\ 439 amps_message_set_field_value_nts(_body.get().getMessage(),\ 443 DOX_MAKESETCOMMENT(y) \ 444 Message& set##x(const char* str,size_t len) {\ 445 amps_message_set_field_value(_body.get().getMessage(),\ 449 DOX_MAKENEWCOMMENT(y) \ 451 char buf[Message::IdentifierLength+1];\ 452 buf[Message::IdentifierLength] = 0;\ 453 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\ 454 amps_message_set_field_value_nts(_body.get().getMessage(),\ 503 RefHandle<MessageImpl> _body;
512 static const unsigned int IdentifierLength = 32;
516 static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
540 return Message(_body.get().copy());
556 static const char* None(
void) {
return AMPS_OPTIONS_NONE; }
557 static const char* Live(
void) {
return AMPS_OPTIONS_LIVE; }
558 static const char* OOF(
void) {
return AMPS_OPTIONS_OOF; }
559 static const char* Replace(
void) {
return AMPS_OPTIONS_REPLACE; }
560 static const char* NoEmpties(
void) {
return AMPS_OPTIONS_NOEMPTIES; }
561 static const char* SendKeys(
void) {
return AMPS_OPTIONS_SENDKEYS; }
562 static const char* Timestamp(
void) {
return AMPS_OPTIONS_TIMESTAMP; }
563 static const char* NoSowKey(
void) {
return AMPS_OPTIONS_NOSOWKEY; }
564 static const char* Cancel(
void) {
return AMPS_OPTIONS_CANCEL; }
565 static const char* Resume(
void) {
return AMPS_OPTIONS_RESUME; }
566 static const char* Pause(
void) {
return AMPS_OPTIONS_PAUSE; }
567 static const char* FullyDurable(
void) {
return AMPS_OPTIONS_FULLY_DURABLE; }
568 static const char* Expire(
void) {
return AMPS_OPTIONS_EXPIRE; }
569 static std::string Conflation(
const char* conflation_)
572 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
575 static std::string ConflationKey(
const char* conflationKey_)
577 std::string option(
"conflation_key=");
578 option.append(conflationKey_).append(
",");
581 static std::string TopN(
int topN_)
584 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
587 static std::string MaxBacklog(
int maxBacklog_)
590 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
593 static std::string Rate(
const char* rate_)
596 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
599 static std::string RateMaxGap(
const char* rateMaxGap_)
602 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
605 static std::string SkipN(
int skipN_)
608 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
612 static std::string Projection(
const std::string& projection_)
614 return "projection=[" + projection_ +
"],";
617 template<
class Iterator>
618 static std::string Projection(Iterator begin_, Iterator end_)
620 std::string projection =
"projection=[";
621 for (Iterator i = begin_; i != end_; ++i)
626 projection.insert(projection.length() - 1,
"]");
630 static std::string Grouping(
const std::string& grouping_)
632 return "grouping=[" + grouping_ +
"],";
635 template<
class Iterator>
636 static std::string Grouping(Iterator begin_, Iterator end_)
638 std::string grouping =
"grouping=[";
639 for (Iterator i = begin_; i != end_; ++i)
644 grouping.insert(grouping.length() - 1,
"]");
648 static std::string Select(
const std::string& select_)
650 return "select=[" + select_ +
"],";
653 template<
class Iterator>
654 static std::string Select(Iterator begin_, Iterator end_)
656 std::string select =
"select=[";
657 for (Iterator i = begin_; i != end_; ++i)
662 select.insert(select.length() - 1,
"]");
666 static std::string AckConflationInterval(
const std::string& interval_)
668 return "ack_conflation=" + interval_ +
",";
671 static std::string AckConflationInterval(
const char* interval_)
673 static const std::string start(
"ack_conflation=");
674 return start + interval_ +
",";
680 : _optionStr(options_)
686 int getMaxBacklog(
void)
const {
return _maxBacklog; }
687 std::string getConflation(
void)
const {
return _conflation; }
688 std::string getConflationKey(
void)
const {
return _conflationKey; }
689 int getTopN(
void)
const {
return _topN; }
690 std::string getRate(
void)
const {
return _rate; }
691 std::string getRateMaxGap(
void)
const {
return _rateMaxGap; }
707 void setLive(
void) { _optionStr += AMPS_OPTIONS_LIVE; }
713 void setOOF(
void) { _optionStr += AMPS_OPTIONS_OOF; }
719 void setReplace(
void) { _optionStr += AMPS_OPTIONS_REPLACE; }
745 void setCancel(
void) { _optionStr += AMPS_OPTIONS_CANCEL; }
753 void setResume(
void) { _optionStr += AMPS_OPTIONS_RESUME; }
765 void setPause(
void) { _optionStr += AMPS_OPTIONS_PAUSE; }
788 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
790 _maxBacklog = maxBacklog_;
801 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
803 _conflation = conflation_;
818 AMPS_snprintf(buf,
sizeof(buf),
"conflation_key=%s,", conflationKey_);
820 _conflationKey = conflationKey_;
831 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
845 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
867 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
869 _rateMaxGap = rateMaxGap_;
880 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
891 _projection =
"projection=[" + projection_ +
"],";
892 _optionStr += _projection;
901 template<
class Iterator>
904 _projection =
"projection=[";
905 for (Iterator i = begin_; i != end_; ++i)
910 _projection.insert(_projection.length() - 1,
"]");
911 _optionStr += _projection;
920 _grouping =
"grouping=[" + grouping_ +
"],";
921 _optionStr += _grouping;
930 template<
class Iterator>
933 _grouping =
"grouping=[";
934 for (Iterator i = begin_; i != end_; ++i)
939 _grouping.insert(_grouping.length() - 1,
"]");
940 _optionStr += _grouping;
946 operator const std::string()
948 return _optionStr.substr(0, _optionStr.length()-1);
952 std::string _optionStr;
954 std::string _conflation;
955 std::string _conflationKey;
958 std::string _rateMaxGap;
960 std::string _projection;
961 std::string _grouping;
970 None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
976 static inline
AckType::Type decodeSingleAckType(const
char* begin, const
char* end)
981 return AckType::Stats;
983 return AckType::Parsed;
985 return AckType::Received;
989 case 'e':
return AckType::Persisted;
990 case 'r':
return AckType::Processed;
991 case 'o':
return AckType::Completed;
998 return AckType::None;
1005 unsigned result = AckType::None;
1006 const char* data = NULL;
size_t len = 0;
1008 const char* mark = data;
1009 for (
const char* end = data + len; data != end; ++data)
1013 result |= decodeSingleAckType(mark, data);
1017 if (mark < data) result |= decodeSingleAckType(mark, data);
1025 if(ackType_ < AckTypeConstants<0>::Entries)
1028 AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1033 AMPS_FIELD(BatchSize)
1034 AMPS_FIELD(Bookmark)
1053 SOWAndSubscribe = 256,
1054 DeltaSubscribe = 512,
1055 SOWAndDeltaSubscribe = 1024,
1063 NoDataCommands = Publish|Unsubscribe|Heartbeat|SOWDelete|DeltaPublish
1064 |Logon|StartTimer|StopTimer|Flush
1070 const char* data = NULL;
size_t len = 0;
1074 case 1:
return Command::Publish;
1078 case 's':
return Command::SOW;
1079 case 'o':
return Command::OOF;
1080 case 'a':
return Command::Ack;
1086 case 'l':
return Command::Logon;
1087 case 'f':
return Command::Flush;
1091 return Command::Publish;
1096 case 's':
return Command::Subscribe;
1097 case 'h':
return Command::Heartbeat;
1098 case 'g':
return Command::GroupEnd;
1104 case 'o':
return Command::SOWDelete;
1105 case 't':
return Command::StopTimer;
1111 case 'g':
return Command::GroupBegin;
1112 case 'u':
return Command::Unsubscribe;
1116 return Command::DeltaPublish;
1118 return Command::DeltaSubscribe;
1120 return Command::SOWAndSubscribe;
1122 return Command::SOWAndDeltaSubscribe;
1124 return Command::Unknown;
1131 unsigned command = command_;
1132 while (command > 0) { ++bits; command >>= 1; }
1134 CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1138 AMPS_FIELD(CommandId)
1139 AMPS_FIELD(ClientName)
1140 AMPS_FIELD(CorrelationId)
1141 AMPS_FIELD(Expiration)
1143 AMPS_FIELD(GroupSequenceNumber)
1144 AMPS_FIELD(Heartbeat)
1145 AMPS_FIELD(LeasePeriod)
1147 AMPS_FIELD(MessageLength)
1148 AMPS_FIELD(MessageType)
1152 Field getOptions()
const {
1157 AMPS_Options, &ptr, &sz);
1158 if (sz && ptr[sz-1] ==
',') --sz;
1159 returnValue.assign(ptr, sz);
1163 DOX_MAKEGETRAWCOMMENT(
Options)
1164 void getRawOptions(const
char** dataptr,
size_t* sizeptr)
const {
1166 AMPS_Options, dataptr, sizeptr);
1167 if (*sizeptr && *dataptr && (*dataptr)[*sizeptr-1] ==
',') --*sizeptr;
1173 size_t sz = v.length();
1174 if (sz && v[sz-1] ==
',') --sz;
1176 AMPS_Options, v.c_str(), sz);
1180 DOX_MAKEASSIGNCOMMENT(
Options)
1182 size_t sz = v.length();
1183 if (sz && v[sz-1] ==
',') --sz;
1185 AMPS_Options, v.c_str(), sz);
1189 DOX_MAKEASSIGNCOMMENT(
Options)
1190 Message& assignOptions(const
char* data,
size_t len) {
1191 if (len && data[len-1] ==
',') --len;
1193 AMPS_Options, data, len);
1201 size_t sz = strlen(str);
1202 if (sz && str[sz-1] ==
',') --sz;
1204 AMPS_Options, str, sz);
1209 AMPS_Options, str, 0);
1216 if (len && str[len-1] ==
',') --len;
1218 AMPS_Options, str, len);
1224 AMPS_FIELD(Password)
1225 AMPS_FIELD_ALIAS(QueryId, QueryID)
1227 AMPS_FIELD(RecordsInserted)
1228 AMPS_FIELD(RecordsReturned)
1229 AMPS_FIELD(RecordsUpdated)
1230 AMPS_FIELD(Sequence)
1231 AMPS_FIELD(SowDelete)
1235 AMPS_FIELD_ALIAS(SubId, SubscriptionId)
1236 AMPS_FIELD(SubscriptionIds)
1237 AMPS_FIELD(TimeoutInterval)
1238 AMPS_FIELD(Timestamp)
1245 return getTimestamp();
1254 getRawTimestamp(dataptr, sizeptr);
1258 AMPS_FIELD(TopicMatches)
1259 AMPS_FIELD(TopNRecordsReturned)
1274 returnValue.assign(ptr, sz);
1278 void getRawData(
const char **data,
size_t *sz)
const 1289 Message& assignData(
const std::string &v_)
1303 Message& assignData(
const char* data_,
size_t length_)
1316 Message& assignData(
const char* data_)
1323 return _body.get().getMessage();
1327 _body.get().replace(message, owner);
1331 _body.get().disown();
1337 bool isValid(
void)
const 1339 return _body.isValid();
1343 _body.get().reset();
1347 void setBookmarkSeqNo(
size_t val)
1349 _body.get().setBookmarkSeqNo(val);
1352 size_t getBookmarkSeqNo()
const 1354 return _body.get().getBookmarkSeqNo();
1359 _body.get().setSubscriptionHandle(val);
1364 return _body.get().getSubscriptionHandle();
1367 void ack(
const char* options_ = NULL)
const;
1369 void setClientImpl(ClientImpl *pClientImpl)
1371 _body.get().setClientImpl(pClientImpl);
1374 void setIgnoreAutoAck()
const 1376 _body.get().setIgnoreAutoAck();
1379 bool getIgnoreAutoAck()
const 1381 return _body.get().getIgnoreAutoAck();
1385 void throwFor(
const T& ,
const std::string& ackReason_)
const 1387 switch (ackReason_[0])
1390 throw AuthenticationException(
"Logon failed for user \"" +
1391 (std::string)getUserId() +
"\"");
1394 switch (ackReason_.length())
1397 throw BadFilterException(
"bad filter '" +
1398 (std::string)getFilter() +
1402 if (getSowKeys().len())
1404 throw BadSowKeyException(
"bad sow key '" +
1405 (std::string)getSowKeys() +
1410 throw BadSowKeyException(
"bad sow key '" +
1411 (std::string)getSowKey() +
1416 throw BadRegexTopicException(
"bad regex topic '" +
1417 (std::string)getTopic() +
1425 if (ackReason_.length() == 23)
1427 throw DuplicateLogonException(
"Client '" +
1428 (std::string)getClientName() +
1430 (std::string)getUserId() +
1431 "' duplicate logon attempt");
1435 if (ackReason_.length() >= 9)
1437 switch (ackReason_[8])
1440 throw InvalidBookmarkException(
"invalid bookmark '" +
1441 (std::string)getBookmark() +
1445 throw CommandException(std::string(
"invalid message type '") +
1446 (std::string)getMessageType() +
1450 if (ackReason_[9] ==
'p')
1452 throw InvalidOptionsException(
"invalid options '" +
1453 (std::string)getOptions() +
1456 else if (ackReason_[9] ==
'r')
1458 throw InvalidOrderByException(
"invalid order by '" +
1459 (std::string)getOrderBy() +
1464 throw InvalidSubIdException(
"invalid subid '" +
1465 (std::string)getSubscriptionId() +
1469 if (ackReason_.length() == 13)
1471 throw InvalidTopicException(
"invalid topic '" +
1472 (std::string)getTopic() +
1475 else if (ackReason_.length() == 23)
1477 throw InvalidTopicException(
"invalid topic or filter. Topic '" +
1478 (std::string)getTopic() +
1480 (std::string)getFilter() +
1490 if (ackReason_.length() == 14)
1492 throw LogonRequiredException(
"logon required before command");
1496 switch (ackReason_[4])
1499 throw NameInUseException(
"name in use '" +
1500 (std::string)getClientName() +
1504 throw NotEntitledException(
"User \"" +
1505 (std::string)getUserId() +
1506 "\" not entitled to topic \"" +
1507 (std::string)getTopic() +
1511 throw MissingFieldsException(
"command sent with no filter or bookmark.");
1514 throw MissingFieldsException(
"command sent with no client name.");
1517 throw MissingFieldsException(
"command sent with no topic or filter.");
1520 throw CommandException(
"operation on topic '" +
1521 (std::string)getTopic() +
1522 "' with options '" +
1523 (std::string)getOptions() +
1524 "' not supported.");
1531 switch (ackReason_.length())
1534 throw MissingFieldsException(
"orderby required");
1537 throw CommandException(
"orderby too large '" +
1538 (std::string)getOrderBy() +
1544 switch (ackReason_[1])
1547 throw CommandException(
"projection clause too large in options '" +
1548 (std::string)getOptions() +
1552 throw PublishFilterException(
"Publish filter '" +
1553 (std::string)getFilter() +
1554 "' doesn't match any records.");
1561 switch (ackReason_[2])
1564 throw BadRegexTopicException(
"'regex topic not supported '" +
1565 (std::string)getTopic() +
1573 switch (ackReason_[5])
1576 throw SubidInUseException(
"subid in use '" +
1577 (std::string)getSubscriptionId() +
1581 throw CommandException(
"sow_delete command only supports one of: filter '" +
1582 (std::string)getFilter() +
1584 (std::string)getSowKeys() +
1586 (std::string)getBookmark() +
1588 (std::string)getData() +
1592 throw PublishException(
"sow store failed.");
1599 switch (ackReason_[2])
1602 throw PublishException(
"tx store failure.");
1605 throw CommandException(
"txn replay failed for '" +
1606 (std::string)getSubId() +
1614 throw CommandException(
"Error from server while processing this command: '" +
1622 return lhs + std::string(rhs);
1625 inline std::basic_ostream<char>&
1626 operator<<(std::basic_ostream<char>& os,
const Message::Field& rhs)
1628 os.write(rhs.
data(), (std::streamsize)rhs.
len());
1632 AMPS::Field::operator<(
const AMPS::Field& rhs)
const 1634 return std::lexicographical_compare(data(), data()+len(), rhs.
data(), rhs.
data()+rhs.
len());
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:773
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:179
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1284
void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:864
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1068
void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:724
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:538
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:713
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1023
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:740
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:105
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1128
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:729
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:179
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:553
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:765
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:719
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1040
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1311
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:931
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:735
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:149
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:753
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:798
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:168
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:679
void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:842
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:966
void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:918
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:828
void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:785
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1252
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:118
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:815
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:889
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:877
void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:524
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:696
Implementation class for a Message.
Definition: Message.hpp:83
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:902
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1298
Definition: ampsplusplus.hpp:103
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:745
Message()
Construct a new, empty Message.
Definition: Message.hpp:532
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:707
void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.