AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.0
RingBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RINGBOOKMARKSTORE_H_
27 #define _RINGBOOKMARKSTORE_H_
28 
29 #define AMPS_RING_POSITIONS 3
30 // Setting bookmark max at 6 bookmarks in a range (5 commas, open, :, and close)
31 #define AMPS_RING_BYTES_BOOKMARK (AMPS_MAX_BOOKMARK_LEN * 6 + 8)
32 #define AMPS_RING_ENTRY_SIZE 1024
33 #define AMPS_RING_BYTES_SUBID ( AMPS_RING_ENTRY_SIZE - ( AMPS_RING_POSITIONS * AMPS_RING_BYTES_BOOKMARK ) )
34 #define AMPS_RING_ENTRIES 32
35 
36 #include <MemoryBookmarkStore.hpp>
37 #ifdef _WIN32
38 #include <windows.h>
39 #else
40 #include <sys/mman.h>
41 #include <unistd.h>
42 #endif
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 
47 #if !defined(MREMAP_MAYMOVE)
48 #define MREMAP_MAYMOVE 0
49 #endif
50 
55 
56 namespace AMPS
57 {
69 {
70  struct SubscriptionPosition
71  {
72  size_t _index;
73  size_t _current;
74  };
75 
76 public:
81  RingBookmarkStore(const char* fileName_)
82  : MemoryBookmarkStore(), _fileSize(0), _currentIndex(0), _log(0)
83 #ifdef _WIN32
84  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
85 #else
86  , _fd(0)
87 #endif
88  , _ringRecovering(true)
89  {
90  init(fileName_);
91  }
92 
93  RingBookmarkStore(const std::string& fileName_)
94  : MemoryBookmarkStore(), _fileSize(0), _currentIndex(0), _log(0)
95 #ifdef _WIN32
96  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
97 #else
98  , _fd(0)
99 #endif
100  , _ringRecovering(true)
101  {
102  init(fileName_.c_str());
103  }
104 
105  virtual ~RingBookmarkStore()
106  {
107 #ifdef _WIN32
108  UnmapViewOfFile(_log);
109  _log = 0;
110  CloseHandle(_mapFile);
111  _mapFile = INVALID_HANDLE_VALUE;
112  CloseHandle(_file);
113  _file = INVALID_HANDLE_VALUE;
114 #else
115  munmap(_log, _fileSize);
116  _log = 0;
117  close(_fd);
118  _fd = 0;
119 #endif
120  // In case _lock gets acquired by reader thread between end of this
121  // destructor and start of base class destructor, prevent write()
122  _ringRecovering = true;
123  }
124 
129  virtual size_t log(Message& message_)
130  {
131  Lock<Mutex> guard(_lock);
132  size_t ret = MemoryBookmarkStore::_log(message_);
133  if (BookmarkRange::isRange(message_.getBookmark()))
134  {
135  Message::Field subId = message_.getSubscriptionId();
136  if (subId.empty())
137  subId = message_.getSubscriptionIds();
138  write(subId, MemoryBookmarkStore::_getMostRecent(subId, false));
139  }
140  return ret;
141  }
142 
148  virtual void discard(const Message& message_)
149  {
150  Lock<Mutex> guard(_lock);
151  if (MemoryBookmarkStore::_discard(message_) && _recentChanged)
152  {
153  Message::Field subId = message_.getSubscriptionId();
154  if (subId.empty())
155  subId = message_.getSubscriptionIds();
156  write(subId, MemoryBookmarkStore::_getMostRecent(subId, false));
157  _recentChanged = false;
158  }
159  }
160 
168  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
169  {
170  Lock<Mutex> guard(_lock);
171  if (MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_)
172  && _recentChanged)
173  {
174  write(subId_, MemoryBookmarkStore::_getMostRecent(subId_, false));
175  _recentChanged = false;
176  }
177  }
178 
184  virtual void persisted(const Message::Field& subId_,
185  const Message::Field& bookmark_)
186  {
187  Lock<Mutex> guard(_lock);
188  MemoryBookmarkStore::_persisted(find(subId_), bookmark_);
189  if (_recentChanged) {
190  write(subId_, MemoryBookmarkStore::_getMostRecent(subId_, false));
191  _recentChanged = false;
192  }
193  }
194 
201  {
202  Lock<Mutex> guard(_lock);
203  return MemoryBookmarkStore::_getMostRecent(subId_);
204  }
205 
211  virtual void purge()
212  {
213  Lock<Mutex> guard(_lock);
214  _positionMap.clear();
215  memset(_log, 0, _fileSize);
216  MemoryBookmarkStore::_purge();
217  _currentIndex = 0;
218  }
219 
225  virtual void purge(const Message::Field& subId_)
226  {
227  Lock<Mutex> guard(_lock);
228  Lock<Mutex> fileGuard(_fileLock);
229  Lock<Mutex> posGuard(_posLock);
230  if(_positionMap.count(subId_) == 0)
231  {
232  return;
233  }
234  // Remove from memory
235  MemoryBookmarkStore::_purge(subId_);
236  // Remove from the file
237  SubscriptionPosition pos = _positionMap[subId_];
238  memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
239  AMPS_RING_ENTRY_SIZE);
240  // Move any following subs back an index
241  Message::Field sub;
242  for (size_t index = pos._index; index < _currentIndex - 1; ++index)
243  {
244  char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
245  memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
246  char* end = (char*)memchr(start, '\0', AMPS_RING_BYTES_SUBID);
247  if (!end) break;
248  sub.assign(start, (size_t)(end - start));
249  _positionMap[sub]._index = index;
250  }
251  _positionMap.erase(subId_);
252  // We have one less sub
253  --_currentIndex;
254  // Clear the end
255  memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
256  AMPS_RING_ENTRY_SIZE);
257  }
258 
259 private:
260  void init(const char* fileName_)
261  {
262 #ifdef _WIN32
263  _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
264  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
265  if( _file == INVALID_HANDLE_VALUE )
266  {
267  DWORD err = getErrorNo();
268  std::ostringstream os;
269  os << "Failed to create file " << fileName_ << " for RingBookmarkStore\n";
270  error(os.str(), err);
271  }
272  LARGE_INTEGER liFileSize;
273  if(GetFileSizeEx(_file, &liFileSize) == 0)
274  {
275  error("Failure getting file size for RingBookmarkStore.", getErrorNo());
276  }
277  DWORD fsLow = liFileSize.LowPart;
278  DWORD fsHigh = liFileSize.HighPart;
279 #ifdef _WIN64
280  size_t fileSize = liFileSize.QuadPart;
281 #else
282  size_t fileSize = liFileSize.LowPart;
283 #endif
284  size_t existingSize = AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE;
285  if(existingSize > fileSize)
286  {
287  fsLow = (DWORD)existingSize;
288 #ifdef _WIN64
289  fsHigh = (DWORD)(existingSize >> 32);
290 #endif
291  }
292  setFileSize(fsHigh, fsLow);
293 #else
294  _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
295  if(_fd == -1)
296  {
297  int err = getErrorNo();
298  std::ostringstream os;
299  os << "Failed to open log file " << fileName_ << " for RingBookmarkStore";
300  error(os.str(), err);
301  }
302  struct stat statBuf;
303  if(fstat(_fd, &statBuf) == -1)
304  {
305  int err = getErrorNo();
306  std::ostringstream os;
307  os << "Failed to stat log file " << fileName_ << " for RingBookmarkStore";
308  error(os.str(), err);
309  }
310  size_t fSize = (size_t)statBuf.st_size;
311  if (fSize == 0)
312  if (::write(_fd, "\0\0\0\0", 4) != 4)
313  error("Failed to initialize empty file.", getErrorNo());
314  setFileSize((fSize > AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE ?
315  fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
316 #endif
317  recover();
318  }
319 
320 #ifdef _WIN32
321  DWORD getErrorNo() const
322  {
323  return GetLastError();
324  }
325 
326  void error(const std::string& message_, DWORD err)
327  {
328  std::ostringstream os;
329  static const DWORD msgSize = 2048;
330  char pMsg[msgSize];
331  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
332  FORMAT_MESSAGE_ARGUMENT_ARRAY,
333  NULL, err, LANG_NEUTRAL,
334  pMsg, msgSize, NULL);
335  os << message_ << ". Error is " << pMsg;
336  throw StoreException(os.str());
337  }
338 #else
339  int getErrorNo() const
340  {
341  return errno;
342  }
343 
344  void error(const std::string& message_, int err)
345  {
346  std::ostringstream os;
347  os << message_ << ". Error is " << strerror(err);
348  throw StoreException(os.str());
349  }
350 #endif
351 
352  // Used to log the new _mostRecent bookmark
353  void write(const Message::Field& subId_,
354  const Message::Field& bookmark_)
355  {
356  Lock<Mutex> guard(_fileLock);
357  if( !_ringRecovering)
358  {
359  if (bookmark_.len() > AMPS_RING_BYTES_BOOKMARK)
360  {
361  throw StoreException("Bookmark is too large for fixed size storage. Consider rebuilding after changing AMPS_RING_BYTES_BOOKMARK in include/RingBookmarkStore.hpp");
362  }
363  SubscriptionPosition& pos = findPos(subId_);
364  size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
365  // Get pointer to start of next position for cursor
366  char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
367  // Write the 'cursor' to start of following entry and advance offset
368  *offset = '*';
369  // Change offset to beginning of current bookmark
370  offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
371  size_t len = bookmark_.len();
372  // Write the bookmark and advance offset
373  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
374  offset += len;
375  // Set extra bytes to NULL
376  memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
377  // Return to beginning and change the cursor
378  offset = offset - len - 1;
379  *offset = '+';
380  // Update current for the next write
381  pos._current = nextPos;
382 
383  // Sync the changes to disk
384 #ifdef _WIN32
385 #ifdef _WIN64
386  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize()-1)&0xFFFFFFFFFFFFFFFF);
387 #else
388  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (size_t)~(getPageSize()-1);
389 #endif
390  if(!FlushViewOfFile(_log + syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE))
391 #else
392  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize()-1);
393  if(msync(_log+syncStart, (pos._index * AMPS_RING_ENTRY_SIZE) - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
394 #endif
395  {
396  error("Failed to sync mapped memory", getErrorNo());
397  }
398  }
399  }
400 
401 #ifdef _WIN32
402  void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
403  {
404  bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
405  if(remap)
406  {
407  UnmapViewOfFile(_log);
408  CloseHandle(_mapFile);
409  _positionMap.clear();
410  }
411  _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
412  if(_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
413  {
414  error("Failed to create map of log file", getErrorNo());
415  _log = 0;
416  _fileSize = 0;
417  }
418 #ifdef _WIN64
419  size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
420 #else
421  size_t sz = (size_t)newSizeLow_;
422 #endif
423  _log = (char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
424  if(_log == NULL)
425  {
426  error("Failed to map log file to memory", getErrorNo());
427  _log = 0;
428  _fileSize = 0;
429  return;
430  }
431  _fileSize = sz;
432  // Call recover to reset the _positionMap
433  if(remap)
434  recover();
435  }
436 #else
437  void setFileSize(size_t newSize_)
438  {
439  bool remap = (_log != 0);
440  // Make sure we're using a multiple of page size
441  size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
442  if(sz < newSize_)
443  {
444  sz += getPageSize();
445  }
446  // Improper resize attempt
447  if(newSize_ <= _fileSize)
448  return;
449  // Extend the underlying file
450  if(lseek(_fd, (off_t)sz, SEEK_SET) == -1)
451  {
452  error("Seek failed for RingBookmarkStore", getErrorNo());
453  }
454  if(::write(_fd, "", 1) == -1)
455  {
456  error("Failed to grow RingBookmarkStore", getErrorNo());
457  }
458  void* newLog = MAP_FAILED;
459  if(_log)
460  {
461  _positionMap.clear();
462 
463 #ifdef linux
464  newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
465 #else
466  // try making a new mmap right after the current one.
467  newLog = mmap(_log + _fileSize, sz, PROT_READ|PROT_WRITE,
468  MAP_SHARED|MAP_FIXED, _fd, (off_t)sz);
469  if(newLog!=_log)
470  {
471  // this mmap is relatively small; better to just close the old mmap and reset.
472  munmap(_log,_fileSize);
473  newLog = mmap(0,sz,PROT_READ|PROT_WRITE,MAP_SHARED,_fd,0);
474  }
475 #endif
476  }
477  else // New mapping
478  {
479  // New mapping, map the full file size for recovery or else it std size
480  newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
481  }
482  _fileSize = sz;
483 
484  if(newLog == MAP_FAILED)
485  {
486  _log = 0;
487  _fileSize = 0;
488  error("Failed to map log file to memory", getErrorNo());
489  }
490  _log = static_cast<char*>(newLog);
491  if(remap)
492  recover();
493  }
494 #endif
495 
496  void recover(void)
497  {
498  //Lock<Mutex> guard(_lock);
499  _ringRecovering = true;
500  Message::Field sub;
501  Message::Field bookmarkField;
502 
503  _currentIndex = 0;
504  size_t maxEntries = _fileSize/AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize/AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
505  for(; _currentIndex < maxEntries; ++_currentIndex)
506  {
507  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
508  if(*offset == '\0')
509  {
510  break;
511  }
512  //It's possible we wrote the subId and not NULLs, so be careful
513  char* end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_SUBID);
514  if(!end)
515  {
516  // Failed subscription id read, we're done
517  break;
518  }
519  // Safe to continue
520  sub.assign(offset, (size_t)(end - offset));
521  // Put this sub into the MemoryBookmarkStore
522  Subscription* subPtr = MemoryBookmarkStore::find(sub);
523  // Put this sub into the _positionMap
524  // This is recovery, so do it directly and not with findPos
525  SubscriptionPosition& pos = _positionMap[sub];
526  pos._index = _currentIndex;
527  offset += AMPS_RING_BYTES_SUBID;
528  size_t foundCursor = AMPS_RING_POSITIONS;
529  for(pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
530  {
531  if(offset[pos._current*AMPS_RING_BYTES_BOOKMARK] == '*')
532  {
533  // Subtract one position
534  pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
535  // Subtract one more if a second bookmark is found
536  if(offset[foundCursor*AMPS_RING_BYTES_BOOKMARK] == '*')
537  {
538  pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
539  }
540  break;
541  }
542  }
543  if(pos._current >= AMPS_RING_POSITIONS)
544  {
545  // No valid bookmark found, just use 0
546  pos._current = 0;
547  }
548  else
549  {
550  // We found a cursor
551  offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
552  //It's possible we wrote bookmark and not NULLs, so be careful
553  end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_BOOKMARK);
554  if(end && end != offset)
555  {
556  // add 1 to account for leading '+'
557  bookmarkField.assign(offset + 1, (size_t)(end - offset - 1));
558  // log, discard to make it the most recent
559  if (!BookmarkRange::isRange(bookmarkField))
560  {
561  // This adds bookmark to _publishers
562  subPtr->isDiscarded(bookmarkField);
563  }
564  subPtr->discard(subPtr->log(bookmarkField));
565  }
566  }
567  }
568  _ringRecovering = false;
569  }
570 
571  SubscriptionPosition& findPos(const Message::Field& subId_)
572  {
573  Lock<Mutex> guard(_posLock);
574  if(_positionMap.count(subId_) == 0)
575  {
576  // New subid
577  // Move to its start position and write the sub id
578  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
579  size_t len = subId_.len();
580  memcpy(offset, static_cast<const void*>(subId_.data()), len);
581  // Add it to the map with the current index
582  // Use the data written to the mmap for the subid
583  Message::Field subId;
584  subId.assign(offset, len);
585  _positionMap[subId]._index = _currentIndex;
586  _positionMap[subId]._current = 0;
587  // Fill extra spaces with NULL
588  offset += len;
589  memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
590  // Advance current index
591  ++_currentIndex;
592  }
593  return _positionMap[subId_];
594  }
595 
596  Subscription* find(const Message::Field& subId_)
597  {
598  if(subId_.empty())
599  {
600  throw StoreException("A valid subscription ID must be provided to the RingBookmarkStore");
601  }
602  findPos(subId_);
603  return MemoryBookmarkStore::find(subId_);
604  }
605 
606 
607  Mutex _fileLock;
608  size_t _fileSize;
609  size_t _currentIndex;
610  char* _log;
611 #ifdef _WIN32
612  HANDLE _file;
613  HANDLE _mapFile;
614 #else
615  int _fd;
616 #endif
617  Mutex _posLock;
618  typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
619  PositionMap _positionMap;
620  bool _ringRecovering;
621 #ifdef _WIN32
622  static DWORD getPageSize()
623  {
624  static DWORD pageSize = 0;
625  if(pageSize == 0)
626  {
627  SYSTEM_INFO SYS_INFO;
628  GetSystemInfo(&SYS_INFO);
629  pageSize = SYS_INFO.dwPageSize;
630 #else
631  static size_t getPageSize()
632  {
633  static size_t pageSize = 0UL;
634  if(pageSize == 0)
635  {
636  pageSize = (size_t)sysconf(_SC_PAGESIZE);
637 #endif
638  }
639  return pageSize;
640  }
641 
642 
643 };
644 
645 } // end namespace AMPS
646 
647 
648 #endif // _RINGBOOKMARKSTORE_H_
649 
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1107
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:211
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:68
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1098
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: RingBookmarkStore.hpp:184
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1132
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:148
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:200
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:81
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:129
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1122
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: RingBookmarkStore.hpp:168
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:225