AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
LoggedBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #include <ampsplusplus.hpp>
27 
28 #ifndef _LOGGEDBOOKMARKSTORE_H_
29 #define _LOGGEDBOOKMARKSTORE_H_
30 
31 #include <MemoryBookmarkStore.hpp>
32 #include <RecoveryPoint.hpp>
33 #include <RecoveryPointAdapter.hpp>
34 #include <string>
35 #ifdef _WIN32
36 #include <windows.h>
37 #else
38 #include <sys/mman.h>
39 #include <unistd.h>
40 #include <sys/uio.h>
41 #endif
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <map>
46 #include <set>
47 
48 #if defined(sun)
49 typedef char* amps_iovec_base_ptr;
50 #else
51 typedef void* amps_iovec_base_ptr;
52 #endif
53 
58 
59 namespace AMPS
60 {
65 {
66 private:
67  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
68  {
69  Message::Field f(pair.first);
70  f.clear();
71  }
72 #ifdef _WIN32
73  typedef HANDLE FileType;
74 #else
75  typedef int FileType;
76 #endif
77 public:
88  LoggedBookmarkStore(const char* fileName_,
89  bool useLastModifiedTime_ = false)
91 #ifdef _WIN32
92  , _file(INVALID_HANDLE_VALUE)
93 #else
94  , _file(0)
95 #endif
96  , _fileName(fileName_)
97  {
98  init();
99  recover(useLastModifiedTime_, false);
100  }
101 
109  LoggedBookmarkStore(const std::string& fileName_,
110  bool useLastModifiedTime_ = false)
112 #ifdef _WIN32
113  , _file(INVALID_HANDLE_VALUE)
114 #else
115  , _file(0)
116 #endif
117  , _fileName(fileName_)
118  {
119  init();
120  recover(useLastModifiedTime_, false);
121  }
122 
138  const char* fileName_,
139  RecoveryPointFactory factory_ = NULL,
140  bool useLastModifiedTime_ = false)
141  : MemoryBookmarkStore(adapter_, factory_)
142 #ifdef _WIN32
143  , _file(INVALID_HANDLE_VALUE)
144 #else
145  , _file(0)
146 #endif
147  , _fileName(fileName_)
148  {
149  init();
150  recover(useLastModifiedTime_, true);
151  }
152 
165  const std::string& fileName_,
166  RecoveryPointFactory factory_ = NULL,
167  bool useLastModifiedTime_ = false)
168  : MemoryBookmarkStore(adapter_, factory_)
169 #ifdef _WIN32
170  , _file(INVALID_HANDLE_VALUE)
171 #else
172  , _file(0)
173 #endif
174  , _fileName(fileName_)
175  {
176  init();
177  recover(useLastModifiedTime_, true);
178  }
179 
180  virtual ~LoggedBookmarkStore()
181  {
182  close();
183  // In case _lock gets acquired by reader thread between end of this
184  // destructor and start of base class destructor, prevent write()
185  _recoveringFile = true;
186  }
187 
188  void close()
189  {
190 #ifdef _WIN32
191  CloseHandle(_file);
192 #else
193  ::close(_file);
194 #endif
195  }
196 
202  virtual size_t log(Message& message_)
203  {
204  Message::Field bookmark = message_.getBookmark();
205  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
206  Lock<Mutex> guard(_lock);
207  if (!sub)
208  {
209  Message::Field subId = message_.getSubscriptionId();
210  if (subId.empty())
211  subId = message_.getSubscriptionIds();
212  sub = find(subId);
213  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
214  }
215  write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
216  return MemoryBookmarkStore::_log(message_);
217  }
218 
223  virtual void discard(const Message& message_)
224  {
225  Message::Field bookmark = message_.getBookmark();
226  Message::Field subId = message_.getSubscriptionId();
227  if (subId.empty())
228  subId = message_.getSubscriptionIds();
229  Lock<Mutex> guard(_lock);
230  write(_file, subId, ENTRY_DISCARD, bookmark);
231  MemoryBookmarkStore::_discard(message_);
232  }
233 
241  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
242  {
243  Lock<Mutex> l(_lock);
244  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
245  if (!entry || entry->_val.empty()) return;
246  write(_file, subId_, ENTRY_DISCARD, entry->_val);
247  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
248  }
249 
256  {
257  Lock<Mutex> l(_lock);
258  return MemoryBookmarkStore::_getMostRecent(subId_);
259  }
260 
269  virtual bool isDiscarded(Message& message_)
270  {
271  Lock<Mutex> l(_lock);
272  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
273  if (retVal)
274  {
275  Message::Field subId = message_.getSubscriptionId();
276  if (subId.empty())
277  subId = message_.getSubscriptionIds();
278  write(_file, subId, ENTRY_BOOKMARK, message_.getBookmark());
279  write(_file, subId, ENTRY_DISCARD, message_.getBookmark());
280  }
281  return retVal;
282  }
283 
289  virtual void purge()
290  {
291  Lock<Mutex> guard(_lock);
292 #ifdef _WIN32
293  if(_file != INVALID_HANDLE_VALUE)
294  CloseHandle(_file);
295  DeleteFileA(_fileName.c_str());
296  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
297  NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
298  if( _file == INVALID_HANDLE_VALUE )
299  {
300  DWORD err = getErrorNo();
301  std::ostringstream os;
302  os << "Failed to recreate log file after purge for LoggedBookmarkStore" << _fileName << " for LoggedBookmarkStore";
303  error(os.str(), err);
304  return;
305  }
306 #else
307  ::close(_file);
308  ::unlink(_fileName.c_str());
309  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
310  if(_file == -1)
311  {
312  error("Failed to recreate log file after purge for LoggedBookmarkStore", getErrorNo());
313  return;
314  }
315 #endif
316  MemoryBookmarkStore::_purge();
317  }
318 
324  virtual void purge(const Message::Field& subId_)
325  {
326  Lock<Mutex> guard(_lock);
327  MemoryBookmarkStore::_purge(subId_);
328  std::string tmpFileName = _fileName + ".tmp";
329  __prune(tmpFileName);
330  }
331 
332  void setServerVersion(const VersionInfo& version_)
333  {
335  }
336 
337  void setServerVersion(size_t version_)
338  {
340  }
341 
342  // Yes, the argument is a non-const copy of what is passed in
343  void _prune(const std::string& tmpFileName_)
344  {
345  Lock<Mutex> guard(_lock);
346  // If nothing's changed with most recent, don't rewrite the file
347  if (!_recentChanged)
348  {
349  return;
350  }
351  if (tmpFileName_.empty())
352  {
353  __prune(_fileName + ".tmp");
354  }
355  else
356  {
357  __prune(tmpFileName_);
358  }
359  _recentChanged = false;
360  }
361 
362  void __prune(const std::string& tmpFileName_)
363  {
364 #ifdef _WIN32
365  HANDLE tmpFile;
366  tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
367  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
368  if(tmpFile == INVALID_HANDLE_VALUE )
369  {
370  DWORD err = getErrorNo();
371  std::ostringstream os;
372  os << "Failed to create temp log file " << tmpFileName_ <<
373  " to prune LoggedBookmarkStore " << _fileName;
374  error(os.str(), err);
375  return;
376  }
377 #else
378  int tmpFile;
379  tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
380  if(tmpFile == -1)
381  {
382  int err = getErrorNo();
383  std::ostringstream os;
384  os << "Failed to create temp log file " << tmpFileName_ <<
385  " to prune LoggedBookmarkStore " << _fileName;
386  error(os.str(), err);
387  return;
388  }
389 #endif
390  try
391  {
392  for (SubscriptionMap::iterator i = _subs.begin();
393  i != _subs.end(); ++i)
394  {
395  Message::Field subId = i->first;
396  assert(!subId.empty());
397  Subscription* subPtr = i->second;
398  const BookmarkRange& range = subPtr->getRange();
399  if (range.isValid())
400  {
401  write(tmpFile, subId, ENTRY_BOOKMARK, range);
402  write(tmpFile, subId, ENTRY_DISCARD, range);
403  }
404  Message::Field recent = subPtr->getMostRecent(false);
405  amps_uint64_t recentPub, recentSeq;
406  Subscription::parseBookmark(recent, recentPub, recentSeq);
407  Subscription::PublisherMap publishersDiscarded =
408  subPtr->_publishers;
409  MemoryBookmarkStore::EntryPtrList recovered;
410  subPtr->getRecoveryEntries(recovered);
411  subPtr->setPublishersToDiscarded(&recovered,
412  &publishersDiscarded);
413  char tmpBookmarkBuffer[128];
414  for (Subscription::PublisherIterator pub =
415  publishersDiscarded.begin(),
416  e = publishersDiscarded.end();
417  pub != e; ++pub)
418  {
419  // Don't log EPOCH if it got in the map
420  if (pub->first == 0 || pub->second == 0) continue;
421  // Don't log the most recent yet
422  if (pub->first == recentPub) continue;
423  int written = AMPS_snprintf_amps_uint64_t(
424  tmpBookmarkBuffer,
425  sizeof(tmpBookmarkBuffer),
426  pub->first);
427  *(tmpBookmarkBuffer+written++) = '|';
428  written += AMPS_snprintf_amps_uint64_t(
429  tmpBookmarkBuffer+written,
430  sizeof(tmpBookmarkBuffer)
431  - (size_t)written,
432  pub->second);
433  *(tmpBookmarkBuffer+written++) = '|';
434  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
435  write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
436  write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
437  }
438  if (isWritableBookmark(recent.len()))
439  {
440  write(tmpFile, subId, ENTRY_BOOKMARK, recent);
441  write(tmpFile, subId, ENTRY_DISCARD, recent);
442  }
443  else // set up _recentList
444  {
445  subPtr->getMostRecentList();
446  }
447  if (isWritableBookmark(subPtr->getLastPersisted().len()))
448  {
449  write(tmpFile, subId, ENTRY_PERSISTED,
450  subPtr->getLastPersisted());
451  }
452  subPtr->getActiveEntries(recovered);
453  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
454  recovered.begin();
455  entry != recovered.end(); ++entry)
456  {
457  if ((*entry)->_val.empty() ||
458  !isWritableBookmark((*entry)->_val.len()))
459  continue;
460  write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
461  if (!(*entry)->_active)
462  write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
463  }
464  }
465  }
466  catch (StoreException& ex)
467  {
468 #ifdef _WIN32
469  CloseHandle(tmpFile);
470  DeleteFileA(tmpFileName_.c_str());
471 #else
472  ::close(tmpFile);
473  unlink(tmpFileName_.c_str());
474 #endif
475  std::ostringstream os;
476  os << "Exception during prune: " << ex.what();
477  throw StoreException(os.str());
478  }
479 #ifdef _WIN32
480  CloseHandle(_file);
481  CloseHandle(tmpFile);
482  _file = INVALID_HANDLE_VALUE;
483  tmpFile = INVALID_HANDLE_VALUE;
484  // Replace file with pruned file
485  int retryCount = 3;
486  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
487  MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
488  {
489  DWORD err = getErrorNo();
490  if (--retryCount > 0) continue;
491  // Try to set _file to the tmp file that won't move then throw
492  std::string desiredFileName = _fileName;
493  _fileName = tmpFileName_;
494  init();
495  std::ostringstream os;
496  os << "Failed to move completed temp file " << tmpFileName_
497  << " to " << desiredFileName
498  << " in prune in LoggedBookmarkStore. Continuing by using "
499  << tmpFileName_ << " as the LoggedBookmarkStore file.";
500  error(os.str(), err);
501  return;
502  }
503  init();
504  SetFilePointer(_file, 0, NULL, FILE_END);
505 #else
506  ::close(tmpFile);
507  ::close(_file);
508  if (-1 == ::unlink(_fileName.c_str()))
509  {
510  int err = getErrorNo();
511  // Try to set _file to the tmp file then throw
512  std::string desiredFileName = _fileName;
513  _fileName = tmpFileName_;
514  init();
515  std::ostringstream os;
516  os << "Failed to delete file " << desiredFileName
517  << " after creating temporary file " << tmpFileName_
518  << " in prune in LoggedBookmarkStore. Continuing by using "
519  << tmpFileName_ << " as the LoggedBookmarkStore file.";
520  error(os.str(), err);
521  return;
522  }
523  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
524  {
525  int err = getErrorNo();
526  // Try to set _file to the tmp file that won't move then throw
527  std::string desiredFileName = _fileName;
528  _fileName = tmpFileName_;
529  init();
530  std::ostringstream os;
531  os << "Failed to move completed temp file " << tmpFileName_
532  << " to " << desiredFileName
533  << " in prune in LoggedBookmarkStore. Continuing by using "
534  << tmpFileName_ << " as the LoggedBookmarkStore file.";
535  error(os.str(), err);
536  return;
537  }
538  init();
539  struct stat fst;
540  if (-1 == ::fstat(_file, &fst))
541  {
542  int err = getErrorNo();
543  std::ostringstream os;
544  os << "Failed to get size of pruned file " << _fileName
545  << " in prune in LoggedBookmarkStore. ";
546  error(os.str(), err);
547  return;
548  }
549  ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
550 #endif
551  }
552 
553 private:
554  virtual void _persisted(Subscription* subP_,
555  const Message::Field& bookmark_)
556  {
557  Lock<Mutex> guard(_lock);
558  write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
559  MemoryBookmarkStore::_persisted(subP_, bookmark_);
560  }
561 
562  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
563  {
564  Lock<Mutex> l(_lock);
565  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
566  if (!entryPtr || entryPtr->_val.empty())
567  return Message::Field();
568  Message::Field bookmarkField = entryPtr->_val;
569  write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
570  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
571  return bookmarkField;
572  }
573 
574 #ifdef _WIN32
575  typedef DWORD ERRTYPE ;
576  ERRTYPE getErrorNo() const
577  {
578  return GetLastError();
579  }
580 
581  void error(const std::string& message_, ERRTYPE err)
582  {
583  std::ostringstream os;
584  static const DWORD msgSize = 2048;
585  char pMsg[msgSize];
586  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
587  FORMAT_MESSAGE_ARGUMENT_ARRAY,
588  NULL, err, LANG_NEUTRAL,
589  pMsg, msgSize, NULL);
590  os << "File: " << _fileName << ". " << message_;
591  if (err != 0)
592  {
593  os << " with error " << pMsg;
594  }
595  throw StoreException(os.str());
596  }
597 #else
598  typedef int ERRTYPE;
599  ERRTYPE getErrorNo() const
600  {
601  return errno;
602  }
603 
604  void error(const std::string& message_, ERRTYPE err)
605  {
606  std::ostringstream os;
607  os << "File: " << _fileName << ". " << message_;
608  if (err != 0)
609  {
610  os << " with error " << strerror(err);
611  }
612  close();
613  throw StoreException(os.str());
614  }
615 #endif
616 
617  void init()
618  {
619 #ifdef _WIN32
620  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
621  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
622  if( _file == INVALID_HANDLE_VALUE )
623  {
624  DWORD err = getErrorNo();
625  std::ostringstream os;
626  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
627  error(os.str(), err);
628  return;
629  }
630 #else
631  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
632  if(_file == -1)
633  {
634  int err = getErrorNo();
635  std::ostringstream os;
636  os << "Failed to initialize log file " << _fileName << " for LoggedBookmarkStore";
637  error(os.str(), err);
638  return;
639  }
640 #endif
641  }
642 
643  // This implementation will only ever use this when logging a bookmark
644  // Could be used to add a feature where discarded bookmark fields are logged in
645  // addition to the generated bookmark.
646  void write(FileType file_, const Message::Field& subId_, char type_,
647  const Message::Field& bookmark_)
648  {
649  Lock<Mutex> guard(_fileLock);
650  if(!_recoveringFile && isWritableBookmark(bookmark_.len()))
651  {
652 #ifdef _WIN32
653  DWORD written;
654  size_t len = subId_.len();
655  BOOL ok =WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
656  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
657  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
658  len = bookmark_.len();
659  ok |= WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
660  ok |= WriteFile(file_, (LPVOID)bookmark_.data(), (DWORD)len,
661  &written, NULL);
662  if(!ok)
663  {
664  error("Failed to write to bookmark log.", getErrorNo());
665  return;
666  }
667 
668 #else
669  if(file_ == -1)
670  {
671  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
672  if(file_ == -1)
673  {
674  int err = getErrorNo();
675  std::ostringstream os;
676  os << "Failed to open file " << _fileName
677  << " for write in LoggedBookmarkStore. ";
678  error(os.str(), err);
679  return;
680  }
681  }
682  struct iovec data[5];
683  size_t len = subId_.len();
684  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
685  data[0].iov_len = sizeof(size_t);
686  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
687  data[1].iov_len = len;
688  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
689  data[2].iov_len = 1;
690  size_t bookmarkLen = bookmark_.len();
691  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmarkLen;
692  data[3].iov_len = sizeof(size_t);
693  data[4].iov_base = (amps_iovec_base_ptr)(void*)bookmark_.data();
694  data[4].iov_len = bookmarkLen;
695  ssize_t written = ::writev(file_, data, 5);
696  if(written == -1)
697  {
698  error("Failed to write to bookmark log.", getErrorNo());
699  return;
700  }
701 #endif
702  }
703  }
704 
705  // This implementation will only ever use this when discarding a bookmark
706  // Could be used to add a feature where generated bookmarks are logged in
707  // addition to the bookmark field.
708  void write(FileType file_, const Message::Field& subId_,
709  char type_, size_t bookmark_)
710  {
711  Lock<Mutex> guard(_fileLock);
712  if(!_recoveringFile)
713  {
714 #ifdef _WIN32
715  DWORD written;
716  size_t len = subId_.len();
717  BOOL ok =WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
718  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
719  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
720  ok |= WriteFile(file_, (LPVOID)&bookmark_, sizeof(size_t),
721  &written, NULL);
722  if(!ok)
723  {
724  error("Failed to write bookmark sequence to file.", getErrorNo());
725  return;
726  }
727 
728 #else
729  if(file_ == -1)
730  {
731  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
732  if(file_ == -1)
733  {
734  int err = getErrorNo();
735  std::ostringstream os;
736  os << "Failed to open file " << _fileName
737  << " to write bookmark sequence in LoggedBookmarkStore. ";
738  error(os.str(), err);
739  return;
740  }
741  }
742  struct iovec data[4];
743  size_t len = subId_.len();
744  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
745  data[0].iov_len = sizeof(size_t);
746  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
747  data[1].iov_len = len;
748  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
749  data[2].iov_len = 1;
750  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmark_;
751  data[3].iov_len = sizeof(size_t);
752  ssize_t written = ::writev(file_, data, 4);
753  if(written == -1)
754  {
755  error("Failed to write bookmark sequence to file.", getErrorNo());
756  return;
757  }
758 #endif
759  }
760  }
761 
762 #ifdef _WIN32
763 #define VOID_P(buf) (LPVOID)buf
764  bool readFileBytes(LPVOID buffer, size_t numBytes, DWORD *bytesRead)
765  {
766  return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
767  }
768 #else
769 #define VOID_P(buf) (void*)buf
770  bool readFileBytes(void* buffer, size_t numBytes, ssize_t *bytesRead)
771  {
772  *bytesRead = ::read(_file, buffer, numBytes);
773  return (*bytesRead >= 0);
774  }
775 #endif
776 
777  void recover(bool useLastModifiedTime_, bool hasAdapter_)
778  {
779  size_t bufferLen = 128;
780  char* buffer = new char[bufferLen];
781  size_t subIdBufferLen = 128;
782  char* subIdBuffer = new char[bufferLen];
783  Message::Field sub;
784  size_t subLen = 0;
785  Message::Field bookmarkField;
786  size_t bookmarkLen = 0;
787  Lock<Mutex> l(_lock);
788  Lock<Mutex> guard(_fileLock);
789  _recoveringFile = true;
790  char* fileTimestamp = new char[AMPS_TIMESTAMP_LEN];
791  fileTimestamp[0] = '\0';
792 #ifdef _WIN32
793  LARGE_INTEGER lifileSize;
794  if(GetFileSizeEx(_file, &lifileSize) == 0)
795  {
796  DWORD err = getErrorNo();
797  delete[] buffer;
798  delete[] subIdBuffer;
799  _recoveringFile = false;
800  error("Failure getting file size while trying to recover.", err);
801  return;
802  }
803 #ifdef _WIN64
804  size_t fileSize = lifileSize.QuadPart;
805 #else
806  size_t fileSize = lifileSize.LowPart;
807 #endif
808  if (useLastModifiedTime_ && fileSize > 0)
809  {
810  FILETIME ftModifiedTime;
811  if(GetFileTime(_file, NULL, NULL, &ftModifiedTime) == 0)
812  {
813  DWORD err = getErrorNo();
814  delete[] buffer;
815  delete[] subIdBuffer;
816  _recoveringFile = false;
817  error("Failure getting file time while trying to recover.", err);
818  return;
819  }
820  SYSTEMTIME st;
821  if (FileTimeToSystemTime(&ftModifiedTime, &st) == 0)
822  {
823  DWORD err = getErrorNo();
824  delete[] buffer;
825  delete[] subIdBuffer;
826  _recoveringFile = false;
827  error("Failure converting file time while trying to recover.", err);
828  return;
829  }
830  sprintf_s(fileTimestamp, AMPS_TIMESTAMP_LEN,
831  "%04d%02d%02dT%02d%02d%02d", st.wYear, st.wMonth,
832  st.wDay, st.wHour, st.wMinute, st.wSecond);
833  fileTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
834  }
835  else if (fileSize == 0)
836  {
837  delete[] fileTimestamp;
838  delete[] buffer;
839  delete[] subIdBuffer;
840  _recoveringFile = false;
841  return;
842  }
843  DWORD readBytes = 0;
844  size_t loc = 0;
845  SetFilePointer(_file, 0, NULL, FILE_BEGIN);
846 #else
847  struct stat fst;
848  ::fstat(_file, &fst);
849  ssize_t fileSize = fst.st_size;
850  ssize_t readBytes = 0;
851  if (useLastModifiedTime_ && fileSize > 0)
852  {
853  struct tm timeInfo;
854  gmtime_r(&fst.st_mtime, &timeInfo);
855  strftime(fileTimestamp, AMPS_TIMESTAMP_LEN,
856  "%Y%m%dT%H%M%S", &timeInfo);
857  fileTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
858  }
859  else if (fileSize == 0)
860  {
861  delete[] fileTimestamp;
862  delete[] buffer;
863  delete[] subIdBuffer;
864  _recoveringFile = false;
865  return;
866  }
867  off_t loc = 0;
868  ::lseek(_file, loc, SEEK_SET);
869 #endif
870  // We trust file recovery over Adapter recovery
871  if (hasAdapter_)
872  {
873  MemoryBookmarkStore::__purge();
874  }
875  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes)
876  || subLen > getMaxSubIdLength())
877  {
878  delete[] fileTimestamp;
879  delete[] buffer;
880  delete[] subIdBuffer;
881  _recoveringFile = false;
882  error("Failure reading file while trying to recover.", getErrorNo());
883  return;
884  }
885 #ifdef _WIN32
886  size_t totalBytes = readBytes;
887 #else
888  ssize_t totalBytes = readBytes;
889 #endif
890  ERRTYPE err = 0; // 0 no error, -1 corruption, positive is errno file error
891  size_t tooManyBytes = 0;
892  typedef std::map<Message::Field, size_t,
893  Message::Field::FieldHash> BookmarkMap;
894  typedef std::map<Message::Field, size_t,
895  Message::Field::FieldHash>::iterator BookmarkMapIter;
896  // Map of subId to set of recovered bookmarks
897  typedef std::map<Message::Field, BookmarkMap*,
898  Message::Field::FieldHash> ReadMap;
899  typedef std::map<Message::Field, BookmarkMap*,
900  Message::Field::FieldHash>::iterator ReadMapIter;
901  ReadMap recovered;
902  while(subLen > 0 && (size_t)readBytes == sizeof(size_t) &&
903  (size_t)totalBytes <= (size_t)fileSize)
904  {
905  if (subLen >= ((size_t)fileSize - (size_t)totalBytes)
906  || subLen > getMaxSubIdLength())
907  {
908  tooManyBytes = subLen + 1;
909  err = (ERRTYPE)-1;
910  break;
911  }
912  else
913  {
914  if (subIdBufferLen < subLen)
915  {
916  delete [] subIdBuffer;
917  subIdBufferLen = 2 * subLen;
918  subIdBuffer = new char[subIdBufferLen];
919  }
920  if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
921  {
922  err = getErrorNo();
923  tooManyBytes = subLen;
924  break;
925  }
926  totalBytes += readBytes;
927  sub.assign(subIdBuffer, subLen);
928  if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
929  {
930  err = getErrorNo();
931  tooManyBytes = 1;
932  break;
933  }
934  totalBytes += readBytes;
935  switch(buffer[0])
936  {
937  case ENTRY_BOOKMARK:
938  {
939  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
940  {
941  // Corrupt final record is ok
942  err = (ERRTYPE)-1;
943  tooManyBytes = sizeof(size_t);
944  break;
945  }
946  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
947  {
948  err = getErrorNo();
949  tooManyBytes = sizeof(size_t);
950  break;
951  }
952  totalBytes += readBytes;
953  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
954  {
955  // Corrupt final record is ok
956  err = (ERRTYPE)-1;
957  tooManyBytes = bookmarkLen;
958  break;
959  }
960  if (bufferLen < bookmarkLen)
961  {
962  delete [] buffer;
963  bufferLen = 2 * bookmarkLen;
964  buffer = new char[bufferLen];
965  }
966  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
967  {
968  err = getErrorNo();
969  tooManyBytes = bookmarkLen;
970  break;
971  }
972  totalBytes += readBytes;
973  bookmarkField.assign(buffer, bookmarkLen);
974  Subscription* subP = find(sub);
975  BookmarkMap* bookmarks = NULL;
976  ReadMapIter iter = recovered.find(sub);
977  if (iter == recovered.end())
978  {
979  Message::Field subKey;
980  subKey.deepCopy(sub);
981  bookmarks = new BookmarkMap();
982  recovered[subKey] = bookmarks;
983  }
984  else
985  {
986  bookmarks = iter->second;
987  }
988  if (bookmarks->find(bookmarkField) != bookmarks->end())
989  {
990  std::for_each(bookmarks->begin(), bookmarks->end(),
991  _clearBookmark);
992  bookmarks->clear();
993  subP->getMostRecent(true);
994  }
995  if (BookmarkRange::isRange(bookmarkField))
996  {
997  subP->discard(subP->log(bookmarkField));
998  }
999  else if (!subP->isDiscarded(bookmarkField))
1000  {
1001  size_t sequence = subP->log(bookmarkField);
1002  Message::Field copy;
1003  copy.deepCopy(bookmarkField);
1004  bookmarks->insert(std::make_pair(copy, sequence));
1005  }
1006  else
1007  {
1008  // We know it's discarded, but there may still be a
1009  // discard entry in the log, so avoid a search.
1010  Message::Field copy;
1011  copy.deepCopy(bookmarkField);
1012  bookmarks->insert(std::make_pair(copy,0));
1013  }
1014  }
1015  break;
1016 
1017  case ENTRY_DISCARD:
1018  {
1019  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1020  {
1021  // Corrupt final record is ok
1022  err = (ERRTYPE)-1;
1023  tooManyBytes = sizeof(size_t);
1024  break;
1025  }
1026  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1027  {
1028  err = getErrorNo();
1029  tooManyBytes = sizeof(size_t);
1030  break;
1031  }
1032  totalBytes += readBytes;
1033  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1034  {
1035  // Corrupt final record is ok
1036  err = (ERRTYPE)-1;
1037  tooManyBytes = bookmarkLen;
1038  break;
1039  }
1040  if (bufferLen < bookmarkLen)
1041  {
1042  delete [] buffer;
1043  bufferLen = 2 * bookmarkLen;
1044  buffer = new char[bufferLen];
1045  }
1046  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1047  {
1048  err = getErrorNo();
1049  tooManyBytes = bookmarkLen;
1050  break;
1051  }
1052  totalBytes += readBytes;
1053  bookmarkField.assign(buffer, bookmarkLen);
1054  size_t sequence = AMPS_UNSET_INDEX;
1055  ReadMapIter iter = recovered.find(sub);
1056  if (iter != recovered.end())
1057  {
1058  BookmarkMap* bookmarks = iter->second;
1059  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
1060  if (bookmarkIter != bookmarks->end())
1061  {
1062  sequence = bookmarkIter->second;
1063  Message::Field bookmarkToClear(bookmarkIter->first);
1064  bookmarkToClear.clear();
1065  bookmarks->erase(bookmarkIter);
1066  }
1067  }
1068  Subscription* subP = find(sub);
1069  if (sequence != AMPS_UNSET_INDEX)
1070  {
1071  // A sequence of 0 means it was already discarded
1072  if (sequence) subP->discard(sequence);
1073  }
1074  else // Shouldn't end up here, but just in case we'll search
1075  {
1076  subP->discard(bookmarkField);
1077  }
1078  }
1079  break;
1080  case ENTRY_PERSISTED:
1081  {
1082  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
1083  {
1084  // Corrupt final record is ok
1085  err = (ERRTYPE)-1;
1086  tooManyBytes = sizeof(size_t);
1087  break;
1088  }
1089  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
1090  {
1091  err = getErrorNo();
1092  tooManyBytes = sizeof(size_t);
1093  break;
1094  }
1095  totalBytes += readBytes;
1096  if (bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
1097  {
1098  // Corrupt final record is ok
1099  err = (ERRTYPE)-1;
1100  tooManyBytes = bookmarkLen;
1101  break;
1102  }
1103  if (bufferLen < bookmarkLen)
1104  {
1105  delete [] buffer;
1106  bufferLen = 2 * bookmarkLen;
1107  buffer = new char[bufferLen];
1108  }
1109  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
1110  {
1111  err = getErrorNo();
1112  tooManyBytes = bookmarkLen;
1113  break;
1114  }
1115  totalBytes += readBytes;
1116  bookmarkField.assign(buffer, bookmarkLen);
1117  Subscription* subP = find(sub);
1118  MemoryBookmarkStore::_persisted(subP, bookmarkField);
1119  }
1120  break;
1121  default:
1122  {
1123  // Corrupt final record is ok
1124  tooManyBytes = (size_t)fileSize - (size_t)totalBytes;
1125  err = (ERRTYPE)-1;
1126  }
1127  break;
1128  }
1129  }
1130  loc = totalBytes;
1131  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes))
1132  {
1133  err = getErrorNo();
1134  tooManyBytes = sizeof(size_t);
1135  break;
1136  }
1137  totalBytes += readBytes;
1138  }
1139  delete[] buffer;
1140  delete[] subIdBuffer;
1141  if (err == 0)
1142  {
1143  for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1144  {
1145  if (recovered.count(i->first) && !recovered[i->first]->empty())
1146  {
1147  Subscription* subPtr = i->second;
1148  if (subPtr->getMostRecent(false).len() > 1)
1149  {
1150  subPtr->justRecovered();
1151  }
1152  else
1153  {
1154  // Unlikely, but we may have recovered only undiscarded bookmarks
1155  // so we should really just restart as a new subscription.
1156  delete subPtr;
1157  _subs[i->first] = new Subscription(this, i->first);
1158  }
1159  }
1160  if (useLastModifiedTime_ && fileTimestamp[0] != '\0')
1161  {
1162  _subs[i->first]->setRecoveryTimestamp(fileTimestamp);
1163  }
1164  }
1165  }
1166  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1167  {
1168  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1169  delete i->second;
1170  Message::Field f = i->first;
1171  f.clear();
1172  }
1173  delete[] fileTimestamp;
1174  _recoveringFile = false;
1175  if (err != 0)
1176  {
1177  // Arbitrary guess if we're on the last record
1178  // We set err to -1 if we read a corrupt value or
1179  // to errno/last error if a read failed.
1180  if (err != (ERRTYPE)-1 || loc == 0 || fileSize - loc > 128)
1181  {
1182  std::ostringstream os;
1183  os << "Error while recovering LoggedBookmarkStore from "
1184  << _fileName
1185  << ". Record starting at " << loc
1186  << " reading at " << totalBytes
1187  << " requested " << tooManyBytes
1188  << " and file size is " << fileSize;
1189  error(os.str(), (err != (ERRTYPE)-1 ? err : 0));
1190  }
1191  else
1192  {
1193 #ifdef _WIN32
1194 #ifdef _WIN64
1195  LONG low = (LONG)loc;
1196  LONG high = (LONG)((loc >> 32)&0xffffffff);
1197  SetFilePointer(_file, low, &high, FILE_BEGIN);
1198 #else
1199  SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1200 #endif
1201 #else
1202  ::lseek(_file, loc, SEEK_SET);
1203 #endif
1204  }
1205  }
1206  }
1207 
1208 private:
1209  FileType _file;
1210  Mutex _fileLock;
1211  std::string _fileName;
1212  bool _recoveringFile;
1213 };
1214 
1215 } // end namespace AMPS
1216 
1217 
1218 #endif // _LOGGEDBOOKMARKSTORE_H_
1219 
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:269
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:332
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
LoggedBookmarkStore(const std::string &fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:109
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const char *fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:137
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: LoggedBookmarkStore.hpp:241
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
LoggedBookmarkStore(const RecoveryPointAdapter &adapter_, const std::string &fileName_, RecoveryPointFactory factory_=NULL, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:164
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:337
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1236
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:64
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
Provides AMPS::RecoveryPointAdapter, an iterface for impplementing external storage of bookmark subsc...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1308
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:324
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:289
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:255
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: BookmarkStore.hpp:203
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:202
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
LoggedBookmarkStore(const char *fileName_, bool useLastModifiedTime_=false)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:88
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:223