AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
LoggedBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #include <ampsplusplus.hpp>
27 
28 #ifndef _LOGGEDBOOKMARKSTORE_H_
29 #define _LOGGEDBOOKMARKSTORE_H_
30 
31 #include <MemoryBookmarkStore.hpp>
32 #include <string>
33 #ifdef _WIN32
34 #include <windows.h>
35 #else
36 #include <sys/mman.h>
37 #include <unistd.h>
38 #include <sys/uio.h>
39 #endif
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <fcntl.h>
43 #include <map>
44 #include <set>
45 
46 #if defined(sun)
47 typedef char* amps_iovec_base_ptr;
48 #else
49 typedef void* amps_iovec_base_ptr;
50 #endif
51 
56 
57 namespace AMPS
58 {
63 {
64 private:
65  static void _clearBookmark(std::pair<const Message::Field, size_t>& pair)
66  {
67  Message::Field f(pair.first);
68  f.clear();
69  }
70 #ifdef _WIN32
71  typedef HANDLE FileType;
72 #else
73  typedef int FileType;
74 #endif
75 public:
82  LoggedBookmarkStore(const char* fileName_)
84 #ifdef _WIN32
85  , _file(INVALID_HANDLE_VALUE)
86 #else
87  , _file(0)
88 #endif
89  , _fileName(fileName_)
90  {
91  init();
92  recover();
93  }
94 
98  LoggedBookmarkStore(const std::string& fileName_)
100 #ifdef _WIN32
101  , _file(INVALID_HANDLE_VALUE)
102 #else
103  , _file(0)
104 #endif
105  , _fileName(fileName_)
106  {
107  init();
108  recover();
109  }
110 
111  virtual ~LoggedBookmarkStore()
112  {
113  close();
114  // In case _lock gets acquired by reader thread between end of this
115  // destructor and start of base class destructor, prevent write()
116  _recovering = true;
117  }
118 
119  void close()
120  {
121 #ifdef _WIN32
122  CloseHandle(_file);
123 #else
124  ::close(_file);
125 #endif
126  }
127 
133  virtual size_t log(Message& message_)
134  {
135  Message::Field bookmark = message_.getBookmark();
136  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
137  Lock<Mutex> guard(_lock);
138  if (!sub)
139  {
140  Message::Field subId = message_.getSubscriptionId();
141  if (subId.empty())
142  subId = message_.getSubscriptionIds();
143  sub = find(subId);
144  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
145  }
146  write(_file, sub->id(), ENTRY_BOOKMARK, bookmark);
147  return MemoryBookmarkStore::_log(message_);
148  }
149 
154  virtual void discard(const Message& message_)
155  {
156  Message::Field bookmark = message_.getBookmark();
157  Message::Field subId = message_.getSubscriptionId();
158  if (subId.empty())
159  subId = message_.getSubscriptionIds();
160  Lock<Mutex> guard(_lock);
161  write(_file, subId, ENTRY_DISCARD, bookmark);
162  MemoryBookmarkStore::_discard(message_);
163  }
164 
172  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
173  {
174  Lock<Mutex> l(_lock);
175  Subscription::Entry* entry = find(subId_)->getEntryByIndex(bookmarkSeqNo_);
176  if (!entry || entry->_val.empty()) return;
177  write(_file, subId_, ENTRY_DISCARD, entry->_val);
178  MemoryBookmarkStore::_discard(subId_, bookmarkSeqNo_);
179  }
180 
187  {
188  Lock<Mutex> l(_lock);
189  return MemoryBookmarkStore::getMostRecent(subId_);
190  }
191 
200  virtual bool isDiscarded(Message& message_)
201  {
202  Lock<Mutex> l(_lock);
203  bool retVal = MemoryBookmarkStore::_isDiscarded(message_);
204  if (retVal)
205  {
206  Message::Field subId = message_.getSubscriptionId();
207  if (subId.empty())
208  subId = message_.getSubscriptionIds();
209  write(_file, subId, ENTRY_BOOKMARK, message_.getBookmark());
210  write(_file, subId, ENTRY_DISCARD, message_.getBookmark());
211  }
212  return retVal;
213  }
214 
220  virtual void purge()
221  {
222  Lock<Mutex> guard(_lock);
223 #ifdef _WIN32
224  if(_file != INVALID_HANDLE_VALUE)
225  CloseHandle(_file);
226  DeleteFileA(_fileName.c_str());
227  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
228  NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
229  if( _file == INVALID_HANDLE_VALUE )
230  {
231  DWORD err = getErrorNo();
232  std::ostringstream os;
233  os << "Failed to create file " << _fileName << " for LoggedBookmarkStore";
234  error(os.str(), err);
235  return;
236  }
237 #else
238  ::close(_file);
239  ::unlink(_fileName.c_str());
240  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
241  if(_file == -1)
242  {
243  error("Failed to open log file for LoggedBookmarkStore", getErrorNo());
244  return;
245  }
246 #endif
247  MemoryBookmarkStore::_purge();
248  }
249 
255  virtual void purge(const Message::Field& subId_)
256  {
257  Lock<Mutex> guard(_lock);
258  MemoryBookmarkStore::_purge(subId_);
259  std::string tmpFileName = _fileName + ".tmp";
260  __prune(tmpFileName);
261  }
262 
263  virtual void noPersistedAcks(const Message::Field& subId_)
264  {
266  }
267 
268  void setServerVersion(const VersionInfo& version_)
269  {
271  }
272 
273  void setServerVersion(size_t version_)
274  {
276  }
277 
278  // Yes, the argument is a non-const copy of what is passed in
279  void _prune(std::string tmpFileName_)
280  {
281  if (tmpFileName_.empty())
282  {
283  tmpFileName_ = _fileName + ".tmp";
284  }
285  Lock<Mutex> guard(_lock);
286  // If nothing's changed with most recent, don't rewrite the file
287  if (!_recentChanged)
288  {
289  return;
290  }
291  __prune(tmpFileName_);
292  _recentChanged = false;
293  }
294 
295  void __prune(std::string tmpFileName_)
296  {
297 #ifdef _WIN32
298  HANDLE tmpFile;
299  tmpFile = CreateFileA(tmpFileName_.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
300  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
301  if(tmpFile == INVALID_HANDLE_VALUE )
302  {
303  DWORD err = getErrorNo();
304  std::ostringstream os;
305  os << "Failed to open log file " << tmpFileName_ <<
306  " for LoggedBookmarkStore";
307  error(os.str(), err);
308  return;
309  }
310 #else
311  int tmpFile;
312  tmpFile = open(tmpFileName_.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
313  if(tmpFile == -1)
314  {
315  int err = getErrorNo();
316  std::ostringstream os;
317  os << "Failed to open log file " << tmpFileName_ <<
318  " for LoggedBookmarkStore";
319  error(os.str(), err);
320  return;
321  }
322 #endif
323  try
324  {
325  for (SubscriptionMap::iterator i = _subs.begin();
326  i != _subs.end(); ++i)
327  {
328  Message::Field subId = i->first;
329  assert(!subId.empty());
330  Message::Field recent = i->second->getMostRecent(false);
331  amps_uint64_t recentPub, recentSeq;
332  Subscription::parseBookmark(recent, recentPub, recentSeq);
333  Subscription::PublisherMap publishersDiscarded =
334  i->second->_publishers;
335  MemoryBookmarkStore::EntryPtrList recovered;
336  i->second->getRecoveryEntries(recovered);
337  i->second->setPublishersToDiscarded(&recovered,
338  &publishersDiscarded);
339  char tmpBookmarkBuffer[128];
340  for (Subscription::PublisherIterator pub =
341  publishersDiscarded.begin(),
342  e = publishersDiscarded.end();
343  pub != e; ++pub)
344  {
345  // Don't log EPOCH if it got in the map
346  if (pub->first == 0 || pub->second == 0) continue;
347  // Don't log the most recent yet
348  if (pub->first == recentPub) continue;
349  int written = AMPS_snprintf_amps_uint64_t(
350  tmpBookmarkBuffer,
351  sizeof(tmpBookmarkBuffer),
352  pub->first);
353  *(tmpBookmarkBuffer+written++) = '|';
354  written += AMPS_snprintf_amps_uint64_t(
355  tmpBookmarkBuffer+written,
356  sizeof(tmpBookmarkBuffer)
357  - (size_t)written,
358  pub->second);
359  *(tmpBookmarkBuffer+written++) = '|';
360  Message::Field tmpBookmark(tmpBookmarkBuffer, (size_t)written);
361  write(tmpFile, subId, ENTRY_BOOKMARK, tmpBookmark);
362  write(tmpFile, subId, ENTRY_DISCARD, tmpBookmark);
363  }
364  if (isWritableBookmark(recent.len()))
365  {
366  write(tmpFile, subId, ENTRY_BOOKMARK, recent);
367  write(tmpFile, subId, ENTRY_DISCARD, recent);
368  }
369  else // set up _recentList
370  {
371  i->second->getMostRecentList();
372  }
373  if (isWritableBookmark(i->second->getLastPersisted().len()))
374  {
375  write(tmpFile, subId, ENTRY_PERSISTED,
376  i->second->getLastPersisted());
377  }
378  i->second->getActiveEntries(recovered);
379  for (MemoryBookmarkStore::EntryPtrList::iterator entry =
380  recovered.begin();
381  entry != recovered.end(); ++entry)
382  {
383  if ((*entry)->_val.empty() ||
384  !isWritableBookmark((*entry)->_val.len()))
385  continue;
386  write(tmpFile, subId, ENTRY_BOOKMARK, (*entry)->_val);
387  if (!(*entry)->_active)
388  write(tmpFile, subId, ENTRY_DISCARD, (*entry)->_val);
389  }
390  }
391  }
392  catch (StoreException& ex)
393  {
394 #ifdef _WIN32
395  CloseHandle(tmpFile);
396  DeleteFileA(tmpFileName_.c_str());
397 #else
398  ::close(tmpFile);
399  unlink(tmpFileName_.c_str());
400 #endif
401  std::ostringstream os;
402  os << "Exception during prune: " << ex.what();
403  throw StoreException(os.str());
404  }
405 #ifdef _WIN32
406  CloseHandle(_file);
407  CloseHandle(tmpFile);
408  _file = INVALID_HANDLE_VALUE;
409  tmpFile = INVALID_HANDLE_VALUE;
410  // Replace file with pruned file
411  int retryCount = 3;
412  while (!MoveFileExA(tmpFileName_.c_str(), _fileName.c_str(),
413  MOVEFILE_COPY_ALLOWED|MOVEFILE_REPLACE_EXISTING|MOVEFILE_WRITE_THROUGH))
414  {
415  DWORD err = getErrorNo();
416  if (--retryCount > 0) continue;
417  // Try to set _file to the tmp file that won't move then throw
418  std::string desiredFileName = _fileName;
419  _fileName = tmpFileName_;
420  init();
421  std::ostringstream os;
422  os << "Failed to move completed temp file " << tmpFileName_
423  << " to " << desiredFileName
424  << " in prune in LoggedBookmarkStore. Continuing by using "
425  << tmpFileName_ << " as the LoggedBookmarkStore file.";
426  error(os.str(), err);
427  return;
428  }
429  init();
430  SetFilePointer(_file, 0, NULL, FILE_END);
431 #else
432  ::close(tmpFile);
433  ::close(_file);
434  if (-1 == ::unlink(_fileName.c_str()))
435  {
436  int err = getErrorNo();
437  // Try to set _file to the tmp file then throw
438  std::string desiredFileName = _fileName;
439  _fileName = tmpFileName_;
440  init();
441  std::ostringstream os;
442  os << "Failed to delete file " << desiredFileName
443  << " after creating temporary file " << tmpFileName_
444  << " in prune in LoggedBookmarkStore. Continuing by using "
445  << tmpFileName_ << " as the LoggedBookmarkStore file.";
446  error(os.str(), err);
447  return;
448  }
449  if (-1 == ::rename(tmpFileName_.c_str(), _fileName.c_str()))
450  {
451  int err = getErrorNo();
452  // Try to set _file to the tmp file that won't move then throw
453  std::string desiredFileName = _fileName;
454  _fileName = tmpFileName_;
455  init();
456  std::ostringstream os;
457  os << "Failed to move completed temp file " << tmpFileName_
458  << " to " << desiredFileName
459  << " in prune in LoggedBookmarkStore. Continuing by using "
460  << tmpFileName_ << " as the LoggedBookmarkStore file.";
461  error(os.str(), err);
462  return;
463  }
464  init();
465  struct stat fst;
466  if (-1 == ::fstat(_file, &fst))
467  {
468  int err = getErrorNo();
469  std::ostringstream os;
470  os << "Failed to get size of pruned file " << _fileName
471  << " in prune in LoggedBookmarkStore. ";
472  error(os.str(), err);
473  return;
474  }
475  ::lseek(_file, (off_t)fst.st_size, SEEK_SET);
476 #endif
477  }
478 
479 private:
480  virtual void _persisted(Subscription* subP_,
481  const Message::Field& bookmark_)
482  {
483  Lock<Mutex> guard(_lock);
484  write(_file, subP_->id(), ENTRY_PERSISTED, bookmark_);
485  MemoryBookmarkStore::_persisted(subP_, bookmark_);
486  }
487 
488  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
489  {
490  Lock<Mutex> l(_lock);
491  Subscription::Entry* entryPtr = subP_->getEntryByIndex(bookmark_);
492  if (!entryPtr || entryPtr->_val.empty())
493  return Message::Field();
494  Message::Field bookmarkField = entryPtr->_val;
495  write(_file, subP_->id(), ENTRY_PERSISTED, bookmarkField);
496  MemoryBookmarkStore::_persisted(subP_, bookmarkField);
497  return bookmarkField;
498  }
499 
500 #ifdef _WIN32
501  typedef DWORD ERRTYPE ;
502  ERRTYPE getErrorNo() const
503  {
504  return GetLastError();
505  }
506 
507  void error(const std::string& message_, ERRTYPE err)
508  {
509  std::ostringstream os;
510  static const DWORD msgSize = 2048;
511  char pMsg[msgSize];
512  DWORD sz = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM|
513  FORMAT_MESSAGE_ARGUMENT_ARRAY,
514  NULL, err, LANG_NEUTRAL,
515  pMsg, msgSize, NULL);
516  os << "File: " << _fileName << ". " << message_;
517  if (err != 0)
518  {
519  os << " with error " << pMsg;
520  }
521  throw StoreException(os.str());
522  }
523 #else
524  typedef int ERRTYPE;
525  ERRTYPE getErrorNo() const
526  {
527  return errno;
528  }
529 
530  void error(const std::string& message_, ERRTYPE err)
531  {
532  std::ostringstream os;
533  os << "File: " << _fileName << ". " << message_;
534  if (err != 0)
535  {
536  os << " with error " << strerror(err);
537  }
538  close();
539  throw StoreException(os.str());
540  }
541 #endif
542 
543  void init()
544  {
545 #ifdef _WIN32
546  _file = CreateFileA(_fileName.c_str(), GENERIC_READ | GENERIC_WRITE, 0,
547  NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
548  if( _file == INVALID_HANDLE_VALUE )
549  {
550  DWORD err = getErrorNo();
551  std::ostringstream os;
552  os << "Failed to open log file " << _fileName << " for LoggedBookmarkStore";
553  error(os.str(), err);
554  return;
555  }
556 #else
557  _file = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
558  if(_file == -1)
559  {
560  int err = getErrorNo();
561  std::ostringstream os;
562  os << "Failed to open log file " << _fileName << " for LoggedBookmarkStore";
563  error(os.str(), err);
564  return;
565  }
566 #endif
567  }
568 
569  // This implementation will only ever use this when logging a bookmark
570  // Could be used to add a feature where discarded bookmark fields are logged in
571  // addition to the generated bookmark.
572  void write(FileType file_, const Message::Field& subId_, char type_,
573  const Message::Field& bookmark_)
574  {
575  Lock<Mutex> guard(_fileLock);
576  if(!_recovering && isWritableBookmark(bookmark_.len()))
577  {
578 #ifdef _WIN32
579  DWORD written;
580  size_t len = subId_.len();
581  BOOL ok =WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
582  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
583  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
584  len = bookmark_.len();
585  ok |= WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
586  ok |= WriteFile(file_, (LPVOID)bookmark_.data(), (DWORD)len,
587  &written, NULL);
588  if(!ok)
589  {
590  error("Failed to write bookmark to file", getErrorNo());
591  return;
592  }
593 
594 #else
595  if(file_ == -1)
596  {
597  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
598  if(file_ == -1)
599  {
600  int err = getErrorNo();
601  std::ostringstream os;
602  os << "Failed to open file " << _fileName
603  << " for use in LoggedBookmarkStore. ";
604  error(os.str(), err);
605  return;
606  }
607  }
608  struct iovec data[5];
609  size_t len = subId_.len();
610  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
611  data[0].iov_len = sizeof(size_t);
612  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
613  data[1].iov_len = len;
614  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
615  data[2].iov_len = 1;
616  size_t bookmarkLen = bookmark_.len();
617  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmarkLen;
618  data[3].iov_len = sizeof(size_t);
619  data[4].iov_base = (amps_iovec_base_ptr)(void*)bookmark_.data();
620  data[4].iov_len = bookmarkLen;
621  ssize_t written = ::writev(file_, data, 5);
622  if(written == -1)
623  {
624  error("Failed to write to bookmark log.", getErrorNo());
625  return;
626  }
627 #endif
628  }
629  }
630 
631  // This implementation will only ever use this when discarding a bookmark
632  // Could be used to add a feature where generated bookmarks are logged in
633  // addition to the bookmark field.
634  void write(FileType file_, const Message::Field& subId_,
635  char type_, size_t bookmark_)
636  {
637  Lock<Mutex> guard(_fileLock);
638  if(!_recovering)
639  {
640 #ifdef _WIN32
641  DWORD written;
642  size_t len = subId_.len();
643  BOOL ok =WriteFile(file_, (LPVOID)&len, sizeof(size_t), &written, NULL);
644  ok |= WriteFile(file_, (LPVOID)subId_.data(), (DWORD)len, &written, NULL);
645  ok |= WriteFile(file_, (LPVOID)&type_, 1, &written, NULL);
646  ok |= WriteFile(file_, (LPVOID)&bookmark_, sizeof(size_t),
647  &written, NULL);
648  if(!ok)
649  {
650  error("Failed to write bookmark to file", getErrorNo());
651  return;
652  }
653 
654 #else
655  if(file_ == -1)
656  {
657  file_ = open(_fileName.c_str(), O_RDWR | O_CREAT, (mode_t)0644);
658  if(file_ == -1)
659  {
660  int err = getErrorNo();
661  std::ostringstream os;
662  os << "Failed to open file " << _fileName
663  << " for use in LoggedBookmarkStore. ";
664  error(os.str(), err);
665  return;
666  }
667  }
668  struct iovec data[4];
669  size_t len = subId_.len();
670  data[0].iov_base = (amps_iovec_base_ptr)(void*)&len;
671  data[0].iov_len = sizeof(size_t);
672  data[1].iov_base = (amps_iovec_base_ptr)(void*)subId_.data();
673  data[1].iov_len = len;
674  data[2].iov_base = (amps_iovec_base_ptr)(void*)&type_;
675  data[2].iov_len = 1;
676  data[3].iov_base = (amps_iovec_base_ptr)(void*)&bookmark_;
677  data[3].iov_len = sizeof(size_t);
678  ssize_t written = ::writev(file_, data, 4);
679  if(written == -1)
680  {
681  error("Failed to write to bookmark log.", getErrorNo());
682  return;
683  }
684 #endif
685  }
686  }
687 
688 #ifdef _WIN32
689 #define VOID_P(buf) (LPVOID)buf
690  bool readFileBytes(LPVOID buffer, size_t numBytes, DWORD *bytesRead)
691  {
692  return (ReadFile(_file, buffer, (DWORD)numBytes, bytesRead, NULL) == TRUE);
693  }
694 #else
695 #define VOID_P(buf) (void*)buf
696  bool readFileBytes(void* buffer, size_t numBytes, ssize_t *bytesRead)
697  {
698  *bytesRead = ::read(_file, buffer, numBytes);
699  return (*bytesRead >= 0);
700  }
701 #endif
702 
703  void recover()
704  {
705  size_t resizeLen = 128;
706  size_t bufferLen = 128;
707  char* buffer = new char[bufferLen];
708  size_t subIdBufferLen = 128;
709  char* subIdBuffer = new char[bufferLen];
710  Message::Field sub;
711  size_t subLen = 0;
712  Message::Field bookmarkField;
713  size_t bookmarkLen = 0;
714  Lock<Mutex> l(_lock);
715  Lock<Mutex> guard(_fileLock);
716  _recovering = true;
717 #ifdef _WIN32
718  size_t loc = 0;
719  SetFilePointer(_file, 0, NULL, FILE_BEGIN);
720  LARGE_INTEGER lifileSize;
721  if(GetFileSizeEx(_file, &lifileSize) == 0)
722  {
723  DWORD err = getErrorNo();
724  delete[] buffer;
725  delete[] subIdBuffer;
726  _recovering = false;
727  error("Failure getting file size while trying to recover.", err);
728  return;
729  }
730  DWORD readBytes = 0;
731 #ifdef _WIN64
732  size_t fileSize = lifileSize.QuadPart;
733 #else
734  size_t fileSize = lifileSize.LowPart;
735 #endif
736 #else
737  off_t loc = 0;
738  ::lseek(_file, loc, SEEK_SET);
739  struct stat fst;
740  ::fstat(_file, &fst);
741  ssize_t fileSize = fst.st_size;
742  ssize_t readBytes = 0;
743 #endif
744  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes)
745  || subLen > getMaxSubIdLength())
746  {
747  delete[] buffer;
748  delete[] subIdBuffer;
749  _recovering = false;
750  error("Failure reading file while trying to recover.", getErrorNo());
751  return;
752  }
753 #ifdef _WIN32
754  size_t totalBytes = readBytes;
755 #else
756  ssize_t totalBytes = readBytes;
757 #endif
758  ERRTYPE err = 0; // 0 no error, -1 corruption, positive is errno file error
759  size_t tooManyBytes = 0;
760  typedef std::map<Message::Field, size_t,
761  Message::Field::FieldHash> BookmarkMap;
762  typedef std::map<Message::Field, size_t,
763  Message::Field::FieldHash>::iterator BookmarkMapIter;
764  // Map of subId to set of recovered bookmarks
765  typedef std::map<Message::Field, BookmarkMap*,
766  Message::Field::FieldHash> ReadMap;
767  typedef std::map<Message::Field, BookmarkMap*,
768  Message::Field::FieldHash>::iterator ReadMapIter;
769  ReadMap recovered;
770  while(subLen > 0 && (size_t)readBytes == sizeof(size_t) &&
771  (size_t)totalBytes <= (size_t)fileSize)
772  {
773  if (subLen >= ((size_t)fileSize - (size_t)totalBytes)
774  || subLen > getMaxSubIdLength())
775  {
776  tooManyBytes = subLen + 1;
777  err = (ERRTYPE)-1;
778  break;
779  }
780  else
781  {
782  while(subIdBufferLen < subLen)
783  {
784  char* old = subIdBuffer;
785  subIdBufferLen += resizeLen;
786  subIdBuffer = new char[subIdBufferLen];
787  delete[] old;
788  }
789  if (!readFileBytes(VOID_P(subIdBuffer), subLen, &readBytes))
790  {
791  err = getErrorNo();
792  tooManyBytes = subLen;
793  break;
794  }
795  totalBytes += readBytes;
796  sub.assign(subIdBuffer, subLen);
797  if (!readFileBytes(VOID_P(buffer), 1, &readBytes))
798  {
799  err = getErrorNo();
800  tooManyBytes = 1;
801  break;
802  }
803  totalBytes += readBytes;
804  switch(buffer[0])
805  {
806  case ENTRY_BOOKMARK:
807  {
808  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
809  {
810  // Corrupt final record is ok
811  err = (ERRTYPE)-1;
812  tooManyBytes = sizeof(size_t);
813  break;
814  }
815  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
816  {
817  err = getErrorNo();
818  tooManyBytes = sizeof(size_t);
819  break;
820  }
821  totalBytes += readBytes;
822  if (bookmarkLen > AMPS_MAX_BOOKMARK_LEN
823  || bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
824  {
825  // Corrupt final record is ok
826  err = (ERRTYPE)-1;
827  tooManyBytes = bookmarkLen;
828  break;
829  }
830  while(bufferLen < bookmarkLen)
831  {
832  char* old = buffer;
833  bufferLen *= 2;
834  buffer = new char[bufferLen];
835  delete[] old;
836  }
837  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
838  {
839  err = getErrorNo();
840  tooManyBytes = bookmarkLen;
841  break;
842  }
843  totalBytes += readBytes;
844  bookmarkField.assign(buffer, bookmarkLen);
845  Subscription* subP = find(sub);
846  BookmarkMap* bookmarks = NULL;
847  ReadMapIter iter = recovered.find(sub);
848  if (iter == recovered.end())
849  {
850  Message::Field subKey;
851  subKey.deepCopy(sub);
852  bookmarks = new BookmarkMap();
853  recovered[subKey] = bookmarks;
854  }
855  else
856  {
857  bookmarks = iter->second;
858  }
859  if (bookmarks->find(bookmarkField) != bookmarks->end())
860  {
861  std::for_each(bookmarks->begin(), bookmarks->end(),
862  _clearBookmark);
863  bookmarks->clear();
864  subP->getMostRecent();
865  }
866  if (!subP->isDiscarded(bookmarkField))
867  {
868  size_t sequence = subP->log(bookmarkField);
869  Message::Field copy;
870  copy.deepCopy(bookmarkField);
871  bookmarks->insert(std::make_pair(copy, sequence));
872  }
873  else
874  {
875  // We know it's discarded, but there may still be a
876  // discard entry in the log, so avoid a search.
877  Message::Field copy;
878  copy.deepCopy(bookmarkField);
879  bookmarks->insert(std::make_pair(copy,0));
880  }
881  }
882  break;
883 
884  case ENTRY_DISCARD:
885  {
886  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
887  {
888  // Corrupt final record is ok
889  err = (ERRTYPE)-1;
890  tooManyBytes = sizeof(size_t);
891  break;
892  }
893  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
894  {
895  err = getErrorNo();
896  tooManyBytes = sizeof(size_t);
897  break;
898  }
899  totalBytes += readBytes;
900  if (bookmarkLen > AMPS_MAX_BOOKMARK_LEN
901  || bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
902  {
903  // Corrupt final record is ok
904  err = (ERRTYPE)-1;
905  tooManyBytes = bookmarkLen;
906  break;
907  }
908  while(bufferLen < bookmarkLen)
909  {
910  char* old = buffer;
911  bufferLen *= 2;
912  buffer = new char[bufferLen];
913  delete[] old;
914  }
915  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
916  {
917  err = getErrorNo();
918  tooManyBytes = bookmarkLen;
919  break;
920  }
921  totalBytes += readBytes;
922  bookmarkField.assign(buffer, bookmarkLen);
923  size_t sequence = AMPS_UNSET_INDEX;
924  ReadMapIter iter = recovered.find(sub);
925  if (iter != recovered.end())
926  {
927  BookmarkMap* bookmarks = iter->second;
928  BookmarkMapIter bookmarkIter = bookmarks->find(bookmarkField);
929  if (bookmarkIter != bookmarks->end())
930  {
931  sequence = bookmarkIter->second;
932  Message::Field bookmarkToClear(bookmarkIter->first);
933  bookmarkToClear.clear();
934  bookmarks->erase(bookmarkIter);
935  }
936  }
937  Subscription* subP = find(sub);
938  if (sequence != AMPS_UNSET_INDEX)
939  {
940  // A sequence of 0 means it was already discarded
941  if (sequence) subP->discard(sequence);
942  }
943  else // Shouldn't end up here, but just in case we'll search
944  {
945  subP->discard(bookmarkField);
946  }
947  }
948  break;
949  case ENTRY_PERSISTED:
950  {
951  if ((size_t)totalBytes + sizeof(size_t) >= (size_t)fileSize)
952  {
953  // Corrupt final record is ok
954  err = (ERRTYPE)-1;
955  tooManyBytes = sizeof(size_t);
956  break;
957  }
958  if (!readFileBytes(VOID_P(&bookmarkLen), sizeof(size_t), &readBytes))
959  {
960  err = getErrorNo();
961  tooManyBytes = sizeof(size_t);
962  break;
963  }
964  totalBytes += readBytes;
965  if (bookmarkLen > AMPS_MAX_BOOKMARK_LEN
966  || bookmarkLen > (size_t)fileSize - (size_t)totalBytes)
967  {
968  // Corrupt final record is ok
969  err = (ERRTYPE)-1;
970  tooManyBytes = bookmarkLen;
971  break;
972  }
973  while(bufferLen < bookmarkLen)
974  {
975  char* old = buffer;
976  bufferLen *= 2;
977  buffer = new char[bufferLen];
978  delete[] old;
979  }
980  if (!readFileBytes(VOID_P(buffer), bookmarkLen, &readBytes))
981  {
982  err = getErrorNo();
983  tooManyBytes = bookmarkLen;
984  break;
985  }
986  totalBytes += readBytes;
987  bookmarkField.assign(buffer, bookmarkLen);
988  Subscription* subP = find(sub);
989  MemoryBookmarkStore::_persisted(subP, bookmarkField);
990  }
991  break;
992  default:
993  {
994  // Corrupt final record is ok
995  tooManyBytes = (size_t)fileSize - (size_t)totalBytes;
996  err = (ERRTYPE)-1;
997  }
998  break;
999  }
1000  }
1001  loc = totalBytes;
1002  if (!readFileBytes(VOID_P(&subLen), sizeof(size_t), &readBytes))
1003  {
1004  err = getErrorNo();
1005  tooManyBytes = sizeof(size_t);
1006  break;
1007  }
1008  totalBytes += readBytes;
1009  }
1010  delete[] buffer;
1011  delete[] subIdBuffer;
1012  if (err == 0)
1013  {
1014  for (SubscriptionMap::iterator i=_subs.begin(); i != _subs.end(); ++i)
1015  {
1016  if (recovered.count(i->first) && !recovered[i->first]->empty())
1017  {
1018  if (i->second->getMostRecent(false).len() > 1)
1019  {
1020  i->second->justRecovered();
1021  }
1022  else
1023  {
1024  // Unlikely, but we may have recovered only undiscarded bookmarks
1025  // so we should really just restart as a new subscription.
1026  delete i->second;
1027  _subs[i->first] = new Subscription(this, i->first);
1028  }
1029  }
1030  }
1031  }
1032  for (ReadMapIter i = recovered.begin(), e = recovered.end(); i!=e; ++i)
1033  {
1034  std::for_each(i->second->begin(), i->second->end(), _clearBookmark);
1035  delete i->second;
1036  Message::Field f = i->first;
1037  f.clear();
1038  }
1039  _recovering = false;
1040  if (err != 0)
1041  {
1042  // Arbitrary guess if we're on the last record
1043  // We set err to -1 if we read a corrupt value or
1044  // to errno/last error if a read failed.
1045  if (err != (ERRTYPE)-1 || loc == 0 || fileSize - loc > 128)
1046  {
1047  std::ostringstream os;
1048  os << "Error while recovering LoggedBookmarkStore from "
1049  << _fileName
1050  << ". Record starting at " << loc
1051  << " reading at " << totalBytes
1052  << " requested " << tooManyBytes
1053  << " and file size is " << fileSize;
1054  error(os.str(), (err != (ERRTYPE)-1 ? err : 0));
1055  }
1056  else
1057  {
1058 #ifdef _WIN32
1059 #ifdef _WIN64
1060  LONG low = (LONG)loc;
1061  LONG high = (LONG)((loc >> 32)&0xffffffff);
1062  SetFilePointer(_file, low, &high, FILE_BEGIN);
1063 #else
1064  SetFilePointer(_file, loc, NULL, FILE_BEGIN);
1065 #endif
1066 #else
1067  ::lseek(_file, loc, SEEK_SET);
1068 #endif
1069  }
1070  }
1071  }
1072 
1073 private:
1074  FileType _file;
1075  Mutex _fileLock;
1076  std::string _fileName;
1077  bool _recovering;
1078 };
1079 
1080 } // end namespace AMPS
1081 
1082 
1083 #endif // _LOGGEDBOOKMARKSTORE_H_
1084 
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: LoggedBookmarkStore.hpp:200
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:984
LoggedBookmarkStore(const char *fileName_)
Creates a LoggedBookmarkStore using fileName_ as its file storage.
Definition: LoggedBookmarkStore.hpp:82
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:268
Provides AMPS::MemoryBookmarkStore, a bookmark store that holds bookmarks in memory.
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: LoggedBookmarkStore.hpp:172
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: LoggedBookmarkStore.hpp:273
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1074
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:93
LoggedBookmarkStore(const std::string &fileName_)
Creates a LoggedBookmarkStore using a file name fileName_.
Definition: LoggedBookmarkStore.hpp:98
Core type, function, and class declarations for the AMPS C++ client.
A BookmarkStoreImpl implementation that logs all messages to a file.
Definition: LoggedBookmarkStore.hpp:62
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1076
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: LoggedBookmarkStore.hpp:255
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: LoggedBookmarkStore.hpp:263
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: MemoryBookmarkStore.hpp:1065
virtual void purge()
Called to purge the contents of this store.
Definition: LoggedBookmarkStore.hpp:220
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: LoggedBookmarkStore.hpp:186
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:50
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
size_t getMaxSubIdLength() const
Gets the maximum allowed length for a sub id when recovering a bookmark store from persistent storage...
Definition: ampsplusplus.hpp:811
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: LoggedBookmarkStore.hpp:133
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
Definition: ampsplusplus.hpp:136
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064
virtual void discard(const Message &message_)
Log a Message as discarded from the store.
Definition: LoggedBookmarkStore.hpp:154