25 #ifndef __AMPS_MESSAGE_HPP__ 26 #define __AMPS_MESSAGE_HPP__ 28 #include "constants.hpp" 29 #include "amps_generated.h" 35 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1 38 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600) 39 #define AMPS_USE_FUNCTIONAL 1 42 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=5 ) 43 #define AMPS_USE_LAMBDAS 1 46 #if (_MSC_VER >= 1600) || (__GNUC__ > 4) || ( (__GNUC__ == 4) && (__GNUC_MINOR__) >=8 ) 47 #define AMPS_USE_EMPLACE 1 50 #ifdef AMPS_USE_FUNCTIONAL 60 #define AMPS_OPTIONS_NONE "" 61 #define AMPS_OPTIONS_LIVE "live," 62 #define AMPS_OPTIONS_OOF "oof," 63 #define AMPS_OPTIONS_REPLACE "replace," 64 #define AMPS_OPTIONS_NOEMPTIES "no_empties," 65 #define AMPS_OPTIONS_SENDKEYS "send_keys," 66 #define AMPS_OPTIONS_TIMESTAMP "timestamp," 67 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey," 68 #define AMPS_OPTIONS_CANCEL "cancel," 69 #define AMPS_OPTIONS_RESUME "resume," 70 #define AMPS_OPTIONS_PAUSE "pause," 71 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable," 72 #define AMPS_OPTIONS_EXPIRE "expire," 73 #define AMPS_OPTIONS_TOPN(x) "top_n=##x," 74 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x," 75 #define AMPS_OPTIONS_RATE(x) "rate=##x," 79 typedef void* amps_subscription_handle;
93 mutable bool _isIgnoreAutoAck;
94 size_t _bookmarkSeqNo;
95 amps_subscription_handle _subscription;
96 ClientImpl* _clientImpl;
110 bool ignoreAutoAck_ =
false,
size_t bookmarkSeqNo_ = 0,
111 amps_subscription_handle subscription_ = NULL,
112 ClientImpl* clientImpl_ = NULL)
113 : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
114 , _bookmarkSeqNo(bookmarkSeqNo_)
115 , _subscription(subscription_), _clientImpl(clientImpl_)
123 : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
131 if (_owner && _message)
140 return new MessageImpl(copy,
true, _isIgnoreAutoAck, _bookmarkSeqNo,
141 _subscription, _clientImpl);
144 void setClientImpl(ClientImpl* clientImpl_)
146 _clientImpl = clientImpl_;
149 ClientImpl* clientImpl(
void)
const 163 Lock<Mutex> l(_lock);
166 _subscription = NULL;
167 _isIgnoreAutoAck =
false;
177 Lock<Mutex> l(_lock);
178 if (_message == message_)
182 if (_owner && _message)
189 _subscription = NULL;
190 _isIgnoreAutoAck =
false;
195 Lock<Mutex> l(_lock);
199 static unsigned long newId()
201 static ATOMIC_TYPE _id = 0;
202 return (
unsigned long)(AMPS_FETCH_ADD(&_id, 1));
205 void setBookmarkSeqNo(
size_t val_)
207 _bookmarkSeqNo = val_;
210 size_t getBookmarkSeqNo(
void)
const 212 return _bookmarkSeqNo;
215 void setSubscriptionHandle(amps_subscription_handle subscription_)
217 _subscription = subscription_;
220 amps_subscription_handle getSubscriptionHandle(
void)
const 222 return _subscription;
225 void setIgnoreAutoAck()
const 227 _isIgnoreAutoAck =
true;
230 bool getIgnoreAutoAck()
const 232 return _isIgnoreAutoAck;
242 #ifdef DOXYGEN_PREPROCESSOR 244 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## / 245 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions) 246 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) \ 248 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@}) 249 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 250 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 251 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 252 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 253 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as %CommandId and %SubId.) 257 #define DOX_COMMENTHEAD(s) 258 #define DOX_GROUPNAME(s) 259 #define DOX_OPENGROUP(x) 260 #define DOX_CLOSEGROUP() 261 #define DOX_MAKEGETCOMMENT(x) 262 #define DOX_MAKEGETRAWCOMMENT(x) 263 #define DOX_MAKESETCOMMENT(x) 264 #define DOX_MAKEASSIGNCOMMENT(x) 265 #define DOX_MAKENEWCOMMENT(x) 273 #define AMPS_FIELD(x) \ 275 DOX_MAKEGETCOMMENT(x) \ 276 Field get##x() const {\ 280 amps_message_get_field_value(_body.get().getMessage(),\ 281 AMPS_##x, &ptr, &sz);\ 282 returnValue.assign(ptr, sz);\ 285 DOX_MAKEGETRAWCOMMENT(x) \ 286 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\ 287 amps_message_get_field_value(_body.get().getMessage(),\ 288 AMPS_##x, dataptr, sizeptr);\ 291 DOX_MAKESETCOMMENT(x) \ 292 Message& set##x(const std::string& v) {\ 293 amps_message_set_field_value(_body.get().getMessage(),\ 294 AMPS_##x, v.c_str(), v.length());\ 297 DOX_MAKESETCOMMENT(x) \ 298 Message& set##x(amps_uint64_t v) {\ 300 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 301 amps_message_set_field_value_nts(_body.get().getMessage(),\ 305 DOX_MAKEASSIGNCOMMENT(x) \ 306 Message& assign##x(const std::string& v) {\ 307 amps_message_assign_field_value(_body.get().getMessage(),\ 308 AMPS_##x, v.c_str(), v.length());\ 311 DOX_MAKEASSIGNCOMMENT(x) \ 312 Message& assign##x(const char* data, size_t len) {\ 313 amps_message_assign_field_value(_body.get().getMessage(),\ 314 AMPS_##x, data, len);\ 317 DOX_MAKESETCOMMENT(x) \ 318 Message& set##x(const char* str) {\ 319 amps_message_set_field_value_nts(_body.get().getMessage(),\ 323 DOX_MAKESETCOMMENT(x) \ 324 Message& set##x(const char* str,size_t len) {\ 325 amps_message_set_field_value(_body.get().getMessage(),\ 329 DOX_MAKENEWCOMMENT(x) \ 331 char buf[Message::IdentifierLength+1];\ 332 buf[Message::IdentifierLength] = 0;\ 333 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\ 334 amps_message_set_field_value_nts(_body.get().getMessage(),\ 340 #define AMPS_FIELD_ALIAS(x,y) \ 342 DOX_MAKEGETCOMMENT(y) \ 343 Field get##y() const {\ 347 amps_message_get_field_value(_body.get().getMessage(),\ 348 AMPS_##y, &ptr, &sz);\ 349 returnValue.assign(ptr, sz);\ 352 DOX_MAKEGETRAWCOMMENT(y) \ 353 void getRaw##y(const char** dataptr, size_t* sizeptr) const {\ 354 amps_message_get_field_value(_body.get().getMessage(),\ 355 AMPS_##y, dataptr, sizeptr);\ 358 DOX_MAKESETCOMMENT(y) \ 359 Message& set##y(const std::string& v) {\ 360 amps_message_set_field_value(_body.get().getMessage(),\ 361 AMPS_##y, v.c_str(), v.length());\ 364 DOX_MAKESETCOMMENT(y) \ 365 Message& set##y(amps_uint64_t v) {\ 367 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 368 amps_message_set_field_value_nts(_body.get().getMessage(),\ 372 DOX_MAKEASSIGNCOMMENT(y) \ 373 Message& assign##y(const std::string& v) {\ 374 amps_message_assign_field_value(_body.get().getMessage(),\ 375 AMPS_##y, v.c_str(), v.length());\ 378 DOX_MAKEASSIGNCOMMENT(y) \ 379 Message& assign##y(const char* data, size_t len) {\ 380 amps_message_assign_field_value(_body.get().getMessage(),\ 381 AMPS_##y, data, len);\ 384 DOX_MAKESETCOMMENT(y) \ 385 Message& set##y(const char* str) {\ 386 amps_message_set_field_value_nts(_body.get().getMessage(),\ 390 DOX_MAKESETCOMMENT(y) \ 391 Message& set##y(const char* str,size_t len) {\ 392 amps_message_set_field_value(_body.get().getMessage(),\ 396 DOX_MAKENEWCOMMENT(y) \ 398 char buf[Message::IdentifierLength+1];\ 399 buf[Message::IdentifierLength] = 0;\ 400 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\ 401 amps_message_set_field_value_nts(_body.get().getMessage(),\ 405 DOX_MAKEGETCOMMENT(y) \ 406 Field get##x() const {\ 410 amps_message_get_field_value(_body.get().getMessage(),\ 411 AMPS_##y, &ptr, &sz);\ 412 returnValue.assign(ptr, sz);\ 415 DOX_MAKEGETRAWCOMMENT(y) \ 416 void getRaw##x(const char** dataptr, size_t* sizeptr) const {\ 417 amps_message_get_field_value(_body.get().getMessage(),\ 418 AMPS_##y, dataptr, sizeptr);\ 421 DOX_MAKESETCOMMENT(y) \ 422 Message& set##x(const std::string& v) {\ 423 amps_message_set_field_value(_body.get().getMessage(),\ 424 AMPS_##y, v.c_str(), v.length());\ 427 DOX_MAKESETCOMMENT(y) \ 428 Message& set##x(amps_uint64_t v) {\ 430 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 431 amps_message_set_field_value_nts(_body.get().getMessage(),\ 435 DOX_MAKEASSIGNCOMMENT(y) \ 436 Message& assign##x(const std::string& v) {\ 437 amps_message_assign_field_value(_body.get().getMessage(),\ 438 AMPS_##y, v.c_str(), v.length());\ 441 DOX_MAKEASSIGNCOMMENT(y) \ 442 Message& assign##x(const char* data, size_t len) {\ 443 amps_message_assign_field_value(_body.get().getMessage(),\ 444 AMPS_##y, data, len);\ 447 DOX_MAKESETCOMMENT(y) \ 448 Message& set##x(const char* str) {\ 449 amps_message_set_field_value_nts(_body.get().getMessage(),\ 453 DOX_MAKESETCOMMENT(y) \ 454 Message& set##x(const char* str,size_t len) {\ 455 amps_message_set_field_value(_body.get().getMessage(),\ 459 DOX_MAKENEWCOMMENT(y) \ 461 char buf[Message::IdentifierLength+1];\ 462 buf[Message::IdentifierLength] = 0;\ 463 AMPS_snprintf(buf, Message::IdentifierLength+1, "auto%zx" , (size_t)(_body.get().newId()));\ 464 amps_message_set_field_value_nts(_body.get().getMessage(),\ 513 RefHandle<MessageImpl> _body;
522 static const unsigned int IdentifierLength = 32;
526 static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
550 return Message(_body.get().copy());
566 static const char* None(
void)
568 return AMPS_OPTIONS_NONE;
570 static const char* Live(
void)
572 return AMPS_OPTIONS_LIVE;
574 static const char* OOF(
void)
576 return AMPS_OPTIONS_OOF;
578 static const char* Replace(
void)
580 return AMPS_OPTIONS_REPLACE;
582 static const char* NoEmpties(
void)
584 return AMPS_OPTIONS_NOEMPTIES;
586 static const char* SendKeys(
void)
588 return AMPS_OPTIONS_SENDKEYS;
590 static const char* Timestamp(
void)
592 return AMPS_OPTIONS_TIMESTAMP;
594 static const char* NoSowKey(
void)
596 return AMPS_OPTIONS_NOSOWKEY;
598 static const char* Cancel(
void)
600 return AMPS_OPTIONS_CANCEL;
602 static const char* Resume(
void)
604 return AMPS_OPTIONS_RESUME;
606 static const char* Pause(
void)
608 return AMPS_OPTIONS_PAUSE;
610 static const char* FullyDurable(
void)
612 return AMPS_OPTIONS_FULLY_DURABLE;
614 static const char* Expire(
void)
616 return AMPS_OPTIONS_EXPIRE;
618 static std::string Conflation(
const char* conflation_)
621 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
624 static std::string ConflationKey(
const char* conflationKey_)
626 std::string option(
"conflation_key=");
627 option.append(conflationKey_).append(
",");
630 static std::string TopN(
int topN_)
633 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
636 static std::string MaxBacklog(
int maxBacklog_)
639 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
642 static std::string Rate(
const char* rate_)
645 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
648 static std::string RateMaxGap(
const char* rateMaxGap_)
651 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
654 static std::string SkipN(
int skipN_)
657 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
661 static std::string Projection(
const std::string& projection_)
663 return "projection=[" + projection_ +
"],";
666 template<
class Iterator>
667 static std::string Projection(Iterator begin_, Iterator end_)
669 std::string projection =
"projection=[";
670 for (Iterator i = begin_; i != end_; ++i)
675 projection.insert(projection.length() - 1,
"]");
679 static std::string Grouping(
const std::string& grouping_)
681 return "grouping=[" + grouping_ +
"],";
684 template<
class Iterator>
685 static std::string Grouping(Iterator begin_, Iterator end_)
687 std::string grouping =
"grouping=[";
688 for (Iterator i = begin_; i != end_; ++i)
693 grouping.insert(grouping.length() - 1,
"]");
697 static std::string Select(
const std::string& select_)
699 return "select=[" + select_ +
"],";
702 template<
class Iterator>
703 static std::string Select(Iterator begin_, Iterator end_)
705 std::string select =
"select=[";
706 for (Iterator i = begin_; i != end_; ++i)
711 select.insert(select.length() - 1,
"]");
715 static std::string AckConflationInterval(
const std::string& interval_)
717 return "ack_conflation=" + interval_ +
",";
720 static std::string AckConflationInterval(
const char* interval_)
722 static const std::string start(
"ack_conflation=");
723 return start + interval_ +
",";
729 : _optionStr(options_)
735 int getMaxBacklog(
void)
const 739 std::string getConflation(
void)
const 743 std::string getConflationKey(
void)
const 745 return _conflationKey;
747 int getTopN(
void)
const 751 std::string getRate(
void)
const 755 std::string getRateMaxGap(
void)
const 779 _optionStr += AMPS_OPTIONS_LIVE;
788 _optionStr += AMPS_OPTIONS_OOF;
797 _optionStr += AMPS_OPTIONS_REPLACE;
805 _optionStr += AMPS_OPTIONS_NOEMPTIES;
813 _optionStr += AMPS_OPTIONS_SENDKEYS;
822 _optionStr += AMPS_OPTIONS_TIMESTAMP;
830 _optionStr += AMPS_OPTIONS_NOSOWKEY;
838 _optionStr += AMPS_OPTIONS_CANCEL;
849 _optionStr += AMPS_OPTIONS_RESUME;
864 _optionStr += AMPS_OPTIONS_PAUSE;
875 _optionStr += AMPS_OPTIONS_FULLY_DURABLE;
891 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
893 _maxBacklog = maxBacklog_;
904 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
906 _conflation = conflation_;
921 AMPS_snprintf(buf,
sizeof(buf),
"conflation_key=%s,", conflationKey_);
923 _conflationKey = conflationKey_;
934 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
948 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
970 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
972 _rateMaxGap = rateMaxGap_;
983 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
994 _projection =
"projection=[" + projection_ +
"],";
995 _optionStr += _projection;
1004 template<
class Iterator>
1007 _projection =
"projection=[";
1008 for (Iterator i = begin_; i != end_; ++i)
1013 _projection.insert(_projection.length() - 1,
"]");
1014 _optionStr += _projection;
1023 _grouping =
"grouping=[" + grouping_ +
"],";
1024 _optionStr += _grouping;
1033 template<
class Iterator>
1036 _grouping =
"grouping=[";
1037 for (Iterator i = begin_; i != end_; ++i)
1042 _grouping.insert(_grouping.length() - 1,
"]");
1043 _optionStr += _grouping;
1049 operator const std::string()
1051 return _optionStr.substr(0, _optionStr.length() - 1);
1055 std::string _optionStr;
1057 std::string _conflation;
1058 std::string _conflationKey;
1061 std::string _rateMaxGap;
1063 std::string _projection;
1064 std::string _grouping;
1073 None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
1079 static inline
AckType::Type decodeSingleAckType(const
char* begin, const
char* end)
1081 switch (end - begin)
1084 return AckType::Stats;
1086 return AckType::Parsed;
1088 return AckType::Received;
1092 case 'e':
return AckType::Persisted;
1093 case 'r':
return AckType::Processed;
1094 case 'o':
return AckType::Completed;
1101 return AckType::None;
1108 unsigned result = AckType::None;
1109 const char* data = NULL;
size_t len = 0;
1111 const char* mark = data;
1112 for (
const char* end = data + len; data != end; ++data)
1116 result |= decodeSingleAckType(mark, data);
1122 result |= decodeSingleAckType(mark, data);
1131 if (ackType_ < AckTypeConstants<0>::Entries)
1134 AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1139 AMPS_FIELD(BatchSize)
1140 AMPS_FIELD(Bookmark)
1159 SOWAndSubscribe = 256,
1160 DeltaSubscribe = 512,
1161 SOWAndDeltaSubscribe = 1024,
1169 NoDataCommands = Publish | Unsubscribe | Heartbeat | SOWDelete | DeltaPublish
1170 | Logon | StartTimer | StopTimer | Flush
1176 const char* data = NULL;
size_t len = 0;
1180 case 1:
return Command::Publish;
1184 case 's':
return Command::SOW;
1185 case 'o':
return Command::OOF;
1186 case 'a':
return Command::Ack;
1192 case 'l':
return Command::Logon;
1193 case 'f':
return Command::Flush;
1197 return Command::Publish;
1202 case 's':
return Command::Subscribe;
1203 case 'h':
return Command::Heartbeat;
1204 case 'g':
return Command::GroupEnd;
1210 case 'o':
return Command::SOWDelete;
1211 case 't':
return Command::StopTimer;
1217 case 'g':
return Command::GroupBegin;
1218 case 'u':
return Command::Unsubscribe;
1222 return Command::DeltaPublish;
1224 return Command::DeltaSubscribe;
1226 return Command::SOWAndSubscribe;
1228 return Command::SOWAndDeltaSubscribe;
1230 return Command::Unknown;
1237 unsigned command = command_;
1244 CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
1248 AMPS_FIELD(CommandId)
1249 AMPS_FIELD(ClientName)
1250 AMPS_FIELD(CorrelationId)
1251 AMPS_FIELD(Expiration)
1253 AMPS_FIELD(GroupSequenceNumber)
1254 AMPS_FIELD(Heartbeat)
1255 AMPS_FIELD(LeasePeriod)
1257 AMPS_FIELD(MessageLength)
1258 AMPS_FIELD(MessageType)
1262 Field getOptions()
const 1268 AMPS_Options, &ptr, &sz);
1269 if (sz && ptr[sz - 1] ==
',')
1273 returnValue.assign(ptr, sz);
1277 DOX_MAKEGETRAWCOMMENT(
Options)
1278 void getRawOptions(const
char** dataptr,
size_t* sizeptr)
const 1281 AMPS_Options, dataptr, sizeptr);
1282 if (*sizeptr && *dataptr && (*dataptr)[*sizeptr - 1] ==
',')
1292 size_t sz = v.length();
1293 if (sz && v[sz - 1] ==
',')
1298 AMPS_Options, v.c_str(), sz);
1302 DOX_MAKEASSIGNCOMMENT(
Options)
1305 size_t sz = v.length();
1306 if (sz && v[sz - 1] ==
',')
1311 AMPS_Options, v.c_str(), sz);
1315 DOX_MAKEASSIGNCOMMENT(
Options)
1316 Message& assignOptions(const
char* data,
size_t len)
1318 if (len && data[len - 1] ==
',')
1323 AMPS_Options, data, len);
1332 size_t sz = strlen(str);
1333 if (sz && str[sz - 1] ==
',')
1338 AMPS_Options, str, sz);
1343 AMPS_Options, str, 0);
1351 if (len && str[len - 1] ==
',')
1356 AMPS_Options, str, len);
1362 AMPS_FIELD(Password)
1363 AMPS_FIELD_ALIAS(QueryId, QueryID)
1365 AMPS_FIELD(RecordsInserted)
1366 AMPS_FIELD(RecordsReturned)
1367 AMPS_FIELD(RecordsUpdated)
1368 AMPS_FIELD(Sequence)
1369 AMPS_FIELD(SowDelete)
1373 AMPS_FIELD_ALIAS(SubId, SubscriptionId)
1374 AMPS_FIELD(SubscriptionIds)
1375 AMPS_FIELD(TimeoutInterval)
1376 AMPS_FIELD(Timestamp)
1383 return getTimestamp();
1392 getRawTimestamp(dataptr, sizeptr);
1396 AMPS_FIELD(TopicMatches)
1397 AMPS_FIELD(TopNRecordsReturned)
1412 returnValue.assign(ptr, sz);
1416 void getRawData(
const char** data,
size_t* sz)
const 1427 Message& assignData(
const std::string& v_)
1441 Message& assignData(
const char* data_,
size_t length_)
1454 Message& assignData(
const char* data_)
1461 return _body.get().getMessage();
1465 _body.get().replace(message, owner);
1469 _body.get().disown();
1475 bool isValid(
void)
const 1477 return _body.isValid();
1481 _body.get().reset();
1485 void setBookmarkSeqNo(
size_t val)
1487 _body.get().setBookmarkSeqNo(val);
1490 size_t getBookmarkSeqNo()
const 1492 return _body.get().getBookmarkSeqNo();
1497 _body.get().setSubscriptionHandle(val);
1502 return _body.get().getSubscriptionHandle();
1505 void ack(
const char* options_ = NULL)
const;
1507 void setClientImpl(ClientImpl* pClientImpl)
1509 _body.get().setClientImpl(pClientImpl);
1512 void setIgnoreAutoAck()
const 1514 _body.get().setIgnoreAutoAck();
1517 bool getIgnoreAutoAck()
const 1519 return _body.get().getIgnoreAutoAck();
1523 void throwFor(
const T& ,
const std::string& ackReason_)
const 1525 switch (ackReason_[0])
1528 throw AuthenticationException(
"Logon failed for user \"" +
1529 (std::string)getUserId() +
"\"");
1532 switch (ackReason_.length())
1535 throw BadFilterException(
"bad filter '" +
1536 (std::string)getFilter() +
1540 if (getSowKeys().len())
1542 throw BadSowKeyException(
"bad sow key '" +
1543 (std::string)getSowKeys() +
1548 throw BadSowKeyException(
"bad sow key '" +
1549 (std::string)getSowKey() +
1554 throw BadRegexTopicException(
"bad regex topic '" +
1555 (std::string)getTopic() +
1563 if (ackReason_.length() == 23)
1565 throw DuplicateLogonException(
"Client '" +
1566 (std::string)getClientName() +
1568 (std::string)getUserId() +
1569 "' duplicate logon attempt");
1573 if (ackReason_.length() >= 9)
1575 switch (ackReason_[8])
1578 throw InvalidBookmarkException(
"invalid bookmark '" +
1579 (std::string)getBookmark() +
1583 throw CommandException(std::string(
"invalid message type '") +
1584 (std::string)getMessageType() +
1588 if (ackReason_[9] ==
'p')
1590 throw InvalidOptionsException(
"invalid options '" +
1591 (std::string)getOptions() +
1594 else if (ackReason_[9] ==
'r')
1596 throw InvalidOrderByException(
"invalid order by '" +
1597 (std::string)getOrderBy() +
1602 throw InvalidSubIdException(
"invalid subid '" +
1603 (std::string)getSubscriptionId() +
1607 if (ackReason_.length() == 13)
1609 throw InvalidTopicException(
"invalid topic '" +
1610 (std::string)getTopic() +
1613 else if (ackReason_.length() == 23)
1615 throw InvalidTopicException(
"invalid topic or filter. Topic '" +
1616 (std::string)getTopic() +
1618 (std::string)getFilter() +
1628 if (ackReason_.length() == 14)
1630 throw LogonRequiredException(
"logon required before command");
1634 switch (ackReason_[4])
1637 throw NameInUseException(
"name in use '" +
1638 (std::string)getClientName() +
1642 throw NotEntitledException(
"User \"" +
1643 (std::string)getUserId() +
1644 "\" not entitled to topic \"" +
1645 (std::string)getTopic() +
1649 throw MissingFieldsException(
"command sent with no filter or bookmark.");
1652 throw MissingFieldsException(
"command sent with no client name.");
1655 throw MissingFieldsException(
"command sent with no topic or filter.");
1658 throw CommandException(
"operation on topic '" +
1659 (std::string)getTopic() +
1660 "' with options '" +
1661 (std::string)getOptions() +
1662 "' not supported.");
1669 switch (ackReason_.length())
1672 throw MissingFieldsException(
"orderby required");
1675 throw CommandException(
"orderby too large '" +
1676 (std::string)getOrderBy() +
1682 throw CommandException(
"projection clause too large in options '" +
1683 (std::string)getOptions() +
1687 switch (ackReason_[2])
1690 throw BadRegexTopicException(
"'regex topic not supported '" +
1691 (std::string)getTopic() +
1699 switch (ackReason_[5])
1702 throw SubidInUseException(
"subid in use '" +
1703 (std::string)getSubscriptionId() +
1707 throw CommandException(
"sow_delete command only supports one of: filter '" +
1708 (std::string)getFilter() +
1710 (std::string)getSowKeys() +
1712 (std::string)getBookmark() +
1714 (std::string)getData() +
1718 throw PublishException(
"sow store failed.");
1725 switch (ackReason_[2])
1728 throw PublishException(
"tx store failure.");
1731 throw CommandException(
"txn replay failed for '" +
1732 (std::string)getSubId() +
1740 throw CommandException(
"Error from server while processing this command: '" +
1748 return lhs + std::string(rhs);
1751 inline std::basic_ostream<char>&
1752 operator<<(std::basic_ostream<char>& os,
const Message::Field& rhs)
1754 os.write(rhs.
data(), (std::streamsize)rhs.
len());
1758 AMPS::Field::operator<(
const AMPS::Field& rhs)
const 1760 return std::lexicographical_compare(data(), data() + len(), rhs.
data(), rhs.
data() + rhs.
len());
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:873
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:180
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1422
AMPSDLL amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:967
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1174
AMPSDLL amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:803
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:548
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:786
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1129
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:828
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:109
AMPSDLL void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:1234
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:811
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:196
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:563
AMPSDLL void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:862
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:795
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:1146
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1449
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1034
AMPSDLL void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
AMPSDLL void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:820
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:156
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:847
AMPSDLL void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:901
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:175
AMPSDLL void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
AMPSDLL void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:728
AMPSDLL void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:945
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:1069
void setGrouping(const std::string &grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:1021
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:931
AMPSDLL void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:888
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:1390
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:122
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:918
void setProjection(const std::string &projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:992
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:980
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:534
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:763
Implementation class for a Message.
Definition: Message.hpp:87
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:1005
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1106
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1436
Definition: ampsplusplus.hpp:103
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:836
Message()
Construct a new, empty Message.
Definition: Message.hpp:542
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:777