AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
MMapBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MMAPBOOKMARKSTORE_H_
27 #define _MMAPBOOKMARKSTORE_H_
28 
29 #include <MemoryBookmarkStore.hpp>
30 #include <RecoveryPoint.hpp>
31 #include <RecoveryPointAdapter.hpp>
32 #ifdef _WIN32
33  #include <windows.h>
34 #else
35  #include <sys/mman.h>
36  #include <unistd.h>
37 #endif
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #include <set>
42 
43 #define AMPS_INITIAL_LOG_SIZE 40960UL
44 
49 
50 namespace AMPS
51 {
57  {
58  private:
59 #ifdef _WIN32
60  typedef HANDLE FileType;
61  HANDLE _mapFile;
62 #else
63  typedef int FileType;
64 #endif
65  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
66  {
67  Message::Field f(pair.first);
68  f.clear();
69  }
70 
71  public:
82  MMapBookmarkStore(const char* fileName_, bool useLastModifiedTime_ = false)
83  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
84  , _logOffset(0), _log(0), _fileTimestamp(0)
85 #ifdef _WIN32
86  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
87 #else
88  , _file(0)
89 #endif
90  {
91  if (init(useLastModifiedTime_))
92  {
93  recover(useLastModifiedTime_, false);
94  }
95  }
96 
107  MMapBookmarkStore(const std::string& fileName_,
108  bool useLastModifiedTime_ = false)
109  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
110  , _logOffset(0), _log(0), _fileTimestamp(0)
111 #ifdef _WIN32
112  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
113 #else
114  , _file(0)
115 #endif
116  {
117  if (init(useLastModifiedTime_))
118  {
119  recover(useLastModifiedTime_, false);
120  }
121  }
122 
138  const char* fileName_,
139  RecoveryPointFactory factory_ = NULL,
140  bool useLastModifiedTime_ = false)
141  : MemoryBookmarkStore(adapter_, factory_)
142  , _fileName(fileName_), _fileSize(0)
143  , _logOffset(0), _log(0), _fileTimestamp(0)
144 #ifdef _WIN32
145  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
146 #else
147  , _file(0)
148 #endif
149  {
150  if (init(useLastModifiedTime_))
151  {
152  recover(useLastModifiedTime_, true);
153  }
154  }
155 
171  const std::string& fileName_,
172  RecoveryPointFactory factory_ = NULL,
173  bool useLastModifiedTime_ = false)
174  : MemoryBookmarkStore(adapter_, factory_)
175  , _fileName(fileName_), _fileSize(0)
176  , _logOffset(0), _log(0), _fileTimestamp(0)
177 #ifdef _WIN32
178  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
179 #else
180  , _file(0)
181 #endif
182  {
183  if (init(useLastModifiedTime_))
184  {
185  recover(useLastModifiedTime_, true);
186  }
187  }
188 
189  virtual ~MMapBookmarkStore()
190  {
191 #ifdef _WIN32
192  UnmapViewOfFile(_log);
193  CloseHandle(_mapFile);
194  CloseHandle(_file);
195 #else
196  munmap(_log, _fileSize);
197  ::close(_file);
198 #endif
199  // In case _lock gets acquired by reader thread between end of this
200  // destructor and start of base class destructor, prevent write()
201  _recovering = true;
202  }
203 
209  virtual size_t log(Message& message_)
210  {
211  Message::Field bookmark = message_.getBookmark();
212  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
213  Lock<Mutex> guard(_lock);
214  if (!sub)
215  {
216  Message::Field subId = message_.getSubscriptionId();
217  if (subId.empty())
218  {
219  subId = message_.getSubscriptionIds();
220  }
221  sub = find(subId);
222  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
223  }
224  write(sub->id(), ENTRY_BOOKMARK, bookmark);
225  return MemoryBookmarkStore::_log(message_);
226  }
227 
232  virtual void discard(const Message& message_)
233  {
234  Message::Field bookmark = message_.getBookmark();
235  Message::Field subId = message_.getSubscriptionId();
236  if (subId.empty())
237  {
238  subId = message_.getSubscriptionIds();
239  }
240  Lock<Mutex> guard(_lock);
241  write(subId, ENTRY_DISCARD, bookmark);
242  MemoryBookmarkStore::_discard(message_);
243  }
244 
252  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
253  {
254  Lock<Mutex> guard(_lock);
255  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
256  if (!entry || entry->_val.empty())
257  {
258  return;
259  }
260  write(subId_, ENTRY_DISCARD, entry->_val);
261  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
262  }
263 
270  {
271  Lock<Mutex> guard(_lock);
272  return MemoryBookmarkStore::_getMostRecent(subId_);
273  }
274 
283  virtual bool isDiscarded(Message& message_)
284  {
285  Lock<Mutex> l(_lock);
286  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
287  if (retVal)
288  {
289  Message::Field subId = message_.getSubscriptionId();
290  if (subId.empty())
291  {
292  subId = message_.getSubscriptionIds();
293  }
294  write(subId, ENTRY_BOOKMARK, message_.getBookmark());
295  write(subId, ENTRY_DISCARD, message_.getBookmark());
296  }
297  return retVal;
298  }
299 
305  virtual void purge()
306  {
307  Lock<Mutex> guard(_lock);
308  Lock<Mutex> fileGuard(_fileLock);
309  memset(_log, 0, _logOffset);
310  _logOffset = 0;
311  MemoryBookmarkStore::_purge();
312  }
313 
319  virtual void purge(const Message::Field& subId_)
320  {
321  Lock<Mutex> guard(_lock);
322  Lock<Mutex> fileGuard(_fileLock);
323  MemoryBookmarkStore::_purge(subId_);
324  std::string tmpFileName = _fileName + ".tmp";
325  __prune(tmpFileName);
326  }
327 
328  void setServerVersion(const VersionInfo& version_)
329  {
330  Lock<Mutex> guard(_lock);
332  }
333 
334  void setServerVersion(size_t version_)
335  {
336  Lock<Mutex> guard(_lock);
338  }
339 
340  // Yes, the argument is a non-const copy of what is passed in
341  void _prune(const std::string& tmpFileName_)
342  {
343  Lock<Mutex> guard(_lock);
344  Lock<Mutex> fileGuard(_fileLock);
345  // If nothing's changed with most recent, don't rewrite the file
346  if (!_recentChanged)
347  {
348  return;
349  }
350  if (tmpFileName_.empty())
351  {
352  __prune(_fileName + ".tmp");
353  }
354  else
355  {
356  __prune(tmpFileName_);
357  }
358  _recentChanged = false;
359  }
360 
361  private:
362  void __prune(const std::string& tmpFileName_)
363  {
364  size_t sz = AMPS_INITIAL_LOG_SIZE;
365  FileType file;
366  char* log = NULL;
367  size_t bytesWritten = 0;
368 #ifdef _WIN32
369  file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
370  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
371  if ( file == INVALID_HANDLE_VALUE )
372  {
373  DWORD err = getErrorNo();
374  std::ostringstream os;
375  os << "Failed to create temp store file " << tmpFileName_ <<
376  " to prune MMapBookmarkStore " << _fileName;
377  error(os.str(), err);
378  }
379  HANDLE mapFile = NULL;
380  try
381  {
382  sz = _setFileSize(sz, &log, file, &mapFile);
383  }
384  catch (StoreException& ex)
385  {
386  if (mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
387  {
388  CloseHandle(file);
389  std::ostringstream os;
390  os << "Failed to create map of temp file " << tmpFileName_
391  << " while resizing it to prune MMapBookmarkStore " << _fileName
392  << ": " << ex.what();
393  throw StoreException(os.str());
394  return;
395  }
396  if (log == NULL)
397  {
398  CloseHandle(mapFile);
399  CloseHandle(file);
400  std::ostringstream os;
401  os << "Failed to map temp file " << tmpFileName_
402  << " to memory while resizing it to prune MMapBookmarkStore "
403  << _fileName << ": " << ex.what();
404  throw StoreException(os.str());
405  return;
406  }
407  }
408  if (sz == 0)
409  {
410  DWORD err = getErrorNo();
411  UnmapViewOfFile(log);
412  CloseHandle(mapFile);
413  CloseHandle(file);
414  std::ostringstream os;
415  os << "Failed to grow tmp file " << tmpFileName_
416  << " to prune MMapBookmarkStore " << _fileName;
417  error(os.str(), err);
418  }
419 #else
420  file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
421  if (file == -1)
422  {
423  int err = getErrorNo();
424  std::ostringstream os;
425  os << "Failed to create temp store file " << tmpFileName_ <<
426  " to prune MMapBookmarkStore " << _fileName;
427  error(os.str(), err);
428  return;
429  }
430  if (::write(file, "\0\0\0\0", 4) == -1)
431  {
432  int err = getErrorNo();
433  std::ostringstream os;
434  os << "Failed to write header to temp file " << tmpFileName_
435  << " to prune MMapBookmarkStore " << _fileName;
436  error(os.str(), err);
437  return;
438  }
439  try
440  {
441  sz = _setFileSize(sz, &log, file, 0);
442  }
443  catch (StoreException& ex)
444  {
445  std::ostringstream os;
446  os << "Failed to grow tmp file " << tmpFileName_
447  << " to prune MMapBookmarkStore " << _fileName << ex.what();
448  throw StoreException(os.str());
449  }
450  if (sz == 0)
451  {
452  int err = getErrorNo();
453  log = NULL;
454  ::close(file);
455  std::ostringstream os;
456  os << "Failed to grow tmp file " << tmpFileName_
457  << " to prune MMapBookmarkStore " << _fileName;
458  error(os.str(), err);
459  }
460 #endif
461  try
462  {
463  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
464  {
465  Message::Field subId = i->first;
466  assert(!subId.empty());
467  size_t subIdLen = subId.len();
468  Subscription* mapSubPtr = i->second;
469  const BookmarkRange& range = mapSubPtr->getRange();
470  if (range.isValid())
471  {
472  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
473  write(&log, &bytesWritten, subId, ENTRY_DISCARD, range);
474  }
475  Message::Field recent = mapSubPtr->getMostRecent(false);
476  amps_uint64_t recentPub, recentSeq;
477  Subscription::parseBookmark(recent, recentPub, recentSeq);
478  Subscription::PublisherMap publishersDiscarded =
479  mapSubPtr->_publishers;
480  MemoryBookmarkStore::EntryPtrList recovered;
481  mapSubPtr->getRecoveryEntries(recovered);
482  mapSubPtr->setPublishersToDiscarded(&recovered,
483  &publishersDiscarded);
484  char tmpBookmarkBuffer[128];
485  for (Subscription::PublisherIterator pub =
486  publishersDiscarded.begin(),
487  e = publishersDiscarded.end();
488  pub != e; ++pub)
489  {
490  // Don't log EPOCH if it got in the map
491  if (pub->first == 0 || pub->second == 0)
492  {
493  continue;
494  }
495  // Don't log the most recent yet
496  if (pub->first == recentPub)
497  {
498  continue;
499  }
500  int written = AMPS_snprintf_amps_uint64_t(
501  tmpBookmarkBuffer,
502  sizeof(tmpBookmarkBuffer),
503  pub->first);
504  *(tmpBookmarkBuffer + written++) = '|';
505  written += AMPS_snprintf_amps_uint64_t(
506  tmpBookmarkBuffer + written,
507  sizeof(tmpBookmarkBuffer)
508  - (size_t)written,
509  pub->second);
510  *(tmpBookmarkBuffer + written++) = '|';
511  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
512  // Check we'll be in the current boundaries
513  size_t blockLen = subIdLen + 2 * sizeof(size_t) + tmpBookmark.len() + 1;
514  if (bytesWritten + blockLen + blockLen >= sz)
515  {
516 #ifdef _WIN32
517  sz = _setFileSize(sz * 2, &log, file, &mapFile);
518 #else
519  sz = _setFileSize(sz * 2, &log, file, sz);
520 #endif
521  }
522  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
523  write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
524  }
525  if (isWritableBookmark(recent.len()))
526  {
527  // Check we'll be in the current boundaries
528  size_t blockLen = subIdLen + 2 * sizeof(size_t) + recent.len() + 1;
529  if (bytesWritten + blockLen + blockLen >= sz)
530  {
531 #ifdef _WIN32
532  sz = _setFileSize(sz * 2, &log, file, &mapFile);
533 #else
534  sz = _setFileSize(sz * 2, &log, file, sz);
535 #endif
536  }
537  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
538  write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
539  }
540  else // set up _recentList
541  {
542  mapSubPtr->getMostRecentList();
543  }
544  Message::Field bookmark = mapSubPtr->getLastPersisted();
545  if (isWritableBookmark(bookmark.len()))
546  {
547  // Check we'll be in the current boundaries
548  size_t blockLen = subIdLen + 2 * sizeof(size_t) +
549  bookmark.len() + 1;
550  if (bytesWritten + blockLen >= sz)
551  {
552 #ifdef _WIN32
553  sz = _setFileSize(sz * 2, &log, file, &mapFile);
554 #else
555  sz = _setFileSize(sz * 2, &log, file, sz);
556 #endif
557  }
558  write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
559  mapSubPtr->getLastPersisted());
560  }
561  mapSubPtr->getActiveEntries(recovered);
562  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
563  recovered.begin();
564  entry != recovered.end(); ++entry)
565  {
566  if ((*entry)->_val.empty() ||
567  !isWritableBookmark((*entry)->_val.len()))
568  {
569  continue;
570  }
571  // Check we'll be in the current boundaries
572  size_t blockLen = subIdLen + 2 * sizeof(size_t) +
573  (*entry)->_val.len() + 1;
574  if (bytesWritten + blockLen >= sz)
575  {
576 #ifdef _WIN32
577  sz = _setFileSize(sz * 2, &log, file, &mapFile);
578 #else
579  sz = _setFileSize(sz * 2, &log, file, sz);
580 #endif
581  }
582  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
583  (*entry)->_val);
584  if (!(*entry)->_active)
585  {
586  // Check we'll be in the current boundaries
587  if (bytesWritten + blockLen >= sz)
588  {
589 #ifdef _WIN32
590  sz = _setFileSize(sz * 2, &log, file, &mapFile);
591 #else
592  sz = _setFileSize(sz * 2, &log, file, sz);
593 #endif
594  }
595  write(&log, &bytesWritten, subId, ENTRY_DISCARD,
596  (*entry)->_val);
597  }
598  }
599  }
600  }
601  catch (StoreException& ex)
602  {
603 #ifdef _WIN32
604  UnmapViewOfFile(log);
605  CloseHandle(mapFile);
606  CloseHandle(file);
607 #else
608  ::close(file);
609  ::unlink(tmpFileName_.c_str());
610 #endif
611  std::ostringstream os;
612  os << "Exception during prune: " << ex.what();
613  throw StoreException(os.str());
614  }
615 #ifdef _WIN32
616  BOOL success = FlushViewOfFile(_log, 0);
617  success |= UnmapViewOfFile(_log);
618  _log = NULL;
619  success |= CloseHandle(_mapFile);
620  success |= CloseHandle(_file);
621  if (!success)
622  {
623  DWORD err = getErrorNo();
624  std::ostringstream os;
625  os << "Failed to flush, unmap, and close current file "
626  << _fileName
627  << " in prune in MMapBookmarkStore. ";
628  error(os.str(), err);
629  }
630  _mapFile = INVALID_HANDLE_VALUE;
631  _file = INVALID_HANDLE_VALUE;
632  success = FlushViewOfFile(log, 0);
633  success |= UnmapViewOfFile(log);
634  log = NULL;
635  success |= CloseHandle(mapFile);
636  success |= CloseHandle(file);
637  if (!success)
638  {
639  DWORD err = getErrorNo();
640  std::ostringstream os;
641  os << "Failed to flush, unmap and close completed temp file "
642  << tmpFileName_
643  << " in prune in MMapBookmarkStore. ";
644  error(os.str(), err);
645  }
646  mapFile = INVALID_HANDLE_VALUE;
647  file = INVALID_HANDLE_VALUE;
648  // Replace current file with pruned file
649  int retryCount = 3;
650  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
651  MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
652  {
653  DWORD err = getErrorNo();
654  if (--retryCount > 0)
655  {
656  continue;
657  }
658  // Try to set _file to the tmp file that won't move then throw
659  std::string desiredFileName = _fileName;
660  _fileName = tmpFileName_;
661  init();
662  std::ostringstream os;
663  os << "Failed to move completed temp file " << tmpFileName_
664  << " to " << desiredFileName
665  << " in prune in MMapBookmarkStore. Continuing by using "
666  << tmpFileName_ << " as the MMapBookmarkStore file.";
667  error(os.str(), err);
668  }
669  // Call init to set up file again
670  init();
671 #else
672  munmap(_log, _fileSize);
673  _log = NULL;
674  ::close(_file);
675  munmap(log, sz);
676  ::close(file);
677  if (-1 == ::unlink(_fileName.c_str()))
678  {
679  int err = getErrorNo();
680  // Try to set _file to the tmp file that won't move then throw
681  std::string desiredFileName = _fileName;
682  _fileName = tmpFileName_;
683  init();
684  std::ostringstream os;
685  os << "Failed to delete file " << desiredFileName
686  << " after creating temporary file " << tmpFileName_
687  << " in prune in MMapBookmarkStore. Continuing by using "
688  << tmpFileName_ << " as the MMapBookmarkStore file.";
689  error(os.str(), err);
690  }
691  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
692  {
693  int err = getErrorNo();
694  // Try to set _file to the tmp file that won't move then throw
695  std::string desiredFileName = _fileName;
696  _fileName = tmpFileName_;
697  init();
698  std::ostringstream os;
699  os << "Failed to move completed temp file " << tmpFileName_
700  << " to " << desiredFileName
701  << " in prune in MMapBookmarkStore. Continuing by using "
702  << tmpFileName_ << " as the MMapBookmarkStore file.";
703  error(os.str(), err);
704  }
705  // Call init to set up file again
706  init();
707 #endif
708  _logOffset = bytesWritten;
709  }
710 
711  virtual void _persisted(Subscription* subP_,
712  const Message::Field& bookmarkField_)
713  {
714  Lock<Mutex> l(_lock);
715  write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
716  MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
717  }
718 
719  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
720  {
721  Lock<Mutex> l(_lock);
722  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
723  if (!entryPtr || entryPtr->_val.empty())
724  {
725  return Message::Field();
726  }
727  Message::Field bookmarkField = entryPtr->_val;
728  write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
729  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
730  return bookmarkField;
731  }
732 
733  // Returns true if file exists and is larger than 0 bytes and therefore
734  // should be used for recovery.
735  bool init(bool useLastModifiedTime_ = false)
736  {
737  bool retVal = true;
738 #ifdef _WIN32
739  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
740  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
741  if ( _file == INVALID_HANDLE_VALUE )
742  {
743  DWORD err = getErrorNo();
744  std::ostringstream os;
745  os << "Failed to initialize file " << _fileName << " for MMapBookmarkStore";
746  error(os.str(), err);
747  }
748  LARGE_INTEGER liFileSize;
749  if (GetFileSizeEx(_file, &liFileSize) == 0)
750  {
751  DWORD err = getErrorNo();
752  CloseHandle(_file);
753  std::ostringstream os;
754  os << "Failure getting initial file size for MMapBookmarkStore " << _fileName;
755  error(os.str(), err);
756  return false;
757  }
758 #ifdef _WIN64
759  size_t fileSize = liFileSize.QuadPart;
760 #else
761  size_t fileSize = liFileSize.LowPart;
762 #endif
763  if (useLastModifiedTime_ && fileSize > 0)
764  {
765  FILETIME ftModifiedTime;
766  if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
767  {
768  DWORD err = getErrorNo();
769  CloseHandle(_file);
770  _recovering = false;
771  error("Failure getting file time while trying to recover.", err);
772  return false;
773  }
774  SYSTEMTIME st;
775  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
776  {
777  DWORD err = getErrorNo();
778  CloseHandle(_file);
779  _recovering = false;
780  error("Failure converting file time while trying to recover.", err);
781  return false;
782  }
783  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
784  sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
785  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
786  st.wDay, st.wHour, st.wMinute, st.wSecond);
787  _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
788  }
789  retVal = (fileSize != 0);
790  setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
791  AMPS_INITIAL_LOG_SIZE : fileSize);
792 #else
793  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
794  if (_file == -1)
795  {
796  int err = getErrorNo();
797  std::ostringstream os;
798  os << "Failed to initialize log file " << _fileName << " for MMapBookmarkStore";
799  error(os.str(), err);
800  }
801  struct stat statBuf;
802  if (fstat(_file, &statBuf) == -1)
803  {
804  int err = getErrorNo();
805  ::close(_file);
806  std::ostringstream os;
807  os << "Failed to stat log file " << _fileName << " for MMapBookmarkStore";
808  error(os.str(), err);
809  return false;
810  }
811  size_t fSize = (size_t)statBuf.st_size;
812  if (fSize == 0)
813  {
814  retVal = false;
815  if (::write(_file, "\0\0\0\0", 4) == -1)
816  {
817  int err = getErrorNo();
818  ::close(_file);
819  std::ostringstream os;
820  os << "Failed to write header to log file " << _fileName
821  << " for MMapBookmarkStore";
822  error(os.str(), err);
823  return false;
824  }
825  }
826  else if (useLastModifiedTime_)
827  {
828  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
829  struct tm timeInfo;
830  gmtime_r(&statBuf.st_mtime, &timeInfo);
831  strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
832  "%Y%m%dT%H%M%S", &timeInfo);
833  _fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
834  }
835 
836  setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize - 1 : AMPS_INITIAL_LOG_SIZE);
837 #endif
838  return retVal;
839  }
840 
841 #ifdef _WIN32
842  DWORD getErrorNo() const
843  {
844  return GetLastError();
845  }
846 
847  void error(const std::string& message_, DWORD err)
848  {
849  std::ostringstream os;
850  static const DWORD msgSize = 2048;
851  char pMsg[msgSize];
852  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
853  FORMAT_MESSAGE_ARGUMENT_ARRAY,
854  NULL, err, LANG_NEUTRAL,
855  pMsg, msgSize, NULL);
856  os << "File: " << _fileName << ". " << message_ << " with error " << pMsg;
857  throw StoreException(os.str());
858  }
859 #else
860  int getErrorNo() const
861  {
862  return errno;
863  }
864 
865  void error(const std::string& message_, int err)
866  {
867  std::ostringstream os;
868  os << message_ << ". Error is " << strerror(err);
869  throw StoreException(os.str());
870  }
871 #endif
872 #if defined(sparc)
873 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; }
874 #define AMPS_READ8(p, v) { memcpy(&v,p,8); }
875 #else
876 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; }
877 #define AMPS_READ8(p,v) { v = *(const size_t*)p; }
878 #endif
879 
880  // This implementation will use this when logging a bookmark or a persisted
881  void write(const Message::Field& subId_,
882  char type_, const Message::Field& bookmark_)
883  {
884  Lock<Mutex> guard(_fileLock);
885  write(&_log, &_logOffset, subId_, type_, bookmark_);
886  }
887 
888  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
889  char type_, const Message::Field& bookmark_)
890  {
891  if (!_recovering && isWritableBookmark(bookmark_.len()))
892  {
893  size_t len = subId_.len();
894  // Check we'll be in the current boundaries
895  size_t blockLen = len + 2 * sizeof(size_t) + bookmark_.len() + 1;
896  if (*logOffsetPtr + blockLen >= _fileSize)
897  {
898  setFileSize(_fileSize * 2);
899  }
900  char* offset = *logPtr + *logOffsetPtr;
901  AMPS_WRITE8(offset, len);
902  offset += sizeof(size_t);
903  memcpy(offset, static_cast<const void*>(subId_.data()), len);
904  offset += len;
905  *offset++ = type_;
906  len = bookmark_.len();
907  AMPS_WRITE8(offset, len);
908  offset += sizeof(size_t);
909  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
910  *logOffsetPtr += blockLen;
911  }
912  }
913 
914  // This implementation will only ever use this when discarding a bookmark
915  // Could be used to add a feature where generated bookmarks are logged in
916  // addition to the bookmark field.
917  void write(const Message::Field& subId_, char type_, size_t bookmark_)
918  {
919  Lock<Mutex> guard(_fileLock);
920  write(&_log, &_logOffset, subId_, type_, bookmark_);
921  }
922 
923  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
924  char type_, size_t bookmark_)
925  {
926  if (!_recovering)
927  {
928  size_t len = subId_.len();
929  size_t blockLen = len + 2 * sizeof(size_t) + 1;
930  // Check we'll be in the current boundaries
931  if (*logOffsetPtr + blockLen >= _fileSize)
932  {
933  setFileSize(_fileSize * 2);
934  }
935  char* offset = *logPtr + *logOffsetPtr;
936  *(reinterpret_cast<size_t*>(offset)) = len;
937  offset += sizeof(size_t);
938  memcpy(offset, static_cast<const void*>(subId_.data()), len);
939  offset += len;
940  *offset++ = type_;
941  *(reinterpret_cast<size_t*>(offset)) = bookmark_;
942  *logOffsetPtr += blockLen;
943  }
944  }
945 
946  void setFileSize(size_t newSize_)
947  {
948  if (_log && newSize_ <= _fileSize) // Improper resize attempt
949  {
950  return;
951  }
952 #ifdef _WIN32
953  _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
954 #else
955  _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
956 #endif
957  }
958 
959  // Returns new file size, 0 if there is a failure
960  size_t _setFileSize(size_t newSize_, char** log_, FileType file_,
961 #ifdef WIN32
962  HANDLE* mapFile_
963 #else
964  size_t fileSize_
965 #endif
966  )
967  {
968  // Make sure we're using a multiple of page size
969  size_t sz = newSize_ & (size_t)(~(getPageSize() - 1));
970  if (sz < newSize_ || sz == 0)
971  {
972  sz += getPageSize();
973  }
974 #ifdef _WIN32
975  if (*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
976  {
977  if (*log_)
978  {
979  FlushViewOfFile(*log_, 0);
980  UnmapViewOfFile(*log_);
981  }
982  CloseHandle(*mapFile_);
983  }
984 #ifdef _WIN64
985  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
986 #else
987  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
988 #endif
989  if (*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
990  {
991  DWORD errNo = getErrorNo();
992  CloseHandle(file_);
993  std::ostringstream os;
994  os << "Failed to create map of MMapBookmarkStore file " << _fileName
995  << " during resize.";
996  error(os.str(), errNo);
997  *log_ = 0;
998  return 0;
999  }
1000  else
1001  {
1002  *log_ = (char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
1003  if (*log_ == NULL)
1004  {
1005  DWORD errNo = getErrorNo();
1006  CloseHandle(*mapFile_);
1007  CloseHandle(file_);
1008  std::ostringstream os;
1009  os << "Failed to map MMapBookmarkStore file " << _fileName
1010  << " to memory during resize.";
1011  error(os.str(), errNo);
1012  *log_ = 0;
1013  return 0;
1014  }
1015  }
1016 #else
1017  // Extend the underlying file
1018  if (lseek(file_, (off_t)sz, SEEK_SET) == -1)
1019  {
1020  int err = getErrorNo();
1021  ::close(file_);
1022  std::ostringstream os;
1023  os << "Failed to seek in MMapBookmarkStore file " << _fileName
1024  << " during resize.";
1025  error(os.str(), err);
1026  }
1027  if (::write(file_, "", 1) == -1)
1028  {
1029  int err = getErrorNo();
1030  ::close(file_);
1031  std::ostringstream os;
1032  os << "Failed to grow MMapBookmarkStore file " << _fileName
1033  << " during resize.";
1034  error(os.str(), err);
1035  }
1036  if (*log_)
1037  {
1038 #if defined(linux)
1039  *log_ = static_cast<char*>(mremap(*log_, fileSize_, sz,
1040  MREMAP_MAYMOVE));
1041 #else
1042  munmap(*log_, fileSize_);
1043  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
1044  MAP_SHARED, file_, 0));
1045 #endif
1046  }
1047  else // New mapping
1048  {
1049  // New mapping, map the full file size for recovery or else it std size
1050  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
1051  MAP_SHARED, file_, 0));
1052  }
1053 
1054  if ((void*)(*log_) == MAP_FAILED)
1055  {
1056  int err = getErrorNo();
1057  ::close(file_);
1058  *log_ = 0;
1059  std::ostringstream os;
1060  os << "Failed to map MMapBookmarkStore file " << _fileName
1061  << " to memory during resize.";
1062  error(os.str(), err);
1063  return 0;
1064  }
1065 #endif
1066  return sz;
1067  }
1068 
1069  void recover(bool useLastModifiedTime_ = false,
1070  bool hasAdapter_ = false)
1071  {
1072  Message::Field sub;
1073  Message::Field bookmarkField;
1074  size_t bookmarkLen = 0;
1075  size_t lastGoodOffset = 0;
1076  bool inError = false;
1077  Lock<Mutex> guard(_lock);
1078  Lock<Mutex> fileGuard(_fileLock);
1079  _recovering = true;
1080  // Map of bookmark to sequence number
1081  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1082  typedef std::map<Message::Field, size_t,
1083  Message::Field::FieldHash>::iterator BookmarkMapIter;
1084  // Map of subId to set of recovered bookmarks
1085  typedef std::map<Message::Field, BookmarkMap*,
1086  Message::Field::FieldHash> ReadMap;
1087  typedef std::map<Message::Field, BookmarkMap*,
1088  Message::Field::FieldHash>::iterator ReadMapIter;
1089  ReadMap recovered;
1090  size_t subLen = *(reinterpret_cast<size_t*>(_log));
1091  while (!inError && subLen > 0)
1092  {
1093  // If we recover something, remove anything adapter recovered
1094  if (_logOffset == 0 && hasAdapter_)
1095  {
1096  MemoryBookmarkStore::__purge();
1097  }
1098  _logOffset += sizeof(size_t);
1099  sub.assign(_log + _logOffset, subLen);
1100  _logOffset += subLen;
1101  switch (_log[_logOffset++])
1102  {
1103  case -1:
1104  return;
1105  case ENTRY_BOOKMARK:
1106  {
1107  AMPS_READ8((_log + _logOffset), bookmarkLen);
1108  _logOffset += sizeof(size_t);
1109  bookmarkField.assign(_log + _logOffset, bookmarkLen);
1110  _logOffset += bookmarkLen;
1111  Subscription* subP = find(sub);
1112  BookmarkMap* bookmarks = NULL;
1113  ReadMapIter iter = recovered.find(sub);
1114  if (iter == recovered.end())
1115  {
1116  Message::Field subKey;
1117  subKey.deepCopy(sub);
1118  bookmarks = new BookmarkMap();
1119  recovered[subKey] = bookmarks;
1120  }
1121  else
1122  {
1123  bookmarks = iter->second;
1124  }
1125  if (bookmarks->find(bookmarkField) != bookmarks->end())
1126  {
1127  std::for_each(bookmarks->begin(), bookmarks->end(),
1128  _clearBookmark);
1129  bookmarks->clear();
1130  subP->getMostRecent(true);
1131  }
1132  if (BookmarkRange::isRange(bookmarkField))
1133  {
1134  subP->discard(subP->log(bookmarkField));
1135  }
1136  else if (!subP->isDiscarded(bookmarkField))
1137  {
1138  size_t sequence = subP->log(bookmarkField);
1139  Message::Field copy;
1140  copy.deepCopy(bookmarkField);
1141  bookmarks->insert(std::make_pair(copy, sequence));
1142  }
1143  else
1144  {
1145  // We know it's discarded, but there may still be a
1146  // discard entry in the log, so avoid a search.
1147  Message::Field copy;
1148  copy.deepCopy(bookmarkField);
1149  bookmarks->insert(std::make_pair(copy, 0));
1150  }
1151  }
1152  break;
1153  case ENTRY_DISCARD:
1154  {
1155  AMPS_READ8((_log + _logOffset), bookmarkLen);
1156  _logOffset += sizeof(size_t);
1157  bookmarkField.assign(_log + _logOffset, bookmarkLen);
1158  _logOffset += bookmarkLen;
1159  size_t sequence = AMPS_UNSET_INDEX;
1160  ReadMapIter iter = recovered.find(sub);
1161  if (iter != recovered.end())
1162  {
1163  BookmarkMap* bookmarks = iter->second;
1164  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1165  if (bookmarkIter != bookmarks->end())
1166  {
1167  sequence = bookmarkIter->second;
1168  Message::Field bookmarkToClear(bookmarkIter->first);
1169  bookmarkToClear.clear();
1170  bookmarks->erase(bookmarkIter);
1171  }
1172  }
1173  if (!BookmarkRange::isRange(bookmarkField))
1174  {
1175  Subscription* subP = find(sub);
1176  if (sequence != AMPS_UNSET_INDEX)
1177  {
1178  // A sequence of 0 means it was already discarded
1179  if (sequence)
1180  {
1181  subP->discard(sequence);
1182  }
1183  }
1184  else // Shouldn't end up here, but just in case we'll search
1185  {
1186  subP->discard(bookmarkField);
1187  }
1188  }
1189  }
1190  break;
1191  case ENTRY_PERSISTED:
1192  {
1193  AMPS_READ8((_log + _logOffset), bookmarkLen);
1194  _logOffset += sizeof(size_t);
1195  bookmarkField.assign(_log + _logOffset, bookmarkLen);
1196  _logOffset += bookmarkLen;
1197  MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1198  }
1199  break;
1200  default:
1201  if (lastGoodOffset == 0)
1202  {
1203  error("Error while recovering MMapBookmarkStore file.", getErrorNo());
1204  }
1205  else
1206  {
1207  _logOffset = lastGoodOffset;
1208  inError = true;
1209  }
1210  }
1211  lastGoodOffset = _logOffset;
1212  if (!inError)
1213  {
1214  subLen = *(reinterpret_cast<size_t*>(_log + _logOffset));
1215  }
1216  }
1217  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1218  {
1219  if (recovered.count(i->first) && !recovered[i->first]->empty())
1220  {
1221  if (i->second->getMostRecent(false).len() > 1)
1222  {
1223  i->second->justRecovered();
1224  }
1225  else
1226  {
1227  // Unlikely, but we may have recovered only undiscarded
1228  // bookmarks so just restart as a new subscription.
1229  delete i->second;
1230  _subs[i->first] = new Subscription(this, i->first);
1231  }
1232  }
1233  if (useLastModifiedTime_ && _fileTimestamp)
1234  {
1235  _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1236  }
1237  }
1238  if (_fileTimestamp)
1239  {
1240  delete[] _fileTimestamp;
1241  _fileTimestamp = 0;
1242  }
1243  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1244  {
1245  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1246  delete i->second;
1247  Message::Field f = i->first;
1248  f.clear();
1249  }
1250  _recovering = false;
1251  }
1252 
1253  Mutex _fileLock;
1254  std::string _fileName;
1255  size_t _fileSize;
1256  size_t _logOffset;
1257  char* _log;
1258  char* _fileTimestamp;
1259  FileType _file;
1260  // Each entry begins with a single byte indicating the type of entry:
1261  // a new bookmark, or a discard of a previous one.
1262  static size_t getPageSize()
1263  {
1264  static size_t pageSize;
1265  if (pageSize == 0)
1266  {
1267 #ifdef _WIN32
1268  SYSTEM_INFO SYS_INFO;
1269  GetSystemInfo(&SYS_INFO);
1270  pageSize = SYS_INFO.dwPageSize;
1271 #else
1272  pageSize = (size_t)sysconf(_SC_PAGESIZE);
1273 #endif
1274  }
1275  return pageSize;
1276  }
1277 
1278  };
1279 
1280 } // end namespace AMPS
1281 
1282 
1283 #endif // _MMAPBOOKMARKSTORE_H_
1284 
virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:305
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:232
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:269
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:328
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MMapBookmarkStore.hpp:252
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1374
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:319
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1346
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:170
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:334
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:209
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:137
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
Definition: ampsplusplus.hpp:103
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:107
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:283