AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
MMapBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #include <ampsplusplus.hpp>
27 
28 #ifndef _MMAPBOOKMARKSTORE_H_
29 #define _MMAPBOOKMARKSTORE_H_
30 
31 #include <MemoryBookmarkStore.hpp>
32 #ifdef _WIN32
33 #include <windows.h>
34 #else
35 #include <sys/mman.h>
36 #include <unistd.h>
37 #endif
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #include <set>
42 
43 
48 
49 namespace AMPS
50 {
56 {
57 private:
58 #ifdef _WIN32
59  typedef HANDLE FileType;
60  HANDLE _mapFile;
61 #else
62  typedef int FileType;
63 #endif
64  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
65  {
66  Message::Field f(pair.first);
67  f.clear();
68  }
69 
70 public:
77  MMapBookmarkStore(const char* fileName_)
78  : MemoryBookmarkStore(true), _fileName(fileName_), _fileSize(0)
79  , _logOffset(0) , _log(0)
80 #ifdef _WIN32
81  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
82 #else
83  , _file(0)
84 #endif
85  {
86  init();
87  recover();
88  }
89 
96  MMapBookmarkStore(const std::string& fileName_)
97  : MemoryBookmarkStore(true), _fileName(fileName_), _fileSize(0)
98  , _logOffset(0) , _log(0)
99 #ifdef _WIN32
100  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
101 #else
102  , _file(0)
103 #endif
104  {
105  init();
106  recover();
107  }
108 
109  virtual ~MMapBookmarkStore()
110  {
111 #ifdef _WIN32
112  UnmapViewOfFile(_log);
113  CloseHandle(_mapFile);
114  CloseHandle(_file);
115 #else
116  munmap(_log, _fileSize);
117  ::close(_file);
118 #endif
119  // In case _lock gets acquired by reader thread between end of this
120  // destructor and start of base class destructor, prevent write()
121  _recovering = true;
122  }
123 
129  virtual size_t log(Message& message_)
130  {
131  Message::Field bookmark = message_.getBookmark();
132  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
133  Lock<Mutex> guard(_lock);
134  if (!sub)
135  {
136  Message::Field subId = message_.getSubscriptionId();
137  if (subId.empty())
138  subId = message_.getSubscriptionIds();
139  sub = find(subId);
140  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
141  }
142  write(sub->id(), ENTRY_BOOKMARK, bookmark);
143  return MemoryBookmarkStore::_log(message_);
144  }
145 
150  virtual void discard(const Message& message_)
151  {
152  Message::Field bookmark = message_.getBookmark();
153  Message::Field subId = message_.getSubscriptionId();
154  if (subId.empty())
155  subId = message_.getSubscriptionIds();
156  Lock<Mutex> guard(_lock);
157  MemoryBookmarkStore::_discard(message_);
158  write(subId, ENTRY_DISCARD, bookmark);
159  }
160 
168  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
169  {
170  Lock<Mutex> guard(_lock);
171  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
172  if (!entry || entry->_val.empty()) return;
173  write(subId_, ENTRY_DISCARD, entry->_val);
174  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
175  }
176 
183  {
184  Lock<Mutex> guard(_lock);
185  return MemoryBookmarkStore::getMostRecent(subId_);
186  }
187 
196  virtual bool isDiscarded(Message& message_)
197  {
198  Lock<Mutex> l(_lock);
199  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
200  if (retVal)
201  {
202  Message::Field subId = message_.getSubscriptionId();
203  if (subId.empty())
204  subId = message_.getSubscriptionIds();
205  write(subId, ENTRY_BOOKMARK, message_.getBookmark());
206  write(subId, ENTRY_DISCARD, message_.getBookmark());
207  }
208  return retVal;
209  }
210 
216  virtual void purge()
217  {
218  Lock<Mutex> guard(_lock);
219  Lock<Mutex> fileGuard(_fileLock);
220  memset(_log, 0, _logOffset);
221  _logOffset = 0;
222  MemoryBookmarkStore::_purge();
223  }
224 
230  virtual void purge(const Message::Field& subId_)
231  {
232  Lock<Mutex> guard(_lock);
233  Lock<Mutex> fileGuard(_fileLock);
234  MemoryBookmarkStore::_purge(subId_);
235  std::string tmpFileName = _fileName + ".tmp";
236  __prune(tmpFileName);
237  }
238 
239  virtual void noPersistedAcks(const Message::Field& subId_)
240  {
242  }
243 
244  void setServerVersion(const VersionInfo& version_)
245  {
246  Lock<Mutex> guard(_lock);
248  }
249 
250  void setServerVersion(size_t version_)
251  {
252  Lock<Mutex> guard(_lock);
254  }
255 
256  // Yes, the argument is a non-const copy of what is passed in
257  void _prune(std::string tmpFileName_)
258  {
259  if (tmpFileName_.empty()) tmpFileName_ = _fileName + ".tmp";
260  Lock<Mutex> guard(_lock);
261  Lock<Mutex> fileGuard(_fileLock);
262  // If nothing's changed with most recent, don't rewrite the file
263  if (!_recentChanged)
264  {
265  return;
266  }
267  __prune(tmpFileName_);
268  _recentChanged = false;
269  }
270 
271 private:
272  void __prune(std::string tmpFileName_)
273  {
274  size_t sz = AMPS_INITIAL_LOG_SIZE;
275  FileType file;
276  char* log = NULL;
277  size_t bytesWritten = 0;
278 #ifdef _WIN32
279  file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
280  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
281  if( file == INVALID_HANDLE_VALUE )
282  {
283  DWORD err = getErrorNo();
284  std::ostringstream os;
285  os << "Failed to create file " << tmpFileName_ << " for pruning MMapBookmarkStore";
286  error(os.str(), err);
287  }
288  HANDLE mapFile = NULL;
289  try
290  {
291  sz = _setFileSize(sz, &log, file, &mapFile);
292  }
293  catch (StoreException& ex)
294  {
295  if(mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
296  {
297  CloseHandle(file);
298  std::ostringstream os;
299  os << "Failed to create map of log file for prune"
300  << ex.what();
301  throw StoreException(os.str());
302  return;
303  }
304  if(log == NULL)
305  {
306  CloseHandle(mapFile);
307  CloseHandle(file);
308  std::ostringstream os;
309  os << "Failed to map log file to memory for prune"
310  << ex.what();
311  throw StoreException(os.str());
312  return;
313  }
314  }
315  if (sz == 0)
316  {
317  DWORD err = getErrorNo();
318  UnmapViewOfFile(log);
319  CloseHandle(mapFile);
320  CloseHandle(file);
321  error("Failed to prepare tmp file for prune.", err);
322  }
323 #else
324  file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
325  if(file == -1)
326  {
327  int err = getErrorNo();
328  std::ostringstream os;
329  os << "Failed to open log file " << tmpFileName_ << " for MMapBookmarkStore";
330  error(os.str(), err);
331  return;
332  }
333  if(::write(file, "\0\0\0\0", 4) == -1)
334  {
335  int err = getErrorNo();
336  std::ostringstream os;
337  os << "Failed to write header to log file " << tmpFileName_
338  << " in prune for MMapBookmarkStore";
339  error(os.str(), err);
340  return;
341  }
342  try
343  {
344  sz = _setFileSize(sz, &log, file, 0);
345  }
346  catch (StoreException& ex)
347  {
348  std::ostringstream os;
349  os << "Failed to prepare tmp file \"" << tmpFileName_
350  << "\" for prune. " << ex.what();
351  throw StoreException(os.str());
352  }
353  if (sz == 0)
354  {
355  int err = getErrorNo();
356  log = NULL;
357  ::close(file);
358  error("Failed to prepare tmp file for prune.", err);
359  }
360 #endif
361  try
362  {
363  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
364  {
365  Message::Field subId = i->first;
366  assert(!subId.empty());
367  size_t subIdLen = subId.len();
368  Message::Field recent = i->second->getMostRecent(false);
369  amps_uint64_t recentPub, recentSeq;
370  Subscription::parseBookmark(recent, recentPub, recentSeq);
371  Subscription::PublisherMap publishersDiscarded =
372  i->second->_publishers;
373  MemoryBookmarkStore::EntryPtrList recovered;
374  i->second->getRecoveryEntries(recovered);
375  i->second->setPublishersToDiscarded(&recovered,
376  &publishersDiscarded);
377  char tmpBookmarkBuffer[128];
378  for (Subscription::PublisherIterator pub =
379  publishersDiscarded.begin(),
380  e = publishersDiscarded.end();
381  pub != e; ++pub)
382  {
383  // Don't log EPOCH if it got in the map
384  if (pub->first == 0 || pub->second == 0) continue;
385  // Don't log the most recent yet
386  if (pub->first == recentPub) continue;
387  int written = AMPS_snprintf_amps_uint64_t(
388  tmpBookmarkBuffer,
389  sizeof(tmpBookmarkBuffer),
390  pub->first);
391  *(tmpBookmarkBuffer+written++) = '|';
392  written += AMPS_snprintf_amps_uint64_t(
393  tmpBookmarkBuffer+written,
394  sizeof(tmpBookmarkBuffer)
395  - (size_t)written,
396  pub->second);
397  *(tmpBookmarkBuffer+written++) = '|';
398  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
399  // Check we'll be in the current boundaries
400  size_t blockLen = subIdLen + 2*sizeof(size_t) + tmpBookmark.len() + 1;
401  if(bytesWritten + blockLen + blockLen >= sz)
402  {
403 #ifdef _WIN32
404  sz = _setFileSize(sz*2, &log, file, &mapFile);
405 #else
406  sz = _setFileSize(sz*2, &log, file, sz);
407 #endif
408  }
409  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
410  write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
411  }
412  if (isWritableBookmark(recent.len()))
413  {
414  // Check we'll be in the current boundaries
415  size_t blockLen = subIdLen + 2*sizeof(size_t) + recent.len() + 1;
416  if(bytesWritten + blockLen + blockLen >= sz)
417  {
418 #ifdef _WIN32
419  sz = _setFileSize(sz*2, &log, file, &mapFile);
420 #else
421  sz = _setFileSize(sz*2, &log, file, sz);
422 #endif
423  }
424  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
425  write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
426  }
427  else // set up _recentList
428  {
429  i->second->getMostRecentList();
430  }
431  Message::Field bookmark = i->second->getLastPersisted();
432  if (isWritableBookmark(bookmark.len()))
433  {
434  // Check we'll be in the current boundaries
435  size_t blockLen = subIdLen + 2*sizeof(size_t) +
436  bookmark.len() + 1;
437  if(bytesWritten + blockLen >= sz)
438  {
439 #ifdef _WIN32
440  sz = _setFileSize(sz*2, &log, file, &mapFile);
441 #else
442  sz = _setFileSize(sz*2, &log, file, sz);
443 #endif
444  }
445  write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
446  i->second->getLastPersisted());
447  }
448  i->second->getActiveEntries(recovered);
449  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
450  recovered.begin();
451  entry != recovered.end(); ++entry)
452  {
453  if ((*entry)->_val.empty() ||
454  !isWritableBookmark((*entry)->_val.len()))
455  continue;
456  // Check we'll be in the current boundaries
457  size_t blockLen = subIdLen + 2*sizeof(size_t) +
458  (*entry)->_val.len() + 1;
459  if(bytesWritten + blockLen >= sz)
460  {
461 #ifdef _WIN32
462  sz = _setFileSize(sz*2, &log, file, &mapFile);
463 #else
464  sz = _setFileSize(sz*2, &log, file, sz);
465 #endif
466  }
467  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
468  (*entry)->_val);
469  if (!(*entry)->_active)
470  {
471  // Check we'll be in the current boundaries
472  if(bytesWritten + blockLen >= sz)
473  {
474 #ifdef _WIN32
475  sz = _setFileSize(sz*2, &log, file, &mapFile);
476 #else
477  sz = _setFileSize(sz*2, &log, file, sz);
478 #endif
479  }
480  write(&log, &bytesWritten, subId, ENTRY_DISCARD,
481  (*entry)->_val);
482  }
483  }
484  }
485  }
486  catch (StoreException& ex)
487  {
488 #ifdef _WIN32
489  UnmapViewOfFile(log);
490  CloseHandle(mapFile);
491  CloseHandle(file);
492 #else
493  ::close(file);
494  ::unlink(tmpFileName_.c_str());
495 #endif
496  std::ostringstream os;
497  os << "Exception during prune: " << ex.what();
498  throw StoreException(os.str());
499  }
500 #ifdef _WIN32
501  BOOL success = FlushViewOfFile(_log,0);
502  success |= UnmapViewOfFile(_log);
503  _log = NULL;
504  success |= CloseHandle(_mapFile);
505  success |= CloseHandle(_file);
506  if (!success)
507  {
508  DWORD err = getErrorNo();
509  std::ostringstream os;
510  os << "Failed to flush, unmap, and close current file "
511  << _fileName
512  << " in prune in MMapBookmarkStore. ";
513  error(os.str(), err);
514  }
515  _mapFile = INVALID_HANDLE_VALUE;
516  _file = INVALID_HANDLE_VALUE;
517  success = FlushViewOfFile(log,0);
518  success |= UnmapViewOfFile(log);
519  log = NULL;
520  success |= CloseHandle(mapFile);
521  success |= CloseHandle(file);
522  if (!success)
523  {
524  DWORD err = getErrorNo();
525  std::ostringstream os;
526  os << "Failed to flush, unmap and close completed temp file "
527  << tmpFileName_
528  << " in prune in MMapBookmarkStore. ";
529  error(os.str(), err);
530  }
531  mapFile = INVALID_HANDLE_VALUE;
532  file = INVALID_HANDLE_VALUE;
533  // Replace current file with pruned file
534  int retryCount = 3;
535  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
536  MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
537  {
538  DWORD err = getErrorNo();
539  if (--retryCount > 0) continue;
540  // Try to set _file to the tmp file that won't move then throw
541  std::string desiredFileName = _fileName;
542  _fileName = tmpFileName_;
543  init();
544  std::ostringstream os;
545  os << "Failed to move completed temp file " << tmpFileName_
546  << " to " << desiredFileName
547  << " in prune in MMapBookmarkStore. Continuing by using "
548  << tmpFileName_ << " as the MMapBookmarkStore file.";
549  error(os.str(), err);
550  }
551  // Call init to set up file again
552  init();
553 #else
554  munmap(_log, _fileSize);
555  _log = NULL;
556  ::close(_file);
557  munmap(log, sz);
558  ::close(file);
559  if (-1 == ::unlink(_fileName.c_str()))
560  {
561  int err = getErrorNo();
562  // Try to set _file to the tmp file that won't move then throw
563  std::string desiredFileName = _fileName;
564  _fileName = tmpFileName_;
565  init();
566  std::ostringstream os;
567  os << "Failed to delete file " << desiredFileName
568  << " after creating temporary file " << tmpFileName_
569  << " in prune in MMapBookmarkStore. Continuing by using "
570  << tmpFileName_ << " as the MMapBookmarkStore file.";
571  error(os.str(), err);
572  }
573  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
574  {
575  int err = getErrorNo();
576  // Try to set _file to the tmp file that won't move then throw
577  std::string desiredFileName = _fileName;
578  _fileName = tmpFileName_;
579  init();
580  std::ostringstream os;
581  os << "Failed to move completed temp file " << tmpFileName_
582  << " to " << desiredFileName
583  << " in prune in MMapBookmarkStore. Continuing by using "
584  << tmpFileName_ << " as the MMapBookmarkStore file.";
585  error(os.str(), err);
586  }
587  // Call init to set up file again
588  init();
589 #endif
590  _logOffset = bytesWritten;
591  }
592 
593  virtual void _persisted(Subscription* subP_,
594  const Message::Field& bookmarkField_)
595  {
596  Lock<Mutex> l(_lock);
597  write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
598  MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
599  }
600 
601  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
602  {
603  Lock<Mutex> l(_lock);
604  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
605  if (!entryPtr || entryPtr->_val.empty())
606  return Message::Field();
607  Message::Field bookmarkField = entryPtr->_val;
608  write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
609  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
610  return bookmarkField;
611  }
612 
613  void init(void)
614  {
615 #ifdef _WIN32
616  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
617  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
618  if( _file == INVALID_HANDLE_VALUE )
619  {
620  DWORD err = getErrorNo();
621  std::ostringstream os;
622  os << "Failed to create file " << _fileName << " for MMapBookmarkStore";
623  error(os.str(), err);
624  }
625  LARGE_INTEGER liFileSize;
626  if(GetFileSizeEx(_file, &liFileSize) == 0)
627  {
628  DWORD err = getErrorNo();
629  CloseHandle(_file);
630  std::ostringstream os;
631  os << "Failure getting file size for MMapBookmarkStore " << _fileName;
632  error(os.str(), err);
633  }
634 #ifdef _WIN64
635  size_t fileSize = liFileSize.QuadPart;
636 #else
637  size_t fileSize = liFileSize.LowPart;
638 #endif
639  setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
640  AMPS_INITIAL_LOG_SIZE : fileSize);
641 #else
642  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
643  if(_file == -1)
644  {
645  int err = getErrorNo();
646  std::ostringstream os;
647  os << "Failed to open log file " << _fileName << " for MMapBookmarkStore";
648  error(os.str(), err);
649  }
650  struct stat statBuf;
651  if(fstat(_file, &statBuf) == -1)
652  {
653  int err = getErrorNo();
654  ::close(_file);
655  std::ostringstream os;
656  os << "Failed to stat log file " << _fileName << " for MMapBookmarkStore";
657  error(os.str(), err);
658  }
659  size_t fSize = (size_t)statBuf.st_size;
660  if(fSize == 0)
661  {
662  if(::write(_file, "\0\0\0\0", 4) == -1)
663  {
664  int err = getErrorNo();
665  ::close(_file);
666  std::ostringstream os;
667  os << "Failed to write header to log file " << _fileName
668  << " for MMapBookmarkStore";
669  error(os.str(), err);
670  }
671  }
672 
673  setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize-1 : AMPS_INITIAL_LOG_SIZE);
674 #endif
675  }
676 
677 #ifdef _WIN32
678  DWORD getErrorNo() const
679  {
680  return GetLastError();
681  }
682 
683  void error(const std::string& message_, DWORD err)
684  {
685  std::ostringstream os;
686  static const DWORD msgSize = 2048;
687  char pMsg[msgSize];
688  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
689  FORMAT_MESSAGE_ARGUMENT_ARRAY,
690  NULL, err, LANG_NEUTRAL,
691  pMsg, msgSize, NULL);
692  os << "File: " << _fileName << ". " << message_ << " with error " << pMsg;
693  throw StoreException(os.str());
694  }
695 #else
696  int getErrorNo() const
697  {
698  return errno;
699  }
700 
701  void error(const std::string& message_, int err)
702  {
703  std::ostringstream os;
704  os << message_ << ". Error is " << strerror(err);
705  throw StoreException(os.str());
706  }
707 #endif
708 #if defined(sparc)
709 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; }
710 #define AMPS_READ8(p, v) { memcpy(&v,p,8); }
711 #else
712 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; }
713 #define AMPS_READ8(p,v) { v = *(const size_t*)p; }
714 #endif
715 
716  // This implementation will use this when logging a bookmark or a persisted
717  void write(const Message::Field& subId_,
718  char type_, const Message::Field& bookmark_)
719  {
720  Lock<Mutex> guard(_fileLock);
721  write(&_log, &_logOffset, subId_, type_, bookmark_);
722  }
723 
724  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
725  char type_, const Message::Field& bookmark_)
726  {
727  if(!_recovering && isWritableBookmark(bookmark_.len()))
728  {
729  size_t len = subId_.len();
730  // Check we'll be in the current boundaries
731  size_t blockLen = len + 2*sizeof(size_t) + bookmark_.len() + 1;
732  if(*logOffsetPtr + blockLen >= _fileSize)
733  {
734  setFileSize(_fileSize*2);
735  }
736  char* offset = *logPtr+*logOffsetPtr;
737  AMPS_WRITE8(offset,len);
738  offset += sizeof(size_t);
739  memcpy(offset, static_cast<const void*>(subId_.data()), len);
740  offset += len;
741  *offset++ = type_;
742  len = bookmark_.len();
743  AMPS_WRITE8(offset,len);
744  offset += sizeof(size_t);
745  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
746  *logOffsetPtr += blockLen;
747  }
748  }
749 
750  // This implementation will only ever use this when discarding a bookmark
751  // Could be used to add a feature where generated bookmarks are logged in
752  // addition to the bookmark field.
753  void write(const Message::Field& subId_, char type_, size_t bookmark_)
754  {
755  Lock<Mutex> guard(_fileLock);
756  write(&_log, &_logOffset, subId_, type_, bookmark_);
757  }
758 
759  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
760  char type_, size_t bookmark_)
761  {
762  if(!_recovering)
763  {
764  size_t len = subId_.len();
765  size_t blockLen = len + 2*sizeof(size_t) + 1;
766  // Check we'll be in the current boundaries
767  if(*logOffsetPtr + blockLen >= _fileSize)
768  {
769  setFileSize(_fileSize*2);
770  }
771  char* offset = *logPtr+*logOffsetPtr;
772  *(reinterpret_cast<size_t*>(offset)) = len;
773  offset += sizeof(size_t);
774  memcpy(offset, static_cast<const void*>(subId_.data()), len);
775  offset += len;
776  *offset++ = type_;
777  *(reinterpret_cast<size_t*>(offset)) = bookmark_;
778  *logOffsetPtr += blockLen;
779  }
780  }
781 
782  void setFileSize(size_t newSize_)
783  {
784  if(_log && newSize_ <= _fileSize) // Improper resize attempt
785  return;
786 #ifdef _WIN32
787  _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
788 #else
789  _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
790 #endif
791  }
792 
793  // Returns new file size, 0 if there is a failure
794  size_t _setFileSize(size_t newSize_, char** log_, FileType file_,
795 #ifdef WIN32
796  HANDLE* mapFile_
797 #else
798  size_t fileSize_
799 #endif
800  )
801  {
802  // Make sure we're using a multiple of page size
803  size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
804  if(sz < newSize_ || sz == 0)
805  {
806  sz += getPageSize();
807  }
808 #ifdef _WIN32
809  if(*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
810  {
811  if(*log_)
812  {
813  FlushViewOfFile(*log_, 0);
814  UnmapViewOfFile(*log_);
815  }
816  CloseHandle(*mapFile_);
817  }
818 #ifdef _WIN64
819  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
820 #else
821  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
822 #endif
823  if(*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
824  {
825  DWORD errNo = getErrorNo();
826  CloseHandle(file_);
827  error("Failed to create map of log file", errNo);
828  *log_ = 0;
829  return 0;
830  }
831  else
832  {
833  *log_ = (char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
834  if(*log_ == NULL)
835  {
836  DWORD errNo = getErrorNo();
837  CloseHandle(*mapFile_);
838  CloseHandle(file_);
839  error("Failed to map log file to memory", errNo);
840  *log_ = 0;
841  return 0;
842  }
843  }
844 #else
845  // Extend the underlying file
846  if(lseek(file_, (off_t)sz, SEEK_SET) == -1)
847  {
848  int err = getErrorNo();
849  ::close(file_);
850  error("Seek failed for bookmark log", err);
851  }
852  if(::write(file_, "", 1) == -1)
853  {
854  int err = getErrorNo();
855  ::close(file_);
856  error("Failed to grow bookmark log", err);
857  }
858  if(*log_)
859  {
860 #if defined(linux)
861  *log_ = static_cast<char*>(mremap(*log_, fileSize_, sz,
862  MREMAP_MAYMOVE));
863 #else
864  munmap(*log_,fileSize_);
865  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
866  MAP_SHARED, file_, 0));
867 #endif
868  }
869  else // New mapping
870  {
871  // New mapping, map the full file size for recovery or else it std size
872  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
873  MAP_SHARED, file_, 0));
874  }
875 
876  if((void*)(*log_) == MAP_FAILED)
877  {
878  int err = getErrorNo();
879  ::close(file_);
880  error("Failed to map log file to memory", err);
881  *log_ = 0;
882  return 0;
883  }
884 #endif
885  return sz;
886  }
887 
888  void recover()
889  {
890  Message::Field sub;
891  Message::Field bookmarkField;
892  size_t bookmarkLen = 0;
893  size_t lastGoodOffset = 0;
894  bool inError = false;
895  Lock<Mutex> guard(_lock);
896  Lock<Mutex> fileGuard(_fileLock);
897  _recovering = true;
898  // Map of bookmark to sequence number
899  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
900  typedef std::map<Message::Field, size_t,
901  Message::Field::FieldHash>::iterator BookmarkMapIter;
902  // Map of subId to set of recovered bookmarks
903  typedef std::map<Message::Field, BookmarkMap*,
904  Message::Field::FieldHash> ReadMap;
905  typedef std::map<Message::Field, BookmarkMap*,
906  Message::Field::FieldHash>::iterator ReadMapIter;
907  ReadMap recovered;
908  size_t subLen = *(reinterpret_cast<size_t*>(_log));
909  while(!inError && subLen > 0)
910  {
911  _logOffset += sizeof(size_t);
912  sub.assign(_log+_logOffset, subLen);
913  _logOffset += subLen;
914  switch(_log[_logOffset++])
915  {
916  case -1:
917  return;
918  case ENTRY_BOOKMARK:
919  {
920  AMPS_READ8((_log+_logOffset),bookmarkLen);
921  _logOffset += sizeof(size_t);
922  bookmarkField.assign(_log+_logOffset, bookmarkLen);
923  _logOffset += bookmarkLen;
924  Subscription* subP = find(sub);
925  BookmarkMap* bookmarks = NULL;
926  ReadMapIter iter = recovered.find(sub);
927  if (iter == recovered.end())
928  {
929  Message::Field subKey;
930  subKey.deepCopy(sub);
931  bookmarks = new BookmarkMap();
932  recovered[subKey] = bookmarks;
933  }
934  else
935  {
936  bookmarks = iter->second;
937  }
938  if (bookmarks->find(bookmarkField) != bookmarks->end())
939  {
940  std::for_each(bookmarks->begin(), bookmarks->end(),
941  _clearBookmark);
942  bookmarks->clear();
943  subP->getMostRecent();
944  }
945  if (!subP->isDiscarded(bookmarkField))
946  {
947  size_t sequence = subP->log(bookmarkField);
948  Message::Field copy;
949  copy.deepCopy(bookmarkField);
950  bookmarks->insert(std::make_pair(copy, sequence));
951  }
952  else
953  {
954  // We know it's discarded, but there may still be a
955  // discard entry in the log, so avoid a search.
956  Message::Field copy;
957  copy.deepCopy(bookmarkField);
958  bookmarks->insert(std::make_pair(copy,0));
959  }
960  }
961  break;
962  case ENTRY_DISCARD:
963  {
964  AMPS_READ8((_log+_logOffset),bookmarkLen);
965  _logOffset += sizeof(size_t);
966  bookmarkField.assign(_log+_logOffset, bookmarkLen);
967  _logOffset += bookmarkLen;
968  size_t sequence = AMPS_UNSET_INDEX;
969  ReadMapIter iter = recovered.find(sub);
970  if (iter != recovered.end())
971  {
972  BookmarkMap* bookmarks = iter->second;
973  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
974  if (bookmarkIter != bookmarks->end())
975  {
976  sequence = bookmarkIter->second;
977  Message::Field bookmarkToClear(bookmarkIter->first);
978  bookmarkToClear.clear();
979  bookmarks->erase(bookmarkIter);
980  }
981  }
982  Subscription* subP = find(sub);
983  if (sequence != AMPS_UNSET_INDEX)
984  {
985  // A sequence of 0 means it was already discarded
986  if (sequence) subP->discard(sequence);
987  }
988  else // Shouldn't end up here, but just in case we'll search
989  {
990  subP->discard(bookmarkField);
991  }
992  }
993  break;
994  case ENTRY_PERSISTED:
995  {
996  AMPS_READ8((_log+_logOffset),bookmarkLen);
997  _logOffset += sizeof(size_t);
998  bookmarkField.assign(_log+_logOffset, bookmarkLen);
999  _logOffset += bookmarkLen;
1000  MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1001  }
1002  break;
1003  default:
1004  if (lastGoodOffset == 0)
1005  {
1006  error("Error while recovering bookmark store file.", getErrorNo());
1007  }
1008  else
1009  {
1010  _logOffset = lastGoodOffset;
1011  inError = true;
1012  }
1013  }
1014  lastGoodOffset = _logOffset;
1015  if (!inError) subLen = *(reinterpret_cast<size_t*>(_log + _logOffset));
1016  }
1017  for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1018  {
1019  if (recovered.count(i->first) && !recovered[i->first]->empty())
1020  {
1021  if (i->second->getMostRecent(false).len() > 1)
1022  {
1023  i->second->justRecovered();
1024  }
1025  else
1026  {
1027  // Unlikely, but we may have recovered only undiscarded bookmarks
1028  // so we should really just restart as a new subscription.
1029  delete i->second;
1030  _subs[i->first] = new Subscription(this, i->first);
1031  }
1032  }
1033  }
1034  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1035  {
1036  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1037  delete i->second;
1038  Message::Field f = i->first;
1039  f.clear();
1040  }
1041  _recovering = false;
1042  }
1043 
1044  Mutex _fileLock;
1045  std::string _fileName;
1046  size_t _fileSize;
1047  size_t _logOffset;
1048  char* _log;
1049  FileType _file;
1050  // Each entry begins with a single byte indicating the type of entry:
1051  // a new bookmark, or a discard of a previous one.
1052  static size_t getPageSize()
1053  {
1054  static size_t pageSize;
1055  if(pageSize == 0)
1056  {
1057 #ifdef _WIN32
1058  SYSTEM_INFO SYS_INFO;
1059  GetSystemInfo(&SYS_INFO);
1060  pageSize = SYS_INFO.dwPageSize;
1061 #else
1062  pageSize = (size_t)sysconf(_SC_PAGESIZE);
1063 #endif
1064  }
1065  return pageSize;
1066  }
1067 
1068 };
1069 
1070 } // end namespace AMPS
1071 
1072 
1073 #endif // _MMAPBOOKMARKSTORE_H_
1074 
virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:216
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:984
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:55
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:150
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:182
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:244
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: MMapBookmarkStore.hpp:239
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MMapBookmarkStore.hpp:168
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1074
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:93
Core type, function, and class declarations for the AMPS C++ client.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:230
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1076
MMapBookmarkStore(const std::string &fileName_)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:96
MMapBookmarkStore(const char *fileName_)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:77
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: MemoryBookmarkStore.hpp:1065
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:50
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:250
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:129
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
Definition: ampsplusplus.hpp:136
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:196