AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.0
MMapBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MMAPBOOKMARKSTORE_H_
27 #define _MMAPBOOKMARKSTORE_H_
28 
29 #include <MemoryBookmarkStore.hpp>
30 #include <RecoveryPoint.hpp>
31 #include <RecoveryPointAdapter.hpp>
32 #ifdef _WIN32
33 #include <windows.h>
34 #else
35 #include <sys/mman.h>
36 #include <unistd.h>
37 #endif
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #include <set>
42 
43 #define AMPS_INITIAL_LOG_SIZE 40960UL
44 
49 
50 namespace AMPS
51 {
57 {
58 private:
59 #ifdef _WIN32
60  typedef HANDLE FileType;
61  HANDLE _mapFile;
62 #else
63  typedef int FileType;
64 #endif
65  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
66  {
67  Message::Field f(pair.first);
68  f.clear();
69  }
70 
71 public:
82  MMapBookmarkStore(const char* fileName_, bool useLastModifiedTime_ = false)
83  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
84  , _logOffset(0) , _log(0), _fileTimestamp(0)
85 #ifdef _WIN32
86  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
87 #else
88  , _file(0)
89 #endif
90  {
91  if (init(useLastModifiedTime_))
92  recover(useLastModifiedTime_, false);
93  }
94 
105  MMapBookmarkStore(const std::string& fileName_,
106  bool useLastModifiedTime_ = false)
107  : MemoryBookmarkStore(), _fileName(fileName_), _fileSize(0)
108  , _logOffset(0) , _log(0), _fileTimestamp(0)
109 #ifdef _WIN32
110  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
111 #else
112  , _file(0)
113 #endif
114  {
115  if (init(useLastModifiedTime_))
116  recover(useLastModifiedTime_, false);
117  }
118 
134  const char* fileName_,
135  RecoveryPointFactory factory_ = NULL,
136  bool useLastModifiedTime_ = false)
137  : MemoryBookmarkStore(adapter_, factory_)
138  , _fileName(fileName_), _fileSize(0)
139  , _logOffset(0) , _log(0), _fileTimestamp(0)
140 #ifdef _WIN32
141  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
142 #else
143  , _file(0)
144 #endif
145  {
146  if (init(useLastModifiedTime_))
147  recover(useLastModifiedTime_, true);
148  }
149 
165  const std::string& fileName_,
166  RecoveryPointFactory factory_ = NULL,
167  bool useLastModifiedTime_ = false)
168  : MemoryBookmarkStore(adapter_, factory_)
169  , _fileName(fileName_), _fileSize(0)
170  , _logOffset(0) , _log(0), _fileTimestamp(0)
171 #ifdef _WIN32
172  , _file(INVALID_HANDLE_VALUE), _mapFile(INVALID_HANDLE_VALUE)
173 #else
174  , _file(0)
175 #endif
176  {
177  if (init(useLastModifiedTime_))
178  recover(useLastModifiedTime_, true);
179  }
180 
181  virtual ~MMapBookmarkStore()
182  {
183 #ifdef _WIN32
184  UnmapViewOfFile(_log);
185  CloseHandle(_mapFile);
186  CloseHandle(_file);
187 #else
188  munmap(_log, _fileSize);
189  ::close(_file);
190 #endif
191  // In case _lock gets acquired by reader thread between end of this
192  // destructor and start of base class destructor, prevent write()
193  _recovering = true;
194  }
195 
201  virtual size_t log(Message& message_)
202  {
203  Message::Field bookmark = message_.getBookmark();
204  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
205  Lock<Mutex> guard(_lock);
206  if (!sub)
207  {
208  Message::Field subId = message_.getSubscriptionId();
209  if (subId.empty())
210  subId = message_.getSubscriptionIds();
211  sub = find(subId);
212  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
213  }
214  write(sub->id(), ENTRY_BOOKMARK, bookmark);
215  return MemoryBookmarkStore::_log(message_);
216  }
217 
222  virtual void discard(const Message& message_)
223  {
224  Message::Field bookmark = message_.getBookmark();
225  Message::Field subId = message_.getSubscriptionId();
226  if (subId.empty())
227  subId = message_.getSubscriptionIds();
228  Lock<Mutex> guard(_lock);
229  write(subId, ENTRY_DISCARD, bookmark);
230  MemoryBookmarkStore::_discard(message_);
231  }
232 
240  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
241  {
242  Lock<Mutex> guard(_lock);
243  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
244  if (!entry || entry->_val.empty()) return;
245  write(subId_, ENTRY_DISCARD, entry->_val);
246  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
247  }
248 
255  {
256  Lock<Mutex> guard(_lock);
257  return MemoryBookmarkStore::_getMostRecent(subId_);
258  }
259 
268  virtual bool isDiscarded(Message& message_)
269  {
270  Lock<Mutex> l(_lock);
271  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
272  if (retVal)
273  {
274  Message::Field subId = message_.getSubscriptionId();
275  if (subId.empty())
276  subId = message_.getSubscriptionIds();
277  write(subId, ENTRY_BOOKMARK, message_.getBookmark());
278  write(subId, ENTRY_DISCARD, message_.getBookmark());
279  }
280  return retVal;
281  }
282 
288  virtual void purge()
289  {
290  Lock<Mutex> guard(_lock);
291  Lock<Mutex> fileGuard(_fileLock);
292  memset(_log, 0, _logOffset);
293  _logOffset = 0;
294  MemoryBookmarkStore::_purge();
295  }
296 
302  virtual void purge(const Message::Field& subId_)
303  {
304  Lock<Mutex> guard(_lock);
305  Lock<Mutex> fileGuard(_fileLock);
306  MemoryBookmarkStore::_purge(subId_);
307  std::string tmpFileName = _fileName + ".tmp";
308  __prune(tmpFileName);
309  }
310 
311  void setServerVersion(const VersionInfo& version_)
312  {
313  Lock<Mutex> guard(_lock);
315  }
316 
317  void setServerVersion(size_t version_)
318  {
319  Lock<Mutex> guard(_lock);
321  }
322 
323  // Yes, the argument is a non-const copy of what is passed in
324  void _prune(const std::string& tmpFileName_)
325  {
326  Lock<Mutex> guard(_lock);
327  Lock<Mutex> fileGuard(_fileLock);
328  // If nothing's changed with most recent, don't rewrite the file
329  if (!_recentChanged)
330  {
331  return;
332  }
333  if (tmpFileName_.empty())
334  {
335  __prune(_fileName + ".tmp");
336  }
337  else
338  {
339  __prune(tmpFileName_);
340  }
341  _recentChanged = false;
342  }
343 
344 private:
345  void __prune(const std::string& tmpFileName_)
346  {
347  size_t sz = AMPS_INITIAL_LOG_SIZE;
348  FileType file;
349  char* log = NULL;
350  size_t bytesWritten = 0;
351 #ifdef _WIN32
352  file = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
353  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
354  if( file == INVALID_HANDLE_VALUE )
355  {
356  DWORD err = getErrorNo();
357  std::ostringstream os;
358  os << "Failed to create temp store file " << tmpFileName_ <<
359  " to prune MMapBookmarkStore " << _fileName;
360  error(os.str(), err);
361  }
362  HANDLE mapFile = NULL;
363  try
364  {
365  sz = _setFileSize(sz, &log, file, &mapFile);
366  }
367  catch (StoreException& ex)
368  {
369  if(mapFile == NULL || mapFile == INVALID_HANDLE_VALUE)
370  {
371  CloseHandle(file);
372  std::ostringstream os;
373  os << "Failed to create map of temp file " << tmpFileName_
374  << " while resizing it to prune MMapBookmarkStore " << _fileName
375  << ": " << ex.what();
376  throw StoreException(os.str());
377  return;
378  }
379  if(log == NULL)
380  {
381  CloseHandle(mapFile);
382  CloseHandle(file);
383  std::ostringstream os;
384  os << "Failed to map temp file " << tmpFileName_
385  << " to memory while resizing it to prune MMapBookmarkStore "
386  << _fileName << ": " << ex.what();
387  throw StoreException(os.str());
388  return;
389  }
390  }
391  if (sz == 0)
392  {
393  DWORD err = getErrorNo();
394  UnmapViewOfFile(log);
395  CloseHandle(mapFile);
396  CloseHandle(file);
397  std::ostringstream os;
398  os << "Failed to grow tmp file " << tmpFileName_
399  << " to prune MMapBookmarkStore " << _fileName;
400  error(os.str(), err);
401  }
402 #else
403  file = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
404  if(file == -1)
405  {
406  int err = getErrorNo();
407  std::ostringstream os;
408  os << "Failed to create temp store file " << tmpFileName_ <<
409  " to prune MMapBookmarkStore " << _fileName;
410  error(os.str(), err);
411  return;
412  }
413  if(::write(file, "\0\0\0\0", 4) == -1)
414  {
415  int err = getErrorNo();
416  std::ostringstream os;
417  os << "Failed to write header to temp file " << tmpFileName_
418  << " to prune MMapBookmarkStore " << _fileName;
419  error(os.str(), err);
420  return;
421  }
422  try
423  {
424  sz = _setFileSize(sz, &log, file, 0);
425  }
426  catch (StoreException& ex)
427  {
428  std::ostringstream os;
429  os << "Failed to grow tmp file " << tmpFileName_
430  << " to prune MMapBookmarkStore " << _fileName << ex.what();
431  throw StoreException(os.str());
432  }
433  if (sz == 0)
434  {
435  int err = getErrorNo();
436  log = NULL;
437  ::close(file);
438  std::ostringstream os;
439  os << "Failed to grow tmp file " << tmpFileName_
440  << " to prune MMapBookmarkStore " << _fileName;
441  error(os.str(), err);
442  }
443 #endif
444  try
445  {
446  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
447  {
448  Message::Field subId = i->first;
449  assert(!subId.empty());
450  size_t subIdLen = subId.len();
451  Subscription* mapSubPtr = i->second;
452  const BookmarkRange& range = mapSubPtr->getRange();
453  if (range.isValid())
454  {
455  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, range);
456  write(&log, &bytesWritten, subId, ENTRY_DISCARD, range);
457  }
458  Message::Field recent = mapSubPtr->getMostRecent(false);
459  amps_uint64_t recentPub, recentSeq;
460  Subscription::parseBookmark(recent, recentPub, recentSeq);
461  Subscription::PublisherMap publishersDiscarded =
462  mapSubPtr->_publishers;
463  MemoryBookmarkStore::EntryPtrList recovered;
464  mapSubPtr->getRecoveryEntries(recovered);
465  mapSubPtr->setPublishersToDiscarded(&recovered,
466  &publishersDiscarded);
467  char tmpBookmarkBuffer[128];
468  for (Subscription::PublisherIterator pub =
469  publishersDiscarded.begin(),
470  e = publishersDiscarded.end();
471  pub != e; ++pub)
472  {
473  // Don't log EPOCH if it got in the map
474  if (pub->first == 0 || pub->second == 0) continue;
475  // Don't log the most recent yet
476  if (pub->first == recentPub) continue;
477  int written = AMPS_snprintf_amps_uint64_t(
478  tmpBookmarkBuffer,
479  sizeof(tmpBookmarkBuffer),
480  pub->first);
481  *(tmpBookmarkBuffer+written++) = '|';
482  written += AMPS_snprintf_amps_uint64_t(
483  tmpBookmarkBuffer+written,
484  sizeof(tmpBookmarkBuffer)
485  - (size_t)written,
486  pub->second);
487  *(tmpBookmarkBuffer+written++) = '|';
488  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
489  // Check we'll be in the current boundaries
490  size_t blockLen = subIdLen + 2*sizeof(size_t) + tmpBookmark.len() + 1;
491  if(bytesWritten + blockLen + blockLen >= sz)
492  {
493 #ifdef _WIN32
494  sz = _setFileSize(sz*2, &log, file, &mapFile);
495 #else
496  sz = _setFileSize(sz*2, &log, file, sz);
497 #endif
498  }
499  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, tmpBookmark);
500  write(&log, &bytesWritten, subId, ENTRY_DISCARD, tmpBookmark);
501  }
502  if (isWritableBookmark(recent.len()))
503  {
504  // Check we'll be in the current boundaries
505  size_t blockLen = subIdLen + 2*sizeof(size_t) + recent.len() + 1;
506  if(bytesWritten + blockLen + blockLen >= sz)
507  {
508 #ifdef _WIN32
509  sz = _setFileSize(sz*2, &log, file, &mapFile);
510 #else
511  sz = _setFileSize(sz*2, &log, file, sz);
512 #endif
513  }
514  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK, recent);
515  write(&log, &bytesWritten, subId, ENTRY_DISCARD, recent);
516  }
517  else // set up _recentList
518  {
519  mapSubPtr->getMostRecentList();
520  }
521  Message::Field bookmark = mapSubPtr->getLastPersisted();
522  if (isWritableBookmark(bookmark.len()))
523  {
524  // Check we'll be in the current boundaries
525  size_t blockLen = subIdLen + 2*sizeof(size_t) +
526  bookmark.len() + 1;
527  if(bytesWritten + blockLen >= sz)
528  {
529 #ifdef _WIN32
530  sz = _setFileSize(sz*2, &log, file, &mapFile);
531 #else
532  sz = _setFileSize(sz*2, &log, file, sz);
533 #endif
534  }
535  write(&log, &bytesWritten, subId, ENTRY_PERSISTED,
536  mapSubPtr->getLastPersisted());
537  }
538  mapSubPtr->getActiveEntries(recovered);
539  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
540  recovered.begin();
541  entry != recovered.end(); ++entry)
542  {
543  if ((*entry)->_val.empty() ||
544  !isWritableBookmark((*entry)->_val.len()))
545  continue;
546  // Check we'll be in the current boundaries
547  size_t blockLen = subIdLen + 2*sizeof(size_t) +
548  (*entry)->_val.len() + 1;
549  if(bytesWritten + blockLen >= sz)
550  {
551 #ifdef _WIN32
552  sz = _setFileSize(sz*2, &log, file, &mapFile);
553 #else
554  sz = _setFileSize(sz*2, &log, file, sz);
555 #endif
556  }
557  write(&log, &bytesWritten, subId, ENTRY_BOOKMARK,
558  (*entry)->_val);
559  if (!(*entry)->_active)
560  {
561  // Check we'll be in the current boundaries
562  if(bytesWritten + blockLen >= sz)
563  {
564 #ifdef _WIN32
565  sz = _setFileSize(sz*2, &log, file, &mapFile);
566 #else
567  sz = _setFileSize(sz*2, &log, file, sz);
568 #endif
569  }
570  write(&log, &bytesWritten, subId, ENTRY_DISCARD,
571  (*entry)->_val);
572  }
573  }
574  }
575  }
576  catch (StoreException& ex)
577  {
578 #ifdef _WIN32
579  UnmapViewOfFile(log);
580  CloseHandle(mapFile);
581  CloseHandle(file);
582 #else
583  ::close(file);
584  ::unlink(tmpFileName_.c_str());
585 #endif
586  std::ostringstream os;
587  os << "Exception during prune: " << ex.what();
588  throw StoreException(os.str());
589  }
590 #ifdef _WIN32
591  BOOL success = FlushViewOfFile(_log,0);
592  success |= UnmapViewOfFile(_log);
593  _log = NULL;
594  success |= CloseHandle(_mapFile);
595  success |= CloseHandle(_file);
596  if (!success)
597  {
598  DWORD err = getErrorNo();
599  std::ostringstream os;
600  os << "Failed to flush, unmap, and close current file "
601  << _fileName
602  << " in prune in MMapBookmarkStore. ";
603  error(os.str(), err);
604  }
605  _mapFile = INVALID_HANDLE_VALUE;
606  _file = INVALID_HANDLE_VALUE;
607  success = FlushViewOfFile(log,0);
608  success |= UnmapViewOfFile(log);
609  log = NULL;
610  success |= CloseHandle(mapFile);
611  success |= CloseHandle(file);
612  if (!success)
613  {
614  DWORD err = getErrorNo();
615  std::ostringstream os;
616  os << "Failed to flush, unmap and close completed temp file "
617  << tmpFileName_
618  << " in prune in MMapBookmarkStore. ";
619  error(os.str(), err);
620  }
621  mapFile = INVALID_HANDLE_VALUE;
622  file = INVALID_HANDLE_VALUE;
623  // Replace current file with pruned file
624  int retryCount = 3;
625  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
626  MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
627  {
628  DWORD err = getErrorNo();
629  if (--retryCount > 0) continue;
630  // Try to set _file to the tmp file that won't move then throw
631  std::string desiredFileName = _fileName;
632  _fileName = tmpFileName_;
633  init();
634  std::ostringstream os;
635  os << "Failed to move completed temp file " << tmpFileName_
636  << " to " << desiredFileName
637  << " in prune in MMapBookmarkStore. Continuing by using "
638  << tmpFileName_ << " as the MMapBookmarkStore file.";
639  error(os.str(), err);
640  }
641  // Call init to set up file again
642  init();
643 #else
644  munmap(_log, _fileSize);
645  _log = NULL;
646  ::close(_file);
647  munmap(log, sz);
648  ::close(file);
649  if (-1 == ::unlink(_fileName.c_str()))
650  {
651  int err = getErrorNo();
652  // Try to set _file to the tmp file that won't move then throw
653  std::string desiredFileName = _fileName;
654  _fileName = tmpFileName_;
655  init();
656  std::ostringstream os;
657  os << "Failed to delete file " << desiredFileName
658  << " after creating temporary file " << tmpFileName_
659  << " in prune in MMapBookmarkStore. Continuing by using "
660  << tmpFileName_ << " as the MMapBookmarkStore file.";
661  error(os.str(), err);
662  }
663  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
664  {
665  int err = getErrorNo();
666  // Try to set _file to the tmp file that won't move then throw
667  std::string desiredFileName = _fileName;
668  _fileName = tmpFileName_;
669  init();
670  std::ostringstream os;
671  os << "Failed to move completed temp file " << tmpFileName_
672  << " to " << desiredFileName
673  << " in prune in MMapBookmarkStore. Continuing by using "
674  << tmpFileName_ << " as the MMapBookmarkStore file.";
675  error(os.str(), err);
676  }
677  // Call init to set up file again
678  init();
679 #endif
680  _logOffset = bytesWritten;
681  }
682 
683  virtual void _persisted(Subscription* subP_,
684  const Message::Field& bookmarkField_)
685  {
686  Lock<Mutex> l(_lock);
687  write(subP_->id(), ENTRY_PERSISTED, bookmarkField_);
688  MemoryBookmarkStore::_persisted(subP_, bookmarkField_);
689  }
690 
691  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
692  {
693  Lock<Mutex> l(_lock);
694  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
695  if (!entryPtr || entryPtr->_val.empty())
696  return Message::Field();
697  Message::Field bookmarkField = entryPtr->_val;
698  write(subP_->id(), ENTRY_PERSISTED, bookmarkField);
699  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
700  return bookmarkField;
701  }
702 
703  // Returns true if file exists and is larger than 0 bytes and therefore
704  // should be used for recovery.
705  bool init(bool useLastModifiedTime_ = false)
706  {
707  bool retVal = true;
708 #ifdef _WIN32
709  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
710  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
711  if( _file == INVALID_HANDLE_VALUE )
712  {
713  DWORD err = getErrorNo();
714  std::ostringstream os;
715  os << "Failed to initialize file " << _fileName << " for MMapBookmarkStore";
716  error(os.str(), err);
717  }
718  LARGE_INTEGER liFileSize;
719  if(GetFileSizeEx(_file, &liFileSize) == 0)
720  {
721  DWORD err = getErrorNo();
722  CloseHandle(_file);
723  std::ostringstream os;
724  os << "Failure getting initial file size for MMapBookmarkStore " << _fileName;
725  error(os.str(), err);
726  return false;
727  }
728 #ifdef _WIN64
729  size_t fileSize = liFileSize.QuadPart;
730 #else
731  size_t fileSize = liFileSize.LowPart;
732 #endif
733  if (useLastModifiedTime_ && fileSize > 0)
734  {
735  FILETIME ftModifiedTime;
736  if(GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
737  {
738  DWORD err = getErrorNo();
739  CloseHandle(_file);
740  _recovering = false;
741  error("Failure getting file time while trying to recover.", err);
742  return false;
743  }
744  SYSTEMTIME st;
745  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
746  {
747  DWORD err = getErrorNo();
748  CloseHandle(_file);
749  _recovering = false;
750  error("Failure converting file time while trying to recover.", err);
751  return false;
752  }
753  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
754  sprintf_s(_fileTimestamp, AMPS_TIMESTAMP_LEN,
755  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
756  st.wDay, st.wHour, st.wMinute, st.wSecond);
757  _fileTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
758  }
759  retVal = (fileSize != 0);
760  setFileSize( AMPS_INITIAL_LOG_SIZE > fileSize ?
761  AMPS_INITIAL_LOG_SIZE : fileSize);
762 #else
763  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
764  if(_file == -1)
765  {
766  int err = getErrorNo();
767  std::ostringstream os;
768  os << "Failed to initilize log file " << _fileName << " for MMapBookmarkStore";
769  error(os.str(), err);
770  }
771  struct stat statBuf;
772  if(fstat(_file, &statBuf) == -1)
773  {
774  int err = getErrorNo();
775  ::close(_file);
776  std::ostringstream os;
777  os << "Failed to stat log file " << _fileName << " for MMapBookmarkStore";
778  error(os.str(), err);
779  return false;
780  }
781  size_t fSize = (size_t)statBuf.st_size;
782  if(fSize == 0)
783  {
784  retVal = false;
785  if(::write(_file, "\0\0\0\0", 4) == -1)
786  {
787  int err = getErrorNo();
788  ::close(_file);
789  std::ostringstream os;
790  os << "Failed to write header to log file " << _fileName
791  << " for MMapBookmarkStore";
792  error(os.str(), err);
793  return false;
794  }
795  }
796  else if (useLastModifiedTime_)
797  {
798  _fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
799  struct tm timeInfo;
800  gmtime_r(&statBuf.st_mtime, &timeInfo);
801  strftime(_fileTimestamp, AMPS_TIMESTAMP_LEN,
802  "%Y%m%dT%H%M%S", &timeInfo);
803  _fileTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
804  }
805 
806  setFileSize((fSize > AMPS_INITIAL_LOG_SIZE) ? fSize-1 : AMPS_INITIAL_LOG_SIZE);
807 #endif
808  return retVal;
809  }
810 
811 #ifdef _WIN32
812  DWORD getErrorNo() const
813  {
814  return GetLastError();
815  }
816 
817  void error(const std::string& message_, DWORD err)
818  {
819  std::ostringstream os;
820  static const DWORD msgSize = 2048;
821  char pMsg[msgSize];
822  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
823  FORMAT_MESSAGE_ARGUMENT_ARRAY,
824  NULL, err, LANG_NEUTRAL,
825  pMsg, msgSize, NULL);
826  os << "File: " << _fileName << ". " << message_ << " with error " << pMsg;
827  throw StoreException(os.str());
828  }
829 #else
830  int getErrorNo() const
831  {
832  return errno;
833  }
834 
835  void error(const std::string& message_, int err)
836  {
837  std::ostringstream os;
838  os << message_ << ". Error is " << strerror(err);
839  throw StoreException(os.str());
840  }
841 #endif
842 #if defined(sparc)
843 #define AMPS_WRITE8(p,v) { p[0] = (v>>56)&0xFF; p[1] = (v>>48)&0xFF; p[2] = (v>>40)&0xFF; p[3] = (v>>32)&0xFF; p[4] = (v>>24)&0xFF; p[5] = (v>>16)&0xFF; p[6] = (v>>8)&0xFF; p[7]=v&0xFF; }
844 #define AMPS_READ8(p, v) { memcpy(&v,p,8); }
845 #else
846 #define AMPS_WRITE8(p,v) { *(size_t*)p = (size_t)v; }
847 #define AMPS_READ8(p,v) { v = *(const size_t*)p; }
848 #endif
849 
850  // This implementation will use this when logging a bookmark or a persisted
851  void write(const Message::Field& subId_,
852  char type_, const Message::Field& bookmark_)
853  {
854  Lock<Mutex> guard(_fileLock);
855  write(&_log, &_logOffset, subId_, type_, bookmark_);
856  }
857 
858  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
859  char type_, const Message::Field& bookmark_)
860  {
861  if(!_recovering && isWritableBookmark(bookmark_.len()))
862  {
863  size_t len = subId_.len();
864  // Check we'll be in the current boundaries
865  size_t blockLen = len + 2*sizeof(size_t) + bookmark_.len() + 1;
866  if(*logOffsetPtr + blockLen >= _fileSize)
867  {
868  setFileSize(_fileSize*2);
869  }
870  char* offset = *logPtr+*logOffsetPtr;
871  AMPS_WRITE8(offset,len);
872  offset += sizeof(size_t);
873  memcpy(offset, static_cast<const void*>(subId_.data()), len);
874  offset += len;
875  *offset++ = type_;
876  len = bookmark_.len();
877  AMPS_WRITE8(offset,len);
878  offset += sizeof(size_t);
879  memcpy(offset, static_cast<const void*>(bookmark_.data()), len);
880  *logOffsetPtr += blockLen;
881  }
882  }
883 
884  // This implementation will only ever use this when discarding a bookmark
885  // Could be used to add a feature where generated bookmarks are logged in
886  // addition to the bookmark field.
887  void write(const Message::Field& subId_, char type_, size_t bookmark_)
888  {
889  Lock<Mutex> guard(_fileLock);
890  write(&_log, &_logOffset, subId_, type_, bookmark_);
891  }
892 
893  void write(char** logPtr, size_t* logOffsetPtr, const Message::Field& subId_,
894  char type_, size_t bookmark_)
895  {
896  if(!_recovering)
897  {
898  size_t len = subId_.len();
899  size_t blockLen = len + 2*sizeof(size_t) + 1;
900  // Check we'll be in the current boundaries
901  if(*logOffsetPtr + blockLen >= _fileSize)
902  {
903  setFileSize(_fileSize*2);
904  }
905  char* offset = *logPtr+*logOffsetPtr;
906  *(reinterpret_cast<size_t*>(offset)) = len;
907  offset += sizeof(size_t);
908  memcpy(offset, static_cast<const void*>(subId_.data()), len);
909  offset += len;
910  *offset++ = type_;
911  *(reinterpret_cast<size_t*>(offset)) = bookmark_;
912  *logOffsetPtr += blockLen;
913  }
914  }
915 
916  void setFileSize(size_t newSize_)
917  {
918  if(_log && newSize_ <= _fileSize) // Improper resize attempt
919  return;
920 #ifdef _WIN32
921  _fileSize = _setFileSize(newSize_, &_log, _file, &_mapFile);
922 #else
923  _fileSize = _setFileSize(newSize_, &_log, _file, _fileSize);
924 #endif
925  }
926 
927  // Returns new file size, 0 if there is a failure
928  size_t _setFileSize(size_t newSize_, char** log_, FileType file_,
929 #ifdef WIN32
930  HANDLE* mapFile_
931 #else
932  size_t fileSize_
933 #endif
934  )
935  {
936  // Make sure we're using a multiple of page size
937  size_t sz = newSize_ & (size_t)(~(getPageSize()-1));
938  if(sz < newSize_ || sz == 0)
939  {
940  sz += getPageSize();
941  }
942 #ifdef _WIN32
943  if(*mapFile_ && *mapFile_ != INVALID_HANDLE_VALUE)
944  {
945  if(*log_)
946  {
947  FlushViewOfFile(*log_, 0);
948  UnmapViewOfFile(*log_);
949  }
950  CloseHandle(*mapFile_);
951  }
952 #ifdef _WIN64
953  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, (DWORD)((sz >> 32) & 0xffffffff), (DWORD)sz, NULL);
954 #else
955  *mapFile_ = CreateFileMapping( file_, NULL, PAGE_READWRITE, 0, (DWORD)sz, NULL);
956 #endif
957  if(*mapFile_ == NULL || *mapFile_ == INVALID_HANDLE_VALUE)
958  {
959  DWORD errNo = getErrorNo();
960  CloseHandle(file_);
961  std::ostringstream os;
962  os << "Failed to create map of MMapBookmarkStore file " << _fileName
963  << " during resize.";
964  error(os.str(), errNo);
965  *log_ = 0;
966  return 0;
967  }
968  else
969  {
970  *log_ = (char*)MapViewOfFile(*mapFile_, FILE_MAP_ALL_ACCESS, 0, 0, sz);
971  if(*log_ == NULL)
972  {
973  DWORD errNo = getErrorNo();
974  CloseHandle(*mapFile_);
975  CloseHandle(file_);
976  std::ostringstream os;
977  os << "Failed to map MMapBookmarkStore file " << _fileName
978  << " to memory during resize.";
979  error(os.str(), errNo);
980  *log_ = 0;
981  return 0;
982  }
983  }
984 #else
985  // Extend the underlying file
986  if(lseek(file_, (off_t)sz, SEEK_SET) == -1)
987  {
988  int err = getErrorNo();
989  ::close(file_);
990  std::ostringstream os;
991  os << "Failed to seek in MMapBookmarkStore file " << _fileName
992  << " during resize.";
993  error(os.str(), err);
994  }
995  if(::write(file_, "", 1) == -1)
996  {
997  int err = getErrorNo();
998  ::close(file_);
999  std::ostringstream os;
1000  os << "Failed to grow MMapBookmarkStore file " << _fileName
1001  << " during resize.";
1002  error(os.str(), err);
1003  }
1004  if(*log_)
1005  {
1006 #if defined(linux)
1007  *log_ = static_cast<char*>(mremap(*log_, fileSize_, sz,
1008  MREMAP_MAYMOVE));
1009 #else
1010  munmap(*log_,fileSize_);
1011  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
1012  MAP_SHARED, file_, 0));
1013 #endif
1014  }
1015  else // New mapping
1016  {
1017  // New mapping, map the full file size for recovery or else it std size
1018  *log_ = static_cast<char*>(mmap(0, sz, PROT_READ | PROT_WRITE,
1019  MAP_SHARED, file_, 0));
1020  }
1021 
1022  if((void*)(*log_) == MAP_FAILED)
1023  {
1024  int err = getErrorNo();
1025  ::close(file_);
1026  *log_ = 0;
1027  std::ostringstream os;
1028  os << "Failed to map MMapBookmarkStore file " << _fileName
1029  << " to memory during resize.";
1030  error(os.str(), err);
1031  return 0;
1032  }
1033 #endif
1034  return sz;
1035  }
1036 
1037  void recover(bool useLastModifiedTime_ = false,
1038  bool hasAdapter_ = false)
1039  {
1040  Message::Field sub;
1041  Message::Field bookmarkField;
1042  size_t bookmarkLen = 0;
1043  size_t lastGoodOffset = 0;
1044  bool inError = false;
1045  Lock<Mutex> guard(_lock);
1046  Lock<Mutex> fileGuard(_fileLock);
1047  _recovering = true;
1048  // Map of bookmark to sequence number
1049  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> BookmarkMap;
1050  typedef std::map<Message::Field, size_t,
1051  Message::Field::FieldHash>::iterator BookmarkMapIter;
1052  // Map of subId to set of recovered bookmarks
1053  typedef std::map<Message::Field, BookmarkMap*,
1054  Message::Field::FieldHash> ReadMap;
1055  typedef std::map<Message::Field, BookmarkMap*,
1056  Message::Field::FieldHash>::iterator ReadMapIter;
1057  ReadMap recovered;
1058  size_t subLen = *(reinterpret_cast<size_t*>(_log));
1059  while(!inError && subLen > 0)
1060  {
1061  // If we recover something, remove anything adapter recovered
1062  if (_logOffset == 0 && hasAdapter_)
1063  MemoryBookmarkStore::__purge();
1064  _logOffset += sizeof(size_t);
1065  sub.assign(_log+_logOffset, subLen);
1066  _logOffset += subLen;
1067  switch(_log[_logOffset++])
1068  {
1069  case -1:
1070  return;
1071  case ENTRY_BOOKMARK:
1072  {
1073  AMPS_READ8((_log+_logOffset),bookmarkLen);
1074  _logOffset += sizeof(size_t);
1075  bookmarkField.assign(_log+_logOffset, bookmarkLen);
1076  _logOffset += bookmarkLen;
1077  Subscription* subP = find(sub);
1078  BookmarkMap* bookmarks = NULL;
1079  ReadMapIter iter = recovered.find(sub);
1080  if (iter == recovered.end())
1081  {
1082  Message::Field subKey;
1083  subKey.deepCopy(sub);
1084  bookmarks = new BookmarkMap();
1085  recovered[subKey] = bookmarks;
1086  }
1087  else
1088  {
1089  bookmarks = iter->second;
1090  }
1091  if (bookmarks->find(bookmarkField) != bookmarks->end())
1092  {
1093  std::for_each(bookmarks->begin(), bookmarks->end(),
1094  _clearBookmark);
1095  bookmarks->clear();
1096  subP->getMostRecent(true);
1097  }
1098  if (BookmarkRange::isRange(bookmarkField))
1099  {
1100  subP->discard(subP->log(bookmarkField));
1101  }
1102  else if (!subP->isDiscarded(bookmarkField))
1103  {
1104  size_t sequence = subP->log(bookmarkField);
1105  Message::Field copy;
1106  copy.deepCopy(bookmarkField);
1107  bookmarks->insert(std::make_pair(copy, sequence));
1108  }
1109  else
1110  {
1111  // We know it's discarded, but there may still be a
1112  // discard entry in the log, so avoid a search.
1113  Message::Field copy;
1114  copy.deepCopy(bookmarkField);
1115  bookmarks->insert(std::make_pair(copy,0));
1116  }
1117  }
1118  break;
1119  case ENTRY_DISCARD:
1120  {
1121  AMPS_READ8((_log+_logOffset),bookmarkLen);
1122  _logOffset += sizeof(size_t);
1123  bookmarkField.assign(_log+_logOffset, bookmarkLen);
1124  _logOffset += bookmarkLen;
1125  size_t sequence = AMPS_UNSET_INDEX;
1126  ReadMapIter iter = recovered.find(sub);
1127  if (iter != recovered.end())
1128  {
1129  BookmarkMap* bookmarks = iter->second;
1130  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1131  if (bookmarkIter != bookmarks->end())
1132  {
1133  sequence = bookmarkIter->second;
1134  Message::Field bookmarkToClear(bookmarkIter->first);
1135  bookmarkToClear.clear();
1136  bookmarks->erase(bookmarkIter);
1137  }
1138  }
1139  if (!BookmarkRange::isRange(bookmarkField))
1140  {
1141  Subscription* subP = find(sub);
1142  if (sequence != AMPS_UNSET_INDEX)
1143  {
1144  // A sequence of 0 means it was already discarded
1145  if (sequence) subP->discard(sequence);
1146  }
1147  else // Shouldn't end up here, but just in case we'll search
1148  {
1149  subP->discard(bookmarkField);
1150  }
1151  }
1152  }
1153  break;
1154  case ENTRY_PERSISTED:
1155  {
1156  AMPS_READ8((_log+_logOffset),bookmarkLen);
1157  _logOffset += sizeof(size_t);
1158  bookmarkField.assign(_log+_logOffset, bookmarkLen);
1159  _logOffset += bookmarkLen;
1160  MemoryBookmarkStore::_persisted(find(sub), bookmarkField);
1161  }
1162  break;
1163  default:
1164  if (lastGoodOffset == 0)
1165  {
1166  error("Error while recovering MMapBookmarkStore file.", getErrorNo());
1167  }
1168  else
1169  {
1170  _logOffset = lastGoodOffset;
1171  inError = true;
1172  }
1173  }
1174  lastGoodOffset = _logOffset;
1175  if (!inError) subLen = *(reinterpret_cast<size_t*>(_log + _logOffset));
1176  }
1177  for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1178  {
1179  if (recovered.count(i->first) && !recovered[i->first]->empty())
1180  {
1181  if (i->second->getMostRecent(false).len() > 1)
1182  {
1183  i->second->justRecovered();
1184  }
1185  else
1186  {
1187  // Unlikely, but we may have recovered only undiscarded
1188  // bookmarks so just restart as a new subscription.
1189  delete i->second;
1190  _subs[i->first] = new Subscription(this, i->first);
1191  }
1192  }
1193  if (useLastModifiedTime_ && _fileTimestamp)
1194  {
1195  _subs[i->first]->setRecoveryTimestamp(_fileTimestamp);
1196  }
1197  }
1198  if (_fileTimestamp)
1199  {
1200  delete[] _fileTimestamp;
1201  _fileTimestamp = 0;
1202  }
1203  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1204  {
1205  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1206  delete i->second;
1207  Message::Field f = i->first;
1208  f.clear();
1209  }
1210  _recovering = false;
1211  }
1212 
1213  Mutex _fileLock;
1214  std::string _fileName;
1215  size_t _fileSize;
1216  size_t _logOffset;
1217  char* _log;
1218  char* _fileTimestamp;
1219  FileType _file;
1220  // Each entry begins with a single byte indicating the type of entry:
1221  // a new bookmark, or a discard of a previous one.
1222  static size_t getPageSize()
1223  {
1224  static size_t pageSize;
1225  if(pageSize == 0)
1226  {
1227 #ifdef _WIN32
1228  SYSTEM_INFO SYS_INFO;
1229  GetSystemInfo(&SYS_INFO);
1230  pageSize = SYS_INFO.dwPageSize;
1231 #else
1232  pageSize = (size_t)sysconf(_SC_PAGESIZE);
1233 #endif
1234  }
1235  return pageSize;
1236  }
1237 
1238 };
1239 
1240 } // end namespace AMPS
1241 
1242 
1243 #endif // _MMAPBOOKMARKSTORE_H_
1244 
virtual void purge()
Called to purge the contents of this store.
Definition: MMapBookmarkStore.hpp:288
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1107
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
A BookmarkStoreImpl implementation that uses a memory mapped file for storage of the bookmarks...
Definition: MMapBookmarkStore.hpp:56
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: MMapBookmarkStore.hpp:222
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MMapBookmarkStore.hpp:254
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:311
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MMapBookmarkStore.hpp:240
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1132
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
MMapBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:82
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MMapBookmarkStore.hpp:302
Provides AMPS::RecoveryPointAdapter, an iterface for impplementing external storage of bookmark subsc...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1308
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:164
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MMapBookmarkStore.hpp:317
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MMapBookmarkStore.hpp:201
MMapBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:133
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:103
MMapBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Create an MMapBookmarkStore that uses fileName_ as its file storage.
Definition: MMapBookmarkStore.hpp:105
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1122
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MMapBookmarkStore.hpp:268