26 #ifndef _MMAPBOOKMARKSTORE_H_ 27 #define _MMAPBOOKMARKSTORE_H_ 38 #include <sys/types.h> 43 #define AMPS_INITIAL_LOG_SIZE 40960UL 60 typedef HANDLE FileType;
65 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
84 , _logOffset(0) , _log(0), _fileTimestamp(0)
86 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
91 if (init(useLastModifiedTime_))
92 recover(useLastModifiedTime_,
false);
106 bool useLastModifiedTime_ =
false)
108 , _logOffset(0) , _log(0), _fileTimestamp(0)
110 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
115 if (init(useLastModifiedTime_))
116 recover(useLastModifiedTime_,
false);
134 const char* fileName_,
135 RecoveryPointFactory factory_ = NULL,
136 bool useLastModifiedTime_ =
false)
138 , _fileName(fileName_), _fileSize(0)
139 , _logOffset(0) , _log(0), _fileTimestamp(0)
141 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
146 if (init(useLastModifiedTime_))
147 recover(useLastModifiedTime_,
true);
165 const std::string& fileName_,
166 RecoveryPointFactory factory_ = NULL,
167 bool useLastModifiedTime_ =
false)
169 , _fileName(fileName_), _fileSize(0)
170 , _logOffset(0) , _log(0), _fileTimestamp(0)
172 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
177 if (init(useLastModifiedTime_))
178 recover(useLastModifiedTime_,
true);
184 UnmapViewOfFile(_log);
185 CloseHandle(_mapFile);
188 munmap(_log, _fileSize);
204 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
205 Lock<Mutex> guard(_lock);
212 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
214 write(sub->id(), ENTRY_BOOKMARK, bookmark);
215 return MemoryBookmarkStore::_log(message_);
228 Lock<Mutex> guard(_lock);
229 write(subId, ENTRY_DISCARD, bookmark);
230 MemoryBookmarkStore::_discard(message_);
242 Lock<Mutex> guard(_lock);
243 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
244 if (!entry || entry->_val.empty())
return;
245 write(subId_, ENTRY_DISCARD, entry->_val);
246 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
256 Lock<Mutex> guard(_lock);
257 return MemoryBookmarkStore::_getMostRecent(subId_);
270 Lock<Mutex> l(_lock);
271 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
277 write(subId, ENTRY_BOOKMARK, message_.
getBookmark());
278 write(subId, ENTRY_DISCARD, message_.
getBookmark());
290 Lock<Mutex> guard(_lock);
291 Lock<Mutex> fileGuard(_fileLock);
292 memset(_log, 0, _logOffset);
294 MemoryBookmarkStore::_purge();
304 Lock<Mutex> guard(_lock);
305 Lock<Mutex> fileGuard(_fileLock);
306 MemoryBookmarkStore::_purge(subId_);
307 std::string tmpFileName = _fileName +
".tmp";
308 __prune(tmpFileName);
313 Lock<Mutex> guard(_lock);
319 Lock<Mutex> guard(_lock);
324 void _prune(
const std::string& tmpFileName_)
326 Lock<Mutex> guard(_lock);
327 Lock<Mutex> fileGuard(_fileLock);
333 if (tmpFileName_.empty())
335 __prune(_fileName +
".tmp");
339 __prune(tmpFileName_);
341 _recentChanged =
false;
345 void __prune(
const std::string& tmpFileName_)
347 size_t sz = AMPS_INITIAL_LOG_SIZE;
350 size_t bytesWritten = 0;
352 file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
353 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
354 if( file == INVALID_HANDLE_VALUE )
356 DWORD err = getErrorNo();
357 std::ostringstream os;
358 os <<
"Failed to create temp store file " << tmpFileName_ <<
359 " to prune MMapBookmarkStore " << _fileName;
360 error(os.str(), err);
362 HANDLE mapFile = NULL;
365 sz = _setFileSize(sz, &log, file, &mapFile);
367 catch (StoreException& ex)
369 if(mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
372 std::ostringstream os;
373 os <<
"Failed to create map of temp file " << tmpFileName_
374 <<
" while resizing it to prune MMapBookmarkStore " << _fileName
375 <<
": " << ex.what();
376 throw StoreException(os.str());
381 CloseHandle(mapFile);
383 std::ostringstream os;
384 os <<
"Failed to map temp file " << tmpFileName_
385 <<
" to memory while resizing it to prune MMapBookmarkStore " 386 << _fileName <<
": " << ex.what();
387 throw StoreException(os.str());
393 DWORD err = getErrorNo();
394 UnmapViewOfFile(log);
395 CloseHandle(mapFile);
397 std::ostringstream os;
398 os <<
"Failed to grow tmp file " << tmpFileName_
399 <<
" to prune MMapBookmarkStore " << _fileName;
400 error(os.str(), err);
403 file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
406 int err = getErrorNo();
407 std::ostringstream os;
408 os <<
"Failed to create temp store file " << tmpFileName_ <<
409 " to prune MMapBookmarkStore " << _fileName;
410 error(os.str(), err);
413 if(::write(file,
"\0\0\0\0", 4) == -1)
415 int err = getErrorNo();
416 std::ostringstream os;
417 os <<
"Failed to write header to temp file " << tmpFileName_
418 <<
" to prune MMapBookmarkStore " << _fileName;
419 error(os.str(), err);
424 sz = _setFileSize(sz, &log, file, 0);
426 catch (StoreException& ex)
428 std::ostringstream os;
429 os <<
"Failed to grow tmp file " << tmpFileName_
430 <<
" to prune MMapBookmarkStore " << _fileName << ex.what();
431 throw StoreException(os.str());
435 int err = getErrorNo();
438 std::ostringstream os;
439 os <<
"Failed to grow tmp file " << tmpFileName_
440 <<
" to prune MMapBookmarkStore " << _fileName;
441 error(os.str(), err);
446 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
449 assert(!subId.
empty());
450 size_t subIdLen = subId.
len();
451 Subscription* mapSubPtr = i->second;
452 const BookmarkRange& range = mapSubPtr->getRange();
455 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
456 write(&log, &bytesWritten, subId, ENTRY_DISCARD, range);
459 amps_uint64_t recentPub, recentSeq;
460 Subscription::parseBookmark(recent, recentPub, recentSeq);
461 Subscription::PublisherMap publishersDiscarded =
462 mapSubPtr->_publishers;
463 MemoryBookmarkStore::EntryPtrList recovered;
464 mapSubPtr->getRecoveryEntries(recovered);
465 mapSubPtr->setPublishersToDiscarded(&recovered,
466 &publishersDiscarded);
467 char tmpBookmarkBuffer[128];
468 for (Subscription::PublisherIterator pub =
469 publishersDiscarded.begin(),
470 e = publishersDiscarded.end();
474 if (pub->first == 0 || pub->second == 0)
continue;
476 if (pub->first == recentPub)
continue;
477 int written = AMPS_snprintf_amps_uint64_t(
479 sizeof(tmpBookmarkBuffer),
481 *(tmpBookmarkBuffer+written++) =
'|';
482 written += AMPS_snprintf_amps_uint64_t(
483 tmpBookmarkBuffer+written,
484 sizeof(tmpBookmarkBuffer)
487 *(tmpBookmarkBuffer+written++) =
'|';
490 size_t blockLen = subIdLen + 2*
sizeof(size_t) + tmpBookmark.
len() + 1;
491 if(bytesWritten + blockLen + blockLen >= sz)
494 sz = _setFileSize(sz*2, &log, file, &mapFile);
496 sz = _setFileSize(sz*2, &log, file, sz);
499 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
500 write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
502 if (isWritableBookmark(recent.
len()))
505 size_t blockLen = subIdLen + 2*
sizeof(size_t) + recent.
len() + 1;
506 if(bytesWritten + blockLen + blockLen >= sz)
509 sz = _setFileSize(sz*2, &log, file, &mapFile);
511 sz = _setFileSize(sz*2, &log, file, sz);
514 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
515 write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
519 mapSubPtr->getMostRecentList();
522 if (isWritableBookmark(bookmark.
len()))
525 size_t blockLen = subIdLen + 2*
sizeof(size_t) +
527 if(bytesWritten + blockLen >= sz)
530 sz = _setFileSize(sz*2, &log, file, &mapFile);
532 sz = _setFileSize(sz*2, &log, file, sz);
535 write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
536 mapSubPtr->getLastPersisted());
538 mapSubPtr->getActiveEntries(recovered);
539 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
541 entry != recovered.end(); ++entry)
543 if ((*entry)->_val.empty() ||
544 !isWritableBookmark((*entry)->_val.len()))
547 size_t blockLen = subIdLen + 2*
sizeof(size_t) +
548 (*entry)->_val.len() + 1;
549 if(bytesWritten + blockLen >= sz)
552 sz = _setFileSize(sz*2, &log, file, &mapFile);
554 sz = _setFileSize(sz*2, &log, file, sz);
557 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
559 if (!(*entry)->_active)
562 if(bytesWritten + blockLen >= sz)
565 sz = _setFileSize(sz*2, &log, file, &mapFile);
567 sz = _setFileSize(sz*2, &log, file, sz);
570 write(&log, &bytesWritten, subId, ENTRY_DISCARD,
576 catch (StoreException& ex)
579 UnmapViewOfFile(log);
580 CloseHandle(mapFile);
584 ::unlink(tmpFileName_.c_str());
586 std::ostringstream os;
587 os <<
"Exception during prune: " << ex.what();
588 throw StoreException(os.str());
591 BOOL success = FlushViewOfFile(_log,0);
592 success |= UnmapViewOfFile(_log);
594 success |= CloseHandle(_mapFile);
595 success |= CloseHandle(_file);
598 DWORD err = getErrorNo();
599 std::ostringstream os;
600 os <<
"Failed to flush, unmap, and close current file " 602 <<
" in prune in MMapBookmarkStore. ";
603 error(os.str(), err);
605 _mapFile = INVALID_HANDLE_VALUE;
606 _file = INVALID_HANDLE_VALUE;
607 success = FlushViewOfFile(log,0);
608 success |= UnmapViewOfFile(log);
610 success |= CloseHandle(mapFile);
611 success |= CloseHandle(file);
614 DWORD err = getErrorNo();
615 std::ostringstream os;
616 os <<
"Failed to flush, unmap and close completed temp file " 618 <<
" in prune in MMapBookmarkStore. ";
619 error(os.str(), err);
621 mapFile = INVALID_HANDLE_VALUE;
622 file = INVALID_HANDLE_VALUE;
625 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
626 MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
628 DWORD err = getErrorNo();
629 if (--retryCount > 0)
continue;
631 std::string desiredFileName = _fileName;
632 _fileName = tmpFileName_;
634 std::ostringstream os;
635 os <<
"Failed to move completed temp file " << tmpFileName_
636 <<
" to " << desiredFileName
637 <<
" in prune in MMapBookmarkStore. Continuing by using " 638 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
639 error(os.str(), err);
644 munmap(_log, _fileSize);
649 if (-1 == ::unlink(_fileName.c_str()))
651 int err = getErrorNo();
653 std::string desiredFileName = _fileName;
654 _fileName = tmpFileName_;
656 std::ostringstream os;
657 os <<
"Failed to delete file " << desiredFileName
658 <<
" after creating temporary file " << tmpFileName_
659 <<
" in prune in MMapBookmarkStore. Continuing by using " 660 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
661 error(os.str(), err);
663 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
665 int err = getErrorNo();
667 std::string desiredFileName = _fileName;
668 _fileName = tmpFileName_;
670 std::ostringstream os;
671 os <<
"Failed to move completed temp file " << tmpFileName_
672 <<
" to " << desiredFileName
673 <<
" in prune in MMapBookmarkStore. Continuing by using " 674 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
675 error(os.str(), err);
680 _logOffset = bytesWritten;
683 virtual void _persisted(Subscription* subP_,
686 Lock<Mutex> l(_lock);
687 write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
688 MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
691 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
693 Lock<Mutex> l(_lock);
694 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
695 if (!entryPtr || entryPtr->_val.empty())
698 write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
699 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
700 return bookmarkField;
705 bool init(
bool useLastModifiedTime_ =
false)
709 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
710 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
711 if( _file == INVALID_HANDLE_VALUE )
713 DWORD err = getErrorNo();
714 std::ostringstream os;
715 os <<
"Failed to initialize file " << _fileName <<
" for MMapBookmarkStore";
716 error(os.str(), err);
718 LARGE_INTEGER liFileSize;
719 if(GetFileSizeEx(_file, &liFileSize) == 0)
721 DWORD err = getErrorNo();
723 std::ostringstream os;
724 os <<
"Failure getting initial file size for MMapBookmarkStore " << _fileName;
725 error(os.str(), err);
729 size_t fileSize = liFileSize.QuadPart;
731 size_t fileSize = liFileSize.LowPart;
733 if (useLastModifiedTime_ && fileSize > 0)
735 FILETIME ftModifiedTime;
736 if(GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
738 DWORD err = getErrorNo();
741 error(
"Failure getting file time while trying to recover.", err);
745 if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
747 DWORD err = getErrorNo();
750 error(
"Failure converting file time while trying to recover.", err);
753 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
754 sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
755 "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
756 st.wDay, st.wHour, st.wMinute, st.wSecond);
757 _fileTimestamp[AMPS_TIMESTAMP_LEN-1] =
'Z';
759 retVal = (fileSize != 0);
760 setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
761 AMPS_INITIAL_LOG_SIZE : fileSize);
763 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
766 int err = getErrorNo();
767 std::ostringstream os;
768 os <<
"Failed to initilize log file " << _fileName <<
" for MMapBookmarkStore";
769 error(os.str(), err);
772 if(fstat(_file, &statBuf) == -1)
774 int err = getErrorNo();
776 std::ostringstream os;
777 os <<
"Failed to stat log file " << _fileName <<
" for MMapBookmarkStore";
778 error(os.str(), err);
781 size_t fSize = (size_t)statBuf.st_size;
785 if(::write(_file,
"\0\0\0\0", 4) == -1)
787 int err = getErrorNo();
789 std::ostringstream os;
790 os <<
"Failed to write header to log file " << _fileName
791 <<
" for MMapBookmarkStore";
792 error(os.str(), err);
796 else if (useLastModifiedTime_)
798 _fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
800 gmtime_r(&statBuf.st_mtime, &timeInfo);
801 strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
802 "%Y%m%dT%H%M%S", &timeInfo);
803 _fileTimestamp[AMPS_TIMESTAMP_LEN-1] =
'Z';
806 setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize-1 : AMPS_INITIAL_LOG_SIZE);
812 DWORD getErrorNo()
const 814 return GetLastError();
817 void error(
const std::string& message_, DWORD err)
819 std::ostringstream os;
820 static const DWORD msgSize = 2048;
822 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
823 FORMAT_MESSAGE_ARGUMENT_ARRAY,
824 NULL, err, LANG_NEUTRAL,
825 pMsg, msgSize, NULL);
826 os <<
"File: " << _fileName <<
". " << message_ <<
" with error " << pMsg;
827 throw StoreException(os.str());
830 int getErrorNo()
const 835 void error(
const std::string& message_,
int err)
837 std::ostringstream os;
838 os << message_ <<
". Error is " << strerror(err);
839 throw StoreException(os.str());
843 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; } 844 #define AMPS_READ8(p, v) { memcpy(&v,p,8); } 846 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; } 847 #define AMPS_READ8(p,v) { v = *(const size_t*)p; } 854 Lock<Mutex> guard(_fileLock);
855 write(&_log, &_logOffset, subId_, type_, bookmark_);
858 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
861 if(!_recovering && isWritableBookmark(bookmark_.
len()))
863 size_t len = subId_.
len();
865 size_t blockLen = len + 2*
sizeof(size_t) + bookmark_.
len() + 1;
866 if(*logOffsetPtr + blockLen >= _fileSize)
868 setFileSize(_fileSize*2);
870 char* offset = *logPtr+*logOffsetPtr;
871 AMPS_WRITE8(offset,len);
872 offset +=
sizeof(size_t);
873 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
876 len = bookmark_.
len();
877 AMPS_WRITE8(offset,len);
878 offset +=
sizeof(size_t);
879 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
880 *logOffsetPtr += blockLen;
887 void write(
const Message::Field& subId_,
char type_,
size_t bookmark_)
889 Lock<Mutex> guard(_fileLock);
890 write(&_log, &_logOffset, subId_, type_, bookmark_);
893 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
894 char type_,
size_t bookmark_)
898 size_t len = subId_.
len();
899 size_t blockLen = len + 2*
sizeof(size_t) + 1;
901 if(*logOffsetPtr + blockLen >= _fileSize)
903 setFileSize(_fileSize*2);
905 char* offset = *logPtr+*logOffsetPtr;
906 *(
reinterpret_cast<size_t*
>(offset)) = len;
907 offset +=
sizeof(size_t);
908 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
911 *(
reinterpret_cast<size_t*
>(offset)) = bookmark_;
912 *logOffsetPtr += blockLen;
916 void setFileSize(
size_t newSize_)
918 if(_log && newSize_ <= _fileSize)
921 _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
923 _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
928 size_t _setFileSize(
size_t newSize_,
char** log_, FileType file_,
937 size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
938 if(sz < newSize_ || sz == 0)
943 if(*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
947 FlushViewOfFile(*log_, 0);
948 UnmapViewOfFile(*log_);
950 CloseHandle(*mapFile_);
953 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
955 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
957 if(*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
959 DWORD errNo = getErrorNo();
961 std::ostringstream os;
962 os <<
"Failed to create map of MMapBookmarkStore file " << _fileName
963 <<
" during resize.";
964 error(os.str(), errNo);
970 *log_ = (
char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
973 DWORD errNo = getErrorNo();
974 CloseHandle(*mapFile_);
976 std::ostringstream os;
977 os <<
"Failed to map MMapBookmarkStore file " << _fileName
978 <<
" to memory during resize.";
979 error(os.str(), errNo);
986 if(lseek(file_, (off_t)sz, SEEK_SET) == -1)
988 int err = getErrorNo();
990 std::ostringstream os;
991 os <<
"Failed to seek in MMapBookmarkStore file " << _fileName
992 <<
" during resize.";
993 error(os.str(), err);
995 if(::write(file_,
"", 1) == -1)
997 int err = getErrorNo();
999 std::ostringstream os;
1000 os <<
"Failed to grow MMapBookmarkStore file " << _fileName
1001 <<
" during resize.";
1002 error(os.str(), err);
1007 *log_ =
static_cast<char*
>(mremap(*log_, fileSize_, sz,
1010 munmap(*log_,fileSize_);
1011 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
1012 MAP_SHARED, file_, 0));
1018 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
1019 MAP_SHARED, file_, 0));
1022 if((
void*)(*log_) == MAP_FAILED)
1024 int err = getErrorNo();
1027 std::ostringstream os;
1028 os <<
"Failed to map MMapBookmarkStore file " << _fileName
1029 <<
" to memory during resize.";
1030 error(os.str(), err);
1037 void recover(
bool useLastModifiedTime_ =
false,
1038 bool hasAdapter_ =
false)
1042 size_t bookmarkLen = 0;
1043 size_t lastGoodOffset = 0;
1044 bool inError =
false;
1045 Lock<Mutex> guard(_lock);
1046 Lock<Mutex> fileGuard(_fileLock);
1049 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1051 Message::Field::FieldHash>::iterator BookmarkMapIter;
1053 typedef std::map<Message::Field, BookmarkMap*,
1054 Message::Field::FieldHash> ReadMap;
1055 typedef std::map<Message::Field, BookmarkMap*,
1056 Message::Field::FieldHash>::iterator ReadMapIter;
1058 size_t subLen = *(
reinterpret_cast<size_t*
>(_log));
1059 while(!inError && subLen > 0)
1062 if (_logOffset == 0 && hasAdapter_)
1063 MemoryBookmarkStore::__purge();
1064 _logOffset +=
sizeof(size_t);
1065 sub.assign(_log+_logOffset, subLen);
1066 _logOffset += subLen;
1067 switch(_log[_logOffset++])
1071 case ENTRY_BOOKMARK:
1073 AMPS_READ8((_log+_logOffset),bookmarkLen);
1074 _logOffset +=
sizeof(size_t);
1075 bookmarkField.assign(_log+_logOffset, bookmarkLen);
1076 _logOffset += bookmarkLen;
1077 Subscription* subP = find(sub);
1078 BookmarkMap* bookmarks = NULL;
1079 ReadMapIter iter = recovered.find(sub);
1080 if (iter == recovered.end())
1082 Message::Field subKey;
1084 bookmarks =
new BookmarkMap();
1085 recovered[subKey] = bookmarks;
1089 bookmarks = iter->second;
1091 if (bookmarks->find(bookmarkField) != bookmarks->end())
1093 std::for_each(bookmarks->begin(), bookmarks->end(),
1096 subP->getMostRecent(
true);
1098 if (BookmarkRange::isRange(bookmarkField))
1100 subP->discard(subP->log(bookmarkField));
1102 else if (!subP->isDiscarded(bookmarkField))
1104 size_t sequence = subP->log(bookmarkField);
1105 Message::Field copy;
1107 bookmarks->insert(std::make_pair(copy, sequence));
1113 Message::Field copy;
1115 bookmarks->insert(std::make_pair(copy,0));
1121 AMPS_READ8((_log+_logOffset),bookmarkLen);
1122 _logOffset +=
sizeof(size_t);
1123 bookmarkField.assign(_log+_logOffset, bookmarkLen);
1124 _logOffset += bookmarkLen;
1125 size_t sequence = AMPS_UNSET_INDEX;
1126 ReadMapIter iter = recovered.find(sub);
1127 if (iter != recovered.end())
1129 BookmarkMap* bookmarks = iter->second;
1130 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1131 if (bookmarkIter != bookmarks->end())
1133 sequence = bookmarkIter->second;
1134 Message::Field bookmarkToClear(bookmarkIter->first);
1135 bookmarkToClear.
clear();
1136 bookmarks->erase(bookmarkIter);
1139 if (!BookmarkRange::isRange(bookmarkField))
1141 Subscription* subP = find(sub);
1142 if (sequence != AMPS_UNSET_INDEX)
1145 if (sequence) subP->discard(sequence);
1149 subP->discard(bookmarkField);
1154 case ENTRY_PERSISTED:
1156 AMPS_READ8((_log+_logOffset),bookmarkLen);
1157 _logOffset +=
sizeof(size_t);
1158 bookmarkField.assign(_log+_logOffset, bookmarkLen);
1159 _logOffset += bookmarkLen;
1160 MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1164 if (lastGoodOffset == 0)
1166 error(
"Error while recovering MMapBookmarkStore file.", getErrorNo());
1170 _logOffset = lastGoodOffset;
1174 lastGoodOffset = _logOffset;
1175 if (!inError) subLen = *(
reinterpret_cast<size_t*
>(_log + _logOffset));
1177 for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1179 if (recovered.count(i->first) && !recovered[i->first]->empty())
1181 if (i->second->getMostRecent(
false).len() > 1)
1183 i->second->justRecovered();
1190 _subs[i->first] =
new Subscription(
this, i->first);
1193 if (useLastModifiedTime_ && _fileTimestamp)
1195 _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1200 delete[] _fileTimestamp;
1203 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1205 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1207 Message::Field f = i->first;
1210 _recovering =
false;
1214 std::string _fileName;
1218 char* _fileTimestamp;
1222 static size_t getPageSize()
1224 static size_t pageSize;
1228 SYSTEM_INFO SYS_INFO;
1229 GetSystemInfo(&SYS_INFO);
1230 pageSize = SYS_INFO.dwPageSize;
1232 pageSize = (size_t)sysconf(_SC_PAGESIZE);
1243 #endif // _MMAPBOOKMARKSTORE_H_ virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:288
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1107
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:222
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:254
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:311
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MMapBookmarkStore.hpp:240
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1132
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:302
Provides AMPS::RecoveryPointAdapter, an iterface for impplementing external storage of bookmark subsc...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1308
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:164
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:317
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:201
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:133
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:103
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:105
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1122
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:268