AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.2.0
MMapBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MMAPBOOKMARKSTORE_H_
27 #define _MMAPBOOKMARKSTORE_H_
28 
29 #include <MemoryBookmarkStore.hpp>
30 #include <RecoveryPoint.hpp>
31 #include <RecoveryPointAdapter.hpp>
32 #ifdef _WIN32
33 #include <windows.h>
34 #else
35 #include <sys/mman.h>
36 #include <unistd.h>
37 #endif
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #include <set>
42 
43 #define AMPS_INITIAL_LOG_SIZE 40960UL
44 
49 
50 namespace AMPS
51 {
57 {
58 private:
59 #ifdef _WIN32
60  typedef HANDLE FileType;
61  HANDLE _mapFile;
62 #else
63  typedef int FileType;
64 #endif
65  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
66  {
67  Message::Field f(pair.first);
68  f.clear();
69  }
70 
71 public:
82  MMapBookmarkStore(const char* fileName_, bool useLastModifiedTime_ = false)
83  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
84  , _logOffset(0) , _log(0), _fileTimestamp(0)
85 #ifdef _WIN32
86  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
87 #else
88  , _file(0)
89 #endif
90  {
91  if (init(useLastModifiedTime_))
92  recover(useLastModifiedTime_, false);
93  }
94 
105  MMapBookmarkStore(const std::string& fileName_,
106  bool useLastModifiedTime_ = false)
107  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
108  , _logOffset(0) , _log(0), _fileTimestamp(0)
109 #ifdef _WIN32
110  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
111 #else
112  , _file(0)
113 #endif
114  {
115  if (init(useLastModifiedTime_))
116  recover(useLastModifiedTime_, false);
117  }
118 
133  MMapBookmarkStore(const RecoveryPointAdapter& adapter_,
134  const char* fileName_,
135  RecoveryPointFactory factory_ = NULL,
136  bool useLastModifiedTime_ = false)
137  : MemoryBookmarkStore(adapter_, factory_)
138  , _fileName(fileName_), _fileSize(0)
139  , _logOffset(0) , _log(0), _fileTimestamp(0)
140 #ifdef _WIN32
141  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
142 #else
143  , _file(0)
144 #endif
145  {
146  if (init(useLastModifiedTime_))
147  recover(useLastModifiedTime_, true);
148  }
149 
164  MMapBookmarkStore(const RecoveryPointAdapter& adapter_,
165  const std::string& fileName_,
166  RecoveryPointFactory factory_ = NULL,
167  bool useLastModifiedTime_ = false)
168  : MemoryBookmarkStore(adapter_, factory_)
169  , _fileName(fileName_), _fileSize(0)
170  , _logOffset(0) , _log(0), _fileTimestamp(0)
171 #ifdef _WIN32
172  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
173 #else
174  , _file(0)
175 #endif
176  {
177  if (init(useLastModifiedTime_))
178  recover(useLastModifiedTime_, true);
179  }
180 
181  virtual ~MMapBookmarkStore()
182  {
183 #ifdef _WIN32
184  UnmapViewOfFile(_log);
185  CloseHandle(_mapFile);
186  CloseHandle(_file);
187 #else
188  munmap(_log, _fileSize);
189  ::close(_file);
190 #endif
191  // In case _lock gets acquired by reader thread between end of this
192  // destructor and start of base class destructor, prevent write()
193  _recovering = true;
194  }
195 
201  virtual size_t log(Message& message_)
202  {
203  Message::Field bookmark = message_.getBookmark();
204  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
205  Lock<Mutex> guard(_lock);
206  if (!sub)
207  {
208  Message::Field subId = message_.getSubscriptionId();
209  if (subId.empty())
210  subId = message_.getSubscriptionIds();
211  sub = find(subId);
212  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
213  }
214  write(sub->id(), ENTRY_BOOKMARK, bookmark);
215  return MemoryBookmarkStore::_log(message_);
216  }
217 
222  virtual void discard(const Message& message_)
223  {
224  Message::Field bookmark = message_.getBookmark();
225  Message::Field subId = message_.getSubscriptionId();
226  if (subId.empty())
227  subId = message_.getSubscriptionIds();
228  Lock<Mutex> guard(_lock);
229  write(subId, ENTRY_DISCARD, bookmark);
230  MemoryBookmarkStore::_discard(message_);
231  }
232 
240  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
241  {
242  Lock<Mutex> guard(_lock);
243  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
244  if (!entry || entry->_val.empty()) return;
245  write(subId_, ENTRY_DISCARD, entry->_val);
246  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
247  }
248 
255  {
256  Lock<Mutex> guard(_lock);
257  return MemoryBookmarkStore::_getMostRecent(subId_);
258  }
259 
268  virtual bool isDiscarded(Message& message_)
269  {
270  Lock<Mutex> l(_lock);
271  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
272  if (retVal)
273  {
274  Message::Field subId = message_.getSubscriptionId();
275  if (subId.empty())
276  subId = message_.getSubscriptionIds();
277  write(subId, ENTRY_BOOKMARK, message_.getBookmark());
278  write(subId, ENTRY_DISCARD, message_.getBookmark());
279  }
280  return retVal;
281  }
282 
288  virtual void purge()
289  {
290  Lock<Mutex> guard(_lock);
291  Lock<Mutex> fileGuard(_fileLock);
292  memset(_log, 0, _logOffset);
293  _logOffset = 0;
294  MemoryBookmarkStore::_purge();
295  }
296 
302  virtual void purge(const Message::Field& subId_)
303  {
304  Lock<Mutex> guard(_lock);
305  Lock<Mutex> fileGuard(_fileLock);
306  MemoryBookmarkStore::_purge(subId_);
307  std::string tmpFileName = _fileName + ".tmp";
308  __prune(tmpFileName);
309  }
310 
311  void setServerVersion(const VersionInfo& version_)
312  {
313  Lock<Mutex> guard(_lock);
315  }
316 
317  void setServerVersion(size_t version_)
318  {
319  Lock<Mutex> guard(_lock);
321  }
322 
323  // Yes, the argument is a non-const copy of what is passed in
324  void _prune(std::string tmpFileName_)
325  {
326  if (tmpFileName_.empty()) tmpFileName_ = _fileName + ".tmp";
327  Lock<Mutex> guard(_lock);
328  Lock<Mutex> fileGuard(_fileLock);
329  // If nothing's changed with most recent, don't rewrite the file
330  if (!_recentChanged)
331  {
332  return;
333  }
334  __prune(tmpFileName_);
335  _recentChanged = false;
336  }
337 
338 private:
339  void __prune(std::string tmpFileName_)
340  {
341  size_t sz = AMPS_INITIAL_LOG_SIZE;
342  FileType file;
343  char* log = NULL;
344  size_t bytesWritten = 0;
345 #ifdef _WIN32
346  file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
347  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
348  if( file == INVALID_HANDLE_VALUE )
349  {
350  DWORD err = getErrorNo();
351  std::ostringstream os;
352  os << "Failed to create file " << tmpFileName_ << " for pruning MMapBookmarkStore";
353  error(os.str(), err);
354  }
355  HANDLE mapFile = NULL;
356  try
357  {
358  sz = _setFileSize(sz, &log, file, &mapFile);
359  }
360  catch (StoreException& ex)
361  {
362  if(mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
363  {
364  CloseHandle(file);
365  std::ostringstream os;
366  os << "Failed to create map of log file for prune"
367  << ex.what();
368  throw StoreException(os.str());
369  return;
370  }
371  if(log == NULL)
372  {
373  CloseHandle(mapFile);
374  CloseHandle(file);
375  std::ostringstream os;
376  os << "Failed to map log file to memory for prune"
377  << ex.what();
378  throw StoreException(os.str());
379  return;
380  }
381  }
382  if (sz == 0)
383  {
384  DWORD err = getErrorNo();
385  UnmapViewOfFile(log);
386  CloseHandle(mapFile);
387  CloseHandle(file);
388  error("Failed to prepare tmp file for prune.", err);
389  }
390 #else
391  file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
392  if(file == -1)
393  {
394  int err = getErrorNo();
395  std::ostringstream os;
396  os << "Failed to open log file " << tmpFileName_ << " for MMapBookmarkStore";
397  error(os.str(), err);
398  return;
399  }
400  if(::write(file, "\0\0\0\0", 4) == -1)
401  {
402  int err = getErrorNo();
403  std::ostringstream os;
404  os << "Failed to write header to log file " << tmpFileName_
405  << " in prune for MMapBookmarkStore";
406  error(os.str(), err);
407  return;
408  }
409  try
410  {
411  sz = _setFileSize(sz, &log, file, 0);
412  }
413  catch (StoreException& ex)
414  {
415  std::ostringstream os;
416  os << "Failed to prepare tmp file \"" << tmpFileName_
417  << "\" for prune. " << ex.what();
418  throw StoreException(os.str());
419  }
420  if (sz == 0)
421  {
422  int err = getErrorNo();
423  log = NULL;
424  ::close(file);
425  error("Failed to prepare tmp file for prune.", err);
426  }
427 #endif
428  try
429  {
430  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
431  {
432  Message::Field subId = i->first;
433  assert(!subId.empty());
434  size_t subIdLen = subId.len();
435  const BookmarkRange& range = i->second->getRange();
436  if (range.isValid())
437  {
438  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
439  write(&log, &bytesWritten, subId, ENTRY_DISCARD, range);
440  }
441  Message::Field recent = i->second->getMostRecent(false);
442  amps_uint64_t recentPub, recentSeq;
443  Subscription::parseBookmark(recent, recentPub, recentSeq);
444  Subscription::PublisherMap publishersDiscarded =
445  i->second->_publishers;
446  MemoryBookmarkStore::EntryPtrList recovered;
447  i->second->getRecoveryEntries(recovered);
448  i->second->setPublishersToDiscarded(&recovered,
449  &publishersDiscarded);
450  char tmpBookmarkBuffer[128];
451  for (Subscription::PublisherIterator pub =
452  publishersDiscarded.begin(),
453  e = publishersDiscarded.end();
454  pub != e; ++pub)
455  {
456  // Don't log EPOCH if it got in the map
457  if (pub->first == 0 || pub->second == 0) continue;
458  // Don't log the most recent yet
459  if (pub->first == recentPub) continue;
460  int written = AMPS_snprintf_amps_uint64_t(
461  tmpBookmarkBuffer,
462  sizeof(tmpBookmarkBuffer),
463  pub->first);
464  *(tmpBookmarkBuffer+written++) = '|';
465  written += AMPS_snprintf_amps_uint64_t(
466  tmpBookmarkBuffer+written,
467  sizeof(tmpBookmarkBuffer)
468  - (size_t)written,
469  pub->second);
470  *(tmpBookmarkBuffer+written++) = '|';
471  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
472  // Check we'll be in the current boundaries
473  size_t blockLen = subIdLen + 2*sizeof(size_t) + tmpBookmark.len() + 1;
474  if(bytesWritten + blockLen + blockLen >= sz)
475  {
476 #ifdef _WIN32
477  sz = _setFileSize(sz*2, &log, file, &mapFile);
478 #else
479  sz = _setFileSize(sz*2, &log, file, sz);
480 #endif
481  }
482  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
483  write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
484  }
485  if (isWritableBookmark(recent.len()))
486  {
487  // Check we'll be in the current boundaries
488  size_t blockLen = subIdLen + 2*sizeof(size_t) + recent.len() + 1;
489  if(bytesWritten + blockLen + blockLen >= sz)
490  {
491 #ifdef _WIN32
492  sz = _setFileSize(sz*2, &log, file, &mapFile);
493 #else
494  sz = _setFileSize(sz*2, &log, file, sz);
495 #endif
496  }
497  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
498  write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
499  }
500  else // set up _recentList
501  {
502  i->second->getMostRecentList();
503  }
504  Message::Field bookmark = i->second->getLastPersisted();
505  if (isWritableBookmark(bookmark.len()))
506  {
507  // Check we'll be in the current boundaries
508  size_t blockLen = subIdLen + 2*sizeof(size_t) +
509  bookmark.len() + 1;
510  if(bytesWritten + blockLen >= sz)
511  {
512 #ifdef _WIN32
513  sz = _setFileSize(sz*2, &log, file, &mapFile);
514 #else
515  sz = _setFileSize(sz*2, &log, file, sz);
516 #endif
517  }
518  write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
519  i->second->getLastPersisted());
520  }
521  i->second->getActiveEntries(recovered);
522  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
523  recovered.begin();
524  entry != recovered.end(); ++entry)
525  {
526  if ((*entry)->_val.empty() ||
527  !isWritableBookmark((*entry)->_val.len()))
528  continue;
529  // Check we'll be in the current boundaries
530  size_t blockLen = subIdLen + 2*sizeof(size_t) +
531  (*entry)->_val.len() + 1;
532  if(bytesWritten + blockLen >= sz)
533  {
534 #ifdef _WIN32
535  sz = _setFileSize(sz*2, &log, file, &mapFile);
536 #else
537  sz = _setFileSize(sz*2, &log, file, sz);
538 #endif
539  }
540  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
541  (*entry)->_val);
542  if (!(*entry)->_active)
543  {
544  // Check we'll be in the current boundaries
545  if(bytesWritten + blockLen >= sz)
546  {
547 #ifdef _WIN32
548  sz = _setFileSize(sz*2, &log, file, &mapFile);
549 #else
550  sz = _setFileSize(sz*2, &log, file, sz);
551 #endif
552  }
553  write(&log, &bytesWritten, subId, ENTRY_DISCARD,
554  (*entry)->_val);
555  }
556  }
557  }
558  }
559  catch (StoreException& ex)
560  {
561 #ifdef _WIN32
562  UnmapViewOfFile(log);
563  CloseHandle(mapFile);
564  CloseHandle(file);
565 #else
566  ::close(file);
567  ::unlink(tmpFileName_.c_str());
568 #endif
569  std::ostringstream os;
570  os << "Exception during prune: " << ex.what();
571  throw StoreException(os.str());
572  }
573 #ifdef _WIN32
574  BOOL success = FlushViewOfFile(_log,0);
575  success |= UnmapViewOfFile(_log);
576  _log = NULL;
577  success |= CloseHandle(_mapFile);
578  success |= CloseHandle(_file);
579  if (!success)
580  {
581  DWORD err = getErrorNo();
582  std::ostringstream os;
583  os << "Failed to flush, unmap, and close current file "
584  << _fileName
585  << " in prune in MMapBookmarkStore. ";
586  error(os.str(), err);
587  }
588  _mapFile = INVALID_HANDLE_VALUE;
589  _file = INVALID_HANDLE_VALUE;
590  success = FlushViewOfFile(log,0);
591  success |= UnmapViewOfFile(log);
592  log = NULL;
593  success |= CloseHandle(mapFile);
594  success |= CloseHandle(file);
595  if (!success)
596  {
597  DWORD err = getErrorNo();
598  std::ostringstream os;
599  os << "Failed to flush, unmap and close completed temp file "
600  << tmpFileName_
601  << " in prune in MMapBookmarkStore. ";
602  error(os.str(), err);
603  }
604  mapFile = INVALID_HANDLE_VALUE;
605  file = INVALID_HANDLE_VALUE;
606  // Replace current file with pruned file
607  int retryCount = 3;
608  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
609  MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
610  {
611  DWORD err = getErrorNo();
612  if (--retryCount > 0) continue;
613  // Try to set _file to the tmp file that won't move then throw
614  std::string desiredFileName = _fileName;
615  _fileName = tmpFileName_;
616  init();
617  std::ostringstream os;
618  os << "Failed to move completed temp file " << tmpFileName_
619  << " to " << desiredFileName
620  << " in prune in MMapBookmarkStore. Continuing by using "
621  << tmpFileName_ << " as the MMapBookmarkStore file.";
622  error(os.str(), err);
623  }
624  // Call init to set up file again
625  init();
626 #else
627  munmap(_log, _fileSize);
628  _log = NULL;
629  ::close(_file);
630  munmap(log, sz);
631  ::close(file);
632  if (-1 == ::unlink(_fileName.c_str()))
633  {
634  int err = getErrorNo();
635  // Try to set _file to the tmp file that won't move then throw
636  std::string desiredFileName = _fileName;
637  _fileName = tmpFileName_;
638  init();
639  std::ostringstream os;
640  os << "Failed to delete file " << desiredFileName
641  << " after creating temporary file " << tmpFileName_
642  << " in prune in MMapBookmarkStore. Continuing by using "
643  << tmpFileName_ << " as the MMapBookmarkStore file.";
644  error(os.str(), err);
645  }
646  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
647  {
648  int err = getErrorNo();
649  // Try to set _file to the tmp file that won't move then throw
650  std::string desiredFileName = _fileName;
651  _fileName = tmpFileName_;
652  init();
653  std::ostringstream os;
654  os << "Failed to move completed temp file " << tmpFileName_
655  << " to " << desiredFileName
656  << " in prune in MMapBookmarkStore. Continuing by using "
657  << tmpFileName_ << " as the MMapBookmarkStore file.";
658  error(os.str(), err);
659  }
660  // Call init to set up file again
661  init();
662 #endif
663  _logOffset = bytesWritten;
664  }
665 
666  virtual void _persisted(Subscription* subP_,
667  const Message::Field& bookmarkField_)
668  {
669  Lock<Mutex> l(_lock);
670  write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
671  MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
672  }
673 
674  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
675  {
676  Lock<Mutex> l(_lock);
677  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
678  if (!entryPtr || entryPtr->_val.empty())
679  return Message::Field();
680  Message::Field bookmarkField = entryPtr->_val;
681  write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
682  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
683  return bookmarkField;
684  }
685 
686  // Returns true if file exists and is larger than 0 bytes and therefore
687  // should be used for recovery.
688  bool init(bool useLastModifiedTime_ = false)
689  {
690  bool retVal = true;
691 #ifdef _WIN32
692  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
693  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
694  if( _file == INVALID_HANDLE_VALUE )
695  {
696  DWORD err = getErrorNo();
697  std::ostringstream os;
698  os << "Failed to create file " << _fileName << " for MMapBookmarkStore";
699  error(os.str(), err);
700  }
701  LARGE_INTEGER liFileSize;
702  if(GetFileSizeEx(_file, &liFileSize) == 0)
703  {
704  DWORD err = getErrorNo();
705  CloseHandle(_file);
706  std::ostringstream os;
707  os << "Failure getting file size for MMapBookmarkStore " << _fileName;
708  error(os.str(), err);
709  return false;
710  }
711 #ifdef _WIN64
712  size_t fileSize = liFileSize.QuadPart;
713 #else
714  size_t fileSize = liFileSize.LowPart;
715 #endif
716  if (useLastModifiedTime_ && fileSize > 0)
717  {
718  FILETIME ftModifiedTime;
719  if(GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
720  {
721  DWORD err = getErrorNo();
722  CloseHandle(_file);
723  _recovering = false;
724  error("Failure getting file time while trying to recover.", err);
725  return false;
726  }
727  SYSTEMTIME st;
728  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
729  {
730  DWORD err = getErrorNo();
731  CloseHandle(_file);
732  _recovering = false;
733  error("Failure converting file time while trying to recover.", err);
734  return false;
735  }
736  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
737  sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
738  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
739  st.wDay, st.wHour, st.wMinute, st.wSecond);
740  _fileTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
741  }
742  retVal = (fileSize != 0);
743  setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
744  AMPS_INITIAL_LOG_SIZE : fileSize);
745 #else
746  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
747  if(_file == -1)
748  {
749  int err = getErrorNo();
750  std::ostringstream os;
751  os << "Failed to open log file " << _fileName << " for MMapBookmarkStore";
752  error(os.str(), err);
753  }
754  struct stat statBuf;
755  if(fstat(_file, &statBuf) == -1)
756  {
757  int err = getErrorNo();
758  ::close(_file);
759  std::ostringstream os;
760  os << "Failed to stat log file " << _fileName << " for MMapBookmarkStore";
761  error(os.str(), err);
762  return false;
763  }
764  size_t fSize = (size_t)statBuf.st_size;
765  if(fSize == 0)
766  {
767  retVal = false;
768  if(::write(_file, "\0\0\0\0", 4) == -1)
769  {
770  int err = getErrorNo();
771  ::close(_file);
772  std::ostringstream os;
773  os << "Failed to write header to log file " << _fileName
774  << " for MMapBookmarkStore";
775  error(os.str(), err);
776  return false;
777  }
778  }
779  else if (useLastModifiedTime_)
780  {
781  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
782  struct tm timeInfo;
783  gmtime_r(&statBuf.st_mtime, &timeInfo);
784  strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
785  "%Y%m%dT%H%M%S", &timeInfo);
786  _fileTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
787  }
788 
789  setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize-1 : AMPS_INITIAL_LOG_SIZE);
790 #endif
791  return retVal;
792  }
793 
794 #ifdef _WIN32
795  DWORD getErrorNo() const
796  {
797  return GetLastError();
798  }
799 
800  void error(const std::string& message_, DWORD err)
801  {
802  std::ostringstream os;
803  static const DWORD msgSize = 2048;
804  char pMsg[msgSize];
805  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
806  FORMAT_MESSAGE_ARGUMENT_ARRAY,
807  NULL, err, LANG_NEUTRAL,
808  pMsg, msgSize, NULL);
809  os << "File: " << _fileName << ". " << message_ << " with error " << pMsg;
810  throw StoreException(os.str());
811  }
812 #else
813  int getErrorNo() const
814  {
815  return errno;
816  }
817 
818  void error(const std::string& message_, int err)
819  {
820  std::ostringstream os;
821  os << message_ << ". Error is " << strerror(err);
822  throw StoreException(os.str());
823  }
824 #endif
825 #if defined(sparc)
826 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; }
827 #define AMPS_READ8(p, v) { memcpy(&v,p,8); }
828 #else
829 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; }
830 #define AMPS_READ8(p,v) { v = *(const size_t*)p; }
831 #endif
832 
833  // This implementation will use this when logging a bookmark or a persisted
834  void write(const Message::Field& subId_,
835  char type_, const Message::Field& bookmark_)
836  {
837  Lock<Mutex> guard(_fileLock);
838  write(&_log, &_logOffset, subId_, type_, bookmark_);
839  }
840 
841  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
842  char type_, const Message::Field& bookmark_)
843  {
844  if(!_recovering && isWritableBookmark(bookmark_.len()))
845  {
846  size_t len = subId_.len();
847  // Check we'll be in the current boundaries
848  size_t blockLen = len + 2*sizeof(size_t) + bookmark_.len() + 1;
849  if(*logOffsetPtr + blockLen >= _fileSize)
850  {
851  setFileSize(_fileSize*2);
852  }
853  char* offset = *logPtr+*logOffsetPtr;
854  AMPS_WRITE8(offset,len);
855  offset += sizeof(size_t);
856  memcpy(offset, static_cast<const void*>(subId_.data()), len);
857  offset += len;
858  *offset++ = type_;
859  len = bookmark_.len();
860  AMPS_WRITE8(offset,len);
861  offset += sizeof(size_t);
862  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
863  *logOffsetPtr += blockLen;
864  }
865  }
866 
867  // This implementation will only ever use this when discarding a bookmark
868  // Could be used to add a feature where generated bookmarks are logged in
869  // addition to the bookmark field.
870  void write(const Message::Field& subId_, char type_, size_t bookmark_)
871  {
872  Lock<Mutex> guard(_fileLock);
873  write(&_log, &_logOffset, subId_, type_, bookmark_);
874  }
875 
876  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
877  char type_, size_t bookmark_)
878  {
879  if(!_recovering)
880  {
881  size_t len = subId_.len();
882  size_t blockLen = len + 2*sizeof(size_t) + 1;
883  // Check we'll be in the current boundaries
884  if(*logOffsetPtr + blockLen >= _fileSize)
885  {
886  setFileSize(_fileSize*2);
887  }
888  char* offset = *logPtr+*logOffsetPtr;
889  *(reinterpret_cast<size_t*>(offset)) = len;
890  offset += sizeof(size_t);
891  memcpy(offset, static_cast<const void*>(subId_.data()), len);
892  offset += len;
893  *offset++ = type_;
894  *(reinterpret_cast<size_t*>(offset)) = bookmark_;
895  *logOffsetPtr += blockLen;
896  }
897  }
898 
899  void setFileSize(size_t newSize_)
900  {
901  if(_log && newSize_ <= _fileSize) // Improper resize attempt
902  return;
903 #ifdef _WIN32
904  _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
905 #else
906  _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
907 #endif
908  }
909 
910  // Returns new file size, 0 if there is a failure
911  size_t _setFileSize(size_t newSize_, char** log_, FileType file_,
912 #ifdef WIN32
913  HANDLE* mapFile_
914 #else
915  size_t fileSize_
916 #endif
917  )
918  {
919  // Make sure we're using a multiple of page size
920  size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
921  if(sz < newSize_ || sz == 0)
922  {
923  sz += getPageSize();
924  }
925 #ifdef _WIN32
926  if(*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
927  {
928  if(*log_)
929  {
930  FlushViewOfFile(*log_, 0);
931  UnmapViewOfFile(*log_);
932  }
933  CloseHandle(*mapFile_);
934  }
935 #ifdef _WIN64
936  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
937 #else
938  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
939 #endif
940  if(*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
941  {
942  DWORD errNo = getErrorNo();
943  CloseHandle(file_);
944  error("Failed to create map of log file", errNo);
945  *log_ = 0;
946  return 0;
947  }
948  else
949  {
950  *log_ = (char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
951  if(*log_ == NULL)
952  {
953  DWORD errNo = getErrorNo();
954  CloseHandle(*mapFile_);
955  CloseHandle(file_);
956  error("Failed to map log file to memory", errNo);
957  *log_ = 0;
958  return 0;
959  }
960  }
961 #else
962  // Extend the underlying file
963  if(lseek(file_, (off_t)sz, SEEK_SET) == -1)
964  {
965  int err = getErrorNo();
966  ::close(file_);
967  error("Seek failed for bookmark log", err);
968  }
969  if(::write(file_, "", 1) == -1)
970  {
971  int err = getErrorNo();
972  ::close(file_);
973  error("Failed to grow bookmark log", err);
974  }
975  if(*log_)
976  {
977 #if defined(linux)
978  *log_ = static_cast<char*>(mremap(*log_, fileSize_, sz,
979  MREMAP_MAYMOVE));
980 #else
981  munmap(*log_,fileSize_);
982  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
983  MAP_SHARED, file_, 0));
984 #endif
985  }
986  else // New mapping
987  {
988  // New mapping, map the full file size for recovery or else it std size
989  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
990  MAP_SHARED, file_, 0));
991  }
992 
993  if((void*)(*log_) == MAP_FAILED)
994  {
995  int err = getErrorNo();
996  ::close(file_);
997  error("Failed to map log file to memory", err);
998  *log_ = 0;
999  return 0;
1000  }
1001 #endif
1002  return sz;
1003  }
1004 
1005  void recover(bool useLastModifiedTime_ = false,
1006  bool hasAdapter_ = false)
1007  {
1008  Message::Field sub;
1009  Message::Field bookmarkField;
1010  size_t bookmarkLen = 0;
1011  size_t lastGoodOffset = 0;
1012  bool inError = false;
1013  Lock<Mutex> guard(_lock);
1014  Lock<Mutex> fileGuard(_fileLock);
1015  _recovering = true;
1016  // Map of bookmark to sequence number
1017  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1018  typedef std::map<Message::Field, size_t,
1019  Message::Field::FieldHash>::iterator BookmarkMapIter;
1020  // Map of subId to set of recovered bookmarks
1021  typedef std::map<Message::Field, BookmarkMap*,
1022  Message::Field::FieldHash> ReadMap;
1023  typedef std::map<Message::Field, BookmarkMap*,
1024  Message::Field::FieldHash>::iterator ReadMapIter;
1025  ReadMap recovered;
1026  size_t subLen = *(reinterpret_cast<size_t*>(_log));
1027  while(!inError && subLen > 0)
1028  {
1029  // If we recover something, remove anything adapter recovered
1030  if (_logOffset == 0 && hasAdapter_)
1031  MemoryBookmarkStore::__purge();
1032  _logOffset += sizeof(size_t);
1033  sub.assign(_log+_logOffset, subLen);
1034  _logOffset += subLen;
1035  switch(_log[_logOffset++])
1036  {
1037  case -1:
1038  return;
1039  case ENTRY_BOOKMARK:
1040  {
1041  AMPS_READ8((_log+_logOffset),bookmarkLen);
1042  _logOffset += sizeof(size_t);
1043  bookmarkField.assign(_log+_logOffset, bookmarkLen);
1044  _logOffset += bookmarkLen;
1045  Subscription* subP = find(sub);
1046  BookmarkMap* bookmarks = NULL;
1047  ReadMapIter iter = recovered.find(sub);
1048  if (iter == recovered.end())
1049  {
1050  Message::Field subKey;
1051  subKey.deepCopy(sub);
1052  bookmarks = new BookmarkMap();
1053  recovered[subKey] = bookmarks;
1054  }
1055  else
1056  {
1057  bookmarks = iter->second;
1058  }
1059  if (bookmarks->find(bookmarkField) != bookmarks->end())
1060  {
1061  std::for_each(bookmarks->begin(), bookmarks->end(),
1062  _clearBookmark);
1063  bookmarks->clear();
1064  subP->getMostRecent(true);
1065  }
1066  if (BookmarkRange::isRange(bookmarkField))
1067  {
1068  subP->discard(subP->log(bookmarkField));
1069  }
1070  else if (!subP->isDiscarded(bookmarkField))
1071  {
1072  size_t sequence = subP->log(bookmarkField);
1073  Message::Field copy;
1074  copy.deepCopy(bookmarkField);
1075  bookmarks->insert(std::make_pair(copy, sequence));
1076  }
1077  else
1078  {
1079  // We know it's discarded, but there may still be a
1080  // discard entry in the log, so avoid a search.
1081  Message::Field copy;
1082  copy.deepCopy(bookmarkField);
1083  bookmarks->insert(std::make_pair(copy,0));
1084  }
1085  }
1086  break;
1087  case ENTRY_DISCARD:
1088  {
1089  AMPS_READ8((_log+_logOffset),bookmarkLen);
1090  _logOffset += sizeof(size_t);
1091  bookmarkField.assign(_log+_logOffset, bookmarkLen);
1092  _logOffset += bookmarkLen;
1093  size_t sequence = AMPS_UNSET_INDEX;
1094  ReadMapIter iter = recovered.find(sub);
1095  if (iter != recovered.end())
1096  {
1097  BookmarkMap* bookmarks = iter->second;
1098  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1099  if (bookmarkIter != bookmarks->end())
1100  {
1101  sequence = bookmarkIter->second;
1102  Message::Field bookmarkToClear(bookmarkIter->first);
1103  bookmarkToClear.clear();
1104  bookmarks->erase(bookmarkIter);
1105  }
1106  }
1107  if (!BookmarkRange::isRange(bookmarkField))
1108  {
1109  Subscription* subP = find(sub);
1110  if (sequence != AMPS_UNSET_INDEX)
1111  {
1112  // A sequence of 0 means it was already discarded
1113  if (sequence) subP->discard(sequence);
1114  }
1115  else // Shouldn't end up here, but just in case we'll search
1116  {
1117  subP->discard(bookmarkField);
1118  }
1119  }
1120  }
1121  break;
1122  case ENTRY_PERSISTED:
1123  {
1124  AMPS_READ8((_log+_logOffset),bookmarkLen);
1125  _logOffset += sizeof(size_t);
1126  bookmarkField.assign(_log+_logOffset, bookmarkLen);
1127  _logOffset += bookmarkLen;
1128  MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1129  }
1130  break;
1131  default:
1132  if (lastGoodOffset == 0)
1133  {
1134  error("Error while recovering bookmark store file.", getErrorNo());
1135  }
1136  else
1137  {
1138  _logOffset = lastGoodOffset;
1139  inError = true;
1140  }
1141  }
1142  lastGoodOffset = _logOffset;
1143  if (!inError) subLen = *(reinterpret_cast<size_t*>(_log + _logOffset));
1144  }
1145  for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1146  {
1147  if (recovered.count(i->first) && !recovered[i->first]->empty())
1148  {
1149  if (i->second->getMostRecent(false).len() > 1)
1150  {
1151  i->second->justRecovered();
1152  }
1153  else
1154  {
1155  // Unlikely, but we may have recovered only undiscarded
1156  // bookmarks so just restart as a new subscription.
1157  delete i->second;
1158  _subs[i->first] = new Subscription(this, i->first);
1159  }
1160  }
1161  if (useLastModifiedTime_ && _fileTimestamp)
1162  {
1163  _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1164  }
1165  }
1166  if (_fileTimestamp)
1167  {
1168  delete[] _fileTimestamp;
1169  _fileTimestamp = 0;
1170  }
1171  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1172  {
1173  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1174  delete i->second;
1175  Message::Field f = i->first;
1176  f.clear();
1177  }
1178  _recovering = false;
1179  }
1180 
1181  Mutex _fileLock;
1182  std::string _fileName;
1183  size_t _fileSize;
1184  size_t _logOffset;
1185  char* _log;
1186  char* _fileTimestamp;
1187  FileType _file;
1188  // Each entry begins with a single byte indicating the type of entry:
1189  // a new bookmark, or a discard of a previous one.
1190  static size_t getPageSize()
1191  {
1192  static size_t pageSize;
1193  if(pageSize == 0)
1194  {
1195 #ifdef _WIN32
1196  SYSTEM_INFO SYS_INFO;
1197  GetSystemInfo(&SYS_INFO);
1198  pageSize = SYS_INFO.dwPageSize;
1199 #else
1200  pageSize = (size_t)sysconf(_SC_PAGESIZE);
1201 #endif
1202  }
1203  return pageSize;
1204  }
1205 
1206 };
1207 
1208 } // end namespace AMPS
1209 
1210 
1211 #endif // _MMAPBOOKMARKSTORE_H_
1212 
virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:288
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1105
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:446
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:222
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:254
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:311
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MMapBookmarkStore.hpp:240
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1130
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:302
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1307
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:164
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:317
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:201
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:133
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:105
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:105
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1120
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:268