AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
RingBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #include <ampsplusplus.hpp>
27 
28 #ifndef _RINGBOOKMARKSTORE_H_
29 #define _RINGBOOKMARKSTORE_H_
30 
31 #include <MemoryBookmarkStore.hpp>
32 #ifdef _WIN32
33 #include <windows.h>
34 #else
35 #include <sys/mman.h>
36 #include <unistd.h>
37 #endif
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 
42 #if !defined(MREMAP_MAYMOVE)
43 #define MREMAP_MAYMOVE 0
44 #endif
45 
50 
51 namespace AMPS
52 {
64 {
65  struct SubscriptionPosition
66  {
67  size_t _index;
68  size_t _current;
69  };
70 
71 public:
76  RingBookmarkStore(const char* fileName_)
77  : MemoryBookmarkStore(false), _fileSize(0), _currentIndex(0), _log(0)
78 #ifdef _WIN32
79  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
80 #else
81  , _fd(0)
82 #endif
83  , _recovering(true)
84  {
85  init(fileName_);
86  }
87 
88  RingBookmarkStore(const std::string& fileName_)
89  : MemoryBookmarkStore(false), _fileSize(0), _currentIndex(0), _log(0)
90 #ifdef _WIN32
91  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
92 #else
93  , _fd(0)
94 #endif
95  , _recovering(true)
96  {
97  init(fileName_.c_str());
98  }
99 
100  virtual ~RingBookmarkStore()
101  {
102 #ifdef _WIN32
103  UnmapViewOfFile(_log);
104  CloseHandle(_mapFile);
105  CloseHandle(_file);
106 #else
107  munmap(_log, _fileSize);
108  close(_fd);
109 #endif
110  // In case _lock gets acquired by reader thread between end of this
111  // destructor and start of base class destructor, prevent write()
112  _recovering = true;
113  }
114 
119  virtual size_t log(Message& message_)
120  {
121  Lock<Mutex> guard(_lock);
122  size_t ret = MemoryBookmarkStore::_log(message_);
123  return ret;
124  }
125 
131  virtual void discard(const Message& message_)
132  {
133  Lock<Mutex> guard(_lock);
134  MemoryBookmarkStore::_discard(message_);
135  Message::Field subId = message_.getSubscriptionId();
136  if (subId.empty())
137  subId = message_.getSubscriptionIds();
138  write(subId, MemoryBookmarkStore::_getMostRecent(subId));
139  }
140 
148  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
149  {
150  Lock<Mutex> guard(_lock);
151  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
152  write(subId_, MemoryBookmarkStore::_getMostRecent(subId_));
153  }
154 
161  {
162  Lock<Mutex> guard(_lock);
163  return find(subId_)->getMostRecent();
164  }
165 
172  virtual bool isDiscarded(Message& message_)
173  {
174  Message::Field subId = message_.getSubscriptionId();
175  if (subId.empty()) subId = message_.getSubscriptionIds();
176  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(MemoryBookmarkStore::find(subId)));
177  return false;
178  }
179 
185  virtual void purge()
186  {
187  Lock<Mutex> guard(_lock);
188  _positionMap.clear();
189  memset(_log, 0, _fileSize);
190  MemoryBookmarkStore::_purge();
191  }
192 
198  virtual void purge(const Message::Field& subId_)
199  {
200  Lock<Mutex> guard(_lock);
201  Lock<Mutex> fileGuard(_fileLock);
202  Lock<Mutex> posGuard(_posLock);
203  if(_positionMap.count(subId_) == 0)
204  {
205  return;
206  }
207  // Remove from memory
208  MemoryBookmarkStore::_purge(subId_);
209  // Remove from the file
210  SubscriptionPosition pos = _positionMap[subId_];
211  memset(_log + (pos._index * AMPS_RING_ENTRY_SIZE), 0,
212  AMPS_RING_ENTRY_SIZE);
213  // Move any following subs back an index
214  Message::Field sub;
215  for (size_t index = pos._index; index < _currentIndex - 1; ++index)
216  {
217  char* start = _log + (index * AMPS_RING_ENTRY_SIZE);
218  memcpy(start, start + AMPS_RING_ENTRY_SIZE, AMPS_RING_ENTRY_SIZE);
219  char* end = (char*)memchr(start, '\0', AMPS_RING_BYTES_SUBID);
220  if (!end) break;
221  sub.assign(start, (size_t)(end - start));
222  _positionMap[sub]._index = index;
223  }
224  _positionMap.erase(subId_);
225  // We have one less sub
226  --_currentIndex;
227  // Clear the end
228  memset(_log + (_currentIndex * AMPS_RING_ENTRY_SIZE), 0,
229  AMPS_RING_ENTRY_SIZE);
230  }
231 
234  virtual void noPersistedAcks(const Message::Field&)
235  {
236  }
237 
238 private:
239  void init(const char* fileName_)
240  {
241 #ifdef _WIN32
242  _file = CreateFileA(fileName_, GENERIC_READ | GENERIC_WRITE, 0,
243  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
244  if( _file == INVALID_HANDLE_VALUE )
245  {
246  DWORD err = getErrorNo();
247  std::ostringstream os;
248  os << "Failed to create file " << fileName_ << " for RingBookmarkStore\n";
249  error(os.str(), err);
250  }
251  LARGE_INTEGER liFileSize;
252  if(GetFileSizeEx(_file, &liFileSize) == 0)
253  {
254  error("Failure getting file size for RingBookmarkStore.", getErrorNo());
255  }
256  DWORD fsLow = liFileSize.LowPart;
257  DWORD fsHigh = liFileSize.HighPart;
258 #ifdef _WIN64
259  size_t fileSize = liFileSize.QuadPart;
260 #else
261  size_t fileSize = liFileSize.LowPart;
262 #endif
263  size_t existingSize = AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE;
264  if(existingSize > fileSize)
265  {
266  fsLow = (DWORD)existingSize;
267 #ifdef _WIN64
268  fsHigh = (DWORD)(existingSize >> 32);
269 #endif
270  }
271  setFileSize(fsHigh, fsLow);
272 #else
273  _fd = open(fileName_, O_RDWR | O_CREAT, (mode_t)0644);
274  if(_fd == -1)
275  {
276  int err = getErrorNo();
277  std::ostringstream os;
278  os << "Failed to open log file " << fileName_ << " for RingBookmarkStore";
279  error(os.str(), err);
280  }
281  struct stat statBuf;
282  if(fstat(_fd, &statBuf) == -1)
283  {
284  int err = getErrorNo();
285  std::ostringstream os;
286  os << "Failed to stat log file " << fileName_ << " for RingBookmarkStore";
287  error(os.str(), err);
288  }
289  size_t fSize = (size_t)statBuf.st_size;
290  if (fSize == 0)
291  if (::write(_fd, "\0\0\0\0", 4) != 4)
292  error("Failed to initialize empty file.", getErrorNo());
293  setFileSize((fSize > AMPS_RING_ENTRIES*AMPS_RING_ENTRY_SIZE ?
294  fSize - 1 : AMPS_RING_ENTRIES * AMPS_RING_ENTRY_SIZE));
295 #endif
296  recover();
297  }
298 
299 #ifdef _WIN32
300  DWORD getErrorNo() const
301  {
302  return GetLastError();
303  }
304 
305  void error(const std::string& message_, DWORD err)
306  {
307  std::ostringstream os;
308  static const DWORD msgSize = 2048;
309  char pMsg[msgSize];
310  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
311  FORMAT_MESSAGE_ARGUMENT_ARRAY,
312  NULL, err, LANG_NEUTRAL,
313  pMsg, msgSize, NULL);
314  os << message_ << ". Error is " << pMsg;
315  throw StoreException(os.str());
316  }
317 #else
318  int getErrorNo() const
319  {
320  return errno;
321  }
322 
323  void error(const std::string& message_, int err)
324  {
325  std::ostringstream os;
326  os << message_ << ". Error is " << strerror(err);
327  throw StoreException(os.str());
328  }
329 #endif
330 
331  // Used to log the new _mostRecent bookmark
332  void write(const Message::Field& subId_,
333  const Message::Field& bookmark_)
334  {
335  Lock<Mutex> guard(_fileLock);
336  if(!_recovering)
337  {
338  SubscriptionPosition& pos = findPos(subId_);
339  size_t nextPos = (pos._current + 1) % AMPS_RING_POSITIONS;
340  // Get pointer to start of next position for cursor
341  char* offset = _log + (pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (nextPos * AMPS_RING_BYTES_BOOKMARK);
342  // Write the 'cursor' to start of following entry and advance offset
343  *offset = '*';
344  // Change offset to beginning of current bookmark
345  offset = _log + ((pos._index * AMPS_RING_ENTRY_SIZE) + AMPS_RING_BYTES_SUBID + (pos._current * AMPS_RING_BYTES_BOOKMARK) + 1);
346  size_t len = bookmark_.len();
347  // Write the bookmark and advance offset
348  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
349  offset += len;
350  // Set extra bytes to NULL
351  memset(offset, 0, AMPS_RING_BYTES_BOOKMARK - (len + 2));
352  // Return to beginning and change the cursor
353  offset = offset - len - 1;
354  *offset = '+';
355  // Update current for the next write
356  pos._current = nextPos;
357 
358  // Sync the changes to disk
359 #ifdef _WIN32
360 #ifdef _WIN64
361  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~((getPageSize()-1)&0xFFFFFFFFFFFFFFFF);
362 #else
363  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & (size_t)~(getPageSize()-1);
364 #endif
365  if(!FlushViewOfFile(_log + syncStart, pos._index - syncStart + AMPS_RING_ENTRY_SIZE))
366 #else
367  size_t syncStart = (pos._index * AMPS_RING_ENTRY_SIZE) & ~(getPageSize()-1);
368  if(msync(_log+syncStart, pos._index - syncStart + AMPS_RING_ENTRY_SIZE, MS_ASYNC) != 0)
369 #endif
370  {
371  error("Failed to sync mapped memory", getErrorNo());
372  }
373  }
374  }
375 
376 #ifdef _WIN32
377  void setFileSize(DWORD newSizeHigh_, DWORD newSizeLow_)
378  {
379  bool remap = (_mapFile && _mapFile != INVALID_HANDLE_VALUE);
380  if(remap)
381  {
382  UnmapViewOfFile(_log);
383  CloseHandle(_mapFile);
384  _positionMap.clear();
385  }
386  _mapFile = CreateFileMappingA( _file, NULL, PAGE_READWRITE, newSizeHigh_, newSizeLow_, NULL);
387  if(_mapFile == NULL || _mapFile == INVALID_HANDLE_VALUE)
388  {
389  error("Failed to create map of log file", getErrorNo());
390  _log = 0;
391  _fileSize = 0;
392  }
393 #ifdef _WIN64
394  size_t sz = ((size_t)newSizeHigh_ << 32) | (size_t)newSizeLow_;
395 #else
396  size_t sz = (size_t)newSizeLow_;
397 #endif
398  _log = (char*)MapViewOfFile(_mapFile, FILE_MAP_ALL_ACCESS, 0, 0, sz);
399  if(_log == NULL)
400  {
401  error("Failed to map log file to memory", getErrorNo());
402  _log = 0;
403  _fileSize = 0;
404  return;
405  }
406  _fileSize = sz;
407  // Call recover to reset the _positionMap
408  if(remap)
409  recover();
410  }
411 #else
412  void setFileSize(size_t newSize_)
413  {
414  // Make sure we're using a multiple of page size
415  size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
416  if(sz < newSize_)
417  {
418  sz += getPageSize();
419  }
420  // Improper resize attempt
421  if(newSize_ <= _fileSize)
422  return;
423  // Extend the underlying file
424  if(lseek(_fd, (off_t)sz, SEEK_SET) == -1)
425  {
426  error("Seek failed for RingBookmarkStore", getErrorNo());
427  }
428  if(::write(_fd, "", 1) == -1)
429  {
430  error("Failed to grow RingBookmarkStore", getErrorNo());
431  }
432  void* newLog = MAP_FAILED;
433  if(_log)
434  {
435  _positionMap.clear();
436 
437 #ifdef linux
438  newLog = (mremap(_log, _fileSize, sz, MREMAP_MAYMOVE));
439 #else
440  // try making a new mmap right after the current one.
441  newLog = mmap(_log + _fileSize, sz, PROT_READ|PROT_WRITE,
442  MAP_SHARED|MAP_FIXED, _fd, (off_t)sz);
443  if(newLog!=_log)
444  {
445  // this mmap is relatively small; better to just close the old mmap and reset.
446  munmap(_log,_fileSize);
447  newLog = mmap(0,sz,PROT_READ|PROT_WRITE,MAP_SHARED,_fd,0);
448  }
449 #endif
450  }
451  else // New mapping
452  {
453  // New mapping, map the full file size for recovery or else it std size
454  newLog = (mmap(0, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _fd, 0));
455  }
456  _fileSize = sz;
457 
458  if(newLog == MAP_FAILED)
459  {
460  error("Failed to map log file to memory", getErrorNo());
461  _log = 0;
462  _fileSize = 0;
463  return;
464  }
465  _log = static_cast<char*>(newLog);
466  recover();
467  }
468 #endif
469 
470  void recover(void)
471  {
472  //Lock<Mutex> guard(_lock);
473  _recovering = true;
474  Message::Field sub;
475  Message::Field bookmarkField;
476 
477  _currentIndex = 0;
478  size_t maxEntries = _fileSize/AMPS_RING_ENTRY_SIZE > AMPS_RING_ENTRIES ? _fileSize/AMPS_RING_ENTRY_SIZE : AMPS_RING_ENTRIES;
479  for(; _currentIndex < maxEntries; ++_currentIndex)
480  {
481  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
482  if(*offset == '\0')
483  {
484  break;
485  }
486  //It's possible we wrote the subId and not NULLs, so be careful
487  char* end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_SUBID);
488  if(!end)
489  {
490  // Failed subscription id read, we're done
491  break;
492  }
493  // Safe to continue
494  sub.assign(offset, (size_t)(end - offset));
495  // Put this sub into the MemoryBookmarkStore
496  MemoryBookmarkStore::find(sub);
497  // Put this sub into the _positionMap
498  // This is recovery, so do it directly and not with findPos
499  SubscriptionPosition& pos = _positionMap[sub];
500  pos._index = _currentIndex;
501  offset += AMPS_RING_BYTES_SUBID;
502  size_t foundCursor = AMPS_RING_POSITIONS;
503  for(pos._current = 0; pos._current < AMPS_RING_POSITIONS; pos._current++)
504  {
505  if(offset[pos._current*AMPS_RING_BYTES_BOOKMARK] == '*')
506  {
507  // Subtract one position
508  pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
509  // Subtract one more if a second bookmark is found
510  if(offset[foundCursor*AMPS_RING_BYTES_BOOKMARK] == '*')
511  {
512  pos._current = (pos._current+(AMPS_RING_POSITIONS-1)) % AMPS_RING_POSITIONS;
513  }
514  break;
515  }
516  }
517  if(pos._current >= AMPS_RING_POSITIONS)
518  {
519  // No valid bookmark found, just use 0
520  pos._current = 0;
521  }
522  else
523  {
524  // We found a cursor
525  offset += pos._current * AMPS_RING_BYTES_BOOKMARK;
526  //It's possible we wrote bookmark and not NULLs, so be careful
527  end = (char*)memchr(offset, '\0', AMPS_RING_BYTES_BOOKMARK);
528  if(end && end != offset)
529  {
530  // add 1 to account for leading '+'
531  bookmarkField.assign(offset + 1, (size_t)(end - offset - 1));
532  MemoryBookmarkStore::setMostRecent(sub, bookmarkField);
533  }
534  }
535  }
536  _recovering = false;
537  }
538 
539  SubscriptionPosition& findPos(const Message::Field& subId_)
540  {
541  Lock<Mutex> guard(_posLock);
542  if(_positionMap.count(subId_) == 0)
543  {
544  // New subid
545  // Move to its start position and write the sub id
546  char* offset = _log + (_currentIndex * AMPS_RING_ENTRY_SIZE);
547  size_t len = subId_.len();
548  memcpy(offset, static_cast<const void*>(subId_.data()), len);
549  // Add it to the map with the current index
550  // Use the data written to the mmap for the subid
551  Message::Field subId;
552  subId.assign(offset, len);
553  _positionMap[subId]._index = _currentIndex;
554  _positionMap[subId]._current = 0;
555  // Fill extra spaces with NULL
556  offset += len;
557  memset(offset, 0, AMPS_RING_BYTES_SUBID - len);
558  // Advance current index
559  ++_currentIndex;
560  }
561  return _positionMap[subId_];
562  }
563 
564  Subscription* find(const Message::Field& subId_)
565  {
566  if(subId_.empty())
567  {
568  throw StoreException("A valid subscription ID must be provided to the RingBookmarkStore");
569  }
570  findPos(subId_);
571  return MemoryBookmarkStore::find(subId_);
572  }
573 
574 
575  Mutex _fileLock;
576  size_t _fileSize;
577  size_t _currentIndex;
578  char* _log;
579 #ifdef _WIN32
580  HANDLE _file;
581  HANDLE _mapFile;
582 #else
583  int _fd;
584 #endif
585  Mutex _posLock;
586  typedef std::map<Message::Field, SubscriptionPosition, Message::Field::FieldHash> PositionMap;
587  PositionMap _positionMap;
588  bool _recovering;
589 #ifdef _WIN32
590  static DWORD getPageSize()
591  {
592  static DWORD pageSize = 0;
593  if(pageSize == 0)
594  {
595  SYSTEM_INFO SYS_INFO;
596  GetSystemInfo(&SYS_INFO);
597  pageSize = SYS_INFO.dwPageSize;
598 #else
599  static size_t getPageSize()
600  {
601  static size_t pageSize = 0UL;
602  if(pageSize == 0)
603  {
604  pageSize = (size_t)sysconf(_SC_PAGESIZE);
605 #endif
606  }
607  return pageSize;
608  }
609 
610 
611 };
612 
613 } // end namespace AMPS
614 
615 
616 #endif // _RINGBOOKMARKSTORE_H_
617 
virtual bool isDiscarded(Message &message_)
Since we don&#39;t persist discarded messages after the most recently logged bookmark, we always assume any message tested for should be delivered and is not discarded.
Definition: RingBookmarkStore.hpp:172
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
virtual void purge()
Called to purge the contents of this store.
Definition: RingBookmarkStore.hpp:185
A BookmarkStoreImpl that stores only the MOST_RECENT bookmark to a file for recovery and keeps any bo...
Definition: RingBookmarkStore.hpp:63
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1074
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:93
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: RingBookmarkStore.hpp:131
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: RingBookmarkStore.hpp:160
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:50
RingBookmarkStore(const char *fileName_)
Create a RingBookmarkStore using fileName_ for storage of most recent.
Definition: RingBookmarkStore.hpp:76
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual size_t log(Message &message_)
Return the corresponding sequence number for this bookmark.
Definition: RingBookmarkStore.hpp:119
virtual void noPersistedAcks(const Message::Field &)
RingBookmarkStore never uses persisted acks so this is a no-op.
Definition: RingBookmarkStore.hpp:234
Definition: ampsplusplus.hpp:136
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: RingBookmarkStore.hpp:148
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: RingBookmarkStore.hpp:198