AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
LoggedBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #include <ampsplusplus.hpp>
27 
28 #ifndef _LOGGEDBOOKMARKSTORE_H_
29 #define _LOGGEDBOOKMARKSTORE_H_
30 
31 #include <MemoryBookmarkStore.hpp>
32 #include <RecoveryPoint.hpp>
33 #include <RecoveryPointAdapter.hpp>
34 #include <string>
35 #ifdef _WIN32
36  #include <windows.h>
37 #else
38  #include <sys/mman.h>
39  #include <unistd.h>
40  #include <sys/uio.h>
41 #endif
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <map>
46 #include <set>
47 
48 #if defined(sun)
49  typedef char* amps_iovec_base_ptr;
50 #else
51  typedef void* amps_iovec_base_ptr;
52 #endif
53 
58 
59 namespace AMPS
60 {
65  {
66  private:
67  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
68  {
69  Message::Field f(pair.first);
70  f.clear();
71  }
72 #ifdef _WIN32
73  typedef HANDLE FileType;
74 #else
75  typedef int FileType;
76 #endif
77  public:
88  LoggedBookmarkStore(const char* fileName_,
89  bool useLastModifiedTime_ = false)
91 #ifdef _WIN32
92  , _file(INVALID_HANDLE_VALUE)
93 #else
94  , _file(0)
95 #endif
96  , _fileName(fileName_)
97  {
98  init();
99  recover(useLastModifiedTime_, false);
100  }
101 
109  LoggedBookmarkStore(const std::string& fileName_,
110  bool useLastModifiedTime_ = false)
112 #ifdef _WIN32
113  , _file(INVALID_HANDLE_VALUE)
114 #else
115  , _file(0)
116 #endif
117  , _fileName(fileName_)
118  {
119  init();
120  recover(useLastModifiedTime_, false);
121  }
122 
138  const char* fileName_,
139  RecoveryPointFactory factory_ = NULL,
140  bool useLastModifiedTime_ = false)
141  : MemoryBookmarkStore(adapter_, factory_)
142 #ifdef _WIN32
143  , _file(INVALID_HANDLE_VALUE)
144 #else
145  , _file(0)
146 #endif
147  , _fileName(fileName_)
148  {
149  init();
150  recover(useLastModifiedTime_, true);
151  }
152 
165  const std::string& fileName_,
166  RecoveryPointFactory factory_ = NULL,
167  bool useLastModifiedTime_ = false)
168  : MemoryBookmarkStore(adapter_, factory_)
169 #ifdef _WIN32
170  , _file(INVALID_HANDLE_VALUE)
171 #else
172  , _file(0)
173 #endif
174  , _fileName(fileName_)
175  {
176  init();
177  recover(useLastModifiedTime_, true);
178  }
179 
180  virtual ~LoggedBookmarkStore()
181  {
182  close();
183  // In case _lock gets acquired by reader thread between end of this
184  // destructor and start of base class destructor, prevent write()
185  _recoveringFile = true;
186  }
187 
188  void close()
189  {
190 #ifdef _WIN32
191  CloseHandle(_file);
192 #else
193  ::close(_file);
194 #endif
195  }
196 
202  virtual size_t log(Message& message_)
203  {
204  Message::Field bookmark = message_.getBookmark();
205  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
206  Lock<Mutex> guard(_lock);
207  if (!sub)
208  {
209  Message::Field subId = message_.getSubscriptionId();
210  if (subId.empty())
211  {
212  subId = message_.getSubscriptionIds();
213  }
214  sub = find(subId);
215  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
216  }
217  write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
218  return MemoryBookmarkStore::_log(message_);
219  }
220 
225  virtual void discard(const Message& message_)
226  {
227  Message::Field bookmark = message_.getBookmark();
228  Message::Field subId = message_.getSubscriptionId();
229  if (subId.empty())
230  {
231  subId = message_.getSubscriptionIds();
232  }
233  Lock<Mutex> guard(_lock);
234  write(_file, subId, ENTRY_DISCARD, bookmark);
235  MemoryBookmarkStore::_discard(message_);
236  }
237 
245  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
246  {
247  Lock<Mutex> l(_lock);
248  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
249  if (!entry || entry->_val.empty())
250  {
251  return;
252  }
253  write(_file, subId_, ENTRY_DISCARD, entry->_val);
254  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
255  }
256 
263  {
264  Lock<Mutex> l(_lock);
265  return MemoryBookmarkStore::_getMostRecent(subId_);
266  }
267 
276  virtual bool isDiscarded(Message& message_)
277  {
278  Lock<Mutex> l(_lock);
279  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
280  if (retVal)
281  {
282  Message::Field subId = message_.getSubscriptionId();
283  if (subId.empty())
284  {
285  subId = message_.getSubscriptionIds();
286  }
287  write(_file, subId, ENTRY_BOOKMARK, message_.getBookmark());
288  write(_file, subId, ENTRY_DISCARD, message_.getBookmark());
289  }
290  return retVal;
291  }
292 
298  virtual void purge()
299  {
300  Lock<Mutex> guard(_lock);
301 #ifdef _WIN32
302  if (_file != INVALID_HANDLE_VALUE)
303  {
304  CloseHandle(_file);
305  }
306  DeleteFileA(_fileName.c_str());
307  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
308  NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
309  if ( _file == INVALID_HANDLE_VALUE )
310  {
311  DWORD err = getErrorNo();
312  std::ostringstream os;
313  os << "Failed to recreate log file after purge for LoggedBookmarkStore" << _fileName << " for LoggedBookmarkStore";
314  error(os.str(), err);
315  return;
316  }
317 #else
318  ::close(_file);
319  ::unlink(_fileName.c_str());
320  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
321  if (_file == -1)
322  {
323  error("Failed to recreate log file after purge for LoggedBookmarkStore", getErrorNo());
324  return;
325  }
326 #endif
327  MemoryBookmarkStore::_purge();
328  }
329 
335  virtual void purge(const Message::Field& subId_)
336  {
337  Lock<Mutex> guard(_lock);
338  MemoryBookmarkStore::_purge(subId_);
339  std::string tmpFileName = _fileName + ".tmp";
340  __prune(tmpFileName);
341  }
342 
343  void setServerVersion(const VersionInfo& version_)
344  {
346  }
347 
348  void setServerVersion(size_t version_)
349  {
351  }
352 
353  // Yes, the argument is a non-const copy of what is passed in
354  void _prune(const std::string& tmpFileName_)
355  {
356  Lock<Mutex> guard(_lock);
357  // If nothing's changed with most recent, don't rewrite the file
358  if (!_recentChanged)
359  {
360  return;
361  }
362  if (tmpFileName_.empty())
363  {
364  __prune(_fileName + ".tmp");
365  }
366  else
367  {
368  __prune(tmpFileName_);
369  }
370  _recentChanged = false;
371  }
372 
373  void __prune(const std::string& tmpFileName_)
374  {
375 #ifdef _WIN32
376  HANDLE tmpFile;
377  tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
378  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
379  if (tmpFile == INVALID_HANDLE_VALUE )
380  {
381  DWORD err = getErrorNo();
382  std::ostringstream os;
383  os << "Failed to create temp log file " << tmpFileName_ <<
384  " to prune LoggedBookmarkStore " << _fileName;
385  error(os.str(), err);
386  return;
387  }
388 #else
389  int tmpFile;
390  tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
391  if (tmpFile == -1)
392  {
393  int err = getErrorNo();
394  std::ostringstream os;
395  os << "Failed to create temp log file " << tmpFileName_ <<
396  " to prune LoggedBookmarkStore " << _fileName;
397  error(os.str(), err);
398  return;
399  }
400 #endif
401  try
402  {
403  for (SubscriptionMap::iterator i = _subs.begin();
404  i != _subs.end(); ++i)
405  {
406  Message::Field subId = i->first;
407  assert(!subId.empty());
408  Subscription* subPtr = i->second;
409  const BookmarkRange& range = subPtr->getRange();
410  if (range.isValid())
411  {
412  write(tmpFile, subId, ENTRY_BOOKMARK, range);
413  write(tmpFile, subId, ENTRY_DISCARD, range);
414  }
415  Message::Field recent = subPtr->getMostRecent(false);
416  amps_uint64_t recentPub, recentSeq;
417  Subscription::parseBookmark(recent, recentPub, recentSeq);
418  Subscription::PublisherMap publishersDiscarded =
419  subPtr->_publishers;
420  MemoryBookmarkStore::EntryPtrList recovered;
421  subPtr->getRecoveryEntries(recovered);
422  subPtr->setPublishersToDiscarded(&recovered,
423  &publishersDiscarded);
424  char tmpBookmarkBuffer[128];
425  for (Subscription::PublisherIterator pub =
426  publishersDiscarded.begin(),
427  e = publishersDiscarded.end();
428  pub != e; ++pub)
429  {
430  // Don't log EPOCH if it got in the map
431  if (pub->first == 0 || pub->second == 0)
432  {
433  continue;
434  }
435  // Don't log the most recent yet
436  if (pub->first == recentPub)
437  {
438  continue;
439  }
440  int written = AMPS_snprintf_amps_uint64_t(
441  tmpBookmarkBuffer,
442  sizeof(tmpBookmarkBuffer),
443  pub->first);
444  *(tmpBookmarkBuffer + written++) = '|';
445  written += AMPS_snprintf_amps_uint64_t(
446  tmpBookmarkBuffer + written,
447  sizeof(tmpBookmarkBuffer)
448  - (size_t)written,
449  pub->second);
450  *(tmpBookmarkBuffer + written++) = '|';
451  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
452  write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
453  write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
454  }
455  if (isWritableBookmark(recent.len()))
456  {
457  write(tmpFile, subId, ENTRY_BOOKMARK, recent);
458  write(tmpFile, subId, ENTRY_DISCARD, recent);
459  }
460  else // set up _recentList
461  {
462  subPtr->getMostRecentList();
463  }
464  if (isWritableBookmark(subPtr->getLastPersisted().len()))
465  {
466  write(tmpFile, subId, ENTRY_PERSISTED,
467  subPtr->getLastPersisted());
468  }
469  subPtr->getActiveEntries(recovered);
470  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
471  recovered.begin();
472  entry != recovered.end(); ++entry)
473  {
474  if ((*entry)->_val.empty() ||
475  !isWritableBookmark((*entry)->_val.len()))
476  {
477  continue;
478  }
479  write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
480  if (!(*entry)->_active)
481  {
482  write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
483  }
484  }
485  }
486  }
487  catch (StoreException& ex)
488  {
489 #ifdef _WIN32
490  CloseHandle(tmpFile);
491  DeleteFileA(tmpFileName_.c_str());
492 #else
493  ::close(tmpFile);
494  unlink(tmpFileName_.c_str());
495 #endif
496  std::ostringstream os;
497  os << "Exception during prune: " << ex.what();
498  throw StoreException(os.str());
499  }
500 #ifdef _WIN32
501  CloseHandle(_file);
502  CloseHandle(tmpFile);
503  _file = INVALID_HANDLE_VALUE;
504  tmpFile = INVALID_HANDLE_VALUE;
505  // Replace file with pruned file
506  int retryCount = 3;
507  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
508  MOVEFILE_COPY_ALLOWED | MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH))
509  {
510  DWORD err = getErrorNo();
511  if (--retryCount > 0)
512  {
513  continue;
514  }
515  // Try to set _file to the tmp file that won't move then throw
516  std::string desiredFileName = _fileName;
517  _fileName = tmpFileName_;
518  init();
519  std::ostringstream os;
520  os << "Failed to move completed temp file " << tmpFileName_
521  << " to " << desiredFileName
522  << " in prune in LoggedBookmarkStore. Continuing by using "
523  << tmpFileName_ << " as the LoggedBookmarkStore file.";
524  error(os.str(), err);
525  return;
526  }
527  init();
528  SetFilePointer(_file, 0, NULL, FILE_END);
529 #else
530  ::close(tmpFile);
531  ::close(_file);
532  if (-1 == ::unlink(_fileName.c_str()))
533  {
534  int err = getErrorNo();
535  // Try to set _file to the tmp file then throw
536  std::string desiredFileName = _fileName;
537  _fileName = tmpFileName_;
538  init();
539  std::ostringstream os;
540  os << "Failed to delete file " << desiredFileName
541  << " after creating temporary file " << tmpFileName_
542  << " in prune in LoggedBookmarkStore. Continuing by using "
543  << tmpFileName_ << " as the LoggedBookmarkStore file.";
544  error(os.str(), err);
545  return;
546  }
547  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
548  {
549  int err = getErrorNo();
550  // Try to set _file to the tmp file that won't move then throw
551  std::string desiredFileName = _fileName;
552  _fileName = tmpFileName_;
553  init();
554  std::ostringstream os;
555  os << "Failed to move completed temp file " << tmpFileName_
556  << " to " << desiredFileName
557  << " in prune in LoggedBookmarkStore. Continuing by using "
558  << tmpFileName_ << " as the LoggedBookmarkStore file.";
559  error(os.str(), err);
560  return;
561  }
562  init();
563  struct stat fst;
564  if (-1 == ::fstat(_file, &fst))
565  {
566  int err = getErrorNo();
567  std::ostringstream os;
568  os << "Failed to get size of pruned file " << _fileName
569  << " in prune in LoggedBookmarkStore. ";
570  error(os.str(), err);
571  return;
572  }
573  ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
574 #endif
575  }
576 
577  private:
578  virtual void _persisted(Subscription* subP_,
579  const Message::Field& bookmark_)
580  {
581  Lock<Mutex> guard(_lock);
582  write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
583  MemoryBookmarkStore::_persisted(subP_, bookmark_);
584  }
585 
586  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
587  {
588  Lock<Mutex> l(_lock);
589  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
590  if (!entryPtr || entryPtr->_val.empty())
591  {
592  return Message::Field();
593  }
594  Message::Field bookmarkField = entryPtr->_val;
595  write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
596  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
597  return bookmarkField;
598  }
599 
600 #ifdef _WIN32
601  typedef DWORD ERRTYPE ;
602  ERRTYPE getErrorNo() const
603  {
604  return GetLastError();
605  }
606 
607  void error(const std::string& message_, ERRTYPE err)
608  {
609  std::ostringstream os;
610  static const DWORD msgSize = 2048;
611  char pMsg[msgSize];
612  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM |
613  FORMAT_MESSAGE_ARGUMENT_ARRAY,
614  NULL, err, LANG_NEUTRAL,
615  pMsg, msgSize, NULL);
616  os << "File: " << _fileName << ". " << message_;
617  if (err != 0)
618  {
619  os << " with error " << pMsg;
620  }
621  throw StoreException(os.str());
622  }
623 #else
624  typedef int ERRTYPE;
625  ERRTYPE getErrorNo() const
626  {
627  return errno;
628  }
629 
630  void error(const std::string& message_, ERRTYPE err)
631  {
632  std::ostringstream os;
633  os << "File: " << _fileName << ". " << message_;
634  if (err != 0)
635  {
636  os << " with error " << strerror(err);
637  }
638  close();
639  throw StoreException(os.str());
640  }
641 #endif
642 
643  void init()
644  {
645 #ifdef _WIN32
646  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
647  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
648  if ( _file == INVALID_HANDLE_VALUE )
649  {
650  DWORD err = getErrorNo();
651  std::ostringstream os;
652  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
653  error(os.str(), err);
654  return;
655  }
656 #else
657  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
658  if (_file == -1)
659  {
660  int err = getErrorNo();
661  std::ostringstream os;
662  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
663  error(os.str(), err);
664  return;
665  }
666 #endif
667  }
668 
669  // This implementation will only ever use this when logging a bookmark
670  // Could be used to add a feature where discarded bookmark fields are logged in
671  // addition to the generated bookmark.
672  void write(FileType file_, const Message::Field& subId_, char type_,
673  const Message::Field& bookmark_)
674  {
675  Lock<Mutex> guard(_fileLock);
676  if (!_recoveringFile && isWritableBookmark(bookmark_.len()))
677  {
678 #ifdef _WIN32
679  DWORD written;
680  size_t len = subId_.len();
681  BOOL ok = WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
682  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
683  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
684  len = bookmark_.len();
685  ok |= WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
686  ok |= WriteFile(file_, (LPVOID)bookmark_.data(), (DWORD)len,
687  &written, NULL);
688  if (!ok)
689  {
690  error("Failed to write to bookmark log.", getErrorNo());
691  return;
692  }
693 
694 #else
695  if (file_ == -1)
696  {
697  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
698  if (file_ == -1)
699  {
700  int err = getErrorNo();
701  std::ostringstream os;
702  os << "Failed to open file " << _fileName
703  << " for write in LoggedBookmarkStore. ";
704  error(os.str(), err);
705  return;
706  }
707  }
708  struct iovec data[5];
709  size_t len = subId_.len();
710  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
711  data[0].iov_len = sizeof(size_t);
712  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
713  data[1].iov_len = len;
714  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
715  data[2].iov_len = 1;
716  size_t bookmarkLen = bookmark_.len();
717  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmarkLen;
718  data[3].iov_len = sizeof(size_t);
719  data[4].iov_base = (amps_iovec_base_ptr)(void*)bookmark_.data();
720  data[4].iov_len = bookmarkLen;
721  ssize_t written = ::writev(file_, data, 5);
722  if (written == -1)
723  {
724  error("Failed to write to bookmark log.", getErrorNo());
725  return;
726  }
727 #endif
728  }
729  }
730 
731  // This implementation will only ever use this when discarding a bookmark
732  // Could be used to add a feature where generated bookmarks are logged in
733  // addition to the bookmark field.
734  void write(FileType file_, const Message::Field& subId_,
735  char type_, size_t bookmark_)
736  {
737  Lock<Mutex> guard(_fileLock);
738  if (!_recoveringFile)
739  {
740 #ifdef _WIN32
741  DWORD written;
742  size_t len = subId_.len();
743  BOOL ok = WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
744  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
745  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
746  ok |= WriteFile(file_, (LPVOID)&bookmark_, sizeof(size_t),
747  &written, NULL);
748  if (!ok)
749  {
750  error("Failed to write bookmark sequence to file.", getErrorNo());
751  return;
752  }
753 
754 #else
755  if (file_ == -1)
756  {
757  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
758  if (file_ == -1)
759  {
760  int err = getErrorNo();
761  std::ostringstream os;
762  os << "Failed to open file " << _fileName
763  << " to write bookmark sequence in LoggedBookmarkStore. ";
764  error(os.str(), err);
765  return;
766  }
767  }
768  struct iovec data[4];
769  size_t len = subId_.len();
770  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
771  data[0].iov_len = sizeof(size_t);
772  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
773  data[1].iov_len = len;
774  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
775  data[2].iov_len = 1;
776  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmark_;
777  data[3].iov_len = sizeof(size_t);
778  ssize_t written = ::writev(file_, data, 4);
779  if (written == -1)
780  {
781  error("Failed to write bookmark sequence to file.", getErrorNo());
782  return;
783  }
784 #endif
785  }
786  }
787 
788 #ifdef _WIN32
789 #define VOID_P(buf) (LPVOID)buf
790  bool readFileBytes(LPVOID buffer, size_t numBytes, DWORD* bytesRead)
791  {
792  return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
793  }
794 #else
795 #define VOID_P(buf) (void*)buf
796  bool readFileBytes(void* buffer, size_t numBytes, ssize_t* bytesRead)
797  {
798  *bytesRead = ::read(_file, buffer, numBytes);
799  return (*bytesRead >= 0);
800  }
801 #endif
802 
803  void recover(bool useLastModifiedTime_, bool hasAdapter_)
804  {
805  size_t bufferLen = 128;
806  char* buffer = new char[bufferLen];
807  size_t subIdBufferLen = 128;
808  char* subIdBuffer = new char[bufferLen];
809  Message::Field sub;
810  size_t subLen = 0;
811  Message::Field bookmarkField;
812  size_t bookmarkLen = 0;
813  Lock<Mutex> l(_lock);
814  Lock<Mutex> guard(_fileLock);
815  _recoveringFile = true;
816  char* fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
817  fileTimestamp[0] = '\0';
818 #ifdef _WIN32
819  LARGE_INTEGER lifileSize;
820  if (GetFileSizeEx(_file, &lifileSize) == 0)
821  {
822  DWORD err = getErrorNo();
823  delete[] buffer;
824  delete[] subIdBuffer;
825  _recoveringFile = false;
826  error("Failure getting file size while trying to recover.", err);
827  return;
828  }
829 #ifdef _WIN64
830  size_t fileSize = lifileSize.QuadPart;
831 #else
832  size_t fileSize = lifileSize.LowPart;
833 #endif
834  if (useLastModifiedTime_ && fileSize > 0)
835  {
836  FILETIME ftModifiedTime;
837  if (GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
838  {
839  DWORD err = getErrorNo();
840  delete[] buffer;
841  delete[] subIdBuffer;
842  _recoveringFile = false;
843  error("Failure getting file time while trying to recover.", err);
844  return;
845  }
846  SYSTEMTIME st;
847  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
848  {
849  DWORD err = getErrorNo();
850  delete[] buffer;
851  delete[] subIdBuffer;
852  _recoveringFile = false;
853  error("Failure converting file time while trying to recover.", err);
854  return;
855  }
856  sprintf_s(fileTimestamp, AMPS_TIMESTAMP_LEN,
857  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
858  st.wDay, st.wHour, st.wMinute, st.wSecond);
859  fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
860  }
861  else if (fileSize == 0)
862  {
863  delete[] fileTimestamp;
864  delete[] buffer;
865  delete[] subIdBuffer;
866  _recoveringFile = false;
867  return;
868  }
869  DWORD readBytes = 0;
870  size_t loc = 0;
871  SetFilePointer(_file, 0, NULL, FILE_BEGIN);
872 #else
873  struct stat fst;
874  ::fstat(_file, &fst);
875  ssize_t fileSize = fst.st_size;
876  ssize_t readBytes = 0;
877  if (useLastModifiedTime_ && fileSize > 0)
878  {
879  struct tm timeInfo;
880  gmtime_r(&fst.st_mtime, &timeInfo);
881  strftime(fileTimestamp, AMPS_TIMESTAMP_LEN,
882  "%Y%m%dT%H%M%S", &timeInfo);
883  fileTimestamp[AMPS_TIMESTAMP_LEN - 1] = 'Z';
884  }
885  else if (fileSize == 0)
886  {
887  delete[] fileTimestamp;
888  delete[] buffer;
889  delete[] subIdBuffer;
890  _recoveringFile = false;
891  return;
892  }
893  off_t loc = 0;
894  ::lseek(_file, loc, SEEK_SET);
895 #endif
896  // We trust file recovery over Adapter recovery
897  if (hasAdapter_)
898  {
899  MemoryBookmarkStore::__purge();
900  }
901  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes)
902  || subLen > getMaxSubIdLength())
903  {
904  delete[] fileTimestamp;
905  delete[] buffer;
906  delete[] subIdBuffer;
907  _recoveringFile = false;
908  error("Failure reading file while trying to recover.", getErrorNo());
909  return;
910  }
911 #ifdef _WIN32
912  size_t totalBytes = readBytes;
913 #else
914  ssize_t totalBytes = readBytes;
915 #endif
916  ERRTYPE err = 0; // 0 no error, -1 corruption, positive is errno file error
917  size_t tooManyBytes = 0;
918  typedef std::map<Message::Field, size_t,
919  Message::Field::FieldHash> BookmarkMap;
920  typedef std::map<Message::Field, size_t,
921  Message::Field::FieldHash>::iterator BookmarkMapIter;
922  // Map of subId to set of recovered bookmarks
923  typedef std::map<Message::Field, BookmarkMap*,
924  Message::Field::FieldHash> ReadMap;
925  typedef std::map<Message::Field, BookmarkMap*,
926  Message::Field::FieldHash>::iterator ReadMapIter;
927  ReadMap recovered;
928  while (subLen > 0 && (size_t)readBytes == sizeof(size_t) &&
929  (size_t)totalBytes <= (size_t)fileSize)
930  {
931  if (subLen >= ((size_t)fileSize - (size_t)totalBytes)
932  || subLen > getMaxSubIdLength())
933  {
934  tooManyBytes = subLen + 1;
935  err = (ERRTYPE) - 1;
936  break;
937  }
938  else
939  {
940  if (subIdBufferLen < subLen)
941  {
942  delete [] subIdBuffer;
943  subIdBufferLen = 2 * subLen;
944  subIdBuffer = new char[subIdBufferLen];
945  }
946  if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
947  {
948  err = getErrorNo();
949  tooManyBytes = subLen;
950  break;
951  }
952  totalBytes += readBytes;
953  sub.assign(subIdBuffer, subLen);
954  if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
955  {
956  err = getErrorNo();
957  tooManyBytes = 1;
958  break;
959  }
960  totalBytes += readBytes;
961  switch (buffer[0])
962  {
963  case ENTRY_BOOKMARK:
964  {
965  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
966  {
967  // Corrupt final record is ok
968  err = (ERRTYPE) - 1;
969  tooManyBytes = sizeof(size_t);
970  break;
971  }
972  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
973  {
974  err = getErrorNo();
975  tooManyBytes = sizeof(size_t);
976  break;
977  }
978  totalBytes += readBytes;
979  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
980  {
981  // Corrupt final record is ok
982  err = (ERRTYPE) - 1;
983  tooManyBytes = bookmarkLen;
984  break;
985  }
986  if (bufferLen < bookmarkLen)
987  {
988  delete [] buffer;
989  bufferLen = 2 * bookmarkLen;
990  buffer = new char[bufferLen];
991  }
992  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
993  {
994  err = getErrorNo();
995  tooManyBytes = bookmarkLen;
996  break;
997  }
998  totalBytes += readBytes;
999  bookmarkField.assign(buffer, bookmarkLen);
1000  Subscription* subP = find(sub);
1001  BookmarkMap* bookmarks = NULL;
1002  ReadMapIter iter = recovered.find(sub);
1003  if (iter == recovered.end())
1004  {
1005  Message::Field subKey;
1006  subKey.deepCopy(sub);
1007  bookmarks = new BookmarkMap();
1008  recovered[subKey] = bookmarks;
1009  }
1010  else
1011  {
1012  bookmarks = iter->second;
1013  }
1014  if (bookmarks->find(bookmarkField) != bookmarks->end())
1015  {
1016  std::for_each(bookmarks->begin(), bookmarks->end(),
1017  _clearBookmark);
1018  bookmarks->clear();
1019  subP->getMostRecent(true);
1020  }
1021  if (BookmarkRange::isRange(bookmarkField))
1022  {
1023  subP->discard(subP->log(bookmarkField));
1024  }
1025  else if (!subP->isDiscarded(bookmarkField))
1026  {
1027  size_t sequence = subP->log(bookmarkField);
1028  Message::Field copy;
1029  copy.deepCopy(bookmarkField);
1030  bookmarks->insert(std::make_pair(copy, sequence));
1031  }
1032  else
1033  {
1034  // We know it's discarded, but there may still be a
1035  // discard entry in the log, so avoid a search.
1036  Message::Field copy;
1037  copy.deepCopy(bookmarkField);
1038  bookmarks->insert(std::make_pair(copy, 0));
1039  }
1040  }
1041  break;
1042 
1043  case ENTRY_DISCARD:
1044  {
1045  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1046  {
1047  // Corrupt final record is ok
1048  err = (ERRTYPE) - 1;
1049  tooManyBytes = sizeof(size_t);
1050  break;
1051  }
1052  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1053  {
1054  err = getErrorNo();
1055  tooManyBytes = sizeof(size_t);
1056  break;
1057  }
1058  totalBytes += readBytes;
1059  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1060  {
1061  // Corrupt final record is ok
1062  err = (ERRTYPE) - 1;
1063  tooManyBytes = bookmarkLen;
1064  break;
1065  }
1066  if (bufferLen < bookmarkLen)
1067  {
1068  delete [] buffer;
1069  bufferLen = 2 * bookmarkLen;
1070  buffer = new char[bufferLen];
1071  }
1072  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1073  {
1074  err = getErrorNo();
1075  tooManyBytes = bookmarkLen;
1076  break;
1077  }
1078  totalBytes += readBytes;
1079  bookmarkField.assign(buffer, bookmarkLen);
1080  size_t sequence = AMPS_UNSET_INDEX;
1081  ReadMapIter iter = recovered.find(sub);
1082  if (iter != recovered.end())
1083  {
1084  BookmarkMap* bookmarks = iter->second;
1085  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1086  if (bookmarkIter != bookmarks->end())
1087  {
1088  sequence = bookmarkIter->second;
1089  Message::Field bookmarkToClear(bookmarkIter->first);
1090  bookmarkToClear.clear();
1091  bookmarks->erase(bookmarkIter);
1092  }
1093  }
1094  Subscription* subP = find(sub);
1095  if (sequence != AMPS_UNSET_INDEX)
1096  {
1097  // A sequence of 0 means it was already discarded
1098  if (sequence)
1099  {
1100  subP->discard(sequence);
1101  }
1102  }
1103  else // Shouldn't end up here, but just in case we'll search
1104  {
1105  subP->discard(bookmarkField);
1106  }
1107  }
1108  break;
1109  case ENTRY_PERSISTED:
1110  {
1111  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1112  {
1113  // Corrupt final record is ok
1114  err = (ERRTYPE) - 1;
1115  tooManyBytes = sizeof(size_t);
1116  break;
1117  }
1118  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1119  {
1120  err = getErrorNo();
1121  tooManyBytes = sizeof(size_t);
1122  break;
1123  }
1124  totalBytes += readBytes;
1125  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1126  {
1127  // Corrupt final record is ok
1128  err = (ERRTYPE) - 1;
1129  tooManyBytes = bookmarkLen;
1130  break;
1131  }
1132  if (bufferLen < bookmarkLen)
1133  {
1134  delete [] buffer;
1135  bufferLen = 2 * bookmarkLen;
1136  buffer = new char[bufferLen];
1137  }
1138  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1139  {
1140  err = getErrorNo();
1141  tooManyBytes = bookmarkLen;
1142  break;
1143  }
1144  totalBytes += readBytes;
1145  bookmarkField.assign(buffer, bookmarkLen);
1146  Subscription* subP = find(sub);
1147  MemoryBookmarkStore::_persisted(subP, bookmarkField);
1148  }
1149  break;
1150  default:
1151  {
1152  // Corrupt final record is ok
1153  tooManyBytes = (size_t)fileSize - (size_t)totalBytes;
1154  err = (ERRTYPE) - 1;
1155  }
1156  break;
1157  }
1158  }
1159  loc = totalBytes;
1160  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes))
1161  {
1162  err = getErrorNo();
1163  tooManyBytes = sizeof(size_t);
1164  break;
1165  }
1166  totalBytes += readBytes;
1167  }
1168  delete[] buffer;
1169  delete[] subIdBuffer;
1170  if (err == 0)
1171  {
1172  for (SubscriptionMap::iterator i = _subs.begin(); i != _subs.end(); ++i)
1173  {
1174  if (recovered.count(i->first) && !recovered[i->first]->empty())
1175  {
1176  Subscription* subPtr = i->second;
1177  if (subPtr->getMostRecent(false).len() > 1)
1178  {
1179  subPtr->justRecovered();
1180  }
1181  else
1182  {
1183  // Unlikely, but we may have recovered only undiscarded bookmarks
1184  // so we should really just restart as a new subscription.
1185  delete subPtr;
1186  _subs[i->first] = new Subscription(this, i->first);
1187  }
1188  }
1189  if (useLastModifiedTime_ && fileTimestamp[0] != '\0')
1190  {
1191  _subs[i->first]->setRecoveryTimestamp(fileTimestamp);
1192  }
1193  }
1194  }
1195  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i != e; ++i)
1196  {
1197  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1198  delete i->second;
1199  Message::Field f = i->first;
1200  f.clear();
1201  }
1202  delete[] fileTimestamp;
1203  _recoveringFile = false;
1204  if (err != 0)
1205  {
1206  // Arbitrary guess if we're on the last record
1207  // We set err to -1 if we read a corrupt value or
1208  // to errno/last error if a read failed.
1209  if (err != (ERRTYPE) - 1 || loc == 0 || fileSize - loc > 128)
1210  {
1211  std::ostringstream os;
1212  os << "Error while recovering LoggedBookmarkStore from "
1213  << _fileName
1214  << ". Record starting at " << loc
1215  << " reading at " << totalBytes
1216  << " requested " << tooManyBytes
1217  << " and file size is " << fileSize;
1218  error(os.str(), (err != (ERRTYPE) - 1 ? err : 0));
1219  }
1220  else
1221  {
1222 #ifdef _WIN32
1223 #ifdef _WIN64
1224  LONG low = (LONG)loc;
1225  LONG high = (LONG)((loc >> 32) & 0xffffffff);
1226  SetFilePointer(_file, low, &high, FILE_BEGIN);
1227 #else
1228  SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1229 #endif
1230 #else
1231  ::lseek(_file, loc, SEEK_SET);
1232 #endif
1233  }
1234  }
1235  }
1236 
1237  private:
1238  FileType _file;
1239  Mutex _fileLock;
1240  std::string _fileName;
1241  bool _recoveringFile;
1242  };
1243 
1244 } // end namespace AMPS
1245 
1246 
1247 #endif // _LOGGEDBOOKMARKSTORE_H_
1248 
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:276
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:343
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
LoggedBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:109
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:137
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: LoggedBookmarkStore.hpp:245
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:164
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:348
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1374
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:64
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1346
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:335
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:298
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:262
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: BookmarkStore.hpp:206
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:202
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
LoggedBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:88
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:225