26 #ifndef _RINGBOOKMARKSTORE_H_ 27 #define _RINGBOOKMARKSTORE_H_ 29 #define AMPS_RING_POSITIONS 3 31 #define AMPS_RING_BYTES_BOOKMARK (AMPS_MAX_BOOKMARK_LEN * 6 + 8) 32 #define AMPS_RING_ENTRY_SIZE 1024 33 #define AMPS_RING_BYTES_SUBID ( AMPS_RING_ENTRY_SIZE - ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) ) 34 #define AMPS_RING_ENTRIES 32 43 #include <sys/types.h> 47 #if !defined(MREMAP_MAYMOVE) 48 #define MREMAP_MAYMOVE 0 70 struct SubscriptionPosition
84 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
88 , _ringRecovering(true)
96 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
100 , _ringRecovering(
true)
102 init(fileName_.c_str());
108 UnmapViewOfFile(_log);
110 CloseHandle(_mapFile);
111 _mapFile = INVALID_HANDLE_VALUE;
113 _file = INVALID_HANDLE_VALUE;
115 munmap(_log, _fileSize);
122 _ringRecovering =
true;
131 Lock<Mutex> guard(_lock);
132 size_t ret = MemoryBookmarkStore::_log(message_);
133 if (BookmarkRange::isRange(message_.
getBookmark()))
138 write(subId, MemoryBookmarkStore::_getMostRecent(subId,
false));
150 Lock<Mutex> guard(_lock);
151 if (MemoryBookmarkStore::_discard(message_) && _recentChanged)
156 write(subId, MemoryBookmarkStore::_getMostRecent(subId,
false));
157 _recentChanged =
false;
170 Lock<Mutex> guard(_lock);
171 if (MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_)
174 write(subId_, MemoryBookmarkStore::_getMostRecent(subId_,
false));
175 _recentChanged =
false;
187 Lock<Mutex> guard(_lock);
188 MemoryBookmarkStore::_persisted(find(subId_), bookmark_);
189 if (_recentChanged) {
190 write(subId_, MemoryBookmarkStore::_getMostRecent(subId_,
false));
191 _recentChanged =
false;
202 Lock<Mutex> guard(_lock);
203 return MemoryBookmarkStore::_getMostRecent(subId_);
213 Lock<Mutex> guard(_lock);
214 _positionMap.clear();
215 memset(_log, 0, _fileSize);
216 MemoryBookmarkStore::_purge();
227 Lock<Mutex> guard(_lock);
228 Lock<Mutex> fileGuard(_fileLock);
229 Lock<Mutex> posGuard(_posLock);
230 if(_positionMap.count(subId_) == 0)
235 MemoryBookmarkStore::_purge(subId_);
237 SubscriptionPosition pos = _positionMap[subId_];
238 memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
239 AMPS_RING_ENTRY_SIZE);
242 for (
size_t index = pos._index; index < _currentIndex - 1; ++index)
244 char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
245 memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
246 char* end = (
char*)memchr(start,
'\0', AMPS_RING_BYTES_SUBID);
248 sub.assign(start, (
size_t)(end - start));
249 _positionMap[sub]._index = index;
251 _positionMap.erase(subId_);
255 memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
256 AMPS_RING_ENTRY_SIZE);
260 void init(
const char* fileName_)
263 _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
264 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
265 if( _file == INVALID_HANDLE_VALUE )
267 DWORD err = getErrorNo();
268 std::ostringstream os;
269 os <<
"Failed to create file " << fileName_ <<
" for RingBookmarkStore\n";
270 error(os.str(), err);
272 LARGE_INTEGER liFileSize;
273 if(GetFileSizeEx(_file, &liFileSize) == 0)
275 error(
"Failure getting file size for RingBookmarkStore.", getErrorNo());
277 DWORD fsLow = liFileSize.LowPart;
278 DWORD fsHigh = liFileSize.HighPart;
280 size_t fileSize = liFileSize.QuadPart;
282 size_t fileSize = liFileSize.LowPart;
284 size_t existingSize = AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE;
285 if(existingSize > fileSize)
287 fsLow = (DWORD)existingSize;
289 fsHigh = (DWORD)(existingSize >> 32);
292 setFileSize(fsHigh, fsLow);
294 _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
297 int err = getErrorNo();
298 std::ostringstream os;
299 os <<
"Failed to open log file " << fileName_ <<
" for RingBookmarkStore";
300 error(os.str(), err);
303 if(fstat(_fd, &statBuf) == -1)
305 int err = getErrorNo();
306 std::ostringstream os;
307 os <<
"Failed to stat log file " << fileName_ <<
" for RingBookmarkStore";
308 error(os.str(), err);
310 size_t fSize = (size_t)statBuf.st_size;
312 if (::write(_fd,
"\0\0\0\0", 4) != 4)
313 error(
"Failed to initialize empty file.", getErrorNo());
314 setFileSize((fSize > AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE ?
315 fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
321 DWORD getErrorNo()
const 323 return GetLastError();
326 void error(
const std::string& message_, DWORD err)
328 std::ostringstream os;
329 static const DWORD msgSize = 2048;
331 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
332 FORMAT_MESSAGE_ARGUMENT_ARRAY,
333 NULL, err, LANG_NEUTRAL,
334 pMsg, msgSize, NULL);
335 os << message_ <<
". Error is " << pMsg;
336 throw StoreException(os.str());
339 int getErrorNo()
const 344 void error(
const std::string& message_,
int err)
346 std::ostringstream os;
347 os << message_ <<
". Error is " << strerror(err);
348 throw StoreException(os.str());
356 Lock<Mutex> guard(_fileLock);
357 if( !_ringRecovering)
359 if (bookmark_.
len() > AMPS_RING_BYTES_BOOKMARK)
361 throw StoreException(
"Bookmark is too large for fixed size storage. Consider rebuilding after changing AMPS_RING_BYTES_BOOKMARK in include/RingBookmarkStore.hpp");
363 SubscriptionPosition& pos = findPos(subId_);
364 size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
366 char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
370 offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
371 size_t len = bookmark_.
len();
373 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
376 memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
378 offset = offset - len - 1;
381 pos._current = nextPos;
386 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize()-1)&0xFFFFFFFFFFFFFFFF);
388 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (
size_t)~(getPageSize()-1);
390 if(!FlushViewOfFile(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE))
392 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize()-1);
393 if(msync(_log+syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
396 error(
"Failed to sync mapped memory", getErrorNo());
402 void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
404 bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
407 UnmapViewOfFile(_log);
408 CloseHandle(_mapFile);
409 _positionMap.clear();
411 _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
412 if(_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
414 error(
"Failed to create map of log file", getErrorNo());
419 size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
421 size_t sz = (size_t)newSizeLow_;
423 _log = (
char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
426 error(
"Failed to map log file to memory", getErrorNo());
437 void setFileSize(
size_t newSize_)
439 bool remap = (_log != 0);
441 size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
447 if(newSize_ <= _fileSize)
450 if(lseek(_fd, (off_t)sz, SEEK_SET) == -1)
452 error(
"Seek failed for RingBookmarkStore", getErrorNo());
454 if(::write(_fd,
"", 1) == -1)
456 error(
"Failed to grow RingBookmarkStore", getErrorNo());
458 void* newLog = MAP_FAILED;
461 _positionMap.clear();
464 newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
467 newLog = mmap(_log + _fileSize, sz, PROT_READ|PROT_WRITE,
468 MAP_SHARED|MAP_FIXED, _fd, (off_t)sz);
472 munmap(_log,_fileSize);
473 newLog = mmap(0,sz,PROT_READ|PROT_WRITE,MAP_SHARED,_fd,0);
480 newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
484 if(newLog == MAP_FAILED)
488 error(
"Failed to map log file to memory", getErrorNo());
490 _log =
static_cast<char*
>(newLog);
499 _ringRecovering =
true;
504 size_t maxEntries = _fileSize/AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize/AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
505 for(; _currentIndex < maxEntries; ++_currentIndex)
507 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
513 char* end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_SUBID);
520 sub.assign(offset, (
size_t)(end - offset));
522 Subscription* subPtr = MemoryBookmarkStore::find(sub);
525 SubscriptionPosition& pos = _positionMap[sub];
526 pos._index = _currentIndex;
527 offset += AMPS_RING_BYTES_SUBID;
528 size_t foundCursor = AMPS_RING_POSITIONS;
529 for(pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
531 if(offset[pos._current*AMPS_RING_BYTES_BOOKMARK] ==
'*')
534 pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
536 if(offset[foundCursor*AMPS_RING_BYTES_BOOKMARK] ==
'*')
538 pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
543 if(pos._current >= AMPS_RING_POSITIONS)
551 offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
553 end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_BOOKMARK);
554 if(end && end != offset)
557 bookmarkField.assign(offset + 1, (
size_t)(end - offset - 1));
559 if (!BookmarkRange::isRange(bookmarkField))
562 subPtr->isDiscarded(bookmarkField);
564 subPtr->discard(subPtr->log(bookmarkField));
568 _ringRecovering =
false;
573 Lock<Mutex> guard(_posLock);
574 if(_positionMap.count(subId_) == 0)
578 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
579 size_t len = subId_.
len();
580 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
584 subId.assign(offset, len);
585 _positionMap[subId]._index = _currentIndex;
586 _positionMap[subId]._current = 0;
589 memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
593 return _positionMap[subId_];
600 throw StoreException(
"A valid subscription ID must be provided to the RingBookmarkStore");
603 return MemoryBookmarkStore::find(subId_);
609 size_t _currentIndex;
618 typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
619 PositionMap _positionMap;
620 bool _ringRecovering;
622 static DWORD getPageSize()
624 static DWORD pageSize = 0;
627 SYSTEM_INFO SYS_INFO;
628 GetSystemInfo(&SYS_INFO);
629 pageSize = SYS_INFO.dwPageSize;
631 static size_t getPageSize()
633 static size_t pageSize = 0UL;
636 pageSize = (size_t)sysconf(_SC_PAGESIZE);
648 #endif // _RINGBOOKMARKSTORE_H_ Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1107
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:211
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:68
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1098
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: RingBookmarkStore.hpp:184
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1132
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:148
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:200
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:81
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:129
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1122
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: RingBookmarkStore.hpp:168
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:225