26 #ifndef _MMAPBOOKMARKSTORE_H_ 27 #define _MMAPBOOKMARKSTORE_H_ 30 #include <RecoveryPoint.hpp> 31 #include <RecoveryPointAdapter.hpp> 38 #include <sys/types.h> 43 #define AMPS_INITIAL_LOG_SIZE 40960UL 60 typedef HANDLE FileType;
65 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
84 , _logOffset(0) , _log(0), _fileTimestamp(0)
86 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
91 if (init(useLastModifiedTime_))
92 recover(useLastModifiedTime_,
false);
106 bool useLastModifiedTime_ =
false)
108 , _logOffset(0) , _log(0), _fileTimestamp(0)
110 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
115 if (init(useLastModifiedTime_))
116 recover(useLastModifiedTime_,
false);
134 const char* fileName_,
135 RecoveryPointFactory factory_ = NULL,
136 bool useLastModifiedTime_ =
false)
138 , _fileName(fileName_), _fileSize(0)
139 , _logOffset(0) , _log(0), _fileTimestamp(0)
141 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
146 if (init(useLastModifiedTime_))
147 recover(useLastModifiedTime_,
true);
165 const std::string& fileName_,
166 RecoveryPointFactory factory_ = NULL,
167 bool useLastModifiedTime_ =
false)
169 , _fileName(fileName_), _fileSize(0)
170 , _logOffset(0) , _log(0), _fileTimestamp(0)
172 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
177 if (init(useLastModifiedTime_))
178 recover(useLastModifiedTime_,
true);
184 UnmapViewOfFile(_log);
185 CloseHandle(_mapFile);
188 munmap(_log, _fileSize);
204 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
205 Lock<Mutex> guard(_lock);
212 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
214 write(sub->id(), ENTRY_BOOKMARK, bookmark);
215 return MemoryBookmarkStore::_log(message_);
228 Lock<Mutex> guard(_lock);
229 write(subId, ENTRY_DISCARD, bookmark);
230 MemoryBookmarkStore::_discard(message_);
242 Lock<Mutex> guard(_lock);
243 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
244 if (!entry || entry->_val.empty())
return;
245 write(subId_, ENTRY_DISCARD, entry->_val);
246 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
256 Lock<Mutex> guard(_lock);
257 return MemoryBookmarkStore::_getMostRecent(subId_);
270 Lock<Mutex> l(_lock);
271 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
277 write(subId, ENTRY_BOOKMARK, message_.
getBookmark());
278 write(subId, ENTRY_DISCARD, message_.
getBookmark());
290 Lock<Mutex> guard(_lock);
291 Lock<Mutex> fileGuard(_fileLock);
292 memset(_log, 0, _logOffset);
294 MemoryBookmarkStore::_purge();
304 Lock<Mutex> guard(_lock);
305 Lock<Mutex> fileGuard(_fileLock);
306 MemoryBookmarkStore::_purge(subId_);
307 std::string tmpFileName = _fileName +
".tmp";
308 __prune(tmpFileName);
313 Lock<Mutex> guard(_lock);
319 Lock<Mutex> guard(_lock);
324 void _prune(std::string tmpFileName_)
326 if (tmpFileName_.empty()) tmpFileName_ = _fileName +
".tmp";
327 Lock<Mutex> guard(_lock);
328 Lock<Mutex> fileGuard(_fileLock);
334 __prune(tmpFileName_);
335 _recentChanged =
false;
339 void __prune(std::string tmpFileName_)
341 size_t sz = AMPS_INITIAL_LOG_SIZE;
344 size_t bytesWritten = 0;
346 file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
347 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
348 if( file == INVALID_HANDLE_VALUE )
350 DWORD err = getErrorNo();
351 std::ostringstream os;
352 os <<
"Failed to create file " << tmpFileName_ <<
" for pruning MMapBookmarkStore";
353 error(os.str(), err);
355 HANDLE mapFile = NULL;
358 sz = _setFileSize(sz, &log, file, &mapFile);
360 catch (StoreException& ex)
362 if(mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
365 std::ostringstream os;
366 os <<
"Failed to create map of log file for prune" 368 throw StoreException(os.str());
373 CloseHandle(mapFile);
375 std::ostringstream os;
376 os <<
"Failed to map log file to memory for prune" 378 throw StoreException(os.str());
384 DWORD err = getErrorNo();
385 UnmapViewOfFile(log);
386 CloseHandle(mapFile);
388 error(
"Failed to prepare tmp file for prune.", err);
391 file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
394 int err = getErrorNo();
395 std::ostringstream os;
396 os <<
"Failed to open log file " << tmpFileName_ <<
" for MMapBookmarkStore";
397 error(os.str(), err);
400 if(::write(file,
"\0\0\0\0", 4) == -1)
402 int err = getErrorNo();
403 std::ostringstream os;
404 os <<
"Failed to write header to log file " << tmpFileName_
405 <<
" in prune for MMapBookmarkStore";
406 error(os.str(), err);
411 sz = _setFileSize(sz, &log, file, 0);
413 catch (StoreException& ex)
415 std::ostringstream os;
416 os <<
"Failed to prepare tmp file \"" << tmpFileName_
417 <<
"\" for prune. " << ex.what();
418 throw StoreException(os.str());
422 int err = getErrorNo();
425 error(
"Failed to prepare tmp file for prune.", err);
430 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
433 assert(!subId.
empty());
434 size_t subIdLen = subId.
len();
435 const BookmarkRange& range = i->second->getRange();
438 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
439 write(&log, &bytesWritten, subId, ENTRY_DISCARD, range);
442 amps_uint64_t recentPub, recentSeq;
443 Subscription::parseBookmark(recent, recentPub, recentSeq);
444 Subscription::PublisherMap publishersDiscarded =
445 i->second->_publishers;
446 MemoryBookmarkStore::EntryPtrList recovered;
447 i->second->getRecoveryEntries(recovered);
448 i->second->setPublishersToDiscarded(&recovered,
449 &publishersDiscarded);
450 char tmpBookmarkBuffer[128];
451 for (Subscription::PublisherIterator pub =
452 publishersDiscarded.begin(),
453 e = publishersDiscarded.end();
457 if (pub->first == 0 || pub->second == 0)
continue;
459 if (pub->first == recentPub)
continue;
460 int written = AMPS_snprintf_amps_uint64_t(
462 sizeof(tmpBookmarkBuffer),
464 *(tmpBookmarkBuffer+written++) =
'|';
465 written += AMPS_snprintf_amps_uint64_t(
466 tmpBookmarkBuffer+written,
467 sizeof(tmpBookmarkBuffer)
470 *(tmpBookmarkBuffer+written++) =
'|';
473 size_t blockLen = subIdLen + 2*
sizeof(size_t) + tmpBookmark.
len() + 1;
474 if(bytesWritten + blockLen + blockLen >= sz)
477 sz = _setFileSize(sz*2, &log, file, &mapFile);
479 sz = _setFileSize(sz*2, &log, file, sz);
482 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
483 write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
485 if (isWritableBookmark(recent.
len()))
488 size_t blockLen = subIdLen + 2*
sizeof(size_t) + recent.
len() + 1;
489 if(bytesWritten + blockLen + blockLen >= sz)
492 sz = _setFileSize(sz*2, &log, file, &mapFile);
494 sz = _setFileSize(sz*2, &log, file, sz);
497 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
498 write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
502 i->second->getMostRecentList();
505 if (isWritableBookmark(bookmark.
len()))
508 size_t blockLen = subIdLen + 2*
sizeof(size_t) +
510 if(bytesWritten + blockLen >= sz)
513 sz = _setFileSize(sz*2, &log, file, &mapFile);
515 sz = _setFileSize(sz*2, &log, file, sz);
518 write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
519 i->second->getLastPersisted());
521 i->second->getActiveEntries(recovered);
522 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
524 entry != recovered.end(); ++entry)
526 if ((*entry)->_val.empty() ||
527 !isWritableBookmark((*entry)->_val.len()))
530 size_t blockLen = subIdLen + 2*
sizeof(size_t) +
531 (*entry)->_val.len() + 1;
532 if(bytesWritten + blockLen >= sz)
535 sz = _setFileSize(sz*2, &log, file, &mapFile);
537 sz = _setFileSize(sz*2, &log, file, sz);
540 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
542 if (!(*entry)->_active)
545 if(bytesWritten + blockLen >= sz)
548 sz = _setFileSize(sz*2, &log, file, &mapFile);
550 sz = _setFileSize(sz*2, &log, file, sz);
553 write(&log, &bytesWritten, subId, ENTRY_DISCARD,
559 catch (StoreException& ex)
562 UnmapViewOfFile(log);
563 CloseHandle(mapFile);
567 ::unlink(tmpFileName_.c_str());
569 std::ostringstream os;
570 os <<
"Exception during prune: " << ex.what();
571 throw StoreException(os.str());
574 BOOL success = FlushViewOfFile(_log,0);
575 success |= UnmapViewOfFile(_log);
577 success |= CloseHandle(_mapFile);
578 success |= CloseHandle(_file);
581 DWORD err = getErrorNo();
582 std::ostringstream os;
583 os <<
"Failed to flush, unmap, and close current file " 585 <<
" in prune in MMapBookmarkStore. ";
586 error(os.str(), err);
588 _mapFile = INVALID_HANDLE_VALUE;
589 _file = INVALID_HANDLE_VALUE;
590 success = FlushViewOfFile(log,0);
591 success |= UnmapViewOfFile(log);
593 success |= CloseHandle(mapFile);
594 success |= CloseHandle(file);
597 DWORD err = getErrorNo();
598 std::ostringstream os;
599 os <<
"Failed to flush, unmap and close completed temp file " 601 <<
" in prune in MMapBookmarkStore. ";
602 error(os.str(), err);
604 mapFile = INVALID_HANDLE_VALUE;
605 file = INVALID_HANDLE_VALUE;
608 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
609 MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
611 DWORD err = getErrorNo();
612 if (--retryCount > 0)
continue;
614 std::string desiredFileName = _fileName;
615 _fileName = tmpFileName_;
617 std::ostringstream os;
618 os <<
"Failed to move completed temp file " << tmpFileName_
619 <<
" to " << desiredFileName
620 <<
" in prune in MMapBookmarkStore. Continuing by using " 621 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
622 error(os.str(), err);
627 munmap(_log, _fileSize);
632 if (-1 == ::unlink(_fileName.c_str()))
634 int err = getErrorNo();
636 std::string desiredFileName = _fileName;
637 _fileName = tmpFileName_;
639 std::ostringstream os;
640 os <<
"Failed to delete file " << desiredFileName
641 <<
" after creating temporary file " << tmpFileName_
642 <<
" in prune in MMapBookmarkStore. Continuing by using " 643 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
644 error(os.str(), err);
646 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
648 int err = getErrorNo();
650 std::string desiredFileName = _fileName;
651 _fileName = tmpFileName_;
653 std::ostringstream os;
654 os <<
"Failed to move completed temp file " << tmpFileName_
655 <<
" to " << desiredFileName
656 <<
" in prune in MMapBookmarkStore. Continuing by using " 657 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
658 error(os.str(), err);
663 _logOffset = bytesWritten;
666 virtual void _persisted(Subscription* subP_,
669 Lock<Mutex> l(_lock);
670 write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
671 MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
674 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
676 Lock<Mutex> l(_lock);
677 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
678 if (!entryPtr || entryPtr->_val.empty())
681 write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
682 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
683 return bookmarkField;
688 bool init(
bool useLastModifiedTime_ =
false)
692 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
693 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
694 if( _file == INVALID_HANDLE_VALUE )
696 DWORD err = getErrorNo();
697 std::ostringstream os;
698 os <<
"Failed to create file " << _fileName <<
" for MMapBookmarkStore";
699 error(os.str(), err);
701 LARGE_INTEGER liFileSize;
702 if(GetFileSizeEx(_file, &liFileSize) == 0)
704 DWORD err = getErrorNo();
706 std::ostringstream os;
707 os <<
"Failure getting file size for MMapBookmarkStore " << _fileName;
708 error(os.str(), err);
712 size_t fileSize = liFileSize.QuadPart;
714 size_t fileSize = liFileSize.LowPart;
716 if (useLastModifiedTime_ && fileSize > 0)
718 FILETIME ftModifiedTime;
719 if(GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
721 DWORD err = getErrorNo();
724 error(
"Failure getting file time while trying to recover.", err);
728 if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
730 DWORD err = getErrorNo();
733 error(
"Failure converting file time while trying to recover.", err);
736 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
737 sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
738 "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
739 st.wDay, st.wHour, st.wMinute, st.wSecond);
740 _fileTimestamp[AMPS_TIMESTAMP_LEN-1] =
'Z';
742 retVal = (fileSize != 0);
743 setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
744 AMPS_INITIAL_LOG_SIZE : fileSize);
746 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
749 int err = getErrorNo();
750 std::ostringstream os;
751 os <<
"Failed to open log file " << _fileName <<
" for MMapBookmarkStore";
752 error(os.str(), err);
755 if(fstat(_file, &statBuf) == -1)
757 int err = getErrorNo();
759 std::ostringstream os;
760 os <<
"Failed to stat log file " << _fileName <<
" for MMapBookmarkStore";
761 error(os.str(), err);
764 size_t fSize = (size_t)statBuf.st_size;
768 if(::write(_file,
"\0\0\0\0", 4) == -1)
770 int err = getErrorNo();
772 std::ostringstream os;
773 os <<
"Failed to write header to log file " << _fileName
774 <<
" for MMapBookmarkStore";
775 error(os.str(), err);
779 else if (useLastModifiedTime_)
781 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
783 gmtime_r(&statBuf.st_mtime, &timeInfo);
784 strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
785 "%Y%m%dT%H%M%S", &timeInfo);
786 _fileTimestamp[AMPS_TIMESTAMP_LEN-1] =
'Z';
789 setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize-1 : AMPS_INITIAL_LOG_SIZE);
795 DWORD getErrorNo()
const 797 return GetLastError();
800 void error(
const std::string& message_, DWORD err)
802 std::ostringstream os;
803 static const DWORD msgSize = 2048;
805 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
806 FORMAT_MESSAGE_ARGUMENT_ARRAY,
807 NULL, err, LANG_NEUTRAL,
808 pMsg, msgSize, NULL);
809 os <<
"File: " << _fileName <<
". " << message_ <<
" with error " << pMsg;
810 throw StoreException(os.str());
813 int getErrorNo()
const 818 void error(
const std::string& message_,
int err)
820 std::ostringstream os;
821 os << message_ <<
". Error is " << strerror(err);
822 throw StoreException(os.str());
826 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; } 827 #define AMPS_READ8(p, v) { memcpy(&v,p,8); } 829 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; } 830 #define AMPS_READ8(p,v) { v = *(const size_t*)p; } 837 Lock<Mutex> guard(_fileLock);
838 write(&_log, &_logOffset, subId_, type_, bookmark_);
841 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
844 if(!_recovering && isWritableBookmark(bookmark_.
len()))
846 size_t len = subId_.
len();
848 size_t blockLen = len + 2*
sizeof(size_t) + bookmark_.
len() + 1;
849 if(*logOffsetPtr + blockLen >= _fileSize)
851 setFileSize(_fileSize*2);
853 char* offset = *logPtr+*logOffsetPtr;
854 AMPS_WRITE8(offset,len);
855 offset +=
sizeof(size_t);
856 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
859 len = bookmark_.
len();
860 AMPS_WRITE8(offset,len);
861 offset +=
sizeof(size_t);
862 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
863 *logOffsetPtr += blockLen;
870 void write(
const Message::Field& subId_,
char type_,
size_t bookmark_)
872 Lock<Mutex> guard(_fileLock);
873 write(&_log, &_logOffset, subId_, type_, bookmark_);
876 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
877 char type_,
size_t bookmark_)
881 size_t len = subId_.
len();
882 size_t blockLen = len + 2*
sizeof(size_t) + 1;
884 if(*logOffsetPtr + blockLen >= _fileSize)
886 setFileSize(_fileSize*2);
888 char* offset = *logPtr+*logOffsetPtr;
889 *(
reinterpret_cast<size_t*
>(offset)) = len;
890 offset +=
sizeof(size_t);
891 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
894 *(
reinterpret_cast<size_t*
>(offset)) = bookmark_;
895 *logOffsetPtr += blockLen;
899 void setFileSize(
size_t newSize_)
901 if(_log && newSize_ <= _fileSize)
904 _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
906 _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
911 size_t _setFileSize(
size_t newSize_,
char** log_, FileType file_,
920 size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
921 if(sz < newSize_ || sz == 0)
926 if(*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
930 FlushViewOfFile(*log_, 0);
931 UnmapViewOfFile(*log_);
933 CloseHandle(*mapFile_);
936 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
938 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
940 if(*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
942 DWORD errNo = getErrorNo();
944 error(
"Failed to create map of log file", errNo);
950 *log_ = (
char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
953 DWORD errNo = getErrorNo();
954 CloseHandle(*mapFile_);
956 error(
"Failed to map log file to memory", errNo);
963 if(lseek(file_, (off_t)sz, SEEK_SET) == -1)
965 int err = getErrorNo();
967 error(
"Seek failed for bookmark log", err);
969 if(::write(file_,
"", 1) == -1)
971 int err = getErrorNo();
973 error(
"Failed to grow bookmark log", err);
978 *log_ =
static_cast<char*
>(mremap(*log_, fileSize_, sz,
981 munmap(*log_,fileSize_);
982 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
983 MAP_SHARED, file_, 0));
989 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
990 MAP_SHARED, file_, 0));
993 if((
void*)(*log_) == MAP_FAILED)
995 int err = getErrorNo();
997 error(
"Failed to map log file to memory", err);
1005 void recover(
bool useLastModifiedTime_ =
false,
1006 bool hasAdapter_ =
false)
1010 size_t bookmarkLen = 0;
1011 size_t lastGoodOffset = 0;
1012 bool inError =
false;
1013 Lock<Mutex> guard(_lock);
1014 Lock<Mutex> fileGuard(_fileLock);
1017 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1019 Message::Field::FieldHash>::iterator BookmarkMapIter;
1021 typedef std::map<Message::Field, BookmarkMap*,
1022 Message::Field::FieldHash> ReadMap;
1023 typedef std::map<Message::Field, BookmarkMap*,
1024 Message::Field::FieldHash>::iterator ReadMapIter;
1026 size_t subLen = *(
reinterpret_cast<size_t*
>(_log));
1027 while(!inError && subLen > 0)
1030 if (_logOffset == 0 && hasAdapter_)
1031 MemoryBookmarkStore::__purge();
1032 _logOffset +=
sizeof(size_t);
1033 sub.assign(_log+_logOffset, subLen);
1034 _logOffset += subLen;
1035 switch(_log[_logOffset++])
1039 case ENTRY_BOOKMARK:
1041 AMPS_READ8((_log+_logOffset),bookmarkLen);
1042 _logOffset +=
sizeof(size_t);
1043 bookmarkField.assign(_log+_logOffset, bookmarkLen);
1044 _logOffset += bookmarkLen;
1045 Subscription* subP = find(sub);
1046 BookmarkMap* bookmarks = NULL;
1047 ReadMapIter iter = recovered.find(sub);
1048 if (iter == recovered.end())
1050 Message::Field subKey;
1052 bookmarks =
new BookmarkMap();
1053 recovered[subKey] = bookmarks;
1057 bookmarks = iter->second;
1059 if (bookmarks->find(bookmarkField) != bookmarks->end())
1061 std::for_each(bookmarks->begin(), bookmarks->end(),
1064 subP->getMostRecent(
true);
1066 if (BookmarkRange::isRange(bookmarkField))
1068 subP->discard(subP->log(bookmarkField));
1070 else if (!subP->isDiscarded(bookmarkField))
1072 size_t sequence = subP->log(bookmarkField);
1073 Message::Field copy;
1075 bookmarks->insert(std::make_pair(copy, sequence));
1081 Message::Field copy;
1083 bookmarks->insert(std::make_pair(copy,0));
1089 AMPS_READ8((_log+_logOffset),bookmarkLen);
1090 _logOffset +=
sizeof(size_t);
1091 bookmarkField.assign(_log+_logOffset, bookmarkLen);
1092 _logOffset += bookmarkLen;
1093 size_t sequence = AMPS_UNSET_INDEX;
1094 ReadMapIter iter = recovered.find(sub);
1095 if (iter != recovered.end())
1097 BookmarkMap* bookmarks = iter->second;
1098 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1099 if (bookmarkIter != bookmarks->end())
1101 sequence = bookmarkIter->second;
1102 Message::Field bookmarkToClear(bookmarkIter->first);
1103 bookmarkToClear.
clear();
1104 bookmarks->erase(bookmarkIter);
1107 if (!BookmarkRange::isRange(bookmarkField))
1109 Subscription* subP = find(sub);
1110 if (sequence != AMPS_UNSET_INDEX)
1113 if (sequence) subP->discard(sequence);
1117 subP->discard(bookmarkField);
1122 case ENTRY_PERSISTED:
1124 AMPS_READ8((_log+_logOffset),bookmarkLen);
1125 _logOffset +=
sizeof(size_t);
1126 bookmarkField.assign(_log+_logOffset, bookmarkLen);
1127 _logOffset += bookmarkLen;
1128 MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1132 if (lastGoodOffset == 0)
1134 error(
"Error while recovering bookmark store file.", getErrorNo());
1138 _logOffset = lastGoodOffset;
1142 lastGoodOffset = _logOffset;
1143 if (!inError) subLen = *(
reinterpret_cast<size_t*
>(_log + _logOffset));
1145 for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1147 if (recovered.count(i->first) && !recovered[i->first]->empty())
1149 if (i->second->getMostRecent(
false).len() > 1)
1151 i->second->justRecovered();
1158 _subs[i->first] =
new Subscription(
this, i->first);
1161 if (useLastModifiedTime_ && _fileTimestamp)
1163 _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1168 delete[] _fileTimestamp;
1171 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1173 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1175 Message::Field f = i->first;
1178 _recovering =
false;
1182 std::string _fileName;
1186 char* _fileTimestamp;
1190 static size_t getPageSize()
1192 static size_t pageSize;
1196 SYSTEM_INFO SYS_INFO;
1197 GetSystemInfo(&SYS_INFO);
1198 pageSize = SYS_INFO.dwPageSize;
1200 pageSize = (size_t)sysconf(_SC_PAGESIZE);
1211 #endif // _MMAPBOOKMARKSTORE_H_ virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:288
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1105
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:446
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:222
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:254
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:311
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MMapBookmarkStore.hpp:240
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1130
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:302
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1307
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:164
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:317
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:201
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:133
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:105
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:105
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1120
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:268