28 #ifndef _LOGGEDBOOKMARKSTORE_H_ 29 #define _LOGGEDBOOKMARKSTORE_H_ 42 #include <sys/types.h> 49 typedef char* amps_iovec_base_ptr;
51 typedef void* amps_iovec_base_ptr;
67 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
73 typedef HANDLE FileType;
89 bool useLastModifiedTime_ =
false)
92 , _file(INVALID_HANDLE_VALUE)
96 , _fileName(fileName_)
99 recover(useLastModifiedTime_,
false);
110 bool useLastModifiedTime_ =
false)
113 , _file(INVALID_HANDLE_VALUE)
117 , _fileName(fileName_)
120 recover(useLastModifiedTime_,
false);
138 const char* fileName_,
139 RecoveryPointFactory factory_ = NULL,
140 bool useLastModifiedTime_ =
false)
143 , _file(INVALID_HANDLE_VALUE)
147 , _fileName(fileName_)
150 recover(useLastModifiedTime_,
true);
165 const std::string& fileName_,
166 RecoveryPointFactory factory_ = NULL,
167 bool useLastModifiedTime_ =
false)
170 , _file(INVALID_HANDLE_VALUE)
174 , _fileName(fileName_)
177 recover(useLastModifiedTime_,
true);
185 _recoveringFile =
true;
205 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
206 Lock<Mutex> guard(_lock);
213 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
215 write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
216 return MemoryBookmarkStore::_log(message_);
229 Lock<Mutex> guard(_lock);
230 write(_file, subId, ENTRY_DISCARD, bookmark);
231 MemoryBookmarkStore::_discard(message_);
243 Lock<Mutex> l(_lock);
244 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
245 if (!entry || entry->_val.empty())
return;
246 write(_file, subId_, ENTRY_DISCARD, entry->_val);
247 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
257 Lock<Mutex> l(_lock);
258 return MemoryBookmarkStore::_getMostRecent(subId_);
271 Lock<Mutex> l(_lock);
272 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
278 write(_file, subId, ENTRY_BOOKMARK, message_.
getBookmark());
279 write(_file, subId, ENTRY_DISCARD, message_.
getBookmark());
291 Lock<Mutex> guard(_lock);
293 if(_file != INVALID_HANDLE_VALUE)
295 DeleteFileA(_fileName.c_str());
296 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
297 NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
298 if( _file == INVALID_HANDLE_VALUE )
300 DWORD err = getErrorNo();
301 std::ostringstream os;
302 os <<
"Failed to recreate log file after purge for LoggedBookmarkStore" << _fileName <<
" for LoggedBookmarkStore";
303 error(os.str(), err);
308 ::unlink(_fileName.c_str());
309 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
312 error(
"Failed to recreate log file after purge for LoggedBookmarkStore", getErrorNo());
316 MemoryBookmarkStore::_purge();
326 Lock<Mutex> guard(_lock);
327 MemoryBookmarkStore::_purge(subId_);
328 std::string tmpFileName = _fileName +
".tmp";
329 __prune(tmpFileName);
343 void _prune(
const std::string& tmpFileName_)
345 Lock<Mutex> guard(_lock);
351 if (tmpFileName_.empty())
353 __prune(_fileName +
".tmp");
357 __prune(tmpFileName_);
359 _recentChanged =
false;
362 void __prune(
const std::string& tmpFileName_)
366 tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
367 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
368 if(tmpFile == INVALID_HANDLE_VALUE )
370 DWORD err = getErrorNo();
371 std::ostringstream os;
372 os <<
"Failed to create temp log file " << tmpFileName_ <<
373 " to prune LoggedBookmarkStore " << _fileName;
374 error(os.str(), err);
379 tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
382 int err = getErrorNo();
383 std::ostringstream os;
384 os <<
"Failed to create temp log file " << tmpFileName_ <<
385 " to prune LoggedBookmarkStore " << _fileName;
386 error(os.str(), err);
392 for (SubscriptionMap::iterator i = _subs.begin();
393 i != _subs.end(); ++i)
396 assert(!subId.
empty());
397 Subscription* subPtr = i->second;
398 const BookmarkRange& range = subPtr->getRange();
401 write(tmpFile, subId, ENTRY_BOOKMARK, range);
402 write(tmpFile, subId, ENTRY_DISCARD, range);
405 amps_uint64_t recentPub, recentSeq;
406 Subscription::parseBookmark(recent, recentPub, recentSeq);
407 Subscription::PublisherMap publishersDiscarded =
409 MemoryBookmarkStore::EntryPtrList recovered;
410 subPtr->getRecoveryEntries(recovered);
411 subPtr->setPublishersToDiscarded(&recovered,
412 &publishersDiscarded);
413 char tmpBookmarkBuffer[128];
414 for (Subscription::PublisherIterator pub =
415 publishersDiscarded.begin(),
416 e = publishersDiscarded.end();
420 if (pub->first == 0 || pub->second == 0)
continue;
422 if (pub->first == recentPub)
continue;
423 int written = AMPS_snprintf_amps_uint64_t(
425 sizeof(tmpBookmarkBuffer),
427 *(tmpBookmarkBuffer+written++) =
'|';
428 written += AMPS_snprintf_amps_uint64_t(
429 tmpBookmarkBuffer+written,
430 sizeof(tmpBookmarkBuffer)
433 *(tmpBookmarkBuffer+written++) =
'|';
435 write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
436 write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
438 if (isWritableBookmark(recent.
len()))
440 write(tmpFile, subId, ENTRY_BOOKMARK, recent);
441 write(tmpFile, subId, ENTRY_DISCARD, recent);
445 subPtr->getMostRecentList();
447 if (isWritableBookmark(subPtr->getLastPersisted().len()))
449 write(tmpFile, subId, ENTRY_PERSISTED,
450 subPtr->getLastPersisted());
452 subPtr->getActiveEntries(recovered);
453 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
455 entry != recovered.end(); ++entry)
457 if ((*entry)->_val.empty() ||
458 !isWritableBookmark((*entry)->_val.len()))
460 write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
461 if (!(*entry)->_active)
462 write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
466 catch (StoreException& ex)
469 CloseHandle(tmpFile);
470 DeleteFileA(tmpFileName_.c_str());
473 unlink(tmpFileName_.c_str());
475 std::ostringstream os;
476 os <<
"Exception during prune: " << ex.what();
477 throw StoreException(os.str());
481 CloseHandle(tmpFile);
482 _file = INVALID_HANDLE_VALUE;
483 tmpFile = INVALID_HANDLE_VALUE;
486 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
487 MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
489 DWORD err = getErrorNo();
490 if (--retryCount > 0)
continue;
492 std::string desiredFileName = _fileName;
493 _fileName = tmpFileName_;
495 std::ostringstream os;
496 os <<
"Failed to move completed temp file " << tmpFileName_
497 <<
" to " << desiredFileName
498 <<
" in prune in LoggedBookmarkStore. Continuing by using " 499 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
500 error(os.str(), err);
504 SetFilePointer(_file, 0, NULL, FILE_END);
508 if (-1 == ::unlink(_fileName.c_str()))
510 int err = getErrorNo();
512 std::string desiredFileName = _fileName;
513 _fileName = tmpFileName_;
515 std::ostringstream os;
516 os <<
"Failed to delete file " << desiredFileName
517 <<
" after creating temporary file " << tmpFileName_
518 <<
" in prune in LoggedBookmarkStore. Continuing by using " 519 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
520 error(os.str(), err);
523 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
525 int err = getErrorNo();
527 std::string desiredFileName = _fileName;
528 _fileName = tmpFileName_;
530 std::ostringstream os;
531 os <<
"Failed to move completed temp file " << tmpFileName_
532 <<
" to " << desiredFileName
533 <<
" in prune in LoggedBookmarkStore. Continuing by using " 534 << tmpFileName_ <<
" as the LoggedBookmarkStore file.";
535 error(os.str(), err);
540 if (-1 == ::fstat(_file, &fst))
542 int err = getErrorNo();
543 std::ostringstream os;
544 os <<
"Failed to get size of pruned file " << _fileName
545 <<
" in prune in LoggedBookmarkStore. ";
546 error(os.str(), err);
549 ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
554 virtual void _persisted(Subscription* subP_,
557 Lock<Mutex> guard(_lock);
558 write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
559 MemoryBookmarkStore::_persisted(subP_, bookmark_);
562 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
564 Lock<Mutex> l(_lock);
565 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
566 if (!entryPtr || entryPtr->_val.empty())
569 write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
570 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
571 return bookmarkField;
575 typedef DWORD ERRTYPE ;
576 ERRTYPE getErrorNo()
const 578 return GetLastError();
581 void error(
const std::string& message_, ERRTYPE err)
583 std::ostringstream os;
584 static const DWORD msgSize = 2048;
586 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
587 FORMAT_MESSAGE_ARGUMENT_ARRAY,
588 NULL, err, LANG_NEUTRAL,
589 pMsg, msgSize, NULL);
590 os <<
"File: " << _fileName <<
". " << message_;
593 os <<
" with error " << pMsg;
595 throw StoreException(os.str());
599 ERRTYPE getErrorNo()
const 604 void error(
const std::string& message_, ERRTYPE err)
606 std::ostringstream os;
607 os <<
"File: " << _fileName <<
". " << message_;
610 os <<
" with error " << strerror(err);
613 throw StoreException(os.str());
620 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
621 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
622 if( _file == INVALID_HANDLE_VALUE )
624 DWORD err = getErrorNo();
625 std::ostringstream os;
626 os <<
"Failed to initialize log file " << _fileName <<
" for LoggedBookmarkStore";
627 error(os.str(), err);
631 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
634 int err = getErrorNo();
635 std::ostringstream os;
636 os <<
"Failed to initialize log file " << _fileName <<
" for LoggedBookmarkStore";
637 error(os.str(), err);
646 void write(FileType file_,
const Message::Field& subId_,
char type_,
649 Lock<Mutex> guard(_fileLock);
650 if(!_recoveringFile && isWritableBookmark(bookmark_.
len()))
654 size_t len = subId_.
len();
655 BOOL ok =WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
656 ok |= WriteFile(file_, (LPVOID)subId_.
data(), (DWORD)len, &written, NULL);
657 ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
658 len = bookmark_.
len();
659 ok |= WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
660 ok |= WriteFile(file_, (LPVOID)bookmark_.
data(), (DWORD)len,
664 error(
"Failed to write to bookmark log.", getErrorNo());
671 file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
674 int err = getErrorNo();
675 std::ostringstream os;
676 os <<
"Failed to open file " << _fileName
677 <<
" for write in LoggedBookmarkStore. ";
678 error(os.str(), err);
682 struct iovec data[5];
683 size_t len = subId_.
len();
684 data[0].iov_base = (amps_iovec_base_ptr)(
void*)&len;
685 data[0].iov_len =
sizeof(size_t);
686 data[1].iov_base = (amps_iovec_base_ptr)(
void*)subId_.
data();
687 data[1].iov_len = len;
688 data[2].iov_base = (amps_iovec_base_ptr)(
void*)&type_;
690 size_t bookmarkLen = bookmark_.
len();
691 data[3].iov_base = (amps_iovec_base_ptr)(
void*)&bookmarkLen;
692 data[3].iov_len =
sizeof(size_t);
693 data[4].iov_base = (amps_iovec_base_ptr)(
void*)bookmark_.
data();
694 data[4].iov_len = bookmarkLen;
695 ssize_t written = ::writev(file_, data, 5);
698 error(
"Failed to write to bookmark log.", getErrorNo());
709 char type_,
size_t bookmark_)
711 Lock<Mutex> guard(_fileLock);
716 size_t len = subId_.
len();
717 BOOL ok =WriteFile(file_, (LPVOID)&len,
sizeof(
size_t), &written, NULL);
718 ok |= WriteFile(file_, (LPVOID)subId_.
data(), (DWORD)len, &written, NULL);
719 ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
720 ok |= WriteFile(file_, (LPVOID)&bookmark_,
sizeof(
size_t),
724 error(
"Failed to write bookmark sequence to file.", getErrorNo());
731 file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
734 int err = getErrorNo();
735 std::ostringstream os;
736 os <<
"Failed to open file " << _fileName
737 <<
" to write bookmark sequence in LoggedBookmarkStore. ";
738 error(os.str(), err);
742 struct iovec data[4];
743 size_t len = subId_.
len();
744 data[0].iov_base = (amps_iovec_base_ptr)(
void*)&len;
745 data[0].iov_len =
sizeof(size_t);
746 data[1].iov_base = (amps_iovec_base_ptr)(
void*)subId_.
data();
747 data[1].iov_len = len;
748 data[2].iov_base = (amps_iovec_base_ptr)(
void*)&type_;
750 data[3].iov_base = (amps_iovec_base_ptr)(
void*)&bookmark_;
751 data[3].iov_len =
sizeof(size_t);
752 ssize_t written = ::writev(file_, data, 4);
755 error(
"Failed to write bookmark sequence to file.", getErrorNo());
763 #define VOID_P(buf) (LPVOID)buf 764 bool readFileBytes(LPVOID buffer,
size_t numBytes, DWORD *bytesRead)
766 return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
769 #define VOID_P(buf) (void*)buf 770 bool readFileBytes(
void* buffer,
size_t numBytes, ssize_t *bytesRead)
772 *bytesRead = ::read(_file, buffer, numBytes);
773 return (*bytesRead >= 0);
777 void recover(
bool useLastModifiedTime_,
bool hasAdapter_)
779 size_t bufferLen = 128;
780 char* buffer =
new char[bufferLen];
781 size_t subIdBufferLen = 128;
782 char* subIdBuffer =
new char[bufferLen];
786 size_t bookmarkLen = 0;
787 Lock<Mutex> l(_lock);
788 Lock<Mutex> guard(_fileLock);
789 _recoveringFile =
true;
790 char* fileTimestamp =
new char[AMPS_TIMESTAMP_LEN];
791 fileTimestamp[0] =
'\0';
793 LARGE_INTEGER lifileSize;
794 if(GetFileSizeEx(_file, &lifileSize) == 0)
796 DWORD err = getErrorNo();
798 delete[] subIdBuffer;
799 _recoveringFile =
false;
800 error(
"Failure getting file size while trying to recover.", err);
804 size_t fileSize = lifileSize.QuadPart;
806 size_t fileSize = lifileSize.LowPart;
808 if (useLastModifiedTime_ && fileSize > 0)
810 FILETIME ftModifiedTime;
811 if(GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
813 DWORD err = getErrorNo();
815 delete[] subIdBuffer;
816 _recoveringFile =
false;
817 error(
"Failure getting file time while trying to recover.", err);
821 if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
823 DWORD err = getErrorNo();
825 delete[] subIdBuffer;
826 _recoveringFile =
false;
827 error(
"Failure converting file time while trying to recover.", err);
830 sprintf_s(fileTimestamp, AMPS_TIMESTAMP_LEN,
831 "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
832 st.wDay, st.wHour, st.wMinute, st.wSecond);
833 fileTimestamp[AMPS_TIMESTAMP_LEN-1] =
'Z';
835 else if (fileSize == 0)
837 delete[] fileTimestamp;
839 delete[] subIdBuffer;
840 _recoveringFile =
false;
845 SetFilePointer(_file, 0, NULL, FILE_BEGIN);
848 ::fstat(_file, &fst);
849 ssize_t fileSize = fst.st_size;
850 ssize_t readBytes = 0;
851 if (useLastModifiedTime_ && fileSize > 0)
854 gmtime_r(&fst.st_mtime, &timeInfo);
855 strftime(fileTimestamp, AMPS_TIMESTAMP_LEN,
856 "%Y%m%dT%H%M%S", &timeInfo);
857 fileTimestamp[AMPS_TIMESTAMP_LEN-1] =
'Z';
859 else if (fileSize == 0)
861 delete[] fileTimestamp;
863 delete[] subIdBuffer;
864 _recoveringFile =
false;
868 ::lseek(_file, loc, SEEK_SET);
873 MemoryBookmarkStore::__purge();
875 if (!readFileBytes(VOID_P(&subLen),
sizeof(
size_t), &readBytes)
878 delete[] fileTimestamp;
880 delete[] subIdBuffer;
881 _recoveringFile =
false;
882 error(
"Failure reading file while trying to recover.", getErrorNo());
886 size_t totalBytes = readBytes;
888 ssize_t totalBytes = readBytes;
891 size_t tooManyBytes = 0;
893 Message::Field::FieldHash> BookmarkMap;
894 typedef std::map<Message::Field, size_t,
895 Message::Field::FieldHash>::iterator BookmarkMapIter;
897 typedef std::map<Message::Field, BookmarkMap*,
898 Message::Field::FieldHash> ReadMap;
899 typedef std::map<Message::Field, BookmarkMap*,
900 Message::Field::FieldHash>::iterator ReadMapIter;
902 while(subLen > 0 && (
size_t)readBytes ==
sizeof(
size_t) &&
903 (
size_t)totalBytes <= (
size_t)fileSize)
905 if (subLen >= ((
size_t)fileSize - (
size_t)totalBytes)
908 tooManyBytes = subLen + 1;
914 if (subIdBufferLen < subLen)
916 delete [] subIdBuffer;
917 subIdBufferLen = 2 * subLen;
918 subIdBuffer =
new char[subIdBufferLen];
920 if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
923 tooManyBytes = subLen;
926 totalBytes += readBytes;
927 sub.assign(subIdBuffer, subLen);
928 if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
934 totalBytes += readBytes;
939 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
943 tooManyBytes =
sizeof(size_t);
946 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
949 tooManyBytes =
sizeof(size_t);
952 totalBytes += readBytes;
953 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
957 tooManyBytes = bookmarkLen;
960 if (bufferLen < bookmarkLen)
963 bufferLen = 2 * bookmarkLen;
964 buffer =
new char[bufferLen];
966 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
969 tooManyBytes = bookmarkLen;
972 totalBytes += readBytes;
973 bookmarkField.assign(buffer, bookmarkLen);
974 Subscription* subP = find(sub);
975 BookmarkMap* bookmarks = NULL;
976 ReadMapIter iter = recovered.find(sub);
977 if (iter == recovered.end())
979 Message::Field subKey;
981 bookmarks =
new BookmarkMap();
982 recovered[subKey] = bookmarks;
986 bookmarks = iter->second;
988 if (bookmarks->find(bookmarkField) != bookmarks->end())
990 std::for_each(bookmarks->begin(), bookmarks->end(),
993 subP->getMostRecent(
true);
995 if (BookmarkRange::isRange(bookmarkField))
997 subP->discard(subP->log(bookmarkField));
999 else if (!subP->isDiscarded(bookmarkField))
1001 size_t sequence = subP->log(bookmarkField);
1002 Message::Field copy;
1004 bookmarks->insert(std::make_pair(copy, sequence));
1010 Message::Field copy;
1012 bookmarks->insert(std::make_pair(copy,0));
1019 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
1023 tooManyBytes =
sizeof(size_t);
1026 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
1029 tooManyBytes =
sizeof(size_t);
1032 totalBytes += readBytes;
1033 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
1037 tooManyBytes = bookmarkLen;
1040 if (bufferLen < bookmarkLen)
1043 bufferLen = 2 * bookmarkLen;
1044 buffer =
new char[bufferLen];
1046 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1049 tooManyBytes = bookmarkLen;
1052 totalBytes += readBytes;
1053 bookmarkField.assign(buffer, bookmarkLen);
1054 size_t sequence = AMPS_UNSET_INDEX;
1055 ReadMapIter iter = recovered.find(sub);
1056 if (iter != recovered.end())
1058 BookmarkMap* bookmarks = iter->second;
1059 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1060 if (bookmarkIter != bookmarks->end())
1062 sequence = bookmarkIter->second;
1063 Message::Field bookmarkToClear(bookmarkIter->first);
1064 bookmarkToClear.
clear();
1065 bookmarks->erase(bookmarkIter);
1068 Subscription* subP = find(sub);
1069 if (sequence != AMPS_UNSET_INDEX)
1072 if (sequence) subP->discard(sequence);
1076 subP->discard(bookmarkField);
1080 case ENTRY_PERSISTED:
1082 if ((
size_t)totalBytes +
sizeof(
size_t) >= (
size_t)fileSize)
1086 tooManyBytes =
sizeof(size_t);
1089 if (!readFileBytes(VOID_P(&bookmarkLen),
sizeof(size_t), &readBytes))
1092 tooManyBytes =
sizeof(size_t);
1095 totalBytes += readBytes;
1096 if (bookmarkLen > (
size_t)fileSize - (size_t)totalBytes)
1100 tooManyBytes = bookmarkLen;
1103 if (bufferLen < bookmarkLen)
1106 bufferLen = 2 * bookmarkLen;
1107 buffer =
new char[bufferLen];
1109 if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1112 tooManyBytes = bookmarkLen;
1115 totalBytes += readBytes;
1116 bookmarkField.assign(buffer, bookmarkLen);
1117 Subscription* subP = find(sub);
1118 MemoryBookmarkStore::_persisted(subP, bookmarkField);
1124 tooManyBytes = (size_t)fileSize - (
size_t)totalBytes;
1131 if (!readFileBytes(VOID_P(&subLen),
sizeof(size_t), &readBytes))
1134 tooManyBytes =
sizeof(size_t);
1137 totalBytes += readBytes;
1140 delete[] subIdBuffer;
1143 for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1145 if (recovered.count(i->first) && !recovered[i->first]->empty())
1147 Subscription* subPtr = i->second;
1148 if (subPtr->getMostRecent(
false).len() > 1)
1150 subPtr->justRecovered();
1157 _subs[i->first] =
new Subscription(
this, i->first);
1160 if (useLastModifiedTime_ && fileTimestamp[0] !=
'\0')
1162 _subs[i->first]->setRecoveryTimestamp(fileTimestamp);
1166 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1168 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1170 Message::Field f = i->first;
1173 delete[] fileTimestamp;
1174 _recoveringFile =
false;
1180 if (err != (ERRTYPE)-1 || loc == 0 || fileSize - loc > 128)
1182 std::ostringstream os;
1183 os <<
"Error while recovering LoggedBookmarkStore from " 1185 <<
". Record starting at " << loc
1186 <<
" reading at " << totalBytes
1187 <<
" requested " << tooManyBytes
1188 <<
" and file size is " << fileSize;
1189 error(os.str(), (err != (ERRTYPE)-1 ? err : 0));
1195 LONG low = (LONG)loc;
1196 LONG high = (LONG)((loc >> 32)&0xffffffff);
1197 SetFilePointer(_file, low, &high, FILE_BEGIN);
1199 SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1202 ::lseek(_file, loc, SEEK_SET);
1211 std::string _fileName;
1212 bool _recoveringFile;
1218 #endif // _LOGGEDBOOKMARKSTORE_H_ virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:269
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:332
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
LoggedBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:109
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:137
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: LoggedBookmarkStore.hpp:241
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:164
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:337
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1236
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:64
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
Provides AMPS::RecoveryPointAdapter, an iterface for impplementing external storage of bookmark subsc...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1308
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:324
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:289
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:255
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: BookmarkStore.hpp:203
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:202
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
LoggedBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:88
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:223