AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
RingBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RINGBOOKMARKSTORE_H_
27 #define _RINGBOOKMARKSTORE_H_
28 
29 #define AMPS_RING_POSITIONS 3
30 // Setting bookmark max at 6 bookmarks in a range (5 commas, open, :, and close)
31 #define AMPS_RING_BYTES_BOOKMARK (AMPS_MAX_BOOKMARK_LEN * 6 + 8)
32 #define AMPS_RING_ENTRY_SIZE 1024
33 #define AMPS_RING_BYTES_SUBID ( AMPS_RING_ENTRY_SIZE - ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) )
34 #define AMPS_RING_ENTRIES 32
35 
36 #include <MemoryBookmarkStore.hpp>
37 #ifdef _WIN32
38  #include <windows.h>
39 #else
40  #include <sys/mman.h>
41  #include <unistd.h>
42 #endif
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 
47 #if !defined(MREMAP_MAYMOVE)
48  #define MREMAP_MAYMOVE 0
49 #endif
50 
55 
56 namespace AMPS
57 {
69  {
70  struct SubscriptionPosition
71  {
72  size_t _index;
73  size_t _current;
74  };
75 
76  public:
81  RingBookmarkStore(const char* fileName_)
82  : MemoryBookmarkStore(), _fileSize(0), _currentIndex(0), _log(0)
83 #ifdef _WIN32
84  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
85 #else
86  , _fd(0)
87 #endif
88  , _ringRecovering(true)
89  {
90  init(fileName_);
91  }
92 
93  RingBookmarkStore(const std::string& fileName_)
94  : MemoryBookmarkStore(), _fileSize(0), _currentIndex(0), _log(0)
95 #ifdef _WIN32
96  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
97 #else
98  , _fd(0)
99 #endif
100  , _ringRecovering(true)
101  {
102  init(fileName_.c_str());
103  }
104 
105  virtual ~RingBookmarkStore()
106  {
107 #ifdef _WIN32
108  UnmapViewOfFile(_log);
109  _log = 0;
110  CloseHandle(_mapFile);
111  _mapFile = INVALID_HANDLE_VALUE;
112  CloseHandle(_file);
113  _file = INVALID_HANDLE_VALUE;
114 #else
115  munmap(_log, _fileSize);
116  _log = 0;
117  close(_fd);
118  _fd = 0;
119 #endif
120  // In case _lock gets acquired by reader thread between end of this
121  // destructor and start of base class destructor, prevent write()
122  _ringRecovering = true;
123  }
124 
129  virtual size_t log(Message& message_)
130  {
131  Lock<Mutex> guard(_lock);
132  size_t ret = MemoryBookmarkStore::_log(message_);
133  if (BookmarkRange::isRange(message_.getBookmark()))
134  {
135  Message::Field subId = message_.getSubscriptionId();
136  if (subId.empty())
137  {
138  subId = message_.getSubscriptionIds();
139  }
140  write(subId, MemoryBookmarkStore::_getMostRecent(subId, false));
141  }
142  return ret;
143  }
144 
150  virtual void discard(const Message& message_)
151  {
152  Lock<Mutex> guard(_lock);
153  if (MemoryBookmarkStore::_discard(message_) && _recentChanged)
154  {
155  Message::Field subId = message_.getSubscriptionId();
156  if (subId.empty())
157  {
158  subId = message_.getSubscriptionIds();
159  }
160  write(subId, MemoryBookmarkStore::_getMostRecent(subId, false));
161  _recentChanged = false;
162  }
163  }
164 
172  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
173  {
174  Lock<Mutex> guard(_lock);
175  if (MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_)
176  && _recentChanged)
177  {
178  write(subId_, MemoryBookmarkStore::_getMostRecent(subId_, false));
179  _recentChanged = false;
180  }
181  }
182 
188  virtual void persisted(const Message::Field& subId_,
189  const Message::Field& bookmark_)
190  {
191  Lock<Mutex> guard(_lock);
192  MemoryBookmarkStore::_persisted(find(subId_), bookmark_);
193  if (_recentChanged)
194  {
195  write(subId_, MemoryBookmarkStore::_getMostRecent(subId_, false));
196  _recentChanged = false;
197  }
198  }
199 
206  {
207  Lock<Mutex> guard(_lock);
208  return MemoryBookmarkStore::_getMostRecent(subId_);
209  }
210 
216  virtual void purge()
217  {
218  Lock<Mutex> guard(_lock);
219  _positionMap.clear();
220  memset(_log, 0, _fileSize);
221  MemoryBookmarkStore::_purge();
222  _currentIndex = 0;
223  }
224 
230  virtual void purge(const Message::Field& subId_)
231  {
232  Lock<Mutex> guard(_lock);
233  Lock<Mutex> fileGuard(_fileLock);
234  Lock<Mutex> posGuard(_posLock);
235  if (_positionMap.count(subId_) == 0)
236  {
237  return;
238  }
239  // Remove from memory
240  MemoryBookmarkStore::_purge(subId_);
241  // Remove from the file
242  SubscriptionPosition pos = _positionMap[subId_];
243  memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
244  AMPS_RING_ENTRY_SIZE);
245  // Move any following subs back an index
246  Message::Field sub;
247  for (size_t index = pos._index; index < _currentIndex - 1; ++index)
248  {
249  char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
250  memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
251  char* end = (char*)memchr(start, '\0', AMPS_RING_BYTES_SUBID);
252  if (!end)
253  {
254  break;
255  }
256  sub.assign(start, (size_t)(end - start));
257  _positionMap[sub]._index = index;
258  }
259  _positionMap.erase(subId_);
260  // We have one less sub
261  --_currentIndex;
262  // Clear the end
263  memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
264  AMPS_RING_ENTRY_SIZE);
265  }
266 
267  private:
268  void init(const char* fileName_)
269  {
270 #ifdef _WIN32
271  _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
272  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
273  if ( _file == INVALID_HANDLE_VALUE )
274  {
275  DWORD err = getErrorNo();
276  std::ostringstream os;
277  os << "Failed to create file " << fileName_ << " for RingBookmarkStore\n";
278  error(os.str(), err);
279  }
280  LARGE_INTEGER liFileSize;
281  if (GetFileSizeEx(_file, &liFileSize) == 0)
282  {
283  error("Failure getting file size for RingBookmarkStore.", getErrorNo());
284  }
285  DWORD fsLow = liFileSize.LowPart;
286  DWORD fsHigh = liFileSize.HighPart;
287 #ifdef _WIN64
288  size_t fileSize = liFileSize.QuadPart;
289 #else
290  size_t fileSize = liFileSize.LowPart;
291 #endif
292  size_t existingSize = AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE;
293  if (existingSize > fileSize)
294  {
295  fsLow = (DWORD)existingSize;
296 #ifdef _WIN64
297  fsHigh = (DWORD)(existingSize >> 32);
298 #endif
299  }
300  setFileSize(fsHigh, fsLow);
301 #else
302  _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
303  if (_fd == -1)
304  {
305  int err = getErrorNo();
306  std::ostringstream os;
307  os << "Failed to open log file " << fileName_ << " for RingBookmarkStore";
308  error(os.str(), err);
309  }
310  struct stat statBuf;
311  if (fstat(_fd, &statBuf) == -1)
312  {
313  int err = getErrorNo();
314  std::ostringstream os;
315  os << "Failed to stat log file " << fileName_ << " for RingBookmarkStore";
316  error(os.str(), err);
317  }
318  size_t fSize = (size_t)statBuf.st_size;
319  if (fSize == 0)
320  if (::write(_fd, "\0\0\0\0", 4) != 4)
321  {
322  error("Failed to initialize empty file.", getErrorNo());
323  }
324  setFileSize((fSize > AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE ?
325  fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
326 #endif
327  recover();
328  }
329 
330 #ifdef _WIN32
331  DWORD getErrorNo() const
332  {
333  return GetLastError();
334  }
335 
336  void error(const std::string& message_, DWORD err)
337  {
338  std::ostringstream os;
339  static const DWORD msgSize = 2048;
340  char pMsg[msgSize];
341  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
342  FORMAT_MESSAGE_ARGUMENT_ARRAY,
343  NULL, err, LANG_NEUTRAL,
344  pMsg, msgSize, NULL);
345  os << message_ << ". Error is " << pMsg;
346  throw StoreException(os.str());
347  }
348 #else
349  int getErrorNo() const
350  {
351  return errno;
352  }
353 
354  void error(const std::string& message_, int err)
355  {
356  std::ostringstream os;
357  os << message_ << ". Error is " << strerror(err);
358  throw StoreException(os.str());
359  }
360 #endif
361 
362  // Used to log the new _mostRecent bookmark
363  void write(const Message::Field& subId_,
364  const Message::Field& bookmark_)
365  {
366  Lock<Mutex> guard(_fileLock);
367  if ( !_ringRecovering)
368  {
369  if (bookmark_.len() > AMPS_RING_BYTES_BOOKMARK)
370  {
371  throw StoreException("Bookmark is too large for fixed size storage. Consider rebuilding after changing AMPS_RING_BYTES_BOOKMARK in include/RingBookmarkStore.hpp");
372  }
373  SubscriptionPosition& pos = findPos(subId_);
374  size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
375  // Get pointer to start of next position for cursor
376  char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
377  // Write the 'cursor' to start of following entry and advance offset
378  *offset = '*';
379  // Change offset to beginning of current bookmark
380  offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
381  size_t len = bookmark_.len();
382  // Write the bookmark and advance offset
383  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
384  offset += len;
385  // Set extra bytes to NULL
386  memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
387  // Return to beginning and change the cursor
388  offset = offset - len - 1;
389  *offset = '+';
390  // Update current for the next write
391  pos._current = nextPos;
392 
393  // Sync the changes to disk
394 #ifdef _WIN32
395 #ifdef _WIN64
396  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize() - 1) & 0xFFFFFFFFFFFFFFFF);
397 #else
398  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (size_t)~(getPageSize() - 1);
399 #endif
400  if (!FlushViewOfFile(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE))
401 #else
402  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize() - 1);
403  if (msync(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
404 #endif
405  {
406  error("Failed to sync mapped memory", getErrorNo());
407  }
408  }
409  }
410 
411 #ifdef _WIN32
412  void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
413  {
414  bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
415  if (remap)
416  {
417  UnmapViewOfFile(_log);
418  CloseHandle(_mapFile);
419  _positionMap.clear();
420  }
421  _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
422  if (_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
423  {
424  error("Failed to create map of log file", getErrorNo());
425  _log = 0;
426  _fileSize = 0;
427  }
428 #ifdef _WIN64
429  size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
430 #else
431  size_t sz = (size_t)newSizeLow_;
432 #endif
433  _log = (char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
434  if (_log == NULL)
435  {
436  error("Failed to map log file to memory", getErrorNo());
437  _log = 0;
438  _fileSize = 0;
439  return;
440  }
441  _fileSize = sz;
442  // Call recover to reset the _positionMap
443  if (remap)
444  {
445  recover();
446  }
447  }
448 #else
449  void setFileSize(size_t newSize_)
450  {
451  bool remap = (_log != 0);
452  // Make sure we're using a multiple of page size
453  size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
454  if (sz < newSize_)
455  {
456  sz += getPageSize();
457  }
458  // Improper resize attempt
459  if (newSize_ <= _fileSize)
460  {
461  return;
462  }
463  // Extend the underlying file
464  if (lseek(_fd, (off_t)sz, SEEK_SET) == -1)
465  {
466  error("Seek failed for RingBookmarkStore", getErrorNo());
467  }
468  if (::write(_fd, "", 1) == -1)
469  {
470  error("Failed to grow RingBookmarkStore", getErrorNo());
471  }
472  void* newLog = MAP_FAILED;
473  if (_log)
474  {
475  _positionMap.clear();
476 
477 #ifdef linux
478  newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
479 #else
480  // try making a new mmap right after the current one.
481  newLog = mmap(_log + _fileSize, sz, PROT_READ | PROT_WRITE,
482  MAP_SHARED | MAP_FIXED, _fd, (off_t)sz);
483  if (newLog != _log)
484  {
485  // this mmap is relatively small; better to just close the old mmap and reset.
486  munmap(_log, _fileSize);
487  newLog = mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0);
488  }
489 #endif
490  }
491  else // New mapping
492  {
493  // New mapping, map the full file size for recovery or else it std size
494  newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
495  }
496  _fileSize = sz;
497 
498  if (newLog == MAP_FAILED)
499  {
500  _log = 0;
501  _fileSize = 0;
502  error("Failed to map log file to memory", getErrorNo());
503  }
504  _log = static_cast<char*>(newLog);
505  if (remap)
506  {
507  recover();
508  }
509  }
510 #endif
511 
512  void recover(void)
513  {
514  //Lock<Mutex> guard(_lock);
515  _ringRecovering = true;
516  Message::Field sub;
517  Message::Field bookmarkField;
518 
519  _currentIndex = 0;
520  size_t maxEntries = _fileSize / AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize / AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
521  for (; _currentIndex < maxEntries; ++_currentIndex)
522  {
523  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
524  if (*offset == '\0')
525  {
526  break;
527  }
528  //It's possible we wrote the subId and not NULLs, so be careful
529  char* end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_SUBID);
530  if (!end)
531  {
532  // Failed subscription id read, we're done
533  break;
534  }
535  // Safe to continue
536  sub.assign(offset, (size_t)(end - offset));
537  // Put this sub into the MemoryBookmarkStore
538  Subscription* subPtr = MemoryBookmarkStore::find(sub);
539  // Put this sub into the _positionMap
540  // This is recovery, so do it directly and not with findPos
541  SubscriptionPosition& pos = _positionMap[sub];
542  pos._index = _currentIndex;
543  offset += AMPS_RING_BYTES_SUBID;
544  size_t foundCursor = AMPS_RING_POSITIONS;
545  for (pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
546  {
547  if (offset[pos._current * AMPS_RING_BYTES_BOOKMARK] == '*')
548  {
549  // Subtract one position
550  pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
551  // Subtract one more if a second bookmark is found
552  if (offset[foundCursor * AMPS_RING_BYTES_BOOKMARK] == '*')
553  {
554  pos._current = (pos._current + (AMPS_RING_POSITIONS - 1)) % AMPS_RING_POSITIONS;
555  }
556  break;
557  }
558  }
559  if (pos._current >= AMPS_RING_POSITIONS)
560  {
561  // No valid bookmark found, just use 0
562  pos._current = 0;
563  }
564  else
565  {
566  // We found a cursor
567  offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
568  //It's possible we wrote bookmark and not NULLs, so be careful
569  end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_BOOKMARK);
570  if (end && end != offset)
571  {
572  // add 1 to account for leading '+'
573  bookmarkField.assign(offset + 1, (size_t)(end - offset - 1));
574  // log, discard to make it the most recent
575  if (!BookmarkRange::isRange(bookmarkField))
576  {
577  // This adds bookmark to _publishers
578  subPtr->isDiscarded(bookmarkField);
579  }
580  subPtr->discard(subPtr->log(bookmarkField));
581  }
582  }
583  }
584  _ringRecovering = false;
585  }
586 
587  SubscriptionPosition& findPos(const Message::Field& subId_)
588  {
589  Lock<Mutex> guard(_posLock);
590  if (_positionMap.count(subId_) == 0)
591  {
592  // New subid
593  // Move to its start position and write the sub id
594  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
595  size_t len = subId_.len();
596  memcpy(offset, static_cast<const void*>(subId_.data()), len);
597  // Add it to the map with the current index
598  // Use the data written to the mmap for the subid
599  Message::Field subId;
600  subId.assign(offset, len);
601  _positionMap[subId]._index = _currentIndex;
602  _positionMap[subId]._current = 0;
603  // Fill extra spaces with NULL
604  offset += len;
605  memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
606  // Advance current index
607  ++_currentIndex;
608  }
609  return _positionMap[subId_];
610  }
611 
612  Subscription* find(const Message::Field& subId_)
613  {
614  if (subId_.empty())
615  {
616  throw StoreException("A valid subscription ID must be provided to the RingBookmarkStore");
617  }
618  findPos(subId_);
619  return MemoryBookmarkStore::find(subId_);
620  }
621 
622 
623  Mutex _fileLock;
624  size_t _fileSize;
625  size_t _currentIndex;
626  char* _log;
627 #ifdef _WIN32
628  HANDLE _file;
629  HANDLE _mapFile;
630 #else
631  int _fd;
632 #endif
633  Mutex _posLock;
634  typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
635  PositionMap _positionMap;
636  bool _ringRecovering;
637 #ifdef _WIN32
638  static DWORD getPageSize()
639  {
640  static DWORD pageSize = 0;
641  if (pageSize == 0)
642  {
643  SYSTEM_INFO SYS_INFO;
644  GetSystemInfo(&SYS_INFO);
645  pageSize = SYS_INFO.dwPageSize;
646 #else
647  static size_t getPageSize()
648  {
649  static size_t pageSize = 0UL;
650  if (pageSize == 0)
651  {
652  pageSize = (size_t)sysconf(_SC_PAGESIZE);
653 #endif
654  }
655  return pageSize;
656  }
657 
658 
659  };
660 
661 } // end namespace AMPS
662 
663 
664 #endif // _RINGBOOKMARKSTORE_H_
665 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:216
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:68
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1136
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: RingBookmarkStore.hpp:188
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1374
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:150
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:205
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:81
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:129
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:172
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:230