26 #ifndef _MEMORYBOOKMARKSTORE_H_ 27 #define _MEMORYBOOKMARKSTORE_H_ 37 #define AMPS_MIN_BOOKMARK_LEN 3 56 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
57 typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
58 typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
60 typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
66 : _current(1), _currentBase(0), _least(1), _leastBase(0)
67 , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
68 , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
69 , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
74 _store->resize(_id, (
char**)&_entries,
75 sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE,
false);
76 setLastPersistedToEpoch();
81 Lock<Mutex> guard(_subLock);
84 for(
size_t i = 0; i < _entriesLength; ++i)
86 _entries[i]._val.clear();
89 _store->resize(_id, (
char**)&_entries, 0);
93 _lastPersisted.clear();
101 Lock<Mutex> guard(_subLock);
103 size_t index = recover(bookmark_,
true);
104 if (index == AMPS_UNSET_INDEX)
107 if(_current >= _entriesLength)
110 _currentBase += _entriesLength;
114 if((_current == _least && _leastBase < _currentBase) ||
115 (_current == _recoveryMin && _recoveryBase < _currentBase))
117 if (!_store->resize(_id, (
char**)&_entries,
118 sizeof(Entry) * _entriesLength * 2))
121 return log(bookmark_);
131 char* nowTimestamp =
new char[AMPS_TIMESTAMP_LEN];
136 gmtime_s(&timeInfo, &now);
138 gmtime_r(&now, &timeInfo);
140 strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
141 "%Y%m%dT%H%M%S", &timeInfo);
142 nowTimestamp[AMPS_TIMESTAMP_LEN-1] =
'Z';
143 _entries[_current]._val.assign(nowTimestamp,
148 _entries[_current]._val.deepCopy(bookmark_);
150 _entries[_current]._active =
true;
154 return index + _currentBase;
159 Lock<Mutex> guard(_subLock);
168 Lock<Mutex> guard(_subLock);
169 size_t search = _least;
170 size_t searchBase = _leastBase;
171 size_t searchMax = _current;
172 size_t searchMaxBase = _currentBase;
173 if (_least + _leastBase == _current + _currentBase)
175 if (_recoveryMin != AMPS_UNSET_INDEX)
177 search = _recoveryMin;
178 searchBase = _recoveryBase;
179 searchMax = _recoveryMax;
180 searchMaxBase = _recoveryMaxBase;
187 assert(searchMax != AMPS_UNSET_INDEX);
188 assert(searchMaxBase != AMPS_UNSET_INDEX);
189 assert(search != AMPS_UNSET_INDEX);
190 assert(searchBase != AMPS_UNSET_INDEX);
192 while (search + searchBase < searchMax + searchMaxBase)
194 if (_entries[search]._val == bookmark_)
196 _discard(search+searchBase);
199 if(++search == _entriesLength)
202 searchBase += _entriesLength;
210 amps_uint64_t& publisherId_,
211 amps_uint64_t& sequenceNumber_)
213 Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
220 Lock<Mutex> guard(_subLock);
222 size_t recoveredIdx = recover(bookmark_,
false);
223 if (recoveredIdx != AMPS_UNSET_INDEX)
225 return !_entries[recoveredIdx]._active;
228 amps_uint64_t publisher,sequence;
229 parseBookmark(bookmark_, publisher, sequence);
231 PublisherIterator pub = _publishers.find(publisher);
232 if(pub == _publishers.end() || pub->second < sequence)
234 _publishers[publisher] = sequence;
240 if (_store->_recovering)
return false;
245 size_t base = _leastBase;
246 for(
size_t i = _least; i + base < _current + _currentBase; i++)
248 if( i >= _entriesLength )
253 if(_entries[i]._val == bookmark_)
255 return !_entries[i]._active;
262 bool empty(
void)
const 264 if (_least == AMPS_UNSET_INDEX ||
265 ((_least+_leastBase)==(_current+_currentBase) &&
266 _recoveryMin == AMPS_UNSET_INDEX))
271 void updateMostRecent()
273 Lock<Mutex> guard(_subLock);
279 Lock<Mutex> guard(_subLock);
280 bool useLastPersisted = !_lastPersisted.empty() &&
281 _lastPersisted.len() > 1;
286 bool useRecent = !_recent.empty() && _recent.len() > 1;
287 amps_uint64_t lastPublisher = 0;
288 amps_uint64_t lastSeq = 0;
289 amps_uint64_t recentPublisher = 0;
290 amps_uint64_t recentSeq = 0;
291 if (useLastPersisted)
293 parseBookmark(_lastPersisted,lastPublisher,lastSeq);
297 if (empty() && useLastPersisted)
303 parseBookmark(_recent,recentPublisher,recentSeq);
304 if (useLastPersisted && lastPublisher == recentPublisher)
306 if (lastSeq <= recentSeq) useRecent =
false;
307 else useLastPersisted =
false;
312 size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
313 if (useRecent) totalLen += _recent.len() + 1;
318 if (!useLastPersisted &&
321 std::ostringstream os;
322 for (PublisherIterator pub = _publishers.begin();
323 pub != _publishers.end(); ++pub)
325 if (pub->first == 0 && pub->second == 0)
continue;
326 if (pub->first == recentPublisher && recentSeq < pub->second)
327 os << recentPublisher <<
'|' << recentSeq <<
"|,";
329 os << pub->first <<
'|' << pub->second <<
"|,";
331 std::string recent = os.str();
332 totalLen = recent.length();
336 recent.erase(--totalLen);
346 _setLastPersistedToEpoch();
347 return _lastPersisted;
351 char* field =
new char[totalLen];
356 memcpy(field, _recent.data(), len);
357 if (len < totalLen) field[len++] =
',';
359 if (useLastPersisted)
361 memcpy(field+len, _lastPersisted.data(), _lastPersisted.len());
367 _recentList.assign(field, totalLen);
373 Lock<Mutex> guard(_subLock);
376 if (update_ && _store->_recentChanged)
390 Lock<Mutex> guard(_subLock);
391 return _lastPersisted;
397 _recent.deepCopy(recent_);
400 void moveEntries(
char* old_,
char* new_,
size_t newSize_)
402 size_t least = _least;
403 size_t leastBase = _leastBase;
404 if (_recoveryMin != AMPS_UNSET_INDEX)
406 least = _recoveryMin;
407 leastBase = _recoveryBase;
412 if (newSize_ - (
sizeof(Entry)*_entriesLength) >
sizeof(Entry)*least)
414 memcpy(new_ + (
sizeof(Entry)*_entriesLength),
415 old_, (
sizeof(Entry)*least));
417 memset(old_, 0,
sizeof(Entry)*least);
421 Entry* buffer =
new Entry[least];
422 memcpy((
void*)buffer, (
void*)old_,
sizeof(Entry)*least);
424 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
425 (_entriesLength - least)*
sizeof(Entry));
427 memcpy((
void*)((
char*)new_+((_entriesLength - least)*
sizeof(Entry))),
428 (
void*)buffer, least*
sizeof(Entry));
437 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
438 (_entriesLength - least)*
sizeof(Entry));
440 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
441 (
void*)old_, least*
sizeof(Entry));
446 if (_recoveryMin != AMPS_UNSET_INDEX)
448 _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
449 _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
450 (_recoveryMin + _recoveryBase);
451 _recoveryMaxBase = leastBase;
452 _recoveryMin = least;
453 _recoveryBase = leastBase;
459 _leastBase = leastBase;
461 _currentBase = _leastBase;
462 _current = least + _entriesLength;
467 Lock<Mutex> guard(_subLock);
469 return ((_least+_leastBase)==(_current+_currentBase)) ? AMPS_UNSET_INDEX :
475 Lock<Mutex> guard(_subLock);
476 _setLastPersisted(bookmark_);
481 if (!_lastPersisted.empty())
483 amps_uint64_t publisher, publisher_lastPersisted;
484 amps_uint64_t sequence, sequence_lastPersisted;
485 parseBookmark(bookmark_,publisher,sequence);
486 parseBookmark(_lastPersisted, publisher_lastPersisted,
487 sequence_lastPersisted);
488 if(publisher == publisher_lastPersisted &&
489 sequence <= sequence_lastPersisted)
493 parseBookmark(_recent, publisher_lastPersisted,
494 sequence_lastPersisted);
497 if(publisher == publisher_lastPersisted
498 || (_least + _leastBase == _current + _currentBase))
499 _store->_recentChanged =
true;
502 _lastPersisted.deepCopy(bookmark_);
507 Lock<Mutex> guard(_subLock);
509 _setLastPersisted(bookmark);
517 size_t recover(
const Message::Field& bookmark_,
bool relogIfNotDiscarded)
519 size_t retVal = AMPS_UNSET_INDEX;
520 if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
524 RecoveryIterator item = _recovered.find(bookmark_);
525 if(item != _recovered.end())
527 size_t seqNo = item->second;
528 size_t index = (seqNo - _recoveryBase) % _entriesLength;
531 if (_least+_leastBase == _current+_currentBase &&
532 !_entries[index]._active)
535 _recent = _entries[index]._val.deepCopy();
536 retVal = moveEntry(index);
537 if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
538 relogIfNotDiscarded);
540 _leastBase = _currentBase;
542 else if (!_entries[index]._active || relogIfNotDiscarded)
544 retVal = moveEntry(index);
545 if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
546 relogIfNotDiscarded);
552 _recovered.erase(item);
553 if (_recovered.empty())
555 _recoveryMin = AMPS_UNSET_INDEX;
556 _recoveryBase = AMPS_UNSET_INDEX;
557 _recoveryMax = AMPS_UNSET_INDEX;
558 _recoveryMaxBase = AMPS_UNSET_INDEX;
560 else if (index == _recoveryMin)
562 while (_entries[_recoveryMin]._val.empty() &&
563 (_recoveryMin+_recoveryBase) < (_recoveryMax+_recoveryMaxBase))
565 if (++_recoveryMin == _entriesLength)
568 _recoveryBase += _entriesLength;
588 Entry() : _active(
false) {}
593 Field::FieldHash _hasher;
595 size_t operator()(
const Entry* entryPtr_)
const 597 return _hasher(entryPtr_->_val);
600 bool operator()(
const Entry* lhsPtr_,
const Entry* rhsPtr_)
const 602 return _hasher(lhsPtr_->_val, rhsPtr_->_val);
607 typedef std::vector<Entry*> EntryPtrList;
609 void getRecoveryEntries(EntryPtrList& list_)
611 if (_recoveryMin == AMPS_UNSET_INDEX ||
612 _recoveryMax == AMPS_UNSET_INDEX)
614 size_t base = _recoveryBase;
615 size_t max = _recoveryMax + _recoveryMaxBase;
616 for (
size_t i=_recoveryMin; i+base<max; ++i)
618 if (i == _entriesLength)
621 base = _recoveryMaxBase;
624 list_.push_back(&(_entries[i]));
629 void getActiveEntries(EntryPtrList& list_)
631 size_t base = _leastBase;
632 for (
size_t i=_least; i+base < _current + _currentBase; ++i)
634 if (i >= _entriesLength)
640 list_.push_back(&(_entries[i]));
645 Entry* getEntryByIndex(
size_t index_)
647 Lock<Mutex> guard(_subLock);
648 size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
649 index_ >= _least + _leastBase)
650 ? _leastBase : _recoveryBase;
652 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
653 _least + _leastBase :
654 _recoveryMin + _recoveryBase);
655 if(index_ >= _current+_currentBase || index_ < min)
return NULL;
656 return &(_entries[(index_ - base) % _entriesLength]);
661 Lock<Mutex> guard(_subLock);
664 getRecoveryEntries(list);
665 setPublishersToDiscarded(&list, &_publishers);
668 void setPublishersToDiscarded(EntryPtrList* recovered_,
669 PublisherMap* publishers_)
675 for (EntryPtrList::iterator i = recovered_->begin();
676 i != recovered_->end(); i++)
678 amps_uint64_t publisher = (amps_uint64_t)0;
679 amps_uint64_t sequence = (amps_uint64_t)0;
680 parseBookmark((*i)->_val,publisher,sequence);
681 if (publisher && sequence && (*i)->_active &&
682 (*publishers_)[publisher] >= sequence)
684 (*publishers_)[publisher] = sequence - 1;
689 void clearLastPersisted()
691 Lock<Mutex> guard(_subLock);
692 _lastPersisted.clear();
695 void setLastPersistedToEpoch()
697 Lock<Mutex> guard(_subLock);
698 _setLastPersistedToEpoch();
702 Subscription(
const Subscription&);
703 Subscription& operator=(
const Subscription&);
705 size_t moveEntry(
size_t index_)
708 if (_current >= _entriesLength)
711 _currentBase += _entriesLength;
715 if((_current == _least%_entriesLength &&
716 _leastBase < _currentBase) ||
717 (_current == _recoveryMin && _recoveryBase < _currentBase))
719 if (!_store->resize(_id, (
char**)&_entries,
720 sizeof(Entry) * _entriesLength * 2))
722 return AMPS_UNSET_INDEX;
727 _entries[_current]._val = _entries[index_]._val;
728 _entries[_current]._active = _entries[index_]._active;
730 _entries[index_]._val.assign(NULL, 0);
731 _entries[index_]._active =
false;
735 void _setLastPersistedToEpoch()
738 char* field =
new char[fieldLen];
740 _lastPersisted.clear();
741 _lastPersisted.assign(field, fieldLen);
744 void _discard(
size_t index_)
747 assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
748 (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
749 size_t base = (_recoveryBase == AMPS_UNSET_INDEX
750 || index_ >= _least + _leastBase)
751 ? _leastBase : _recoveryBase;
753 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
754 _recoveryMin + _recoveryBase);
755 if(index_ >= _current+_currentBase || index_ < min)
762 Entry& e = _entries[(index_ - base) % _entriesLength];
765 size_t index = index_;
766 if (_recoveryMin != AMPS_UNSET_INDEX &&
767 index_ == _recoveryMin + _recoveryBase)
770 size_t j = _recoveryMin;
771 while(j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
772 !_entries[j]._active)
793 if (!bookmark.
empty())
795 _recovered.erase(bookmark);
796 if (_least + _leastBase == _current + _currentBase ||
797 ((_least + _leastBase) % _entriesLength) ==
798 ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
801 _store->_recentChanged =
true;
804 bookmark.assign(NULL, 0);
814 if(++j == _entriesLength)
817 _recoveryBase += _entriesLength;
821 assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
823 if (_recovered.empty())
825 _recoveryMin = AMPS_UNSET_INDEX;
826 _recoveryBase = AMPS_UNSET_INDEX;
827 _recoveryMax = AMPS_UNSET_INDEX;
828 _recoveryMaxBase = AMPS_UNSET_INDEX;
830 index = _least + _leastBase;
839 if(index == _least + _leastBase)
843 while(j + _leastBase < _current + _currentBase &&
844 !_entries[j]._active)
848 _recent = _entries[j]._val;
849 _entries[j]._val.assign(NULL, 0);
850 _store->_recentChanged =
true;
853 if(++j == _entriesLength)
856 _leastBase += _entriesLength;
865 void _updateMostRecent()
869 assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
870 (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
871 size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
872 size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
873 _recoveryMin = AMPS_UNSET_INDEX;
874 _recoveryBase = AMPS_UNSET_INDEX;
875 _recoveryMax = AMPS_UNSET_INDEX;
876 _recoveryMaxBase = AMPS_UNSET_INDEX;
877 for(
size_t i = start; i + base < _current + _currentBase; i++)
879 if( i >= _entriesLength )
884 if (i >= _recoveryMax+_recoveryBase && i < _least+_leastBase)
886 Entry& entry = _entries[i];
887 if (!entry._val.empty())
889 _recovered[entry._val] = i+base;
890 if (_recoveryMin == AMPS_UNSET_INDEX)
893 _recoveryBase = base;
894 _recoveryMax = _current;
895 _recoveryMaxBase = _currentBase;
899 if (_current == _entriesLength)
902 _currentBase += _entriesLength;
905 _leastBase = _currentBase;
917 size_t _recoveryBase;
919 size_t _recoveryMaxBase;
920 size_t _entriesLength;
924 RecoveryMap _recovered;
926 PublisherMap _publishers;
933 _serverVersion(AMPS_DEFAULT_MIN_VERSION),
934 _recentChanged(
true),
936 _supportsPersistedAcks(
true)
951 Lock<Mutex> guard(_lock);
952 return _log(message_);
962 Lock<Mutex> guard(_lock);
975 Lock<Mutex> guard(_lock);
976 _discard(subId_, bookmarkSeqNo_);
986 Lock<Mutex> guard(_lock);
987 return _getMostRecent(subId_);
1000 Lock<Mutex> guard(_lock);
1001 return _isDiscarded(message_);
1011 Lock<Mutex> guard(_lock);
1022 Lock<Mutex> guard(_lock);
1032 Lock<Mutex> guard(_lock);
1033 return _getOldestBookmarkSeq(subId_);
1044 Lock<Mutex> guard(_lock);
1045 _persisted(find(subId_), bookmark_);
1057 Lock<Mutex> guard(_lock);
1058 return _persisted(find(subId_), bookmark_);
1067 Lock<Mutex> guard(_lock);
1068 Subscription* sub = find(subId_);
1069 _noPersistedAcks.insert(sub);
1087 Lock<Mutex> guard(_subsLock);
1088 _serverVersion = version_;
1091 inline bool isWritableBookmark(
size_t length)
1093 return length >= AMPS_MIN_BOOKMARK_LEN;
1096 typedef Subscription::EntryPtrList EntryPtrList;
1101 size_t _log(
Message& message_)
1104 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
1111 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
1113 size_t retVal = sub->log(bookmark);
1114 message_.setBookmarkSeqNo(retVal);
1119 void _discard(
const Message& message_)
1121 size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1122 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
1130 sub->discard(bookmarkSeqNo);
1135 void _discard(
const Message::Field& subId_,
size_t bookmarkSeqNo_)
1137 Subscription* sub = find(subId_);
1138 sub->discard(bookmarkSeqNo_);
1145 Subscription* sub = find(subId_);
1146 if (!_supportsPersistedAcks || _noPersistedAcks.count(sub) > 0)
1148 return sub->getMostRecent();
1150 return sub->getMostRecentList();
1154 bool _isDiscarded(
Message& message_)
1160 Subscription* sub = find(subId);
1161 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
1162 return sub->isDiscarded(bookmark);
1168 Subscription* sub = find(subId_);
1169 return sub->getOldestBookmarkSeq();
1173 virtual void _persisted(Subscription* subP_,
1176 subP_->lastPersisted(bookmark_);
1180 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
1182 return subP_->lastPersisted(bookmark_);
1189 while(!_subs.empty())
1191 SubscriptionMap::iterator iter = _subs.begin();
1195 delete (iter->second);
1199 _noPersistedAcks.clear();
1205 Lock<Mutex> guard(_subsLock);
1206 SubscriptionMap::iterator iter = _subs.find(subId_);
1207 if (iter == _subs.end())
return;
1209 delete (iter->second);
1217 find(subId_)->setMostRecent(recent_);
1222 static const char ENTRY_BOOKMARK =
'b';
1223 static const char ENTRY_DISCARD =
'd';
1224 static const char ENTRY_PERSISTED =
'p';
1230 throw StoreException(
"A valid subscription ID must be provided to the Bookmark Store");
1232 Lock<Mutex> guard(_subsLock);
1233 if(_subs.count(subId_) == 0)
1238 _subs[id] =
new Subscription(
this,
id);
1239 if (!_supportsPersistedAcks)
1241 _noPersistedAcks.insert(_subs[
id]);
1245 return _subs[subId_];
1248 virtual bool resize(
const Message::Field& subId_,
char** newBuffer_,
size_t size_,
1249 bool callResizeHandler_ =
true)
1251 assert(newBuffer_ != 0);
1261 if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1265 char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1266 *newBuffer_ = (
char*)malloc(size_);
1267 memset(*newBuffer_, 0, size_);
1270 find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1277 typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1278 SubscriptionMap _subs;
1279 size_t _serverVersion;
1280 bool _recentChanged;
1282 bool _supportsPersistedAcks;
1283 typedef std::set<Subscription*> SubscriptionSet;
1284 SubscriptionSet _noPersistedAcks;
1290 _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1291 _recentChanged(
true),
1293 _supportsPersistedAcks(supportsPersistedAcks_)
1300 #endif //_MEMORYBOOKMARKSTORE_H_ Abstract base class for storing received bookmarks for HA clients.
Definition: ampsplusplus.hpp:679
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1009
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MemoryBookmarkStore.hpp:973
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:984
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1054
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1041
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1085
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1074
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:93
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: ampsplusplus.hpp:105
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:998
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1076
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:960
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: ampsplusplus.hpp:109
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: MemoryBookmarkStore.hpp:1065
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:50
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:949
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1030
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1020
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
Definition: ampsplusplus.hpp:136
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064