26 #ifndef _MEMORYBOOKMARKSTORE_H_ 27 #define _MEMORYBOOKMARKSTORE_H_ 29 #include <BookmarkStore.hpp> 40 #define AMPS_MIN_BOOKMARK_LEN 3 41 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL 62 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
63 typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
64 typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
66 typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
72 : _current(1), _currentBase(0), _least(1), _leastBase(0)
73 , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
74 , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
75 , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
80 _store->resize(_id, (
char**)&_entries,
81 sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE,
false);
82 setLastPersistedToEpoch();
87 Lock<Mutex> guard(_subLock);
90 for (
size_t i = 0; i < _entriesLength; ++i)
92 _entries[i]._val.clear();
95 _store->resize(_id, (
char**)&_entries, 0);
99 _lastPersisted.clear();
102 _recoveryTimestamp.clear();
111 Lock<Mutex> guard(_subLock);
113 size_t index = recover(bookmark_,
true);
114 if (index == AMPS_UNSET_INDEX)
117 if (_current >= _entriesLength)
120 _currentBase += _entriesLength;
124 if ((_current == _least && _leastBase < _currentBase) ||
125 (_current == _recoveryMin && _recoveryBase < _currentBase))
127 if (!_store->resize(_id, (
char**)&_entries,
128 sizeof(Entry) * _entriesLength * 2))
131 return log(bookmark_);
164 if (!BookmarkRange::isRange(bookmark_))
166 _entries[_current]._val.deepCopy(bookmark_);
171 _range.set(bookmark_);
173 if (!_range.isValid())
175 throw CommandException(
"Invalid bookmark range specified.");
177 _store->updateAdapter(
this);
178 if (!_range.isStartInclusive())
181 amps_uint64_t publisher, sequence;
182 parseBookmark(_range.getStart(), publisher, sequence);
183 _publishers[publisher] = sequence;
189 _entries[_current]._active =
true;
192 return index + _currentBase;
197 Lock<Mutex> guard(_subLock);
198 return _discard(index_);
208 Lock<Mutex> guard(_subLock);
209 size_t search = _least;
210 size_t searchBase = _leastBase;
211 size_t searchMax = _current;
212 size_t searchMaxBase = _currentBase;
213 if (_least + _leastBase == _current + _currentBase)
215 if (_recoveryMin != AMPS_UNSET_INDEX)
217 search = _recoveryMin;
218 searchBase = _recoveryBase;
219 searchMax = _recoveryMax;
220 searchMaxBase = _recoveryMaxBase;
227 assert(searchMax != AMPS_UNSET_INDEX);
228 assert(searchMaxBase != AMPS_UNSET_INDEX);
229 assert(search != AMPS_UNSET_INDEX);
230 assert(searchBase != AMPS_UNSET_INDEX);
232 while (search + searchBase < searchMax + searchMaxBase)
234 if (_entries[search]._val == bookmark_)
236 return _discard(search + searchBase);
238 if (++search == _entriesLength)
241 searchBase += _entriesLength;
250 amps_uint64_t& publisherId_,
251 amps_uint64_t& sequenceNumber_)
253 Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
260 Lock<Mutex> guard(_subLock);
261 if (BookmarkRange::isRange(bookmark_))
266 size_t recoveredIdx = recover(bookmark_,
false);
268 amps_uint64_t publisher, sequence;
269 parseBookmark(bookmark_, publisher, sequence);
271 PublisherIterator pub = _publishers.find(publisher);
272 if (pub == _publishers.end() || pub->second < sequence)
274 _publishers[publisher] = sequence;
275 if (recoveredIdx == AMPS_UNSET_INDEX)
280 if (recoveredIdx != AMPS_UNSET_INDEX)
282 if (!_entries[recoveredIdx]._active)
284 _recovered.erase(bookmark_);
292 if (_store->_recovering)
300 size_t base = _leastBase;
301 for (
size_t i = _least; i + base < _current + _currentBase; i++)
303 if ( i >= _entriesLength )
308 if (_entries[i]._val == bookmark_)
310 return !_entries[i]._active;
317 bool empty(
void)
const 319 if (_least == AMPS_UNSET_INDEX ||
320 ((_least + _leastBase) == (_current + _currentBase) &&
321 _recoveryMin == AMPS_UNSET_INDEX))
328 void updateMostRecent()
330 Lock<Mutex> guard(_subLock);
334 const BookmarkRange& getRange()
const 341 Lock<Mutex> guard(_subLock);
342 bool useLastPersisted = !_lastPersisted.empty() &&
343 _lastPersisted.len() > 1;
348 bool useRecent = !_recent.empty() && _recent.len() > 1;
349 amps_uint64_t lastPublisher = 0;
350 amps_uint64_t lastSeq = 0;
351 amps_uint64_t recentPublisher = 0;
352 amps_uint64_t recentSeq = 0;
353 if (useLastPersisted)
355 parseBookmark(_lastPersisted, lastPublisher, lastSeq);
359 parseBookmark(_recent, recentPublisher, recentSeq);
360 if (empty() && useLastPersisted)
366 if (useLastPersisted && lastPublisher == recentPublisher)
368 if (lastSeq <= recentSeq)
374 useLastPersisted =
false;
380 size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
383 totalLen += _recent.len() + 1;
388 if (usePublishersList_
389 && ((!useLastPersisted && !useRecent)
392 std::ostringstream os;
393 for (PublisherIterator pub = _publishers.begin();
394 pub != _publishers.end(); ++pub)
396 if (pub->first == 0 && pub->second == 0)
400 if (pub->first == recentPublisher && recentSeq < pub->second)
402 os << recentPublisher <<
'|' << recentSeq <<
"|,";
406 os << pub->first <<
'|' << pub->second <<
"|,";
409 std::string recent = os.str();
410 totalLen = recent.length();
413 if (!_recoveryTimestamp.empty())
415 totalLen += _recoveryTimestamp.len();
416 recent += std::string(_recoveryTimestamp);
421 recent.erase(--totalLen);
426 if (_range.isValid())
428 if (_range.getStart() != recent
431 _range.replaceStart(_recentList,
true);
433 else if (_range.isStartInclusive())
435 amps_uint64_t publisher, sequence;
436 parseBookmark(_range.getStart(), publisher,
438 PublisherIterator pub = _publishers.find(publisher);
439 if (pub != _publishers.end()
440 && pub->second >= sequence)
442 _range.makeStartExclusive();
449 if (_range.isValid())
454 if (!_recoveryTimestamp.empty() && !_range.isValid())
456 totalLen += _recoveryTimestamp.len() + 1;
460 || (_recent.len() < 2 && !empty()))
462 if (_range.isValid())
470 _setLastPersistedToEpoch();
471 return _lastPersisted;
475 char* field =
new char[totalLen];
480 memcpy(field, _recent.data(), len);
486 if (useLastPersisted)
488 memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
489 len += _lastPersisted.len();
495 if (!_recoveryTimestamp.empty() && !_range.isValid())
497 memcpy(field + len, _recoveryTimestamp.data(),
498 _recoveryTimestamp.len());
505 _recentList.assign(field, totalLen);
506 if (_range.isValid())
510 if (_range.getStart() != _recentList)
512 _range.replaceStart(_recentList,
true);
514 else if (_range.isStartInclusive())
516 amps_uint64_t publisher, sequence;
517 parseBookmark(_range.getStart(), publisher,
519 PublisherIterator pub = _publishers.find(publisher);
520 if (pub != _publishers.end()
521 && pub->second >= sequence)
523 _range.makeStartExclusive();
534 Lock<Mutex> guard(_subLock);
537 if (update_ && _store->_recentChanged)
553 Lock<Mutex> guard(_subLock);
554 return _lastPersisted;
560 _recent.deepCopy(recent_);
563 void setRecoveryTimestamp(
const char* recoveryTimestamp_,
566 _recoveryTimestamp.clear();
567 size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
568 char* ts =
new char[len];
569 memcpy((
void*)ts, (
const void*)recoveryTimestamp_, len);
570 _recoveryTimestamp.assign(ts, len);
573 void moveEntries(
char* old_,
char* new_,
size_t newSize_)
575 size_t least = _least;
576 size_t leastBase = _leastBase;
577 if (_recoveryMin != AMPS_UNSET_INDEX)
579 least = _recoveryMin;
580 leastBase = _recoveryBase;
585 if (newSize_ - (
sizeof(Entry)*_entriesLength) >
sizeof(Entry)*least)
587 memcpy(new_ + (
sizeof(Entry)*_entriesLength),
588 old_, (
sizeof(Entry)*least));
590 memset(old_, 0,
sizeof(Entry)*least);
594 Entry* buffer =
new Entry[least];
595 memcpy((
void*)buffer, (
void*)old_,
sizeof(Entry)*least);
597 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
598 (_entriesLength - least)*
sizeof(Entry));
600 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
601 (
void*)buffer, least *
sizeof(Entry));
611 memcpy((
void*)new_, (
void*)((
char*)old_ + (
sizeof(Entry)*least)),
612 (_entriesLength - least)*
sizeof(Entry));
614 memcpy((
void*)((
char*)new_ + ((_entriesLength - least)*
sizeof(Entry))),
615 (
void*)old_, least *
sizeof(Entry));
620 if (_recoveryMin != AMPS_UNSET_INDEX)
622 _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
623 _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
624 (_recoveryMin + _recoveryBase);
625 _recoveryMaxBase = leastBase;
626 _recoveryMin = least;
627 _recoveryBase = leastBase;
633 _leastBase = leastBase;
635 _currentBase = _leastBase;
636 _current = least + _entriesLength;
641 Lock<Mutex> guard(_subLock);
643 return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
651 || BookmarkRange::isRange(bookmark_))
655 Lock<Mutex> guard(_subLock);
656 return _setLastPersisted(bookmark_);
661 if (!_lastPersisted.empty())
663 amps_uint64_t publisher, publisher_lastPersisted;
664 amps_uint64_t sequence, sequence_lastPersisted;
665 parseBookmark(bookmark_, publisher, sequence);
666 parseBookmark(_lastPersisted, publisher_lastPersisted,
667 sequence_lastPersisted);
668 if (publisher == publisher_lastPersisted &&
669 sequence <= sequence_lastPersisted)
675 _lastPersisted.deepCopy(bookmark_);
676 _store->_recentChanged =
true;
677 _recoveryTimestamp.clear();
683 Lock<Mutex> guard(_subLock);
687 || BookmarkRange::isRange(bookmark))
691 _setLastPersisted(bookmark);
699 size_t recover(
const Message::Field& bookmark_,
bool relogIfNotDiscarded)
701 size_t retVal = AMPS_UNSET_INDEX;
702 if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
708 RecoveryIterator item = _recovered.find(bookmark_);
709 if (item != _recovered.end())
711 size_t seqNo = item->second;
712 size_t index = (seqNo - _recoveryBase) % _entriesLength;
715 if (_least + _leastBase == _current + _currentBase &&
716 !_entries[index]._active)
718 _store->_recentChanged =
true;
720 _recent = _entries[index]._val.deepCopy();
721 retVal = moveEntry(index);
722 if (retVal == AMPS_UNSET_INDEX)
724 relogIfNotDiscarded);
726 _leastBase = _currentBase;
728 else if (!_entries[index]._active || relogIfNotDiscarded)
730 retVal = moveEntry(index);
731 if (retVal == AMPS_UNSET_INDEX)
733 relogIfNotDiscarded);
739 _recovered.erase(item);
740 if (_recovered.empty())
742 _recoveryMin = AMPS_UNSET_INDEX;
743 _recoveryBase = AMPS_UNSET_INDEX;
744 _recoveryMax = AMPS_UNSET_INDEX;
745 _recoveryMaxBase = AMPS_UNSET_INDEX;
747 else if (index == _recoveryMin)
749 while (_entries[_recoveryMin]._val.empty() &&
750 (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
752 if (++_recoveryMin == _entriesLength)
755 _recoveryBase += _entriesLength;
775 Entry() : _active(
false)
783 Field::FieldHash _hasher;
785 size_t operator()(
const Entry* entryPtr_)
const 787 return _hasher(entryPtr_->_val);
790 bool operator()(
const Entry* lhsPtr_,
const Entry* rhsPtr_)
const 792 return _hasher(lhsPtr_->_val, rhsPtr_->_val);
797 typedef std::vector<Entry*> EntryPtrList;
799 void getRecoveryEntries(EntryPtrList& list_)
801 if (_recoveryMin == AMPS_UNSET_INDEX ||
802 _recoveryMax == AMPS_UNSET_INDEX)
806 size_t base = _recoveryBase;
807 size_t max = _recoveryMax + _recoveryMaxBase;
808 for (
size_t i = _recoveryMin; i + base < max; ++i)
810 if (i == _entriesLength)
813 base = _recoveryMaxBase;
816 list_.push_back(&(_entries[i]));
821 void getActiveEntries(EntryPtrList& list_)
823 size_t base = _leastBase;
824 for (
size_t i = _least; i + base < _current + _currentBase; ++i)
826 if (i >= _entriesLength)
832 list_.push_back(&(_entries[i]));
837 Entry* getEntryByIndex(
size_t index_)
839 Lock<Mutex> guard(_subLock);
840 size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
841 index_ >= _least + _leastBase)
842 ? _leastBase : _recoveryBase;
844 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
845 _least + _leastBase :
846 _recoveryMin + _recoveryBase);
847 if (index_ >= _current + _currentBase || index_ < min)
851 return &(_entries[(index_ - base) % _entriesLength]);
856 Lock<Mutex> guard(_subLock);
859 getRecoveryEntries(list);
860 setPublishersToDiscarded(&list, &_publishers);
863 void setPublishersToDiscarded(EntryPtrList* recovered_,
864 PublisherMap* publishers_)
870 for (EntryPtrList::iterator i = recovered_->begin();
871 i != recovered_->end(); ++i)
873 amps_uint64_t publisher = (amps_uint64_t)0;
874 amps_uint64_t sequence = (amps_uint64_t)0;
875 parseBookmark((*i)->_val, publisher, sequence);
876 if (publisher && sequence && (*i)->_active &&
877 (*publishers_)[publisher] >= sequence)
879 (*publishers_)[publisher] = sequence - 1;
884 void clearLastPersisted()
886 Lock<Mutex> guard(_subLock);
887 _lastPersisted.clear();
890 void setLastPersistedToEpoch()
892 Lock<Mutex> guard(_subLock);
893 _setLastPersistedToEpoch();
897 Subscription(
const Subscription&);
898 Subscription& operator=(
const Subscription&);
900 size_t moveEntry(
size_t index_)
903 if (_current >= _entriesLength)
906 _currentBase += _entriesLength;
910 if ((_current == _least % _entriesLength &&
911 _leastBase < _currentBase) ||
912 (_current == _recoveryMin && _recoveryBase < _currentBase))
914 if (!_store->resize(_id, (
char**)&_entries,
915 sizeof(Entry) * _entriesLength * 2))
917 return AMPS_UNSET_INDEX;
922 _entries[_current]._val = _entries[index_]._val;
923 _entries[_current]._active = _entries[index_]._active;
925 _entries[index_]._val.assign(NULL, 0);
926 _entries[index_]._active =
false;
930 void _setLastPersistedToEpoch()
933 char* field =
new char[fieldLen];
935 _lastPersisted.clear();
936 _lastPersisted.assign(field, fieldLen);
939 bool _discard(
size_t index_)
943 assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
944 (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
945 size_t base = (_recoveryBase == AMPS_UNSET_INDEX
946 || index_ >= _least + _leastBase)
947 ? _leastBase : _recoveryBase;
949 size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
950 _recoveryMin + _recoveryBase);
951 if (index_ >= _current + _currentBase || index_ < min)
958 Entry& e = _entries[(index_ - base) % _entriesLength];
961 size_t index = index_;
962 if (_recoveryMin != AMPS_UNSET_INDEX &&
963 index_ == _recoveryMin + _recoveryBase)
966 size_t j = _recoveryMin;
967 while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
968 !_entries[j]._active)
989 if (!bookmark.
empty())
991 _recovered.erase(bookmark);
992 if (_least + _leastBase == _current + _currentBase ||
993 ((_least + _leastBase) % _entriesLength) ==
994 ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
998 _store->_recentChanged =
true;
999 _recoveryTimestamp.
clear();
1002 bookmark.assign(NULL, 0);
1012 if (++j == _entriesLength)
1015 _recoveryBase += _entriesLength;
1019 assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1020 _recovered.empty());
1021 if (_recovered.empty())
1023 _recoveryMin = AMPS_UNSET_INDEX;
1024 _recoveryBase = AMPS_UNSET_INDEX;
1025 _recoveryMax = AMPS_UNSET_INDEX;
1026 _recoveryMaxBase = AMPS_UNSET_INDEX;
1028 index = _least + _leastBase;
1037 if (index == _least + _leastBase)
1041 while (j + _leastBase < _current + _currentBase &&
1042 !_entries[j]._active)
1046 _recent = _entries[j]._val;
1047 _entries[j]._val.assign(NULL, 0);
1048 _store->_recentChanged =
true;
1050 _recoveryTimestamp.clear();
1053 if (++j == _entriesLength)
1056 _leastBase += _entriesLength;
1065 void _updateMostRecent()
1069 assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1070 (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1071 size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1072 size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1073 _recoveryMin = AMPS_UNSET_INDEX;
1074 _recoveryBase = AMPS_UNSET_INDEX;
1075 _recoveryMax = AMPS_UNSET_INDEX;
1076 _recoveryMaxBase = AMPS_UNSET_INDEX;
1077 for (
size_t i = start; i + base < _current + _currentBase; i++)
1079 if ( i >= _entriesLength )
1082 base = _currentBase;
1084 if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1088 Entry& entry = _entries[i];
1089 if (!entry._val.empty())
1091 _recovered[entry._val] = i + base;
1092 if (_recoveryMin == AMPS_UNSET_INDEX)
1095 _recoveryBase = base;
1096 _recoveryMax = _current;
1097 _recoveryMaxBase = _currentBase;
1101 if (_current == _entriesLength)
1104 _currentBase += _entriesLength;
1107 _leastBase = _currentBase;
1114 BookmarkRange _range;
1117 size_t _currentBase;
1120 size_t _recoveryMin;
1121 size_t _recoveryBase;
1122 size_t _recoveryMax;
1123 size_t _recoveryMaxBase;
1124 size_t _entriesLength;
1128 RecoveryMap _recovered;
1130 PublisherMap _publishers;
1139 _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1140 _recentChanged(true),
1142 _recoveryPointAdapter(NULL),
1143 _recoveryPointFactory(NULL)
1146 typedef RecoveryPointAdapter::iterator RecoveryIterator;
1155 RecoveryPointFactory factory_ = NULL)
1159 , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1160 , _recentChanged(true)
1162 , _recoveryPointAdapter(adapter_)
1163 , _recoveryPointFactory(factory_)
1166 for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1167 recoveryPoint != _recoveryPointAdapter.end();
1170 Field subId(recoveryPoint->getSubId());
1171 msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1173 Field bookmark = recoveryPoint->getBookmark();
1174 if (BookmarkRange::isRange(bookmark))
1181 const char* start = bookmark.
data();
1182 size_t remain = bookmark.
len();
1183 const char* comma = (
const char*)memchr((
const void*)start,
1187 size_t len = (size_t)(comma - start);
1191 find(subId)->setRecoveryTimestamp(start, len);
1200 remain = bookmark.
len() - (size_t)(start - bookmark.
data());
1201 comma = (
const char*)memchr((
const void*)start,
1207 find(subId)->setRecoveryTimestamp(start, remain);
1217 _recovering =
false;
1232 Lock<Mutex> guard(_lock);
1233 return _log(message_);
1243 Lock<Mutex> guard(_lock);
1244 (void)_discard(message_);
1256 Lock<Mutex> guard(_lock);
1257 (void)_discard(subId_, bookmarkSeqNo_);
1267 Lock<Mutex> guard(_lock);
1268 return _getMostRecent(subId_);
1281 Lock<Mutex> guard(_lock);
1282 return _isDiscarded(message_);
1292 Lock<Mutex> guard(_lock);
1303 Lock<Mutex> guard(_lock);
1313 Lock<Mutex> guard(_lock);
1314 return _getOldestBookmarkSeq(subId_);
1325 Lock<Mutex> guard(_lock);
1326 _persisted(find(subId_), bookmark_);
1338 Lock<Mutex> guard(_lock);
1339 return _persisted(find(subId_), bookmark_);
1357 Lock<Mutex> guard(_subsLock);
1358 _serverVersion = version_;
1361 inline bool isWritableBookmark(
size_t length)
1363 return length >= AMPS_MIN_BOOKMARK_LEN;
1366 typedef Subscription::EntryPtrList EntryPtrList;
1371 size_t _log(
Message& message_)
1374 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1383 message_.setSubscriptionHandle(
1384 static_cast<amps_subscription_handle>(pSub));
1386 size_t retVal = pSub->log(bookmark);
1387 message_.setBookmarkSeqNo(retVal);
1392 bool _discard(
const Message& message_)
1394 size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1395 Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1405 bool retVal = pSub->discard(bookmarkSeqNo);
1408 updateAdapter(pSub);
1414 bool _discard(
const Message::Field& subId_,
size_t bookmarkSeqNo_)
1416 Subscription* pSub = find(subId_);
1417 bool retVal = pSub->discard(bookmarkSeqNo_);
1420 updateAdapter(pSub);
1427 bool usePublishersList_ =
true)
1429 Subscription* pSub = find(subId_);
1430 return pSub->getMostRecentList(usePublishersList_);
1434 bool _isDiscarded(
Message& message_)
1441 Subscription* pSub = find(subId);
1442 message_.setSubscriptionHandle(
1443 static_cast<amps_subscription_handle>(pSub));
1450 Subscription* pSub = find(subId_);
1451 return pSub->getOldestBookmarkSeq();
1455 virtual void _persisted(Subscription* pSub_,
1458 if (pSub_->lastPersisted(bookmark_))
1460 updateAdapter(pSub_);
1465 virtual Message::Field _persisted(Subscription* pSub_,
size_t bookmark_)
1467 return pSub_->lastPersisted(bookmark_);
1473 if (_recoveryPointAdapter.isValid())
1475 _recoveryPointAdapter.purge();
1484 while (!_subs.empty())
1486 SubscriptionMap::iterator iter = _subs.begin();
1490 delete (iter->second);
1499 if (_recoveryPointAdapter.isValid())
1501 _recoveryPointAdapter.purge(subId_);
1509 Lock<Mutex> guard(_subsLock);
1510 SubscriptionMap::iterator iter = _subs.find(subId_);
1511 if (iter == _subs.end())
1516 delete (iter->second);
1524 find(subId_)->setMostRecent(recent_);
1529 static const char ENTRY_BOOKMARK =
'b';
1530 static const char ENTRY_DISCARD =
'd';
1531 static const char ENTRY_PERSISTED =
'p';
1537 throw StoreException(
"A valid subscription ID must be provided to the Bookmark Store");
1539 Lock<Mutex> guard(_subsLock);
1540 if (_subs.count(subId_) == 0)
1545 _subs[id] =
new Subscription(
this,
id);
1548 return _subs[subId_];
1551 virtual bool resize(
const Message::Field& subId_,
char** newBuffer_,
size_t size_,
1552 bool callResizeHandler_ =
true)
1554 assert(newBuffer_ != 0);
1564 if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1568 char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1569 *newBuffer_ = (
char*)malloc(size_);
1570 memset(*newBuffer_, 0, size_);
1573 find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1580 void updateAdapter(Subscription* pSub_)
1582 if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1586 if (_recoveryPointFactory)
1589 pSub_->getMostRecentList(
false)));
1590 _recoveryPointAdapter.update(update);
1595 pSub_->getMostRecentList(
false)));
1596 _recoveryPointAdapter.update(update);
1600 typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1601 SubscriptionMap _subs;
1602 size_t _serverVersion;
1603 bool _recentChanged;
1605 typedef std::set<Subscription*> SubscriptionSet;
1612 #endif //_MEMORYBOOKMARKSTORE_H_ Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1290
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1254
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1265
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1335
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1154
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1136
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1322
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1355
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1374
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:127
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1279
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1346
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1241
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1230
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1311
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1301
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1140
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140