28 #ifndef _RINGBOOKMARKSTORE_H_ 29 #define _RINGBOOKMARKSTORE_H_ 38 #include <sys/types.h> 42 #if !defined(MREMAP_MAYMOVE) 43 #define MREMAP_MAYMOVE 0 65 struct SubscriptionPosition
79 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
91 , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
97 init(fileName_.c_str());
103 UnmapViewOfFile(_log);
104 CloseHandle(_mapFile);
107 munmap(_log, _fileSize);
121 Lock<Mutex> guard(_lock);
122 size_t ret = MemoryBookmarkStore::_log(message_);
133 Lock<Mutex> guard(_lock);
134 MemoryBookmarkStore::_discard(message_);
138 write(subId, MemoryBookmarkStore::_getMostRecent(subId));
150 Lock<Mutex> guard(_lock);
151 MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
152 write(subId_, MemoryBookmarkStore::_getMostRecent(subId_));
162 Lock<Mutex> guard(_lock);
163 return find(subId_)->getMostRecent();
176 message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(MemoryBookmarkStore::find(subId)));
187 Lock<Mutex> guard(_lock);
188 _positionMap.clear();
189 memset(_log, 0, _fileSize);
190 MemoryBookmarkStore::_purge();
200 Lock<Mutex> guard(_lock);
201 Lock<Mutex> fileGuard(_fileLock);
202 Lock<Mutex> posGuard(_posLock);
203 if(_positionMap.count(subId_) == 0)
208 MemoryBookmarkStore::_purge(subId_);
210 SubscriptionPosition pos = _positionMap[subId_];
211 memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
212 AMPS_RING_ENTRY_SIZE);
215 for (
size_t index = pos._index; index < _currentIndex - 1; ++index)
217 char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
218 memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
219 char* end = (
char*)memchr(start,
'\0', AMPS_RING_BYTES_SUBID);
221 sub.assign(start, (
size_t)(end - start));
222 _positionMap[sub]._index = index;
224 _positionMap.erase(subId_);
228 memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
229 AMPS_RING_ENTRY_SIZE);
239 void init(
const char* fileName_)
242 _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
243 NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
244 if( _file == INVALID_HANDLE_VALUE )
246 DWORD err = getErrorNo();
247 std::ostringstream os;
248 os <<
"Failed to create file " << fileName_ <<
" for RingBookmarkStore\n";
249 error(os.str(), err);
251 LARGE_INTEGER liFileSize;
252 if(GetFileSizeEx(_file, &liFileSize) == 0)
254 error(
"Failure getting file size for RingBookmarkStore.", getErrorNo());
256 DWORD fsLow = liFileSize.LowPart;
257 DWORD fsHigh = liFileSize.HighPart;
259 size_t fileSize = liFileSize.QuadPart;
261 size_t fileSize = liFileSize.LowPart;
263 size_t existingSize = AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE;
264 if(existingSize > fileSize)
266 fsLow = (DWORD)existingSize;
268 fsHigh = (DWORD)(existingSize >> 32);
271 setFileSize(fsHigh, fsLow);
273 _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
276 int err = getErrorNo();
277 std::ostringstream os;
278 os <<
"Failed to open log file " << fileName_ <<
" for RingBookmarkStore";
279 error(os.str(), err);
282 if(fstat(_fd, &statBuf) == -1)
284 int err = getErrorNo();
285 std::ostringstream os;
286 os <<
"Failed to stat log file " << fileName_ <<
" for RingBookmarkStore";
287 error(os.str(), err);
289 size_t fSize = (size_t)statBuf.st_size;
291 if (::write(_fd,
"\0\0\0\0", 4) != 4)
292 error(
"Failed to initialize empty file.", getErrorNo());
293 setFileSize((fSize > AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE ?
294 fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
300 DWORD getErrorNo()
const 302 return GetLastError();
305 void error(
const std::string& message_, DWORD err)
307 std::ostringstream os;
308 static const DWORD msgSize = 2048;
310 DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
311 FORMAT_MESSAGE_ARGUMENT_ARRAY,
312 NULL, err, LANG_NEUTRAL,
313 pMsg, msgSize, NULL);
314 os << message_ <<
". Error is " << pMsg;
315 throw StoreException(os.str());
318 int getErrorNo()
const 323 void error(
const std::string& message_,
int err)
325 std::ostringstream os;
326 os << message_ <<
". Error is " << strerror(err);
327 throw StoreException(os.str());
335 Lock<Mutex> guard(_fileLock);
338 SubscriptionPosition& pos = findPos(subId_);
339 size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
341 char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
345 offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
346 size_t len = bookmark_.
len();
348 memcpy(offset, static_cast<const void*>(bookmark_.
data()), len);
351 memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
353 offset = offset - len - 1;
356 pos._current = nextPos;
361 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize()-1)&0xFFFFFFFFFFFFFFFF);
363 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (
size_t)~(getPageSize()-1);
365 if(!FlushViewOfFile(_log + syncStart, pos._index - syncStart + AMPS_RING_ENTRY_SIZE))
367 size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize()-1);
368 if(msync(_log+syncStart, pos._index - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
371 error(
"Failed to sync mapped memory", getErrorNo());
377 void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
379 bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
382 UnmapViewOfFile(_log);
383 CloseHandle(_mapFile);
384 _positionMap.clear();
386 _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
387 if(_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
389 error(
"Failed to create map of log file", getErrorNo());
394 size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
396 size_t sz = (size_t)newSizeLow_;
398 _log = (
char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
401 error(
"Failed to map log file to memory", getErrorNo());
412 void setFileSize(
size_t newSize_)
415 size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
421 if(newSize_ <= _fileSize)
424 if(lseek(_fd, (off_t)sz, SEEK_SET) == -1)
426 error(
"Seek failed for RingBookmarkStore", getErrorNo());
428 if(::write(_fd,
"", 1) == -1)
430 error(
"Failed to grow RingBookmarkStore", getErrorNo());
432 void* newLog = MAP_FAILED;
435 _positionMap.clear();
438 newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
441 newLog = mmap(_log + _fileSize, sz, PROT_READ|PROT_WRITE,
442 MAP_SHARED|MAP_FIXED, _fd, (off_t)sz);
446 munmap(_log,_fileSize);
447 newLog = mmap(0,sz,PROT_READ|PROT_WRITE,MAP_SHARED,_fd,0);
454 newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
458 if(newLog == MAP_FAILED)
460 error(
"Failed to map log file to memory", getErrorNo());
465 _log =
static_cast<char*
>(newLog);
478 size_t maxEntries = _fileSize/AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize/AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
479 for(; _currentIndex < maxEntries; ++_currentIndex)
481 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
487 char* end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_SUBID);
494 sub.assign(offset, (
size_t)(end - offset));
496 MemoryBookmarkStore::find(sub);
499 SubscriptionPosition& pos = _positionMap[sub];
500 pos._index = _currentIndex;
501 offset += AMPS_RING_BYTES_SUBID;
502 size_t foundCursor = AMPS_RING_POSITIONS;
503 for(pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
505 if(offset[pos._current*AMPS_RING_BYTES_BOOKMARK] ==
'*')
508 pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
510 if(offset[foundCursor*AMPS_RING_BYTES_BOOKMARK] ==
'*')
512 pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
517 if(pos._current >= AMPS_RING_POSITIONS)
525 offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
527 end = (
char*)memchr(offset,
'\0', AMPS_RING_BYTES_BOOKMARK);
528 if(end && end != offset)
531 bookmarkField.assign(offset + 1, (
size_t)(end - offset - 1));
532 MemoryBookmarkStore::setMostRecent(sub, bookmarkField);
541 Lock<Mutex> guard(_posLock);
542 if(_positionMap.count(subId_) == 0)
546 char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
547 size_t len = subId_.
len();
548 memcpy(offset, static_cast<const void*>(subId_.
data()), len);
552 subId.assign(offset, len);
553 _positionMap[subId]._index = _currentIndex;
554 _positionMap[subId]._current = 0;
557 memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
561 return _positionMap[subId_];
568 throw StoreException(
"A valid subscription ID must be provided to the RingBookmarkStore");
571 return MemoryBookmarkStore::find(subId_);
577 size_t _currentIndex;
586 typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
587 PositionMap _positionMap;
590 static DWORD getPageSize()
592 static DWORD pageSize = 0;
595 SYSTEM_INFO SYS_INFO;
596 GetSystemInfo(&SYS_INFO);
597 pageSize = SYS_INFO.dwPageSize;
599 static size_t getPageSize()
601 static size_t pageSize = 0UL;
604 pageSize = (size_t)sysconf(_SC_PAGESIZE);
616 #endif // _RINGBOOKMARKSTORE_H_ virtual bool isDiscarded(Message &message_)
Since we don't persist discarded messages after the most recently logged bookmark, we always assume any message tested for should be delivered and is not discarded.
Definition: RingBookmarkStore.hpp:172
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:185
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:63
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1074
bool empty() const
Returns 'true' if empty, 'false' otherwise.
Definition: Field.hpp:93
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:131
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:160
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:50
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:76
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:119
virtual void noPersistedAcks(const Message::Field &)
RingBookmarkStore never uses persisted acks so this is a no-op.
Definition: RingBookmarkStore.hpp:234
Definition: ampsplusplus.hpp:136
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: RingBookmarkStore.hpp:148
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:198