28 #ifndef _MMAPBOOKMARKSTORE_H_ 29 #define _MMAPBOOKMARKSTORE_H_ 38 #include <sys/types.h> 59 typedef HANDLE FileType;
64 static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
79 , _logOffset(0) , _log(0)
81 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
98 , _logOffset(0) , _log(0)
100 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
112 UnmapViewOfFile(_log);
113 CloseHandle(_mapFile);
116 munmap(_log, _fileSize);
132 Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
133 Lock<Mutex> guard(_lock);
140 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
142 write(sub->id(), ENTRY_BOOKMARK, bookmark);
143 return MemoryBookmarkStore::_log(message_);
156 Lock<Mutex> guard(_lock);
157 MemoryBookmarkStore::_discard(message_);
158 write(subId, ENTRY_DISCARD, bookmark);
170 Lock<Mutex> guard(_lock);
171 Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
172 if (!entry || entry->_val.empty())
return;
173 write(subId_, ENTRY_DISCARD, entry->_val);
174 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
184 Lock<Mutex> guard(_lock);
198 Lock<Mutex> l(_lock);
199 bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
205 write(subId, ENTRY_BOOKMARK, message_.
getBookmark());
206 write(subId, ENTRY_DISCARD, message_.
getBookmark());
218 Lock<Mutex> guard(_lock);
219 Lock<Mutex> fileGuard(_fileLock);
220 memset(_log, 0, _logOffset);
222 MemoryBookmarkStore::_purge();
232 Lock<Mutex> guard(_lock);
233 Lock<Mutex> fileGuard(_fileLock);
234 MemoryBookmarkStore::_purge(subId_);
235 std::string tmpFileName = _fileName +
".tmp";
236 __prune(tmpFileName);
246 Lock<Mutex> guard(_lock);
252 Lock<Mutex> guard(_lock);
257 void _prune(std::string tmpFileName_)
259 if (tmpFileName_.empty()) tmpFileName_ = _fileName +
".tmp";
260 Lock<Mutex> guard(_lock);
261 Lock<Mutex> fileGuard(_fileLock);
267 __prune(tmpFileName_);
268 _recentChanged =
false;
272 void __prune(std::string tmpFileName_)
274 size_t sz = AMPS_INITIAL_LOG_SIZE;
277 size_t bytesWritten = 0;
279 file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
280 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
281 if( file == INVALID_HANDLE_VALUE )
283 DWORD err = getErrorNo();
284 std::ostringstream os;
285 os <<
"Failed to create file " << tmpFileName_ <<
" for pruning MMapBookmarkStore";
286 error(os.str(), err);
288 HANDLE mapFile = NULL;
291 sz = _setFileSize(sz, &log, file, &mapFile);
293 catch (StoreException& ex)
295 if(mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
298 std::ostringstream os;
299 os <<
"Failed to create map of log file for prune" 301 throw StoreException(os.str());
306 CloseHandle(mapFile);
308 std::ostringstream os;
309 os <<
"Failed to map log file to memory for prune" 311 throw StoreException(os.str());
317 DWORD err = getErrorNo();
318 UnmapViewOfFile(log);
319 CloseHandle(mapFile);
321 error(
"Failed to prepare tmp file for prune.", err);
324 file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
327 int err = getErrorNo();
328 std::ostringstream os;
329 os <<
"Failed to open log file " << tmpFileName_ <<
" for MMapBookmarkStore";
330 error(os.str(), err);
333 if(::write(file,
"\0\0\0\0", 4) == -1)
335 int err = getErrorNo();
336 std::ostringstream os;
337 os <<
"Failed to write header to log file " << tmpFileName_
338 <<
" in prune for MMapBookmarkStore";
339 error(os.str(), err);
344 sz = _setFileSize(sz, &log, file, 0);
346 catch (StoreException& ex)
348 std::ostringstream os;
349 os <<
"Failed to prepare tmp file \"" << tmpFileName_
350 <<
"\" for prune. " << ex.what();
351 throw StoreException(os.str());
355 int err = getErrorNo();
358 error(
"Failed to prepare tmp file for prune.", err);
363 for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
366 assert(!subId.
empty());
367 size_t subIdLen = subId.
len();
369 amps_uint64_t recentPub, recentSeq;
370 Subscription::parseBookmark(recent, recentPub, recentSeq);
371 Subscription::PublisherMap publishersDiscarded =
372 i->second->_publishers;
373 MemoryBookmarkStore::EntryPtrList recovered;
374 i->second->getRecoveryEntries(recovered);
375 i->second->setPublishersToDiscarded(&recovered,
376 &publishersDiscarded);
377 char tmpBookmarkBuffer[128];
378 for (Subscription::PublisherIterator pub =
379 publishersDiscarded.begin(),
380 e = publishersDiscarded.end();
384 if (pub->first == 0 || pub->second == 0)
continue;
386 if (pub->first == recentPub)
continue;
387 int written = AMPS_snprintf_amps_uint64_t(
389 sizeof(tmpBookmarkBuffer),
391 *(tmpBookmarkBuffer+written++) =
'|';
392 written += AMPS_snprintf_amps_uint64_t(
393 tmpBookmarkBuffer+written,
394 sizeof(tmpBookmarkBuffer)
397 *(tmpBookmarkBuffer+written++) =
'|';
400 size_t blockLen = subIdLen + 2*
sizeof(size_t) + tmpBookmark.
len() + 1;
401 if(bytesWritten + blockLen + blockLen >= sz)
404 sz = _setFileSize(sz*2, &log, file, &mapFile);
406 sz = _setFileSize(sz*2, &log, file, sz);
409 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
410 write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
412 if (isWritableBookmark(recent.
len()))
415 size_t blockLen = subIdLen + 2*
sizeof(size_t) + recent.
len() + 1;
416 if(bytesWritten + blockLen + blockLen >= sz)
419 sz = _setFileSize(sz*2, &log, file, &mapFile);
421 sz = _setFileSize(sz*2, &log, file, sz);
424 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
425 write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
429 i->second->getMostRecentList();
432 if (isWritableBookmark(bookmark.
len()))
435 size_t blockLen = subIdLen + 2*
sizeof(size_t) +
437 if(bytesWritten + blockLen >= sz)
440 sz = _setFileSize(sz*2, &log, file, &mapFile);
442 sz = _setFileSize(sz*2, &log, file, sz);
445 write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
446 i->second->getLastPersisted());
448 i->second->getActiveEntries(recovered);
449 for (MemoryBookmarkStore::EntryPtrList::iterator entry =
451 entry != recovered.end(); ++entry)
453 if ((*entry)->_val.empty() ||
454 !isWritableBookmark((*entry)->_val.len()))
457 size_t blockLen = subIdLen + 2*
sizeof(size_t) +
458 (*entry)->_val.len() + 1;
459 if(bytesWritten + blockLen >= sz)
462 sz = _setFileSize(sz*2, &log, file, &mapFile);
464 sz = _setFileSize(sz*2, &log, file, sz);
467 write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
469 if (!(*entry)->_active)
472 if(bytesWritten + blockLen >= sz)
475 sz = _setFileSize(sz*2, &log, file, &mapFile);
477 sz = _setFileSize(sz*2, &log, file, sz);
480 write(&log, &bytesWritten, subId, ENTRY_DISCARD,
486 catch (StoreException& ex)
489 UnmapViewOfFile(log);
490 CloseHandle(mapFile);
494 ::unlink(tmpFileName_.c_str());
496 std::ostringstream os;
497 os <<
"Exception during prune: " << ex.what();
498 throw StoreException(os.str());
501 BOOL success = FlushViewOfFile(_log,0);
502 success |= UnmapViewOfFile(_log);
504 success |= CloseHandle(_mapFile);
505 success |= CloseHandle(_file);
508 DWORD err = getErrorNo();
509 std::ostringstream os;
510 os <<
"Failed to flush, unmap, and close current file " 512 <<
" in prune in MMapBookmarkStore. ";
513 error(os.str(), err);
515 _mapFile = INVALID_HANDLE_VALUE;
516 _file = INVALID_HANDLE_VALUE;
517 success = FlushViewOfFile(log,0);
518 success |= UnmapViewOfFile(log);
520 success |= CloseHandle(mapFile);
521 success |= CloseHandle(file);
524 DWORD err = getErrorNo();
525 std::ostringstream os;
526 os <<
"Failed to flush, unmap and close completed temp file " 528 <<
" in prune in MMapBookmarkStore. ";
529 error(os.str(), err);
531 mapFile = INVALID_HANDLE_VALUE;
532 file = INVALID_HANDLE_VALUE;
535 while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
536 MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
538 DWORD err = getErrorNo();
539 if (--retryCount > 0)
continue;
541 std::string desiredFileName = _fileName;
542 _fileName = tmpFileName_;
544 std::ostringstream os;
545 os <<
"Failed to move completed temp file " << tmpFileName_
546 <<
" to " << desiredFileName
547 <<
" in prune in MMapBookmarkStore. Continuing by using " 548 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
549 error(os.str(), err);
554 munmap(_log, _fileSize);
559 if (-1 == ::unlink(_fileName.c_str()))
561 int err = getErrorNo();
563 std::string desiredFileName = _fileName;
564 _fileName = tmpFileName_;
566 std::ostringstream os;
567 os <<
"Failed to delete file " << desiredFileName
568 <<
" after creating temporary file " << tmpFileName_
569 <<
" in prune in MMapBookmarkStore. Continuing by using " 570 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
571 error(os.str(), err);
573 if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
575 int err = getErrorNo();
577 std::string desiredFileName = _fileName;
578 _fileName = tmpFileName_;
580 std::ostringstream os;
581 os <<
"Failed to move completed temp file " << tmpFileName_
582 <<
" to " << desiredFileName
583 <<
" in prune in MMapBookmarkStore. Continuing by using " 584 << tmpFileName_ <<
" as the MMapBookmarkStore file.";
585 error(os.str(), err);
590 _logOffset = bytesWritten;
593 virtual void _persisted(Subscription* subP_,
596 Lock<Mutex> l(_lock);
597 write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
598 MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
601 virtual Message::Field _persisted(Subscription* subP_,
size_t bookmark_)
603 Lock<Mutex> l(_lock);
604 Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
605 if (!entryPtr || entryPtr->_val.empty())
608 write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
609 MemoryBookmarkStore::_persisted(subP_, bookmarkField);
610 return bookmarkField;
616 _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
617 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
618 if( _file == INVALID_HANDLE_VALUE )
620 DWORD err = getErrorNo();
621 std::ostringstream os;
622 os <<
"Failed to create file " << _fileName <<
" for MMapBookmarkStore";
623 error(os.str(), err);
625 LARGE_INTEGER liFileSize;
626 if(GetFileSizeEx(_file, &liFileSize) == 0)
628 DWORD err = getErrorNo();
630 std::ostringstream os;
631 os <<
"Failure getting file size for MMapBookmarkStore " << _fileName;
632 error(os.str(), err);
635 size_t fileSize = liFileSize.QuadPart;
637 size_t fileSize = liFileSize.LowPart;
639 setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
640 AMPS_INITIAL_LOG_SIZE : fileSize);
642 _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
645 int err = getErrorNo();
646 std::ostringstream os;
647 os <<
"Failed to open log file " << _fileName <<
" for MMapBookmarkStore";
648 error(os.str(), err);
651 if(fstat(_file, &statBuf) == -1)
653 int err = getErrorNo();
655 std::ostringstream os;
656 os <<
"Failed to stat log file " << _fileName <<
" for MMapBookmarkStore";
657 error(os.str(), err);
659 size_t fSize = (size_t)statBuf.st_size;
662 if(::write(_file,
"\0\0\0\0", 4) == -1)
664 int err = getErrorNo();
666 std::ostringstream os;
667 os <<
"Failed to write header to log file " << _fileName
668 <<
" for MMapBookmarkStore";
669 error(os.str(), err);
673 setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize-1 : AMPS_INITIAL_LOG_SIZE);
678 DWORD getErrorNo()
const 680 return GetLastError();
683 void error(
const std::string& message_, DWORD err)
685 std::ostringstream os;
686 static const DWORD msgSize = 2048;
688 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
689 FORMAT_MESSAGE_ARGUMENT_ARRAY,
690 NULL, err, LANG_NEUTRAL,
691 pMsg, msgSize, NULL);
692 os <<
"File: " << _fileName <<
". " << message_ <<
" with error " << pMsg;
693 throw StoreException(os.str());
696 int getErrorNo()
const 701 void error(
const std::string& message_,
int err)
703 std::ostringstream os;
704 os << message_ <<
". Error is " << strerror(err);
705 throw StoreException(os.str());
709 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; } 710 #define AMPS_READ8(p, v) { memcpy(&v,p,8); } 712 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; } 713 #define AMPS_READ8(p,v) { v = *(const size_t*)p; } 720 Lock<Mutex> guard(_fileLock);
721 write(&_log, &_logOffset, subId_, type_, bookmark_);
724 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
727 if(!_recovering && isWritableBookmark(bookmark_.
len()))
729 size_t len = subId_.
len();
731 size_t blockLen = len + 2*
sizeof(size_t) + bookmark_.
len() + 1;
732 if(*logOffsetPtr + blockLen >= _fileSize)
734 setFileSize(_fileSize*2);
736 char* offset = *logPtr+*logOffsetPtr;
737 AMPS_WRITE8(offset,len);
738 offset +=
sizeof(size_t);
739 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
742 len = bookmark_.
len();
743 AMPS_WRITE8(offset,len);
744 offset +=
sizeof(size_t);
745 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
746 *logOffsetPtr += blockLen;
753 void write(
const Message::Field& subId_,
char type_,
size_t bookmark_)
755 Lock<Mutex> guard(_fileLock);
756 write(&_log, &_logOffset, subId_, type_, bookmark_);
759 void write(
char** logPtr,
size_t* logOffsetPtr,
const Message::Field& subId_,
760 char type_,
size_t bookmark_)
764 size_t len = subId_.
len();
765 size_t blockLen = len + 2*
sizeof(size_t) + 1;
767 if(*logOffsetPtr + blockLen >= _fileSize)
769 setFileSize(_fileSize*2);
771 char* offset = *logPtr+*logOffsetPtr;
772 *(
reinterpret_cast<size_t*
>(offset)) = len;
773 offset +=
sizeof(size_t);
774 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
777 *(
reinterpret_cast<size_t*
>(offset)) = bookmark_;
778 *logOffsetPtr += blockLen;
782 void setFileSize(
size_t newSize_)
784 if(_log && newSize_ <= _fileSize)
787 _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
789 _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
794 size_t _setFileSize(
size_t newSize_,
char** log_, FileType file_,
803 size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
804 if(sz < newSize_ || sz == 0)
809 if(*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
813 FlushViewOfFile(*log_, 0);
814 UnmapViewOfFile(*log_);
816 CloseHandle(*mapFile_);
819 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
821 *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
823 if(*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
825 DWORD errNo = getErrorNo();
827 error(
"Failed to create map of log file", errNo);
833 *log_ = (
char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
836 DWORD errNo = getErrorNo();
837 CloseHandle(*mapFile_);
839 error(
"Failed to map log file to memory", errNo);
846 if(lseek(file_, (off_t)sz, SEEK_SET) == -1)
848 int err = getErrorNo();
850 error(
"Seek failed for bookmark log", err);
852 if(::write(file_,
"", 1) == -1)
854 int err = getErrorNo();
856 error(
"Failed to grow bookmark log", err);
861 *log_ =
static_cast<char*
>(mremap(*log_, fileSize_, sz,
864 munmap(*log_,fileSize_);
865 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
866 MAP_SHARED, file_, 0));
872 *log_ =
static_cast<char*
>(mmap(0, sz, PROT_READ | PROT_WRITE,
873 MAP_SHARED, file_, 0));
876 if((
void*)(*log_) == MAP_FAILED)
878 int err = getErrorNo();
880 error(
"Failed to map log file to memory", err);
892 size_t bookmarkLen = 0;
893 size_t lastGoodOffset = 0;
894 bool inError =
false;
895 Lock<Mutex> guard(_lock);
896 Lock<Mutex> fileGuard(_fileLock);
899 typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
901 Message::Field::FieldHash>::iterator BookmarkMapIter;
903 typedef std::map<Message::Field, BookmarkMap*,
904 Message::Field::FieldHash> ReadMap;
905 typedef std::map<Message::Field, BookmarkMap*,
906 Message::Field::FieldHash>::iterator ReadMapIter;
908 size_t subLen = *(
reinterpret_cast<size_t*
>(_log));
909 while(!inError && subLen > 0)
911 _logOffset +=
sizeof(size_t);
912 sub.assign(_log+_logOffset, subLen);
913 _logOffset += subLen;
914 switch(_log[_logOffset++])
920 AMPS_READ8((_log+_logOffset),bookmarkLen);
921 _logOffset +=
sizeof(size_t);
922 bookmarkField.assign(_log+_logOffset, bookmarkLen);
923 _logOffset += bookmarkLen;
924 Subscription* subP = find(sub);
925 BookmarkMap* bookmarks = NULL;
926 ReadMapIter iter = recovered.find(sub);
927 if (iter == recovered.end())
929 Message::Field subKey;
931 bookmarks =
new BookmarkMap();
932 recovered[subKey] = bookmarks;
936 bookmarks = iter->second;
938 if (bookmarks->find(bookmarkField) != bookmarks->end())
940 std::for_each(bookmarks->begin(), bookmarks->end(),
943 subP->getMostRecent();
945 if (!subP->isDiscarded(bookmarkField))
947 size_t sequence = subP->log(bookmarkField);
950 bookmarks->insert(std::make_pair(copy, sequence));
958 bookmarks->insert(std::make_pair(copy,0));
964 AMPS_READ8((_log+_logOffset),bookmarkLen);
965 _logOffset +=
sizeof(size_t);
966 bookmarkField.assign(_log+_logOffset, bookmarkLen);
967 _logOffset += bookmarkLen;
968 size_t sequence = AMPS_UNSET_INDEX;
969 ReadMapIter iter = recovered.find(sub);
970 if (iter != recovered.end())
972 BookmarkMap* bookmarks = iter->second;
973 BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
974 if (bookmarkIter != bookmarks->end())
976 sequence = bookmarkIter->second;
977 Message::Field bookmarkToClear(bookmarkIter->first);
978 bookmarkToClear.
clear();
979 bookmarks->erase(bookmarkIter);
982 Subscription* subP = find(sub);
983 if (sequence != AMPS_UNSET_INDEX)
986 if (sequence) subP->discard(sequence);
990 subP->discard(bookmarkField);
994 case ENTRY_PERSISTED:
996 AMPS_READ8((_log+_logOffset),bookmarkLen);
997 _logOffset +=
sizeof(size_t);
998 bookmarkField.assign(_log+_logOffset, bookmarkLen);
999 _logOffset += bookmarkLen;
1000 MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1004 if (lastGoodOffset == 0)
1006 error(
"Error while recovering bookmark store file.", getErrorNo());
1010 _logOffset = lastGoodOffset;
1014 lastGoodOffset = _logOffset;
1015 if (!inError) subLen = *(
reinterpret_cast<size_t*
>(_log + _logOffset));
1017 for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1019 if (recovered.count(i->first) && !recovered[i->first]->empty())
1021 if (i->second->getMostRecent(
false).len() > 1)
1023 i->second->justRecovered();
1030 _subs[i->first] =
new Subscription(
this, i->first);
1034 for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1036 std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1038 Message::Field f = i->first;
1041 _recovering =
false;
1045 std::string _fileName;
1052 static size_t getPageSize()
1054 static size_t pageSize;
1058 SYSTEM_INFO SYS_INFO;
1059 GetSystemInfo(&SYS_INFO);
1060 pageSize = SYS_INFO.dwPageSize;
1062 pageSize = (size_t)sysconf(_SC_PAGESIZE);
1073 #endif // _MMAPBOOKMARKSTORE_H_ virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:216
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:984
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:55
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:150
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:182
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:244
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: MMapBookmarkStore.hpp:239
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MMapBookmarkStore.hpp:168
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1074
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:93
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:230
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1076
MMapBookmarkStore(const std::string &fileName_)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:96
MMapBookmarkStore(const char *fileName_)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:77
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: MemoryBookmarkStore.hpp:1065
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:50
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:250
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:129
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
Definition: ampsplusplus.hpp:136
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:196