26 #ifndef _MMAPBOOKMARKSTORE_H_ 27 #define _MMAPBOOKMARKSTORE_H_ 38 #include <sys/types.h> 43 #define AMPS_INITIAL_LOG_SIZE 40960UL 60 typedef HANDLE FileType;
65 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
84 , _logOffset(0), _log(0), _fileTimestamp(0)
86 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
91 if (init(useLastModifiedTime_))
93 recover(useLastModifiedTime_,
false);
108 bool useLastModifiedTime_ =
false)
110 , _logOffset(0), _log(0), _fileTimestamp(0)
112 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
117 if (init(useLastModifiedTime_))
119 recover(useLastModifiedTime_,
false);
138 const char* fileName_,
139 RecoveryPointFactory factory_ = NULL,
140 bool useLastModifiedTime_ =
false)
142 , _fileName(fileName_), _fileSize(0)
143 , _logOffset(0), _log(0), _fileTimestamp(0)
145 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
150 if (init(useLastModifiedTime_))
152 recover(useLastModifiedTime_,
true);
171 const std::string& fileName_,
172 RecoveryPointFactory factory_ = NULL,
173 bool useLastModifiedTime_ =
false)
175 , _fileName(fileName_), _fileSize(0)
176 , _logOffset(0), _log(0), _fileTimestamp(0)
178 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
183 if (init(useLastModifiedTime_))
185 recover(useLastModifiedTime_,
true);
192 UnmapViewOfFile(_log);
193 CloseHandle(_mapFile);
196 munmap(_log, _fileSize);
212 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
213 Lock<Mutex> guard(_lock);
222 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
224 write(sub->id(), ENTRY_BOOKMARK, bookmark);
225 return MemoryBookmarkStore::_log(message_);
240 Lock<Mutex> guard(_lock);
241 write(subId, ENTRY_DISCARD, bookmark);
242 MemoryBookmarkStore::_discard(message_);
254 Lock<Mutex> guard(_lock);
255 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
256 if (!entry || entry->_val.empty())
260 write(subId_, ENTRY_DISCARD, entry->_val);
261 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
271 Lock<Mutex> guard(_lock);
272 return MemoryBookmarkStore::_getMostRecent(subId_);
285 Lock<Mutex> l(_lock);
286 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
294 write(subId, ENTRY_BOOKMARK, message_.
getBookmark());
295 write(subId, ENTRY_DISCARD, message_.
getBookmark());
307 Lock<Mutex> guard(_lock);
308 Lock<Mutex> fileGuard(_fileLock);
309 memset(_log, 0, _logOffset);
311 MemoryBookmarkStore::_purge();
321 Lock<Mutex> guard(_lock);
322 Lock<Mutex> fileGuard(_fileLock);
323 MemoryBookmarkStore::_purge(subId_);
324 std::string tmpFileName = _fileName +
".tmp";
325 __prune(tmpFileName);
330 Lock<Mutex> guard(_lock);
336 Lock<Mutex> guard(_lock);
341 void _prune(
const std::string& tmpFileName_)
343 Lock<Mutex> guard(_lock);
344 Lock<Mutex> fileGuard(_fileLock);
350 if (tmpFileName_.empty())
352 __prune(_fileName +
".tmp");
356 __prune(tmpFileName_);
358 _recentChanged =
false;
362 void __prune(
const std::string& tmpFileName_)
364 size_t sz = AMPS_INITIAL_LOG_SIZE;
367 size_t bytesWritten = 0;
369 file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
370 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
371 if ( file == INVALID_HANDLE_VALUE )
373 DWORD err = getErrorNo();
374 std::ostringstream os;
375 os <<
"Failed to create temp store file " << tmpFileName_ <<
376 " to prune MMapBookmarkStore " << _fileName;
377 error(os.str(), err);
379 HANDLE mapFile = NULL;
382 sz = _setFileSize(sz, &log, file, &mapFile);
384 catch (StoreException& ex)
386 if (mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
389 std::ostringstream os;
390 os <<
"Failed to create map of temp file " << tmpFileName_
391 <<
" while resizing it to prune MMapBookmarkStore " << _fileName
392 <<
": " << ex.what();
393 throw StoreException(os.str());
398 CloseHandle(mapFile);
400 std::ostringstream os;
401 os <<
"Failed to map temp file " << tmpFileName_
402 <<
" to memory while resizing it to prune MMapBookmarkStore " 403 << _fileName <<
": " << ex.what();
404 throw StoreException(os.str());
410 DWORD err = getErrorNo();
411 UnmapViewOfFile(log);
412 CloseHandle(mapFile);
414 std::ostringstream os;
415 os <<
"Failed to grow tmp file " << tmpFileName_
416 <<
" to prune MMapBookmarkStore " << _fileName;
417 error(os.str(), err);
420 file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
423 int err = getErrorNo();
424 std::ostringstream os;
425 os <<
"Failed to create temp store file " << tmpFileName_ <<
426 " to prune MMapBookmarkStore " << _fileName;
427 error(os.str(), err);
430 if (::write(file,
"\0\0\0\0", 4) == -1)
432 int err = getErrorNo();
433 std::ostringstream os;
434 os <<
"Failed to write header to temp file " << tmpFileName_
435 <<
" to prune MMapBookmarkStore " << _fileName;
436 error(os.str(), err);
441 sz = _setFileSize(sz, &log, file, 0);
443 catch (StoreException& ex)
445 std::ostringstream os;
446 os <<
"Failed to grow tmp file " << tmpFileName_
447 <<
" to prune MMapBookmarkStore " << _fileName << ex.what();
448 throw StoreException(os.str());
452 int err = getErrorNo();
455 std::ostringstream os;
456 os <<
"Failed to grow tmp file " << tmpFileName_
457 <<
" to prune MMapBookmarkStore " << _fileName;
458 error(os.str(), err);
463 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
466 assert(!subId.
empty());
467 size_t subIdLen = subId.
len();
468 Subscription* mapSubPtr = i->second;
469 const BookmarkRange& range = mapSubPtr->getRange();
472 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
473 write(&log, &bytesWritten, subId, ENTRY_DISCARD, range);
476 amps_uint64_t recentPub, recentSeq;
477 Subscription::parseBookmark(recent, recentPub, recentSeq);
478 Subscription::PublisherMap publishersDiscarded =
479 mapSubPtr->_publishers;
480 MemoryBookmarkStore::EntryPtrList recovered;
481 mapSubPtr->getRecoveryEntries(recovered);
482 mapSubPtr->setPublishersToDiscarded(&recovered,
483 &publishersDiscarded);
484 char tmpBookmarkBuffer[128];
485 for (Subscription::PublisherIterator pub =
486 publishersDiscarded.begin(),
487 e = publishersDiscarded.end();
491 if (pub->first == 0 || pub->second == 0)
496 if (pub->first == recentPub)
500 int written = AMPS_snprintf_amps_uint64_t(
502 sizeof(tmpBookmarkBuffer),
504 *(tmpBookmarkBuffer + written++) =
'|';
505 written += AMPS_snprintf_amps_uint64_t(
506 tmpBookmarkBuffer + written,
507 sizeof(tmpBookmarkBuffer)
510 *(tmpBookmarkBuffer + written++) =
'|';
513 size_t blockLen = subIdLen + 2 *
sizeof(size_t) + tmpBookmark.
len() + 1;
514 if (bytesWritten + blockLen + blockLen >= sz)
517 sz = _setFileSize(sz * 2, &log, file, &mapFile);
519 sz = _setFileSize(sz * 2, &log, file, sz);
522 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
523 write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
525 if (isWritableBookmark(recent.
len()))
528 size_t blockLen = subIdLen + 2 *
sizeof(size_t) + recent.
len() + 1;
529 if (bytesWritten + blockLen + blockLen >= sz)
532 sz = _setFileSize(sz * 2, &log, file, &mapFile);
534 sz = _setFileSize(sz * 2, &log, file, sz);
537 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
538 write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
542 mapSubPtr->getMostRecentList();
545 if (isWritableBookmark(bookmark.
len()))
548 size_t blockLen = subIdLen + 2 *
sizeof(size_t) +
550 if (bytesWritten + blockLen >= sz)
553 sz = _setFileSize(sz * 2, &log, file, &mapFile);
555 sz = _setFileSize(sz * 2, &log, file, sz);
558 write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
559 mapSubPtr->getLastPersisted());
561 mapSubPtr->getActiveEntries(recovered);
562 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
564 entry != recovered.end(); ++entry)
566 if ((*entry)->_val.empty() ||
567 !isWritableBookmark((*entry)->_val.len()))
572 size_t blockLen = subIdLen + 2 *
sizeof(size_t) +
573 (*entry)->_val.len() + 1;
574 if (bytesWritten + blockLen >= sz)
577 sz = _setFileSize(sz * 2, &log, file, &mapFile);
579 sz = _setFileSize(sz * 2, &log, file, sz);
582 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
584 if (!(*entry)->_active)
587 if (bytesWritten + blockLen >= sz)
590 sz = _setFileSize(sz * 2, &log, file, &mapFile);
592 sz = _setFileSize(sz * 2, &log, file, sz);
595 write(&log, &bytesWritten, subId, ENTRY_DISCARD,
601 catch (StoreException& ex)
604 UnmapViewOfFile(log);
605 CloseHandle(mapFile);
609 ::unlink(tmpFileName_.c_str());
611 std::ostringstream os;
612 os <<
"Exception during prune: " << ex.what();
613 throw StoreException(os.str());
616 BOOL success = FlushViewOfFile(_log, 0);
617 success |= UnmapViewOfFile(_log);
619 success |= CloseHandle(_mapFile);
620 success |= CloseHandle(_file);
623 DWORD err = getErrorNo();
624 std::ostringstream os;
625 os <<
"Failed to flush, unmap, and close current file " 627 <<
" in prune in MMapBookmarkStore. ";
628 error(os.str(), err);
630 _mapFile = INVALID_HANDLE_VALUE;
631 _file = INVALID_HANDLE_VALUE;
632 success = FlushViewOfFile(log, 0);
633 success |= UnmapViewOfFile(log);
635 success |= CloseHandle(mapFile);
636 success |= CloseHandle(file);
639 DWORD err = getErrorNo();
640 std::ostringstream os;
641 os <<
"Failed to flush, unmap and close completed temp file " 643 <<
" in prune in MMapBookmarkStore. ";
644 error(os.str(), err);
646 mapFile = INVALID_HANDLE_VALUE;
647 file = INVALID_HANDLE_VALUE;
650 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
651 MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
653 DWORD err = getErrorNo();
654 if (--retryCount > 0)
659 std::string desiredFileName = _fileName;
660 _fileName = tmpFileName_;
662 std::ostringstream os;
663 os <<
"Failed to move completed temp file " << tmpFileName_
664 <<
" to " << desiredFileName
665 <<
" in prune in MMapBookmarkStore. Continuing by using " 666 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
667 error(os.str(), err);
672 munmap(_log, _fileSize);
677 if (-1 == ::unlink(_fileName.c_str()))
679 int err = getErrorNo();
681 std::string desiredFileName = _fileName;
682 _fileName = tmpFileName_;
684 std::ostringstream os;
685 os <<
"Failed to delete file " << desiredFileName
686 <<
" after creating temporary file " << tmpFileName_
687 <<
" in prune in MMapBookmarkStore. Continuing by using " 688 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
689 error(os.str(), err);
691 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
693 int err = getErrorNo();
695 std::string desiredFileName = _fileName;
696 _fileName = tmpFileName_;
698 std::ostringstream os;
699 os <<
"Failed to move completed temp file " << tmpFileName_
700 <<
" to " << desiredFileName
701 <<
" in prune in MMapBookmarkStore. Continuing by using " 702 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
703 error(os.str(), err);
708 _logOffset = bytesWritten;
711 virtual void _persisted(Subscription* subP_,
714 Lock<Mutex> l(_lock);
715 write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
716 MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
719 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
721 Lock<Mutex> l(_lock);
722 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
723 if (!entryPtr || entryPtr->_val.empty())
728 write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
729 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
730 return bookmarkField;
735 bool init(
bool useLastModifiedTime_ =
false)
739 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
740 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
741 if ( _file == INVALID_HANDLE_VALUE )
743 DWORD err = getErrorNo();
744 std::ostringstream os;
745 os <<
"Failed to initialize file " << _fileName <<
" for MMapBookmarkStore";
746 error(os.str(), err);
748 LARGE_INTEGER liFileSize;
749 if (GetFileSizeEx(_file, &liFileSize) == 0)
751 DWORD err = getErrorNo();
753 std::ostringstream os;
754 os <<
"Failure getting initial file size for MMapBookmarkStore " << _fileName;
755 error(os.str(), err);
759 size_t fileSize = liFileSize.QuadPart;
761 size_t fileSize = liFileSize.LowPart;
763 if (useLastModifiedTime_ && fileSize > 0)
765 FILETIME ftModifiedTime;
766 if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
768 DWORD err = getErrorNo();
771 error(
"Failure getting file time while trying to recover.", err);
775 if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
777 DWORD err = getErrorNo();
780 error(
"Failure converting file time while trying to recover.", err);
783 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
784 sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
785 "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
786 st.wDay, st.wHour, st.wMinute, st.wSecond);
787 _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
789 retVal = (fileSize != 0);
790 setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
791 AMPS_INITIAL_LOG_SIZE : fileSize);
793 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
796 int err = getErrorNo();
797 std::ostringstream os;
798 os <<
"Failed to initialize log file " << _fileName <<
" for MMapBookmarkStore";
799 error(os.str(), err);
802 if (fstat(_file, &statBuf) == -1)
804 int err = getErrorNo();
806 std::ostringstream os;
807 os <<
"Failed to stat log file " << _fileName <<
" for MMapBookmarkStore";
808 error(os.str(), err);
811 size_t fSize = (size_t)statBuf.st_size;
815 if (::write(_file,
"\0\0\0\0", 4) == -1)
817 int err = getErrorNo();
819 std::ostringstream os;
820 os <<
"Failed to write header to log file " << _fileName
821 <<
" for MMapBookmarkStore";
822 error(os.str(), err);
826 else if (useLastModifiedTime_)
828 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
830 gmtime_r(&statBuf.st_mtime, &timeInfo);
831 strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
832 "%Y%m%dT%H%M%S", &timeInfo);
833 _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] =
'Z';
836 setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize - 1 : AMPS_INITIAL_LOG_SIZE);
842 DWORD getErrorNo()
const 844 return GetLastError();
847 void error(
const std::string& message_, DWORD err)
849 std::ostringstream os;
850 static const DWORD msgSize = 2048;
852 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
853 FORMAT_MESSAGE_ARGUMENT_ARRAY,
854 NULL, err, LANG_NEUTRAL,
855 pMsg, msgSize, NULL);
856 os <<
"File: " << _fileName <<
". " << message_ <<
" with error " << pMsg;
857 throw StoreException(os.str());
860 int getErrorNo()
const 865 void error(
const std::string& message_,
int err)
867 std::ostringstream os;
868 os << message_ <<
". Error is " << strerror(err);
869 throw StoreException(os.str());
873 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; } 874 #define AMPS_READ8(p, v) { memcpy(&v,p,8); } 876 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; } 877 #define AMPS_READ8(p,v) { v = *(const size_t*)p; } 884 Lock<Mutex> guard(_fileLock);
885 write(&_log, &_logOffset, subId_, type_, bookmark_);
888 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
891 if (!_recovering && isWritableBookmark(bookmark_.
len()))
893 size_t len = subId_.
len();
895 size_t blockLen = len + 2 *
sizeof(size_t) + bookmark_.
len() + 1;
896 if (*logOffsetPtr + blockLen >= _fileSize)
898 setFileSize(_fileSize * 2);
900 char* offset = *logPtr + *logOffsetPtr;
901 AMPS_WRITE8(offset, len);
902 offset +=
sizeof(size_t);
903 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
906 len = bookmark_.
len();
907 AMPS_WRITE8(offset, len);
908 offset +=
sizeof(size_t);
909 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
910 *logOffsetPtr += blockLen;
917 void write(
const Message::Field& subId_,
char type_,
size_t bookmark_)
919 Lock<Mutex> guard(_fileLock);
920 write(&_log, &_logOffset, subId_, type_, bookmark_);
923 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
924 char type_,
size_t bookmark_)
928 size_t len = subId_.
len();
929 size_t blockLen = len + 2 *
sizeof(size_t) + 1;
931 if (*logOffsetPtr + blockLen >= _fileSize)
933 setFileSize(_fileSize * 2);
935 char* offset = *logPtr + *logOffsetPtr;
936 *(
reinterpret_cast<size_t*
>(offset)) = len;
937 offset +=
sizeof(size_t);
938 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
941 *(
reinterpret_cast<size_t*
>(offset)) = bookmark_;
942 *logOffsetPtr += blockLen;
946 void setFileSize(
size_t newSize_)
948 if (_log && newSize_ <= _fileSize)
953 _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
955 _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
960 size_t _setFileSize(
size_t newSize_,
char** log_, FileType file_,
969 size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
970 if (sz < newSize_ || sz == 0)
975 if (*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
979 FlushViewOfFile(*log_, 0);
980 UnmapViewOfFile(*log_);
982 CloseHandle(*mapFile_);
985 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
987 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
989 if (*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
991 DWORD errNo = getErrorNo();
993 std::ostringstream os;
994 os <<
"Failed to create map of MMapBookmarkStore file " << _fileName
995 <<
" during resize.";
996 error(os.str(), errNo);
1002 *log_ = (
char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
1005 DWORD errNo = getErrorNo();
1006 CloseHandle(*mapFile_);
1008 std::ostringstream os;
1009 os <<
"Failed to map MMapBookmarkStore file " << _fileName
1010 <<
" to memory during resize.";
1011 error(os.str(), errNo);
1018 if (lseek(file_, (off_t)sz, SEEK_SET) == -1)
1020 int err = getErrorNo();
1022 std::ostringstream os;
1023 os <<
"Failed to seek in MMapBookmarkStore file " << _fileName
1024 <<
" during resize.";
1025 error(os.str(), err);
1027 if (::write(file_,
"", 1) == -1)
1029 int err = getErrorNo();
1031 std::ostringstream os;
1032 os <<
"Failed to grow MMapBookmarkStore file " << _fileName
1033 <<
" during resize.";
1034 error(os.str(), err);
1039 *log_ =
static_cast<char*
>(mremap(*log_, fileSize_, sz,
1042 munmap(*log_, fileSize_);
1043 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
1044 MAP_SHARED, file_, 0));
1050 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
1051 MAP_SHARED, file_, 0));
1054 if ((
void*)(*log_) == MAP_FAILED)
1056 int err = getErrorNo();
1059 std::ostringstream os;
1060 os <<
"Failed to map MMapBookmarkStore file " << _fileName
1061 <<
" to memory during resize.";
1062 error(os.str(), err);
1069 void recover(
bool useLastModifiedTime_ =
false,
1070 bool hasAdapter_ =
false)
1074 size_t bookmarkLen = 0;
1075 size_t lastGoodOffset = 0;
1076 bool inError =
false;
1077 Lock<Mutex> guard(_lock);
1078 Lock<Mutex> fileGuard(_fileLock);
1081 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1083 Message::Field::FieldHash>::iterator BookmarkMapIter;
1085 typedef std::map<Message::Field, BookmarkMap*,
1086 Message::Field::FieldHash> ReadMap;
1087 typedef std::map<Message::Field, BookmarkMap*,
1088 Message::Field::FieldHash>::iterator ReadMapIter;
1090 size_t subLen = *(
reinterpret_cast<size_t*
>(_log));
1091 while (!inError && subLen > 0)
1094 if (_logOffset == 0 && hasAdapter_)
1096 MemoryBookmarkStore::__purge();
1098 _logOffset +=
sizeof(size_t);
1099 sub.assign(_log + _logOffset, subLen);
1100 _logOffset += subLen;
1101 switch (_log[_logOffset++])
1105 case ENTRY_BOOKMARK:
1107 AMPS_READ8((_log + _logOffset), bookmarkLen);
1108 _logOffset +=
sizeof(size_t);
1109 bookmarkField.assign(_log + _logOffset, bookmarkLen);
1110 _logOffset += bookmarkLen;
1111 Subscription* subP = find(sub);
1112 BookmarkMap* bookmarks = NULL;
1113 ReadMapIter iter = recovered.find(sub);
1114 if (iter == recovered.end())
1116 Message::Field subKey;
1118 bookmarks =
new BookmarkMap();
1119 recovered[subKey] = bookmarks;
1123 bookmarks = iter->second;
1125 if (bookmarks->find(bookmarkField) != bookmarks->end())
1127 std::for_each(bookmarks->begin(), bookmarks->end(),
1130 subP->getMostRecent(
true);
1132 if (BookmarkRange::isRange(bookmarkField))
1134 subP->discard(subP->log(bookmarkField));
1136 else if (!subP->isDiscarded(bookmarkField))
1138 size_t sequence = subP->log(bookmarkField);
1139 Message::Field copy;
1141 bookmarks->insert(std::make_pair(copy, sequence));
1147 Message::Field copy;
1149 bookmarks->insert(std::make_pair(copy, 0));
1155 AMPS_READ8((_log + _logOffset), bookmarkLen);
1156 _logOffset +=
sizeof(size_t);
1157 bookmarkField.assign(_log + _logOffset, bookmarkLen);
1158 _logOffset += bookmarkLen;
1159 size_t sequence = AMPS_UNSET_INDEX;
1160 ReadMapIter iter = recovered.find(sub);
1161 if (iter != recovered.end())
1163 BookmarkMap* bookmarks = iter->second;
1164 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1165 if (bookmarkIter != bookmarks->end())
1167 sequence = bookmarkIter->second;
1168 Message::Field bookmarkToClear(bookmarkIter->first);
1169 bookmarkToClear.
clear();
1170 bookmarks->erase(bookmarkIter);
1173 if (!BookmarkRange::isRange(bookmarkField))
1175 Subscription* subP = find(sub);
1176 if (sequence != AMPS_UNSET_INDEX)
1181 subP->discard(sequence);
1186 subP->discard(bookmarkField);
1191 case ENTRY_PERSISTED:
1193 AMPS_READ8((_log + _logOffset), bookmarkLen);
1194 _logOffset +=
sizeof(size_t);
1195 bookmarkField.assign(_log + _logOffset, bookmarkLen);
1196 _logOffset += bookmarkLen;
1197 MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1201 if (lastGoodOffset == 0)
1203 error(
"Error while recovering MMapBookmarkStore file.", getErrorNo());
1207 _logOffset = lastGoodOffset;
1211 lastGoodOffset = _logOffset;
1214 subLen = *(
reinterpret_cast<size_t*
>(_log + _logOffset));
1217 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1219 if (recovered.count(i->first) && !recovered[i->first]->empty())
1221 if (i->second->getMostRecent(
false).len() > 1)
1223 i->second->justRecovered();
1230 _subs[i->first] =
new Subscription(
this, i->first);
1233 if (useLastModifiedTime_ && _fileTimestamp)
1235 _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1240 delete[] _fileTimestamp;
1243 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1245 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1247 Message::Field f = i->first;
1250 _recovering =
false;
1254 std::string _fileName;
1258 char* _fileTimestamp;
1262 static size_t getPageSize()
1264 static size_t pageSize;
1268 SYSTEM_INFO SYS_INFO;
1269 GetSystemInfo(&SYS_INFO);
1270 pageSize = SYS_INFO.dwPageSize;
1272 pageSize = (size_t)sysconf(_SC_PAGESIZE);
1283 #endif // _MMAPBOOKMARKSTORE_H_ virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:305
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:232
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:269
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:328
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MMapBookmarkStore.hpp:252
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1374
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:127
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:319
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1346
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:170
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:334
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:209
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:137
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
Definition: ampsplusplus.hpp:103
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:107
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:283