25 #ifndef __AMPS_MESSAGE_HPP__ 26 #define __AMPS_MESSAGE_HPP__ 28 #include "constants.hpp" 29 #include "amps_generated.h" 33 #if (_MSC_VER >= 1400) // VS2005 or higher 34 #define AMPS_snprintf sprintf_s 35 #define AMPS_snprintf_amps_uint64_t(buf,sz,val) sprintf_s(buf,sz,"%I64u",val) 37 #define AMPS_snprintf_sizet(buf,sz,val) sprintf_s(buf,sz,"%lu",val) 39 #define AMPS_snprintf_sizet(buf,sz,val) sprintf_s(buf,sz,"%u",val) 41 #else // VS2003 or older 42 #define AMPS_snprintf _snprintf 44 #define AMPS_snprintf_sizet(buf,sz,val) _sprintf(buf,sz,"%lu",val) 46 #define AMPS_snprintf_sizet(buf,sz,val) _sprintf(buf,sz,"%u",val) 50 #define AMPS_snprintf snprintf 52 #define AMPS_snprintf_amps_uint64_t(buf,sz,val) snprintf(buf,sz,"%lu",val) 53 #define AMPS_snprintf_sizet(buf,sz,val) snprintf(buf,sz,"%lu",val) 55 #define AMPS_snprintf_amps_uint64_t(buf,sz,val) snprintf(buf,sz,"%llu",val) 56 #define AMPS_snprintf_sizet(buf,sz,val) snprintf(buf,sz,"%u",val) 59 #define AMPS_UNSET_INDEX (size_t)-1 60 #define AMPS_UNSET_SEQUENCE (amps_uint64_t)-1 63 #if defined(__GXX_EXPERIMENTAL_CXX0X__) || (_MSC_VER >= 1600) 64 #define AMPS_USE_FUNCTIONAL 1 67 #if (_MSC_VER >= 1600) || ( (__GNUC__ >= 4) && (__GNUC_MINOR__) >=5 ) 68 #define AMPS_USE_LAMBDAS 1 71 #ifdef AMPS_USE_FUNCTIONAL 81 #define AMPS_OPTIONS_NONE "" 82 #define AMPS_OPTIONS_LIVE "live," 83 #define AMPS_OPTIONS_OOF "oof," 84 #define AMPS_OPTIONS_REPLACE "replace," 85 #define AMPS_OPTIONS_NOEMPTIES "no_empties," 86 #define AMPS_OPTIONS_SENDKEYS "send_keys," 87 #define AMPS_OPTIONS_TIMESTAMP "timestamp," 88 #define AMPS_OPTIONS_NOSOWKEY "no_sowkey," 89 #define AMPS_OPTIONS_CANCEL "cancel," 90 #define AMPS_OPTIONS_RESUME "resume," 91 #define AMPS_OPTIONS_PAUSE "pause," 92 #define AMPS_OPTIONS_FULLY_DURABLE "fully_durable," 93 #define AMPS_OPTIONS_EXPIRE "expire," 94 #define AMPS_OPTIONS_TOPN(x) "top_n=##x," 95 #define AMPS_OPTIONS_MAXBACKLOG(x) "max_backlog=##x," 96 #define AMPS_OPTIONS_RATE(x) "rate=##x," 100 typedef void* amps_subscription_handle;
114 mutable bool _isIgnoreAutoAck;
115 size_t _bookmarkSeqNo;
116 amps_subscription_handle _subscription;
117 ClientImpl* _clientImpl;
131 bool ignoreAutoAck_ =
false,
size_t bookmarkSeqNo_ = 0,
132 amps_subscription_handle subscription_ = NULL,
133 ClientImpl* clientImpl_=NULL)
134 : _message(message_), _owner(owner_), _isIgnoreAutoAck(ignoreAutoAck_)
135 , _bookmarkSeqNo(bookmarkSeqNo_)
136 , _subscription(subscription_), _clientImpl(clientImpl_)
144 : _message(NULL), _owner(true), _isIgnoreAutoAck(false), _bookmarkSeqNo(0), _subscription(NULL), _clientImpl(NULL)
158 return new MessageImpl(copy,
true, _isIgnoreAutoAck, _bookmarkSeqNo,
159 _subscription,_clientImpl);
162 void setClientImpl(ClientImpl* clientImpl_)
164 _clientImpl = clientImpl_;
167 ClientImpl* clientImpl(
void)
const 181 Lock<Mutex> l(_lock);
184 _subscription = NULL;
185 _isIgnoreAutoAck =
false;
195 Lock<Mutex> l(_lock);
196 if (_message == message_)
return;
197 if (_owner && _message)
204 _subscription = NULL;
205 _isIgnoreAutoAck =
false;
210 Lock<Mutex> l(_lock);
214 static unsigned long newId()
216 static ATOMIC_TYPE _id = 0;
217 return (
unsigned long)(AMPS_FETCH_ADD(&_id,1));
220 void setBookmarkSeqNo(
size_t val_)
222 _bookmarkSeqNo = val_;
225 size_t getBookmarkSeqNo(
void)
const 227 return _bookmarkSeqNo;
230 void setSubscriptionHandle(amps_subscription_handle subscription_)
232 _subscription = subscription_;
235 amps_subscription_handle getSubscriptionHandle(
void)
const 237 return _subscription;
240 void setIgnoreAutoAck()
const 242 _isIgnoreAutoAck =
true;
245 bool getIgnoreAutoAck()
const 247 return _isIgnoreAutoAck;
257 #ifdef DOXYGEN_PREPROCESSOR 259 #define DOX_COMMENTHEAD(s) / ## ** ## s ## * ## / 260 #define DOX_GROUPNAME(s) DOX_COMMENTHEAD(@name s Functions) 261 #define DOX_OPENGROUP(s) DOX_COMMENTHEAD(@{) DOX_GROUPNAME(s) 262 #define DOX_CLOSEGROUP() DOX_COMMENTHEAD(@}) 263 #define DOX_MAKEGETCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of the Message as a new Field. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 264 #define DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the x header of self as a Field that references the underlying buffer managed by this Message. Notice that not all headers are present on all messages returned by AMPS. See the AMPS %Command Reference for details on which fields will be present in response to a specific command. ) 265 #define DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 266 #define DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the x header for this Message. Not all headers are processed by AMPS for all commands. See the AMPS %Command Reference for which headers are used by AMPS for a specific command. ) 267 #define DOX_MAKENEWCOMMENT(x) DOX_COMMENTHEAD(Creates and sets a new sequential value for the x header for this Message. This function is most useful for headers such as CommandId and SubId.) 271 #define DOX_COMMENTHEAD(s) 272 #define DOX_GROUPNAME(s) 273 #define DOX_OPENGROUP(x) 274 #define DOX_CLOSEGROUP(x) 275 #define DOX_MAKEGETCOMMENT(x) 276 #define DOX_MAKEGETRAWCOMMENT(x) 277 #define DOX_MAKESETCOMMENT(x) 278 #define DOX_MAKEASSIGNCOMMENT(x) 279 #define DOX_MAKENEWCOMMENT(x) 286 #define AMPS_FIELD(x) \ 288 DOX_MAKEGETCOMMENT(x) \ 289 Field get##x() const { \ 293 amps_message_get_field_value(_body.get().getMessage(),\ 294 AMPS_##x, &ptr, &sz);\ 295 returnValue.assign(ptr, sz);\ 298 DOX_MAKEGETRAWCOMMENT(x) \ 299 void getRaw##x(const char** dataptr, size_t* sizeptr) const { \ 300 amps_message_get_field_value(_body.get().getMessage(), \ 301 AMPS_##x, dataptr, sizeptr);\ 304 DOX_MAKESETCOMMENT(x) \ 305 Message& set##x(const std::string& v) {\ 306 amps_message_set_field_value(_body.get().getMessage(),\ 307 AMPS_##x, v.c_str(), v.length());\ 310 DOX_MAKESETCOMMENT(x) \ 311 Message& set##x(amps_uint64_t v) {\ 313 AMPS_snprintf_amps_uint64_t(buf,22,v);\ 314 amps_message_set_field_value_nts(_body.get().getMessage(),\ 318 DOX_MAKEASSIGNCOMMENT(x) \ 319 Message& assign##x(const std::string& v) {\ 320 amps_message_assign_field_value(_body.get().getMessage(),\ 321 AMPS_##x, v.c_str(), v.length());\ 324 DOX_MAKEASSIGNCOMMENT(x) \ 325 Message& assign##x(const char* data, size_t len) {\ 326 amps_message_assign_field_value(_body.get().getMessage(),\ 327 AMPS_##x, data, len);\ 330 DOX_MAKESETCOMMENT(x) \ 331 Message& set##x(const char* str) {\ 332 amps_message_set_field_value_nts(_body.get().getMessage(),\ 336 DOX_MAKESETCOMMENT(x) \ 337 Message& set##x(const char* str,size_t len) {\ 338 amps_message_set_field_value(_body.get().getMessage(),\ 342 DOX_MAKENEWCOMMENT(x) \ 344 char buf[Message::IdentifierLength+1];\ 345 buf[Message::IdentifierLength] = 0;\ 346 AMPS_snprintf(buf, Message::IdentifierLength+1, "%lx" , _body.get().newId());\ 347 amps_message_set_field_value_nts(_body.get().getMessage(),\ 395 RefHandle<MessageImpl> _body;
404 static const unsigned int IdentifierLength = 16;
408 static const size_t BOOKMARK_NONE = AMPS_UNSET_INDEX;
432 return Message(_body.get().copy());
448 static const char* None(
void) {
return AMPS_OPTIONS_NONE; }
449 static const char* Live(
void) {
return AMPS_OPTIONS_LIVE; }
450 static const char* OOF(
void) {
return AMPS_OPTIONS_OOF; }
451 static const char* Replace(
void) {
return AMPS_OPTIONS_REPLACE; }
452 static const char* NoEmpties(
void) {
return AMPS_OPTIONS_NOEMPTIES; }
453 static const char* SendKeys(
void) {
return AMPS_OPTIONS_SENDKEYS; }
454 static const char* Timestamp(
void) {
return AMPS_OPTIONS_TIMESTAMP; }
455 static const char* NoSowKey(
void) {
return AMPS_OPTIONS_NOSOWKEY; }
456 static const char* Cancel(
void) {
return AMPS_OPTIONS_CANCEL; }
457 static const char* Resume(
void) {
return AMPS_OPTIONS_RESUME; }
458 static const char* Pause(
void) {
return AMPS_OPTIONS_PAUSE; }
459 static const char* FullyDurable(
void) {
return AMPS_OPTIONS_FULLY_DURABLE; }
460 static const char* Expire(
void) {
return AMPS_OPTIONS_EXPIRE; }
461 static std::string Conflation(
const char* conflation_)
464 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
467 static std::string ConflationKey(
const char* conflationKey_)
469 std::string option(
"conflation_key=");
470 option.append(conflationKey_).append(
",");
473 static std::string TopN(
int topN_)
476 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
479 static std::string MaxBacklog(
int maxBacklog_)
482 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
485 static std::string Rate(
const char* rate_)
488 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
491 static std::string RateMaxGap(
const char* rateMaxGap_)
494 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
497 static std::string SkipN(
int skipN_)
500 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
504 static std::string Projection(std::string projection_)
506 return "projection=[" + projection_ +
"],";
509 template<
class Iterator>
510 static std::string Projection(Iterator begin_, Iterator end_)
512 std::string projection =
"projection=[";
513 for (Iterator i = begin_; i != end_; ++i)
518 projection.insert(projection.length() - 1,
"]");
522 static std::string Grouping(std::string grouping_)
524 return "grouping=[" + grouping_ +
"],";
527 template<
class Iterator>
528 static std::string Grouping(Iterator begin_, Iterator end_)
530 std::string grouping =
"grouping=[";
531 for (Iterator i = begin_; i != end_; ++i)
536 grouping.insert(grouping.length() - 1,
"]");
540 static std::string Select(
const std::string& select_)
542 return "select=[" + select_ +
"],";
545 template<
class Iterator>
546 static std::string Select(Iterator begin_, Iterator end_)
548 std::string select =
"select=[";
549 for (Iterator i = begin_; i != end_; ++i)
554 select.insert(select.length() - 1,
"]");
558 static std::string AckConflationInterval(
const std::string& interval_)
560 return "ack_conflation=" + interval_ +
",";
563 static std::string AckConflationInterval(
const char* interval_)
565 static const std::string start(
"ack_conflation=");
566 return start + interval_ +
",";
572 : _optionStr(options_)
577 int getMaxBacklog(
void)
const {
return _maxBacklog; }
578 std::string getConflation(
void)
const {
return _conflation; }
579 std::string getConflationKey(
void)
const {
return _conflationKey; }
580 int getTopN(
void)
const {
return _topN; }
581 std::string getRate(
void)
const {
return _rate; }
582 std::string getRateMaxGap(
void)
const {
return _rateMaxGap; }
587 void setNone(
void) { _optionStr = AMPS_OPTIONS_NONE; }
598 void setLive(
void) { _optionStr += AMPS_OPTIONS_LIVE; }
604 void setOOF(
void) { _optionStr += AMPS_OPTIONS_OOF; }
610 void setReplace(
void) { _optionStr += AMPS_OPTIONS_REPLACE; }
636 void setCancel(
void) { _optionStr += AMPS_OPTIONS_CANCEL; }
644 void setResume(
void) { _optionStr += AMPS_OPTIONS_RESUME; }
656 void setPause(
void) { _optionStr += AMPS_OPTIONS_PAUSE; }
679 AMPS_snprintf(buf,
sizeof(buf),
"max_backlog=%d,", maxBacklog_);
681 _maxBacklog = maxBacklog_;
692 AMPS_snprintf(buf,
sizeof(buf),
"conflation=%s,", conflation_);
694 _conflation = conflation_;
709 AMPS_snprintf(buf,
sizeof(buf),
"conflation_key=%s,", conflationKey_);
711 _conflationKey = conflationKey_;
722 AMPS_snprintf(buf,
sizeof(buf),
"top_n=%d,", topN_);
736 AMPS_snprintf(buf,
sizeof(buf),
"rate=%s,", rate_);
758 AMPS_snprintf(buf,
sizeof(buf),
"rate_max_gap=%s,", rateMaxGap_);
760 _rateMaxGap = rateMaxGap_;
771 AMPS_snprintf(buf,
sizeof(buf),
"skip_n=%d,", skipN_);
782 _projection =
"projection=[" + projection_ +
"],";
783 _optionStr += _projection;
792 template<
class Iterator>
795 _projection =
"projection=[";
796 for (Iterator i = begin_; i != end_; ++i)
801 _projection.insert(_projection.length() - 1,
"]");
802 _optionStr += _projection;
812 _grouping =
"grouping=[" + grouping_ +
"],";
813 _optionStr += _grouping;
822 template<
class Iterator>
825 _grouping =
"grouping=[";
826 for (Iterator i = begin_; i != end_; ++i)
831 _grouping.insert(_grouping.length() - 1,
"]");
832 _optionStr += _grouping;
838 operator const std::string()
840 return _optionStr.substr(0, _optionStr.length()-1);
844 std::string _optionStr;
846 std::string _conflation;
847 std::string _conflationKey;
850 std::string _rateMaxGap;
852 std::string _projection;
853 std::string _grouping;
872 SOWAndSubscribe = 256,
873 DeltaSubscribe = 512,
874 SOWAndDeltaSubscribe = 1024,
887 const char* data = NULL;
size_t len = 0;
891 case 1:
return Command::Publish;
895 case 's':
return Command::SOW;
896 case 'o':
return Command::OOF;
897 case 'a':
return Command::Ack;
903 case 'l':
return Command::Logon;
904 case 'f':
return Command::Flush;
908 return Command::Publish;
913 case 's':
return Command::Subscribe;
914 case 'h':
return Command::Heartbeat;
915 case 'g':
return Command::GroupEnd;
921 case 'o':
return Command::SOWDelete;
922 case 't':
return Command::StopTimer;
928 case 'g':
return Command::GroupBegin;
929 case 'u':
return Command::Unsubscribe;
933 return Command::DeltaPublish;
935 return Command::DeltaSubscribe;
937 return Command::SOWAndSubscribe;
939 return Command::SOWAndDeltaSubscribe;
941 return Command::Unknown;
948 unsigned command = command_;
949 while (command > 0) { ++bits; command >>= 1; }
951 CommandConstants<0>::Values[bits], CommandConstants<0>::Lengths[bits]);
956 AMPS_FIELD(CommandId)
957 AMPS_FIELD(ClientName)
959 AMPS_FIELD(Timestamp)
964 Field getTransmissionTime()
const 966 return getTimestamp();
975 getRawTimestamp(dataptr, sizeptr);
980 AMPS_FIELD(MessageType)
988 None = 0, Received = 1, Parsed = 2, Processed = 4, Persisted = 8, Completed = 16, Stats = 32
999 return AckType::Stats;
1001 return AckType::Parsed;
1003 return AckType::Received;
1007 case 'e':
return AckType::Persisted;
1008 case 'r':
return AckType::Processed;
1009 case 'o':
return AckType::Completed;
1016 return AckType::None;
1023 unsigned result = AckType::None;
1024 const char* data = NULL;
size_t len = 0;
1026 const char* mark = data;
1027 for (
const char* end = data + len; data != end; ++data)
1031 result |= decodeSingleAckType(mark, data);
1035 if (mark < data) result |= decodeSingleAckType(mark, data);
1043 if(ackType_ < AckTypeConstants<0>::Entries)
1046 AckTypeConstants<0>::Values[ackType_], AckTypeConstants<0>::Lengths[ackType_]);
1051 AMPS_FIELD(SubscriptionId)
1052 AMPS_FIELD(Expiration)
1053 AMPS_FIELD(Heartbeat)
1054 AMPS_FIELD(TimeoutInterval)
1055 AMPS_FIELD(LeasePeriod)
1058 AMPS_FIELD(BatchSize)
1059 AMPS_FIELD(TopNRecordsReturned)
1062 AMPS_FIELD(CorrelationId)
1063 AMPS_FIELD(Sequence)
1064 AMPS_FIELD(Bookmark)
1065 AMPS_FIELD(RecordsInserted)
1066 AMPS_FIELD(RecordsUpdated)
1067 AMPS_FIELD(SowDelete)
1068 AMPS_FIELD(RecordsReturned)
1069 AMPS_FIELD(TopicMatches)
1071 AMPS_FIELD(MessageLength)
1073 AMPS_FIELD(GroupSequenceNumber)
1074 AMPS_FIELD(SubscriptionIds)
1076 AMPS_FIELD(Password)
1080 DOX_MAKEGETCOMMENT(
Options) DOX_COMMENTHEAD( Retrieves the value of the
Options header of the
Message as a new Field.)
1081 Field getOptions()
const {
1086 AMPS_Options, &ptr, &sz);
1087 if (sz && ptr[sz-1] ==
',') --sz;
1088 returnValue.assign(ptr, sz);
1092 DOX_MAKEGETRAWCOMMENT(x) DOX_COMMENTHEAD( Retrieves the value of the
Options header of
self as a Field that references the underlying buffer managed by
this Message. )
1093 void getRawOptions(
const char** dataptr,
size_t* sizeptr)
const {
1095 AMPS_Options, dataptr, sizeptr);
1096 if (*sizeptr && *dataptr && (*dataptr)[*sizeptr-1] ==
',') --*sizeptr;
1100 DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the
Options header
for this Message.)
1101 Message& setOptions(
const std::string& v) {
1102 size_t sz = v.length();
1103 if (sz && v[sz-1] ==
',') --sz;
1105 AMPS_Options, v.c_str(), sz);
1109 DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the
Options header
for this Message.)
1110 Message& assignOptions(
const std::string& v) {
1111 size_t sz = v.length();
1112 if (sz && v[sz-1] ==
',') --sz;
1114 AMPS_Options, v.c_str(), sz);
1118 DOX_MAKEASSIGNCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the
Options header
for this Message.)
1119 Message& assignOptions(
const char* data,
size_t len) {
1120 if (len && data[len-1] ==
',') --len;
1122 AMPS_Options, data, len);
1126 DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the
Options header
for this Message.)
1127 Message& setOptions(
const char* str) {
1130 size_t sz = strlen(str);
1131 if (sz && str[sz-1] ==
',') --sz;
1133 AMPS_Options, str, sz);
1138 AMPS_Options, str, 0);
1143 DOX_MAKESETCOMMENT(x) DOX_COMMENTHEAD( Sets the value of the
Options header
for this Message.)
1144 Message& setOptions(
const char* str,
size_t len) {
1145 if (len && str[len-1] ==
',') --len;
1147 AMPS_Options, str, len);
1157 Field getData()
const 1163 returnValue.assign(ptr, sz);
1167 void getRawData(
const char **data,
size_t *sz)
const 1178 Message& assignData(
const std::string &v_)
1192 Message& assignData(
const char* data_,
size_t length_)
1205 Message& assignData(
const char* data_)
1212 return _body.get().getMessage();
1216 _body.get().replace(message, owner);
1220 _body.get().disown();
1226 bool isValid(
void)
const 1228 return _body.isValid();
1232 _body.get().reset();
1236 void setBookmarkSeqNo(
size_t val)
1238 _body.get().setBookmarkSeqNo(val);
1241 size_t getBookmarkSeqNo()
const 1243 return _body.get().getBookmarkSeqNo();
1248 _body.get().setSubscriptionHandle(val);
1253 return _body.get().getSubscriptionHandle();
1256 void ack(
const char* options_ = NULL)
const;
1258 void setClientImpl(ClientImpl *pClientImpl)
1260 _body.get().setClientImpl(pClientImpl);
1263 void setIgnoreAutoAck()
const 1265 _body.get().setIgnoreAutoAck();
1268 bool getIgnoreAutoAck()
const 1270 return _body.get().getIgnoreAutoAck();
1274 void throwFor(
const T& ,
const std::string& ackReason_)
const 1276 switch (ackReason_[0])
1279 throw AuthenticationException(
"Logon failed for user \"" +
1280 (std::string)getUserId() +
"\"");
1283 switch (ackReason_.length())
1286 throw BadFilterException(
"Filter '" +
1287 (std::string)getFilter() +
1291 throw BadRegexTopicException(
"Regular Expression Topic '" +
1292 (std::string)getTopic() +
1300 if (ackReason_.length() == 13)
1302 switch (ackReason_[8])
1305 throw InvalidTopicException(
"Topic '" +
1306 (std::string)getTopic() +
1310 throw InvalidSubIdException(
"SubId '" +
1311 (std::string)getSubscriptionId() +
1320 switch (ackReason_[1])
1323 throw NameInUseException(
"Client name '" +
1324 (std::string)getClientName() +
1325 "' already exists.");
1328 throw NotEntitledException(
"User \"" +
1329 (std::string)getUserId() +
1330 "\" not entitled to topic \"" +
1331 (std::string)getTopic() +
1339 switch (ackReason_.length())
1342 throw SubscriptionAlreadyExistsException(
1343 "Subscription for command '" +
1344 (std::string)getCommandId() +
1345 "' already exists.");
1348 throw SubidInUseException(
"Subscription with id '" +
1349 (std::string)getSubscriptionId() +
1350 "' already exists.");
1359 throw CommandException(
"Error from server while processing this command: '" +
1367 return lhs + std::string(rhs);
1370 inline std::basic_ostream<char>&
1371 operator<<(std::basic_ostream<char>& os,
const Message::Field& rhs)
1373 os.write(rhs.
data(), (std::streamsize)rhs.
len());
1377 AMPS::Field::operator<(
const AMPS::Field& rhs)
const 1379 return std::lexicographical_compare(data(), data()+len(), rhs.
data(), rhs.
data()+rhs.
len());
void setFullyDurable(void)
Set the option to only provide messages that have been persisted to all replication destinations that...
Definition: Message.hpp:664
Class to hold string versions of failure reasons.
Definition: ampsplusplus.hpp:212
Message & setData(const std::string &v_)
Sets the data portion of self.
Definition: Message.hpp:1173
void setProjection(std::string projection_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:780
void amps_message_reset(amps_handle message)
Clears all fields and data in a message.
void setRateMaxGap(const char *rateMaxGap_)
Set the option for the maximum amount of time that a bookmark replay with a specified rate will allow...
Definition: Message.hpp:755
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:885
void amps_message_assign_data(amps_handle message, const amps_char *value, size_t length)
Assigns the data component of an AMPS message, without copying the value.
amps_handle amps_message_copy(amps_handle message)
Creates and returns a handle to a new AMPS message object that is a deep copy of the message passed i...
void setNoEmpties(void)
Set the option to not send empty messages on a delta subscription.
Definition: Message.hpp:615
Message deepCopy(void) const
Returns a deep copy of self.
Definition: Message.hpp:430
void setOOF(void)
Set the option to receive out of focus (OOF) messages on a subscription, where applicable.
Definition: Message.hpp:604
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Message & setAckTypeEnum(unsigned ackType_)
Encode self's "ack type" field from a bitmask of values from AckType.
Definition: Message.hpp:1041
void setNoSowKey(void)
Set the option to not set the SowKey header on messages.
Definition: Message.hpp:631
MessageImpl(amps_handle message_, bool owner_=false, bool ignoreAutoAck_=false, size_t bookmarkSeqNo_=0, amps_subscription_handle subscription_=NULL, ClientImpl *clientImpl_=NULL)
Constructs a messageImpl from an existing AMPS message.
Definition: Message.hpp:130
Message & setCommandEnum(Command::Type command_)
Set self's "command" field from one of the values in Command.
Definition: Message.hpp:945
void setSendKeys(void)
Set the option to send key fields with a delta subscription.
Definition: Message.hpp:620
void * amps_handle
Opaque handle type used to refer to objects in the AMPS api.
Definition: amps.h:106
Class for constructing the options string to pass to AMPS in a Message.
Definition: Message.hpp:445
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
void setPause(void)
Set the option to pause a bookmark subscription.
Definition: Message.hpp:656
DOX_CLOSEGROUP(x) Field getData() const
Returns the data from this message.
Definition: Message.hpp:1150
void setReplace(void)
Set the option to replace a current subscription with this one.
Definition: Message.hpp:610
Valid values for setCommandEnum() and getCommandEnum().
Definition: Message.hpp:859
Message & setData(const char *data_)
Sets the data portion of self from a null-terminated string.
Definition: Message.hpp:1200
void setGrouping(Iterator begin_, Iterator end_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:823
void setTimestamp(void)
Set the option to send a timestamp that the message was processed on a subscription or query...
Definition: Message.hpp:626
amps_handle getMessage() const
Returns the underling AMPS message object from the C layer.
Definition: Message.hpp:174
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
void setResume(void)
Set the option to resume a subscription.
Definition: Message.hpp:644
static AckType::Type decodeSingleAckType(const char *begin, const char *end)
Decodes a single ack string.
Definition: Message.hpp:994
void setConflation(const char *conflation_)
Set the options for conflation on a subscription.
Definition: Message.hpp:689
void replace(amps_handle message_, bool owner_=false)
Causes self to refer to a new AMPS message, freeing any current message owned by self along the way...
Definition: Message.hpp:193
Options(std::string options_="")
ctor - default to None
Definition: Message.hpp:571
void amps_message_get_data(amps_handle message, amps_char **value_ptr, size_t *length_ptr)
Gets the data component of an AMPS message.
amps_handle amps_message_create(amps_handle client)
Functions for creation and manipulation of AMPS messages.
void setRate(const char *rate_)
Set the option for the maximum rate at which messages are provided to the subscription.
Definition: Message.hpp:733
Valid values for the setAckTypeEnum() and getAckTypeEnum() methods.
Definition: Message.hpp:984
void amps_message_assign_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Assigns the value of a header field in an AMPS message, without copying the value.
void amps_message_set_field_value(amps_handle message, FieldId field, const amps_char *value, size_t length)
Sets the value of a header field in an AMPS message.
void setTopN(int topN_)
Set the top N option, which specifies the maximum number of messages to return for this command...
Definition: Message.hpp:719
void amps_message_set_data_nts(amps_handle message, const amps_char *value)
Sets the data component of an AMPS message.
void setMaxBacklog(int maxBacklog_)
Set the option for maximum backlog this subscription is willing to accept.
Definition: Message.hpp:676
void getRawTransmissionTime(const char **dataptr, size_t *sizeptr) const
Definition: Message.hpp:973
void amps_message_get_field_value(amps_handle message, FieldId field, const amps_char **value_ptr, size_t *length_ptr)
Retrieves the value of a header field in an AMPS message.
MessageImpl()
Constructs a MessageImpl with a new, empty AMPS message.
Definition: Message.hpp:143
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
void setGrouping(std::string grouping_)
Set the option for grouping the results of an aggregated query or subscription.
Definition: Message.hpp:810
void setConflationKey(const char *conflationKey_)
Set the options for the conflation key, the identifiers for the field or fields used by AMPS to deter...
Definition: Message.hpp:706
void setSkipN(int skipN_)
Set the option for skip N, the number of messages in the result set to skip before returning messages...
Definition: Message.hpp:768
void amps_message_set_data(amps_handle message, const amps_char *value, size_t length)
Sets the data component of an AMPS message.
Message(amps_handle message_, bool owner_=false)
Constructs a new Message to wrap message.
Definition: Message.hpp:416
void setNone(void)
Clear any previously set options and set the options to an empty string (AMPS_OPTIONS_NONE).
Definition: Message.hpp:587
Implementation class for a Message.
Definition: Message.hpp:108
void setProjection(Iterator begin_, Iterator end_)
Set the option for projecting the results of an aggregated query or subscription. ...
Definition: Message.hpp:793
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1021
Message & setData(const char *data_, size_t length_)
Sets the data portion of self from a char array.
Definition: Message.hpp:1187
Definition: ampsplusplus.hpp:136
void setCancel(void)
Set the cancel option, used on a sow_delete command to return a message to the queue.
Definition: Message.hpp:636
Message()
Construct a new, empty Message.
Definition: Message.hpp:424
void setLive(void)
Set the live option for a bookmark subscription, which requests that the subscription receives messag...
Definition: Message.hpp:598
void amps_message_destroy(amps_handle message)
Destroys and frees the memory associated with an AMPS message object.