26 #ifndef _MEMORYBOOKMARKSTORE_H_ 27 #define _MEMORYBOOKMARKSTORE_H_ 29 #include <BookmarkStore.hpp> 40 #define AMPS_MIN_BOOKMARK_LEN 3 41 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL 62 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
63 typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
64 typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
66 typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
72 : _current(1), _currentBase(0), _least(1), _leastBase(0)
73 , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
74 , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
75 , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
80 _store->resize(_id, (
char**)&_entries,
81 sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE,
false);
82 setLastPersistedToEpoch();
87 Lock<Mutex> guard(_subLock);
90 for(
size_t i = 0; i < _entriesLength; ++i)
92 _entries[i]._val.clear();
95 _store->resize(_id, (
char**)&_entries, 0);
99 _lastPersisted.clear();
102 _recoveryTimestamp.clear();
109 Lock<Mutex> guard(_subLock);
111 size_t index = recover(bookmark_,
true);
112 if (index == AMPS_UNSET_INDEX)
115 if(_current >= _entriesLength)
118 _currentBase += _entriesLength;
122 if((_current == _least && _leastBase < _currentBase) ||
123 (_current == _recoveryMin && _recoveryBase < _currentBase))
125 if (!_store->resize(_id, (
char**)&_entries,
126 sizeof(Entry) * _entriesLength * 2))
129 return log(bookmark_);
162 if (!BookmarkRange::isRange(bookmark_))
164 _entries[_current]._val.deepCopy(bookmark_);
169 _range.set(bookmark_);
171 if (!_range.isValid())
173 throw CommandException(
"Invalid bookmark range specified.");
175 _store->updateAdapter(
this);
176 if (!_range.isStartInclusive())
179 amps_uint64_t publisher,sequence;
180 parseBookmark(_range.getStart(), publisher, sequence);
181 _publishers[publisher] = sequence;
187 _entries[_current]._active =
true;
190 return index + _currentBase;
195 Lock<Mutex> guard(_subLock);
196 return _discard(index_);
206 Lock<Mutex> guard(_subLock);
207 size_t search = _least;
208 size_t searchBase = _leastBase;
209 size_t searchMax = _current;
210 size_t searchMaxBase = _currentBase;
211 if (_least + _leastBase == _current + _currentBase)
213 if (_recoveryMin != AMPS_UNSET_INDEX)
215 search = _recoveryMin;
216 searchBase = _recoveryBase;
217 searchMax = _recoveryMax;
218 searchMaxBase = _recoveryMaxBase;
225 assert(searchMax != AMPS_UNSET_INDEX);
226 assert(searchMaxBase != AMPS_UNSET_INDEX);
227 assert(search != AMPS_UNSET_INDEX);
228 assert(searchBase != AMPS_UNSET_INDEX);
230 while (search + searchBase < searchMax + searchMaxBase)
232 if (_entries[search]._val == bookmark_)
234 return _discard(search+searchBase);
236 if(++search == _entriesLength)
239 searchBase += _entriesLength;
248 amps_uint64_t& publisherId_,
249 amps_uint64_t& sequenceNumber_)
251 Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
258 Lock<Mutex> guard(_subLock);
259 if (BookmarkRange::isRange(bookmark_))
264 size_t recoveredIdx = recover(bookmark_,
false);
266 amps_uint64_t publisher,sequence;
267 parseBookmark(bookmark_, publisher, sequence);
269 PublisherIterator pub = _publishers.find(publisher);
270 if(pub == _publishers.end() || pub->second < sequence)
272 _publishers[publisher] = sequence;
273 if (recoveredIdx == AMPS_UNSET_INDEX)
278 if (recoveredIdx != AMPS_UNSET_INDEX)
280 if (!_entries[recoveredIdx]._active)
282 _recovered.erase(bookmark_);
290 if (_store->_recovering)
return false;
295 size_t base = _leastBase;
296 for(
size_t i = _least; i + base < _current + _currentBase; i++)
298 if( i >= _entriesLength )
303 if(_entries[i]._val == bookmark_)
305 return !_entries[i]._active;
312 bool empty(
void)
const 314 if (_least == AMPS_UNSET_INDEX ||
315 ((_least+_leastBase)==(_current+_currentBase) &&
316 _recoveryMin == AMPS_UNSET_INDEX))
321 void updateMostRecent()
323 Lock<Mutex> guard(_subLock);
327 const BookmarkRange& getRange()
const 334 Lock<Mutex> guard(_subLock);
335 bool useLastPersisted = !_lastPersisted.empty() &&
336 _lastPersisted.len() > 1;
341 bool useRecent = !_recent.empty() && _recent.len() > 1;
342 amps_uint64_t lastPublisher = 0;
343 amps_uint64_t lastSeq = 0;
344 amps_uint64_t recentPublisher = 0;
345 amps_uint64_t recentSeq = 0;
346 if (useLastPersisted)
348 parseBookmark(_lastPersisted,lastPublisher,lastSeq);
352 parseBookmark(_recent,recentPublisher,recentSeq);
353 if (empty() && useLastPersisted)
359 if (useLastPersisted && lastPublisher == recentPublisher)
361 if (lastSeq <= recentSeq) useRecent =
false;
362 else useLastPersisted =
false;
367 size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
368 if (useRecent) totalLen += _recent.len() + 1;
372 if (usePublishersList_
373 && ((!useLastPersisted && !useRecent)
376 std::ostringstream os;
377 for (PublisherIterator pub = _publishers.begin();
378 pub != _publishers.end(); ++pub)
380 if (pub->first == 0 && pub->second == 0)
continue;
381 if (pub->first == recentPublisher && recentSeq < pub->second)
383 os << recentPublisher <<
'|' << recentSeq <<
"|,";
387 os << pub->first <<
'|' << pub->second <<
"|,";
390 std::string recent = os.str();
391 totalLen = recent.length();
394 if (!_recoveryTimestamp.empty())
396 totalLen += _recoveryTimestamp.len();
397 recent += std::string(_recoveryTimestamp);
402 recent.erase(--totalLen);
407 if (_range.isValid())
409 if (_range.getStart() != recent
412 _range.replaceStart(_recentList,
true);
414 else if (_range.isStartInclusive())
416 amps_uint64_t publisher,sequence;
417 parseBookmark(_range.getStart(), publisher,
419 PublisherIterator pub = _publishers.find(publisher);
420 if(pub != _publishers.end()
421 && pub->second >= sequence)
423 _range.makeStartExclusive();
430 if (_range.isValid())
435 if (!_recoveryTimestamp.empty() && !_range.isValid())
437 totalLen += _recoveryTimestamp.len()+1;
441 || (_recent.len() < 2 && !empty()))
443 if (_range.isValid())
451 _setLastPersistedToEpoch();
452 return _lastPersisted;
456 char* field =
new char[totalLen];
461 memcpy(field, _recent.data(), len);
462 if (len < totalLen) field[len++] =
',';
464 if (useLastPersisted)
466 memcpy(field+len, _lastPersisted.data(), _lastPersisted.len());
467 len += _lastPersisted.len();
468 if (len < totalLen) field[len++] =
',';
470 if (!_recoveryTimestamp.empty() && !_range.isValid())
472 memcpy(field+len, _recoveryTimestamp.data(),
473 _recoveryTimestamp.len());
480 _recentList.assign(field, totalLen);
481 if (_range.isValid())
485 if (_range.getStart() != _recentList)
487 _range.replaceStart(_recentList,
true);
489 else if (_range.isStartInclusive())
491 amps_uint64_t publisher,sequence;
492 parseBookmark(_range.getStart(), publisher,
494 PublisherIterator pub = _publishers.find(publisher);
495 if(pub != _publishers.end()
496 && pub->second >= sequence)
498 _range.makeStartExclusive();
509 Lock<Mutex> guard(_subLock);
512 if (update_ && _store->_recentChanged)
526 Lock<Mutex> guard(_subLock);
527 return _lastPersisted;
533 _recent.deepCopy(recent_);
536 void setRecoveryTimestamp(
const char* recoveryTimestamp_,
539 _recoveryTimestamp.clear();
540 size_t len = (len_==0)?AMPS_TIMESTAMP_LEN:len_;
541 char* ts =
new char[len];
542 memcpy((
void*)ts, (
const void*)recoveryTimestamp_, len);
543 _recoveryTimestamp.assign(ts, len);
546 void moveEntries(
char* old_,
char* new_,
size_t newSize_)
548 size_t least = _least;
549 size_t leastBase = _leastBase;
550 if (_recoveryMin != AMPS_UNSET_INDEX)
552 least = _recoveryMin;
553 leastBase = _recoveryBase;
558 if (newSize_ - (
sizeof(Entry)*_entriesLength) >
sizeof(Entry)*least)
560 memcpy(new_ + (
sizeof(Entry)*_entriesLength),
561 old_, (
sizeof(Entry)*least));
563 memset(old_, 0,
sizeof(Entry)*least);
567 Entry* buffer =
new Entry[least];
568 memcpy((
void*)buffer, (
void*)old_,
sizeof(Entry)*least);
570 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
571 (_entriesLength - least)*
sizeof(Entry));
573 memcpy((
void*)((
char*)new_+((_entriesLength - least)*
sizeof(Entry))),
574 (
void*)buffer, least*
sizeof(Entry));
584 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
585 (_entriesLength - least)*
sizeof(Entry));
587 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
588 (
void*)old_, least*
sizeof(Entry));
593 if (_recoveryMin != AMPS_UNSET_INDEX)
595 _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
596 _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
597 (_recoveryMin + _recoveryBase);
598 _recoveryMaxBase = leastBase;
599 _recoveryMin = least;
600 _recoveryBase = leastBase;
606 _leastBase = leastBase;
608 _currentBase = _leastBase;
609 _current = least + _entriesLength;
614 Lock<Mutex> guard(_subLock);
616 return ((_least+_leastBase)==(_current+_currentBase)) ? AMPS_UNSET_INDEX :
624 || BookmarkRange::isRange(bookmark_))
628 Lock<Mutex> guard(_subLock);
629 return _setLastPersisted(bookmark_);
634 if (!_lastPersisted.empty())
636 amps_uint64_t publisher, publisher_lastPersisted;
637 amps_uint64_t sequence, sequence_lastPersisted;
638 parseBookmark(bookmark_,publisher,sequence);
639 parseBookmark(_lastPersisted, publisher_lastPersisted,
640 sequence_lastPersisted);
641 if(publisher == publisher_lastPersisted &&
642 sequence <= sequence_lastPersisted)
648 _lastPersisted.deepCopy(bookmark_);
649 _store->_recentChanged =
true;
650 _recoveryTimestamp.clear();
656 Lock<Mutex> guard(_subLock);
660 || BookmarkRange::isRange(bookmark))
664 _setLastPersisted(bookmark);
672 size_t recover(
const Message::Field& bookmark_,
bool relogIfNotDiscarded)
674 size_t retVal = AMPS_UNSET_INDEX;
675 if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
679 RecoveryIterator item = _recovered.find(bookmark_);
680 if(item != _recovered.end())
682 size_t seqNo = item->second;
683 size_t index = (seqNo - _recoveryBase) % _entriesLength;
686 if (_least+_leastBase == _current+_currentBase &&
687 !_entries[index]._active)
689 _store->_recentChanged =
true;
691 _recent = _entries[index]._val.deepCopy();
692 retVal = moveEntry(index);
693 if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
694 relogIfNotDiscarded);
696 _leastBase = _currentBase;
698 else if (!_entries[index]._active || relogIfNotDiscarded)
700 retVal = moveEntry(index);
701 if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
702 relogIfNotDiscarded);
708 _recovered.erase(item);
709 if (_recovered.empty())
711 _recoveryMin = AMPS_UNSET_INDEX;
712 _recoveryBase = AMPS_UNSET_INDEX;
713 _recoveryMax = AMPS_UNSET_INDEX;
714 _recoveryMaxBase = AMPS_UNSET_INDEX;
716 else if (index == _recoveryMin)
718 while (_entries[_recoveryMin]._val.empty() &&
719 (_recoveryMin+_recoveryBase) < (_recoveryMax+_recoveryMaxBase))
721 if (++_recoveryMin == _entriesLength)
724 _recoveryBase += _entriesLength;
744 Entry() : _active(
false)
752 Field::FieldHash _hasher;
754 size_t operator()(
const Entry* entryPtr_)
const 756 return _hasher(entryPtr_->_val);
759 bool operator()(
const Entry* lhsPtr_,
const Entry* rhsPtr_)
const 761 return _hasher(lhsPtr_->_val, rhsPtr_->_val);
766 typedef std::vector<Entry*> EntryPtrList;
768 void getRecoveryEntries(EntryPtrList& list_)
770 if (_recoveryMin == AMPS_UNSET_INDEX ||
771 _recoveryMax == AMPS_UNSET_INDEX)
773 size_t base = _recoveryBase;
774 size_t max = _recoveryMax + _recoveryMaxBase;
775 for (
size_t i=_recoveryMin; i+base<max; ++i)
777 if (i == _entriesLength)
780 base = _recoveryMaxBase;
783 list_.push_back(&(_entries[i]));
788 void getActiveEntries(EntryPtrList& list_)
790 size_t base = _leastBase;
791 for (
size_t i=_least; i+base < _current + _currentBase; ++i)
793 if (i >= _entriesLength)
799 list_.push_back(&(_entries[i]));
804 Entry* getEntryByIndex(
size_t index_)
806 Lock<Mutex> guard(_subLock);
807 size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
808 index_ >= _least + _leastBase)
809 ? _leastBase : _recoveryBase;
811 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
812 _least + _leastBase :
813 _recoveryMin + _recoveryBase);
814 if(index_ >= _current+_currentBase || index_ < min)
return NULL;
815 return &(_entries[(index_ - base) % _entriesLength]);
820 Lock<Mutex> guard(_subLock);
823 getRecoveryEntries(list);
824 setPublishersToDiscarded(&list, &_publishers);
827 void setPublishersToDiscarded(EntryPtrList* recovered_,
828 PublisherMap* publishers_)
834 for (EntryPtrList::iterator i = recovered_->begin();
835 i != recovered_->end(); ++i)
837 amps_uint64_t publisher = (amps_uint64_t)0;
838 amps_uint64_t sequence = (amps_uint64_t)0;
839 parseBookmark((*i)->_val,publisher,sequence);
840 if (publisher && sequence && (*i)->_active &&
841 (*publishers_)[publisher] >= sequence)
843 (*publishers_)[publisher] = sequence - 1;
848 void clearLastPersisted()
850 Lock<Mutex> guard(_subLock);
851 _lastPersisted.clear();
854 void setLastPersistedToEpoch()
856 Lock<Mutex> guard(_subLock);
857 _setLastPersistedToEpoch();
861 Subscription(
const Subscription&);
862 Subscription& operator=(
const Subscription&);
864 size_t moveEntry(
size_t index_)
867 if (_current >= _entriesLength)
870 _currentBase += _entriesLength;
874 if((_current == _least%_entriesLength &&
875 _leastBase < _currentBase) ||
876 (_current == _recoveryMin && _recoveryBase < _currentBase))
878 if (!_store->resize(_id, (
char**)&_entries,
879 sizeof(Entry) * _entriesLength * 2))
881 return AMPS_UNSET_INDEX;
886 _entries[_current]._val = _entries[index_]._val;
887 _entries[_current]._active = _entries[index_]._active;
889 _entries[index_]._val.assign(NULL, 0);
890 _entries[index_]._active =
false;
894 void _setLastPersistedToEpoch()
897 char* field =
new char[fieldLen];
899 _lastPersisted.clear();
900 _lastPersisted.assign(field, fieldLen);
903 bool _discard(
size_t index_)
907 assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
908 (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
909 size_t base = (_recoveryBase == AMPS_UNSET_INDEX
910 || index_ >= _least + _leastBase)
911 ? _leastBase : _recoveryBase;
913 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
914 _recoveryMin + _recoveryBase);
915 if(index_ >= _current+_currentBase || index_ < min)
922 Entry& e = _entries[(index_ - base) % _entriesLength];
925 size_t index = index_;
926 if (_recoveryMin != AMPS_UNSET_INDEX &&
927 index_ == _recoveryMin + _recoveryBase)
930 size_t j = _recoveryMin;
931 while(j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
932 !_entries[j]._active)
953 if (!bookmark.
empty())
955 _recovered.erase(bookmark);
956 if (_least + _leastBase == _current + _currentBase ||
957 ((_least + _leastBase) % _entriesLength) ==
958 ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
962 _store->_recentChanged =
true;
963 _recoveryTimestamp.
clear();
966 bookmark.assign(NULL, 0);
976 if(++j == _entriesLength)
979 _recoveryBase += _entriesLength;
983 assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
985 if (_recovered.empty())
987 _recoveryMin = AMPS_UNSET_INDEX;
988 _recoveryBase = AMPS_UNSET_INDEX;
989 _recoveryMax = AMPS_UNSET_INDEX;
990 _recoveryMaxBase = AMPS_UNSET_INDEX;
992 index = _least + _leastBase;
1001 if(index == _least + _leastBase)
1005 while(j + _leastBase < _current + _currentBase &&
1006 !_entries[j]._active)
1010 _recent = _entries[j]._val;
1011 _entries[j]._val.assign(NULL, 0);
1012 _store->_recentChanged =
true;
1014 _recoveryTimestamp.clear();
1017 if(++j == _entriesLength)
1020 _leastBase += _entriesLength;
1029 void _updateMostRecent()
1033 assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
1034 (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
1035 size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1036 size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1037 _recoveryMin = AMPS_UNSET_INDEX;
1038 _recoveryBase = AMPS_UNSET_INDEX;
1039 _recoveryMax = AMPS_UNSET_INDEX;
1040 _recoveryMaxBase = AMPS_UNSET_INDEX;
1041 for(
size_t i = start; i + base < _current + _currentBase; i++)
1043 if( i >= _entriesLength )
1046 base = _currentBase;
1048 if (i >= _recoveryMax+_recoveryBase && i < _least+_leastBase)
1050 Entry& entry = _entries[i];
1051 if (!entry._val.empty())
1053 _recovered[entry._val] = i+base;
1054 if (_recoveryMin == AMPS_UNSET_INDEX)
1057 _recoveryBase = base;
1058 _recoveryMax = _current;
1059 _recoveryMaxBase = _currentBase;
1063 if (_current == _entriesLength)
1066 _currentBase += _entriesLength;
1069 _leastBase = _currentBase;
1076 BookmarkRange _range;
1079 size_t _currentBase;
1082 size_t _recoveryMin;
1083 size_t _recoveryBase;
1084 size_t _recoveryMax;
1085 size_t _recoveryMaxBase;
1086 size_t _entriesLength;
1090 RecoveryMap _recovered;
1092 PublisherMap _publishers;
1101 _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1102 _recentChanged(true),
1104 _recoveryPointAdapter(NULL),
1105 _recoveryPointFactory(NULL)
1108 typedef RecoveryPointAdapter::iterator RecoveryIterator;
1117 RecoveryPointFactory factory_ = NULL)
1121 , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1122 , _recentChanged(true)
1124 , _recoveryPointAdapter(adapter_)
1125 , _recoveryPointFactory(factory_)
1128 for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1129 recoveryPoint != _recoveryPointAdapter.end();
1132 Field subId(recoveryPoint->getSubId());
1133 msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1135 Field bookmark = recoveryPoint->getBookmark();
1136 if (BookmarkRange::isRange(bookmark))
1143 const char* start = bookmark.
data();
1144 size_t remain = bookmark.
len();
1145 const char* comma = (
const char*)memchr((
const void*)start,
1149 size_t len = (size_t)(comma-start);
1153 find(subId)->setRecoveryTimestamp(start,len);
1162 remain = bookmark.
len() - (size_t)(start-bookmark.
data());
1163 comma = (
const char*)memchr((
const void*)start,
1169 find(subId)->setRecoveryTimestamp(start,remain);
1179 _recovering =
false;
1194 Lock<Mutex> guard(_lock);
1195 return _log(message_);
1205 Lock<Mutex> guard(_lock);
1206 (void)_discard(message_);
1218 Lock<Mutex> guard(_lock);
1219 (void)_discard(subId_, bookmarkSeqNo_);
1229 Lock<Mutex> guard(_lock);
1230 return _getMostRecent(subId_);
1243 Lock<Mutex> guard(_lock);
1244 return _isDiscarded(message_);
1254 Lock<Mutex> guard(_lock);
1265 Lock<Mutex> guard(_lock);
1275 Lock<Mutex> guard(_lock);
1276 return _getOldestBookmarkSeq(subId_);
1287 Lock<Mutex> guard(_lock);
1288 _persisted(find(subId_), bookmark_);
1300 Lock<Mutex> guard(_lock);
1301 return _persisted(find(subId_), bookmark_);
1319 Lock<Mutex> guard(_subsLock);
1320 _serverVersion = version_;
1323 inline bool isWritableBookmark(
size_t length)
1325 return length >= AMPS_MIN_BOOKMARK_LEN;
1328 typedef Subscription::EntryPtrList EntryPtrList;
1333 size_t _log(
Message& message_)
1336 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1343 message_.setSubscriptionHandle(
1344 static_cast<amps_subscription_handle>(pSub));
1346 size_t retVal = pSub->log(bookmark);
1347 message_.setBookmarkSeqNo(retVal);
1352 bool _discard(
const Message& message_)
1354 size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1355 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1363 bool retVal = pSub->discard(bookmarkSeqNo);
1366 updateAdapter(pSub);
1372 bool _discard(
const Message::Field& subId_,
size_t bookmarkSeqNo_)
1374 Subscription* pSub = find(subId_);
1375 bool retVal = pSub->discard(bookmarkSeqNo_);
1378 updateAdapter(pSub);
1385 bool usePublishersList_ =
true)
1387 Subscription* pSub = find(subId_);
1388 return pSub->getMostRecentList(usePublishersList_);
1392 bool _isDiscarded(
Message& message_)
1397 Subscription* pSub = find(subId);
1398 message_.setSubscriptionHandle(
1399 static_cast<amps_subscription_handle>(pSub));
1406 Subscription* pSub = find(subId_);
1407 return pSub->getOldestBookmarkSeq();
1411 virtual void _persisted(Subscription* pSub_,
1414 if (pSub_->lastPersisted(bookmark_))
1416 updateAdapter(pSub_);
1421 virtual Message::Field _persisted(Subscription* pSub_,
size_t bookmark_)
1423 return pSub_->lastPersisted(bookmark_);
1429 if (_recoveryPointAdapter.isValid())
1431 _recoveryPointAdapter.purge();
1440 while(!_subs.empty())
1442 SubscriptionMap::iterator iter = _subs.begin();
1446 delete (iter->second);
1455 if (_recoveryPointAdapter.isValid())
1457 _recoveryPointAdapter.purge(subId_);
1465 Lock<Mutex> guard(_subsLock);
1466 SubscriptionMap::iterator iter = _subs.find(subId_);
1467 if (iter == _subs.end())
return;
1469 delete (iter->second);
1477 find(subId_)->setMostRecent(recent_);
1482 static const char ENTRY_BOOKMARK =
'b';
1483 static const char ENTRY_DISCARD =
'd';
1484 static const char ENTRY_PERSISTED =
'p';
1490 throw StoreException(
"A valid subscription ID must be provided to the Bookmark Store");
1492 Lock<Mutex> guard(_subsLock);
1493 if(_subs.count(subId_) == 0)
1498 _subs[id] =
new Subscription(
this,
id);
1501 return _subs[subId_];
1504 virtual bool resize(
const Message::Field& subId_,
char** newBuffer_,
size_t size_,
1505 bool callResizeHandler_ =
true)
1507 assert(newBuffer_ != 0);
1517 if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1521 char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1522 *newBuffer_ = (
char*)malloc(size_);
1523 memset(*newBuffer_, 0, size_);
1526 find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1533 void updateAdapter(Subscription* pSub_)
1535 if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1539 if (_recoveryPointFactory)
1542 pSub_->getMostRecentList(
false)));
1543 _recoveryPointAdapter.update(update);
1548 pSub_->getMostRecentList(
false)));
1549 _recoveryPointAdapter.update(update);
1553 typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1554 SubscriptionMap _subs;
1555 size_t _serverVersion;
1556 bool _recentChanged;
1558 typedef std::set<Subscription*> SubscriptionSet;
1565 #endif //_MEMORYBOOKMARKSTORE_H_ Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1252
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MemoryBookmarkStore.hpp:1216
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1227
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1297
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1116
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1098
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1284
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1317
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1236
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1241
Provides AMPS::RecoveryPointAdapter, an iterface for impplementing external storage of bookmark subsc...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1308
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1203
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1192
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1273
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1263
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034