AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
MemoryBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYBOOKMARKSTORE_H_
27 #define _MEMORYBOOKMARKSTORE_H_
28 
29 #include <BookmarkStore.hpp>
30 #include <Field.hpp>
31 #include <Message.hpp>
32 #include <RecoveryPoint.hpp>
33 #include <RecoveryPointAdapter.hpp>
34 #include <map>
35 #include <sstream>
36 #include <vector>
37 #include <stdlib.h>
38 #include <assert.h>
39 
40 #define AMPS_MIN_BOOKMARK_LEN 3
41 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL
42 
47 
48 namespace AMPS
49 {
50 
57  {
58  protected:
59  class Subscription
60  {
61  public:
62  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
63  typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
64  typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
65  RecoveryIterator;
66  typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
67 
68  // Start sequence at 1 so that 0 can be used during file subclasses
69  // recovery as an indicator that a message wasn't logged because
70  // isDiscarded() was true.
71  Subscription(MemoryBookmarkStore* store_, const Message::Field& id_)
72  : _current(1), _currentBase(0), _least(1), _leastBase(0)
73  , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
74  , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
75  , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
76  , _store(store_)
77  {
78  // Need our own memory for the sub id
79  _id.deepCopy(id_);
80  _store->resize(_id, (char**)&_entries,
81  sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE, false);
82  setLastPersistedToEpoch();
83  }
84 
85  ~Subscription()
86  {
87  Lock<Mutex> guard(_subLock);
88  if (_entries)
89  {
90  for (size_t i = 0; i < _entriesLength; ++i)
91  {
92  _entries[i]._val.clear();
93  }
94  // resize to 0 will free _entries
95  _store->resize(_id, (char**)&_entries, 0);
96  }
97  _id.clear();
98  _recent.clear();
99  _lastPersisted.clear();
100  _recentList.clear();
101  _range.clear();
102  _recoveryTimestamp.clear();
103  }
104 
105  size_t log(const Message::Field& bookmark_)
106  {
107  if (bookmark_ == AMPS_BOOKMARK_NOW)
108  {
109  return 0;
110  }
111  Lock<Mutex> guard(_subLock);
112  // Either relog the recovery or log it
113  size_t index = recover(bookmark_, true);
114  if (index == AMPS_UNSET_INDEX)
115  {
116  // Check for wrap
117  if (_current >= _entriesLength)
118  {
119  _current = 0;
120  _currentBase += _entriesLength;
121  }
122  // Check for resize
123  // If list is too small, double it
124  if ((_current == _least && _leastBase < _currentBase) ||
125  (_current == _recoveryMin && _recoveryBase < _currentBase))
126  {
127  if (!_store->resize(_id, (char**)&_entries,
128  sizeof(Entry) * _entriesLength * 2))
129  {
130  //Try again
131  return log(bookmark_);
132  }
133  // Length was doubled
134  _entriesLength *= 2;
135  }
136 
137  // Add this entry to the end of our list
138  /*
139  if (bookmark_ == AMPS_BOOKMARK_NOW)
140  {
141  // Save a now timestamp bookmark
142  char* nowTimestamp = new char[AMPS_TIMESTAMP_LEN];
143  struct tm timeInfo;
144  time_t now;
145  time(&now);
146  #ifdef _WIN32
147  gmtime_s(&timeInfo, &now);
148  #else
149  gmtime_r(&now, &timeInfo);
150  #endif
151  strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
152  "%Y%m%dT%H%M%S", &timeInfo);
153  nowTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
154  _entries[_current]._val.assign(nowTimestamp,
155  AMPS_TIMESTAMP_LEN);
156  _entries[_current]._active = false;
157  index = _current++;
158  return index + _currentBase;
159  }
160  else
161  */
162  {
163  // Is this an attempt at a range?
164  if (!BookmarkRange::isRange(bookmark_))
165  {
166  _entries[_current]._val.deepCopy(bookmark_);
167  }
168  else
169  {
170  // Deep copy of the range is saved
171  _range.set(bookmark_);
172  // Stricter check on range syntax
173  if (!_range.isValid())
174  {
175  throw CommandException("Invalid bookmark range specified.");
176  }
177  _store->updateAdapter(this);
178  if (!_range.isStartInclusive())
179  {
180  // Put it in our publishers map
181  amps_uint64_t publisher, sequence;
182  parseBookmark(_range.getStart(), publisher, sequence);
183  _publishers[publisher] = sequence;
184  }
185  // Don't actually log a range
186  return 0;
187  }
188  }
189  _entries[_current]._active = true;
190  index = _current++;
191  }
192  return index + _currentBase;
193  }
194 
195  bool discard(size_t index_)
196  {
197  Lock<Mutex> guard(_subLock);
198  return _discard(index_);
199  }
200 
201  bool discard(const Message::Field& bookmark_)
202  {
203  // These are discarded when logged or not logged
204  if (bookmark_ == AMPS_BOOKMARK_NOW)
205  {
206  return false;
207  }
208  Lock<Mutex> guard(_subLock);
209  size_t search = _least;
210  size_t searchBase = _leastBase;
211  size_t searchMax = _current;
212  size_t searchMaxBase = _currentBase;
213  if (_least + _leastBase == _current + _currentBase)
214  {
215  if (_recoveryMin != AMPS_UNSET_INDEX)
216  {
217  search = _recoveryMin;
218  searchBase = _recoveryBase;
219  searchMax = _recoveryMax;
220  searchMaxBase = _recoveryMaxBase;
221  }
222  else // Store is empty, so nothing to do
223  {
224  return false;
225  }
226  }
227  assert(searchMax != AMPS_UNSET_INDEX);
228  assert(searchMaxBase != AMPS_UNSET_INDEX);
229  assert(search != AMPS_UNSET_INDEX);
230  assert(searchBase != AMPS_UNSET_INDEX);
231  // Search while we don't find the provided bookmark and we're in valid range
232  while (search + searchBase < searchMax + searchMaxBase)
233  {
234  if (_entries[search]._val == bookmark_)
235  {
236  return _discard(search + searchBase);
237  }
238  if (++search == _entriesLength)
239  {
240  // Least has now loooped around
241  searchBase += _entriesLength;
242  search = 0;
243  }
244  }
245  return false;
246  }
247 
248  // Get sequence number from a Field that is a bookmark
249  static void parseBookmark(const Message::Field& field_,
250  amps_uint64_t& publisherId_,
251  amps_uint64_t& sequenceNumber_)
252  {
253  Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
254  }
255 
256  // Check to see if this message is older than the most recent one seen,
257  // and if it is, check if it discarded.
258  bool isDiscarded(const Message::Field& bookmark_)
259  {
260  Lock<Mutex> guard(_subLock);
261  if (BookmarkRange::isRange(bookmark_))
262  {
263  return false;
264  }
265  // Check if we've already recovered this bookmark
266  size_t recoveredIdx = recover(bookmark_, false);
267 
268  amps_uint64_t publisher, sequence;
269  parseBookmark(bookmark_, publisher, sequence);
270  // Compare it to our publishers map
271  PublisherIterator pub = _publishers.find(publisher);
272  if (pub == _publishers.end() || pub->second < sequence)
273  {
274  _publishers[publisher] = sequence;
275  if (recoveredIdx == AMPS_UNSET_INDEX)
276  {
277  return false;
278  }
279  }
280  if (recoveredIdx != AMPS_UNSET_INDEX)
281  {
282  if (!_entries[recoveredIdx]._active)
283  {
284  _recovered.erase(bookmark_);
285  return true;
286  }
287  return false;
288  }
289  // During recovery, we don't really care if it's been discarded
290  // or not. We just want _publishers updated. No need for the
291  // costly linear search.
292  if (_store->_recovering)
293  {
294  return false;
295  }
296  // During failure and recovery scenarios, we'll see out of order
297  // bookmarks arrive, either because (a) we're replaying or (b)
298  // a publisher has cut over, and we've cut over to a new server.
299  // Scan the list to see if we have a match.
300  size_t base = _leastBase;
301  for (size_t i = _least; i + base < _current + _currentBase; i++)
302  {
303  if ( i >= _entriesLength )
304  {
305  i = 0;
306  base = _currentBase;
307  }
308  if (_entries[i]._val == bookmark_)
309  {
310  return !_entries[i]._active;
311  }
312  }
313 
314  return true; // message is totally discarded
315  }
316 
317  bool empty(void) const
318  {
319  if (_least == AMPS_UNSET_INDEX ||
320  ((_least + _leastBase) == (_current + _currentBase) &&
321  _recoveryMin == AMPS_UNSET_INDEX))
322  {
323  return true;
324  }
325  return false;
326  }
327 
328  void updateMostRecent()
329  {
330  Lock<Mutex> guard(_subLock);
331  _updateMostRecent();
332  }
333 
334  const BookmarkRange& getRange() const
335  {
336  return _range;
337  }
338 
339  Message::Field getMostRecentList(bool usePublishersList_ = true)
340  {
341  Lock<Mutex> guard(_subLock);
342  bool useLastPersisted = !_lastPersisted.empty() &&
343  _lastPersisted.len() > 1;
344  // when this is called, we'll take a moment to update the list
345  // of things recovered,
346  // so we don't accidentally log anything we ought not to.
347  _updateMostRecent();
348  bool useRecent = !_recent.empty() && _recent.len() > 1;
349  amps_uint64_t lastPublisher = 0;
350  amps_uint64_t lastSeq = 0;
351  amps_uint64_t recentPublisher = 0;
352  amps_uint64_t recentSeq = 0;
353  if (useLastPersisted)
354  {
355  parseBookmark(_lastPersisted, lastPublisher, lastSeq);
356  }
357  if (useRecent)
358  {
359  parseBookmark(_recent, recentPublisher, recentSeq);
360  if (empty() && useLastPersisted)
361  {
362  useRecent = false;
363  }
364  else
365  {
366  if (useLastPersisted && lastPublisher == recentPublisher)
367  {
368  if (lastSeq <= recentSeq)
369  {
370  useRecent = false;
371  }
372  else
373  {
374  useLastPersisted = false;
375  }
376  }
377  }
378  }
379  // Set size for all bookmarks that will be used
380  size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
381  if (useRecent)
382  {
383  totalLen += _recent.len() + 1;
384  }
385  // If we don't have a non-EPOCH persisted ack and we don't have a
386  // non-EPOCH most recent bookmark, OR we have a range
387  // we can build a list based on all the publishers instead.
388  if (usePublishersList_
389  && ((!useLastPersisted && !useRecent)
390  || _lastPersisted == AMPS_BOOKMARK_EPOCH))
391  {
392  std::ostringstream os;
393  for (PublisherIterator pub = _publishers.begin();
394  pub != _publishers.end(); ++pub)
395  {
396  if (pub->first == 0 && pub->second == 0)
397  {
398  continue;
399  }
400  if (pub->first == recentPublisher && recentSeq < pub->second)
401  {
402  os << recentPublisher << '|' << recentSeq << "|,";
403  }
404  else
405  {
406  os << pub->first << '|' << pub->second << "|,";
407  }
408  }
409  std::string recent = os.str();
410  totalLen = recent.length();
411  if (!recent.empty())
412  {
413  if (!_recoveryTimestamp.empty())
414  {
415  totalLen += _recoveryTimestamp.len();
416  recent += std::string(_recoveryTimestamp);
417  }
418  else
419  {
420  // Remove trailing ,
421  recent.erase(--totalLen);
422  }
423  // Reset _recentList to new value and return it
424  _recentList.clear();
425  _recentList = Message::Field(recent).deepCopy();
426  if (_range.isValid())
427  {
428  if (_range.getStart() != recent
429  && _recentList != AMPS_BOOKMARK_EPOCH)
430  {
431  _range.replaceStart(_recentList, true);
432  }
433  else if (_range.isStartInclusive())
434  {
435  amps_uint64_t publisher, sequence;
436  parseBookmark(_range.getStart(), publisher,
437  sequence);
438  PublisherIterator pub = _publishers.find(publisher);
439  if (pub != _publishers.end()
440  && pub->second >= sequence)
441  {
442  _range.makeStartExclusive();
443  }
444  }
445  return _range;
446  }
447  return _recentList;
448  }
449  if (_range.isValid())
450  {
451  return _range;
452  }
453  }
454  if (!_recoveryTimestamp.empty() && !_range.isValid())
455  {
456  totalLen += _recoveryTimestamp.len() + 1;
457  }
458  // If we have nothing discarded, return EPOCH
459  if (totalLen == 0
460  || (_recent.len() < 2 && !empty()))
461  {
462  if (_range.isValid())
463  {
464  return _range;
465  }
466  if (!useRecent)
467  {
469  }
470  _setLastPersistedToEpoch();
471  return _lastPersisted;
472  }
473  // Remove the trailing , from the length
474  totalLen -= 1;
475  char* field = new char[totalLen];
476  size_t len = 0;
477  if (useRecent)
478  {
479  len = _recent.len();
480  memcpy(field, _recent.data(), len);
481  if (len < totalLen)
482  {
483  field[len++] = ',';
484  }
485  }
486  if (useLastPersisted)
487  {
488  memcpy(field + len, _lastPersisted.data(), _lastPersisted.len());
489  len += _lastPersisted.len();
490  if (len < totalLen)
491  {
492  field[len++] = ',';
493  }
494  }
495  if (!_recoveryTimestamp.empty() && !_range.isValid())
496  {
497  memcpy(field + len, _recoveryTimestamp.data(),
498  _recoveryTimestamp.len());
499  // If more is to be written after this, uncomment the following
500  //len += _lastPersisted.len();
501  //if (len < totalLen) field[len++] = ',';
502  }
503  // _recentList clear will delete[] current buffer and assign will get cleared
504  _recentList.clear();
505  _recentList.assign(field, totalLen);
506  if (_range.isValid())
507  {
508  if (_recentList != AMPS_BOOKMARK_EPOCH)
509  {
510  if (_range.getStart() != _recentList)
511  {
512  _range.replaceStart(_recentList, true);
513  }
514  else if (_range.isStartInclusive())
515  {
516  amps_uint64_t publisher, sequence;
517  parseBookmark(_range.getStart(), publisher,
518  sequence);
519  PublisherIterator pub = _publishers.find(publisher);
520  if (pub != _publishers.end()
521  && pub->second >= sequence)
522  {
523  _range.makeStartExclusive();
524  }
525  }
526  }
527  return _range;
528  }
529  return _recentList;
530  }
531 
532  Message::Field getMostRecent(bool update_ = false)
533  {
534  Lock<Mutex> guard(_subLock);
535  // Return the same as last time if nothing's changed
536  // _recent is the most recent bookmark.
537  if (update_ && _store->_recentChanged)
538  {
539  _updateMostRecent();
540  }
541  if (_recent.empty())
542  {
544  }
545  else
546  {
547  return _recent;
548  }
549  }
550 
551  Message::Field getLastPersisted()
552  {
553  Lock<Mutex> guard(_subLock);
554  return _lastPersisted;
555  }
556 
557  void setMostRecent(const Message::Field& recent_)
558  {
559  _recent.clear();
560  _recent.deepCopy(recent_);
561  }
562 
563  void setRecoveryTimestamp(const char* recoveryTimestamp_,
564  size_t len_ = 0)
565  {
566  _recoveryTimestamp.clear();
567  size_t len = (len_ == 0) ? AMPS_TIMESTAMP_LEN : len_;
568  char* ts = new char[len];
569  memcpy((void*)ts, (const void*)recoveryTimestamp_, len);
570  _recoveryTimestamp.assign(ts, len);
571  }
572 
573  void moveEntries(char* old_, char* new_, size_t newSize_)
574  {
575  size_t least = _least;
576  size_t leastBase = _leastBase;
577  if (_recoveryMin != AMPS_UNSET_INDEX)
578  {
579  least = _recoveryMin;
580  leastBase = _recoveryBase;
581  }
582  // First check if we grew in place, if so, just move current after least
583  if (old_ == new_)
584  {
585  if (newSize_ - (sizeof(Entry)*_entriesLength) > sizeof(Entry)*least)
586  {
587  memcpy(new_ + (sizeof(Entry)*_entriesLength),
588  old_, (sizeof(Entry)*least));
589  // Clear the beginning where those entries were
590  memset(old_, 0, sizeof(Entry)*least);
591  }
592  else // We have to use an intermediate buffer
593  {
594  Entry* buffer = new Entry[least];
595  memcpy((void*)buffer, (void*)old_, sizeof(Entry)*least);
596  //Put the beginning entries at the start of the new buffer
597  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
598  (_entriesLength - least)*sizeof(Entry));
599  //Put the end entries after the beginning entries
600  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
601  (void*)buffer, least * sizeof(Entry));
602  // Least is now at 0 so base must be increased
603  leastBase += least;
604  least = 0;
605  delete [] buffer;
606  }
607  }
608  else
609  {
610  //Put the beginning entries at the start of the new buffer
611  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
612  (_entriesLength - least)*sizeof(Entry));
613  //Put the end entries after the beginning entries
614  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
615  (void*)old_, least * sizeof(Entry));
616  // Least is now at 0 so base must be increased
617  leastBase += least;
618  least = 0;
619  }
620  if (_recoveryMin != AMPS_UNSET_INDEX)
621  {
622  _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
623  _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
624  (_recoveryMin + _recoveryBase);
625  _recoveryMaxBase = leastBase;
626  _recoveryMin = least;
627  _recoveryBase = leastBase;
628  }
629  else
630  {
631  _least = least;
632  }
633  _leastBase = leastBase;
634  // Current is now after everything and using the same base
635  _currentBase = _leastBase;
636  _current = least + _entriesLength;
637  }
638 
639  inline size_t getOldestBookmarkSeq()
640  {
641  Lock<Mutex> guard(_subLock);
642  // If there is nothing in the store, return -1, otherwise return lowest
643  return ((_least + _leastBase) == (_current + _currentBase)) ? AMPS_UNSET_INDEX :
644  _least + _leastBase;
645  }
646 
647  bool lastPersisted(const Message::Field& bookmark_)
648  {
649  // These shouldn't be persisted
650  if (bookmark_ == AMPS_BOOKMARK_NOW
651  || BookmarkRange::isRange(bookmark_))
652  {
653  return false;
654  }
655  Lock<Mutex> guard(_subLock);
656  return _setLastPersisted(bookmark_);
657  }
658 
659  bool _setLastPersisted(const Message::Field& bookmark_)
660  {
661  if (!_lastPersisted.empty())
662  {
663  amps_uint64_t publisher, publisher_lastPersisted;
664  amps_uint64_t sequence, sequence_lastPersisted;
665  parseBookmark(bookmark_, publisher, sequence);
666  parseBookmark(_lastPersisted, publisher_lastPersisted,
667  sequence_lastPersisted);
668  if (publisher == publisher_lastPersisted &&
669  sequence <= sequence_lastPersisted)
670  {
671  return false;
672  }
673  }
674  // deepCopy will clear what's in _lastPersisted
675  _lastPersisted.deepCopy(bookmark_);
676  _store->_recentChanged = true;
677  _recoveryTimestamp.clear();
678  return true;
679  }
680 
681  Message::Field lastPersisted(size_t bookmark_)
682  {
683  Lock<Mutex> guard(_subLock);
684  Message::Field& bookmark = _entries[bookmark_]._val;
685  // These shouldn't be persisted
686  if (bookmark == AMPS_BOOKMARK_NOW
687  || BookmarkRange::isRange(bookmark))
688  {
689  return bookmark;
690  }
691  _setLastPersisted(bookmark);
692  return bookmark;
693  }
694 
695  // Returns the index of the recovered item, either the index where it
696  // was first stored prior to getMostRecent, or the new index if it is
697  // relogged either because this is called from log() or because it was
698  // not active but also not persisted.
699  size_t recover(const Message::Field& bookmark_, bool relogIfNotDiscarded)
700  {
701  size_t retVal = AMPS_UNSET_INDEX;
702  if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
703  {
704  return retVal;
705  }
706  // Check if this is a recovered bookmark.
707  // If so, copy the existing one to the new location
708  RecoveryIterator item = _recovered.find(bookmark_);
709  if (item != _recovered.end())
710  {
711  size_t seqNo = item->second;
712  size_t index = (seqNo - _recoveryBase) % _entriesLength;
713  // If we only have recovery entries and isDiscarded is
714  // checking on an already discarded entry, update recent.
715  if (_least + _leastBase == _current + _currentBase &&
716  !_entries[index]._active)
717  {
718  _store->_recentChanged = true;
719  _recent.clear();
720  _recent = _entries[index]._val.deepCopy();
721  retVal = moveEntry(index);
722  if (retVal == AMPS_UNSET_INDEX)
723  recover(bookmark_,
724  relogIfNotDiscarded);
725  _least = _current;
726  _leastBase = _currentBase;
727  }
728  else if (!_entries[index]._active || relogIfNotDiscarded)
729  {
730  retVal = moveEntry(index);
731  if (retVal == AMPS_UNSET_INDEX)
732  recover(bookmark_,
733  relogIfNotDiscarded);
734  }
735  else
736  {
737  return index;
738  }
739  _recovered.erase(item);
740  if (_recovered.empty())
741  {
742  _recoveryMin = AMPS_UNSET_INDEX;
743  _recoveryBase = AMPS_UNSET_INDEX;
744  _recoveryMax = AMPS_UNSET_INDEX;
745  _recoveryMaxBase = AMPS_UNSET_INDEX;
746  }
747  else if (index == _recoveryMin)
748  {
749  while (_entries[_recoveryMin]._val.empty() &&
750  (_recoveryMin + _recoveryBase) < (_recoveryMax + _recoveryMaxBase))
751  {
752  if (++_recoveryMin == _entriesLength)
753  {
754  _recoveryMin = 0;
755  _recoveryBase += _entriesLength;
756  }
757  }
758  }
759  }
760  return retVal;
761  }
762 
763  // Return the id of this Subscription
764  Message::Field id() const
765  {
766  return _id;
767  }
768 
769  struct Entry
770  {
771  Message::Field _val; //16
772  bool _active; //17
773  char _padding[32 - sizeof(Message::Field) - (sizeof(bool) * 2)]; //32
774 
775  Entry() : _active(false)
776  {
777  _padding[0] = '\0';
778  }
779  };
780 
781  struct EntryHash
782  {
783  Field::FieldHash _hasher;
784 
785  size_t operator()(const Entry* entryPtr_) const
786  {
787  return _hasher(entryPtr_->_val);
788  }
789 
790  bool operator()(const Entry* lhsPtr_, const Entry* rhsPtr_) const
791  {
792  return _hasher(lhsPtr_->_val, rhsPtr_->_val);
793  }
794  };
795 
796  //typedef std::set<Entry*, EntryHash> EntryPtrList;
797  typedef std::vector<Entry*> EntryPtrList;
798 
799  void getRecoveryEntries(EntryPtrList& list_)
800  {
801  if (_recoveryMin == AMPS_UNSET_INDEX ||
802  _recoveryMax == AMPS_UNSET_INDEX)
803  {
804  return;
805  }
806  size_t base = _recoveryBase;
807  size_t max = _recoveryMax + _recoveryMaxBase;
808  for (size_t i = _recoveryMin; i + base < max; ++i)
809  {
810  if (i == _entriesLength)
811  {
812  i = 0;
813  base = _recoveryMaxBase;
814  }
815  //list_.insert(&(_entries[i]));
816  list_.push_back(&(_entries[i]));
817  }
818  return;
819  }
820 
821  void getActiveEntries(EntryPtrList& list_)
822  {
823  size_t base = _leastBase;
824  for (size_t i = _least; i + base < _current + _currentBase; ++i)
825  {
826  if (i >= _entriesLength)
827  {
828  i = 0;
829  base = _currentBase;
830  }
831  //list_.insert(&(_entries[i]));
832  list_.push_back(&(_entries[i]));
833  }
834  return;
835  }
836 
837  Entry* getEntryByIndex(size_t index_)
838  {
839  Lock<Mutex> guard(_subLock);
840  size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
841  index_ >= _least + _leastBase)
842  ? _leastBase : _recoveryBase;
843  // Return NULL if not a valid index
844  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
845  _least + _leastBase :
846  _recoveryMin + _recoveryBase);
847  if (index_ >= _current + _currentBase || index_ < min)
848  {
849  return NULL;
850  }
851  return &(_entries[(index_ - base) % _entriesLength]);
852  }
853 
854  void justRecovered()
855  {
856  Lock<Mutex> guard(_subLock);
857  _updateMostRecent();
858  EntryPtrList list;
859  getRecoveryEntries(list);
860  setPublishersToDiscarded(&list, &_publishers);
861  }
862 
863  void setPublishersToDiscarded(EntryPtrList* recovered_,
864  PublisherMap* publishers_)
865  {
866  // Need to reset publishers to only have up to the last
867  // discarded sequence number. Messages that were in transit
868  // during previous run but not discarded should be considered
869  // new and not duplicate after a restart/recovery.
870  for (EntryPtrList::iterator i = recovered_->begin();
871  i != recovered_->end(); ++i)
872  {
873  amps_uint64_t publisher = (amps_uint64_t)0;
874  amps_uint64_t sequence = (amps_uint64_t)0;
875  parseBookmark((*i)->_val, publisher, sequence);
876  if (publisher && sequence && (*i)->_active &&
877  (*publishers_)[publisher] >= sequence)
878  {
879  (*publishers_)[publisher] = sequence - 1;
880  }
881  }
882  }
883 
884  void clearLastPersisted()
885  {
886  Lock<Mutex> guard(_subLock);
887  _lastPersisted.clear();
888  }
889 
890  void setLastPersistedToEpoch()
891  {
892  Lock<Mutex> guard(_subLock);
893  _setLastPersistedToEpoch();
894  }
895 
896  private:
897  Subscription(const Subscription&);
898  Subscription& operator=(const Subscription&);
899 
900  size_t moveEntry(size_t index_)
901  {
902  // Check for wrap
903  if (_current >= _entriesLength)
904  {
905  _current = 0;
906  _currentBase += _entriesLength;
907  }
908  // Check for resize
909  // If list is too small, double it
910  if ((_current == _least % _entriesLength &&
911  _leastBase < _currentBase) ||
912  (_current == _recoveryMin && _recoveryBase < _currentBase))
913  {
914  if (!_store->resize(_id, (char**)&_entries,
915  sizeof(Entry) * _entriesLength * 2))
916  {
917  return AMPS_UNSET_INDEX;
918  }
919  // Length was doubled
920  _entriesLength *= 2;
921  }
922  _entries[_current]._val = _entries[index_]._val;
923  _entries[_current]._active = _entries[index_]._active;
924  // No need to clear Field, just set it to empty
925  _entries[index_]._val.assign(NULL, 0);
926  _entries[index_]._active = false;
927  return _current++;
928  }
929 
930  void _setLastPersistedToEpoch()
931  {
932  size_t fieldLen = strlen(AMPS_BOOKMARK_EPOCH);
933  char* field = new char[fieldLen];
934  memcpy(field, AMPS_BOOKMARK_EPOCH, fieldLen);
935  _lastPersisted.clear();
936  _lastPersisted.assign(field, fieldLen);
937  }
938 
939  bool _discard(size_t index_)
940  {
941  bool retVal = false;
942  // Lock should already be held
943  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
944  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
945  size_t base = (_recoveryBase == AMPS_UNSET_INDEX
946  || index_ >= _least + _leastBase)
947  ? _leastBase : _recoveryBase;
948  // discard of a record not in the log is a no-op
949  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
950  _recoveryMin + _recoveryBase);
951  if (index_ >= _current + _currentBase || index_ < min)
952  {
953  return retVal;
954  }
955 
956  // log that this one is discarded, then
957  // recalculate what the most recent entry is.
958  Entry& e = _entries[(index_ - base) % _entriesLength];
959  e._active = false;
960 
961  size_t index = index_;
962  if (_recoveryMin != AMPS_UNSET_INDEX &&
963  index_ == _recoveryMin + _recoveryBase)
964  {
965  // Find all to discard
966  size_t j = _recoveryMin;
967  while (j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
968  !_entries[j]._active)
969  {
970  // This index might be left-over from a slow discard and we
971  // may have reconnected. We have a few possibilities at this point.
972  // 1. If we re-logged this bookmark, this index will point at an
973  // empty bookmark. This could happen if the discard thread was slow
974  // and the reconnect was fast. We wouldn't report the
975  // the re-arrival of the bookmark as a duplicate because it
976  // hadn't been marked as discarded. In this case, we have to
977  // simply move past this in the recovery area.
978  // 2. This bookmark should become _recent because we haven't
979  // yet received anything since our last call to getMostRecent.
980  // In this case, we need to take it out of recovered but not
981  // clear it. The publishers map should report it as duplicate.
982  // 3. This is the 'oldest' recovered, but we have received new
983  // bookmarks since we got this one. We can clear it because the
984  // publishers map should report it as a duplicate if/when it
985  // does arrive again. Move the _recoveryMin ahead and remove it
986  // from recovered.
987  Message::Field& bookmark = _entries[j]._val;
988  // Option 1 skips this and just moves on
989  if (!bookmark.empty())
990  {
991  _recovered.erase(bookmark);
992  if (_least + _leastBase == _current + _currentBase ||
993  ((_least + _leastBase) % _entriesLength) ==
994  ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
995  {
996  // Option 2, reset recent
997  retVal = true;
998  _store->_recentChanged = true;
999  _recoveryTimestamp.clear();
1000  _recent.clear();
1001  _recent = bookmark;
1002  bookmark.assign(NULL, 0);
1003  }
1004  else
1005  {
1006  // Option 3, simply clear this one
1007  bookmark.clear();
1008  }
1009  }
1010  // If we reach the buffer end,
1011  // keep checking from the beginning
1012  if (++j == _entriesLength)
1013  {
1014  // Least has now loooped around
1015  _recoveryBase += _entriesLength;
1016  j = 0;
1017  }
1018  }
1019  assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
1020  _recovered.empty());
1021  if (_recovered.empty())
1022  {
1023  _recoveryMin = AMPS_UNSET_INDEX;
1024  _recoveryBase = AMPS_UNSET_INDEX;
1025  _recoveryMax = AMPS_UNSET_INDEX;
1026  _recoveryMaxBase = AMPS_UNSET_INDEX;
1027  // Cleared recovered, want to check onward
1028  index = _least + _leastBase;
1029  }
1030  else
1031  {
1032  _recoveryMin = j;
1033  }
1034  }
1035  // if this is the first item in the list, discard all inactive ones
1036  // as long as recovery also says its okay
1037  if (index == _least + _leastBase)
1038  {
1039  // Find all to discard
1040  size_t j = _least;
1041  while (j + _leastBase < _current + _currentBase &&
1042  !_entries[j]._active)
1043  {
1044  //Must free associated memory
1045  _recent.clear();
1046  _recent = _entries[j]._val;
1047  _entries[j]._val.assign(NULL, 0);
1048  _store->_recentChanged = true;
1049  retVal = true;
1050  _recoveryTimestamp.clear();
1051  // If we reach the buffer end,
1052  // keep checking from the beginning
1053  if (++j == _entriesLength)
1054  {
1055  // Least has now loooped around
1056  _leastBase += _entriesLength;
1057  j = 0;
1058  }
1059  }
1060  _least = j;
1061  }
1062  return retVal;
1063  }
1064 
1065  void _updateMostRecent()
1066  {
1067  // Lock is already held
1068  _recovered.clear();
1069  assert((_recoveryBase == AMPS_UNSET_INDEX && _recoveryMin == AMPS_UNSET_INDEX) ||
1070  (_recoveryBase != AMPS_UNSET_INDEX && _recoveryMin != AMPS_UNSET_INDEX));
1071  size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1072  size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1073  _recoveryMin = AMPS_UNSET_INDEX;
1074  _recoveryBase = AMPS_UNSET_INDEX;
1075  _recoveryMax = AMPS_UNSET_INDEX;
1076  _recoveryMaxBase = AMPS_UNSET_INDEX;
1077  for (size_t i = start; i + base < _current + _currentBase; i++)
1078  {
1079  if ( i >= _entriesLength )
1080  {
1081  i = 0;
1082  base = _currentBase;
1083  }
1084  if (i >= _recoveryMax + _recoveryBase && i < _least + _leastBase)
1085  {
1086  continue;
1087  }
1088  Entry& entry = _entries[i];
1089  if (!entry._val.empty())
1090  {
1091  _recovered[entry._val] = i + base;
1092  if (_recoveryMin == AMPS_UNSET_INDEX)
1093  {
1094  _recoveryMin = i;
1095  _recoveryBase = base;
1096  _recoveryMax = _current;
1097  _recoveryMaxBase = _currentBase;
1098  }
1099  }
1100  }
1101  if (_current == _entriesLength)
1102  {
1103  _current = 0;
1104  _currentBase += _entriesLength;
1105  }
1106  _least = _current;
1107  _leastBase = _currentBase;
1108  }
1109 
1110  Message::Field _id;
1111  Message::Field _recent;
1112  Message::Field _lastPersisted;
1113  Message::Field _recentList;
1114  BookmarkRange _range;
1115  Message::Field _recoveryTimestamp;
1116  size_t _current;
1117  size_t _currentBase;
1118  size_t _least;
1119  size_t _leastBase;
1120  size_t _recoveryMin;
1121  size_t _recoveryBase;
1122  size_t _recoveryMax;
1123  size_t _recoveryMaxBase;
1124  size_t _entriesLength;
1125  Entry* _entries;
1126  MemoryBookmarkStore* _store;
1127  Mutex _subLock;
1128  RecoveryMap _recovered;
1129  public:
1130  PublisherMap _publishers;
1131  };
1132 
1133  public:
1137  _subsLock(),
1138  _lock(),
1139  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1140  _recentChanged(true),
1141  _recovering(false),
1142  _recoveryPointAdapter(NULL),
1143  _recoveryPointFactory(NULL)
1144  { ; }
1145 
1146  typedef RecoveryPointAdapter::iterator RecoveryIterator;
1147 
1155  RecoveryPointFactory factory_ = NULL)
1156  : BookmarkStoreImpl()
1157  , _subsLock()
1158  , _lock()
1159  , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1160  , _recentChanged(true)
1161  , _recovering(true)
1162  , _recoveryPointAdapter(adapter_)
1163  , _recoveryPointFactory(factory_)
1164  {
1165  Message msg;
1166  for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1167  recoveryPoint != _recoveryPointAdapter.end();
1168  ++recoveryPoint)
1169  {
1170  Field subId(recoveryPoint->getSubId());
1171  msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1172  msg.setSubId(subId);
1173  Field bookmark = recoveryPoint->getBookmark();
1174  if (BookmarkRange::isRange(bookmark))
1175  {
1176  msg.setBookmark(bookmark);
1177  _log(msg);
1178  }
1179  else
1180  {
1181  const char* start = bookmark.data();
1182  size_t remain = bookmark.len();
1183  const char* comma = (const char*)memchr((const void*)start,
1184  (int)',', remain);
1185  while (comma)
1186  {
1187  size_t len = (size_t)(comma - start);
1188  msg.setBookmark(start, len);
1189  if (Field::isTimestamp(msg.getBookmark()))
1190  {
1191  find(subId)->setRecoveryTimestamp(start, len);
1192  }
1193  else
1194  {
1195  _isDiscarded(msg);
1196  _log(msg);
1197  _discard(msg);
1198  }
1199  start = ++comma;
1200  remain = bookmark.len() - (size_t)(start - bookmark.data());
1201  comma = (const char*)memchr((const void*)start,
1202  (int)',', remain);
1203  }
1204  msg.setBookmark(start, remain);
1205  if (Field::isTimestamp(msg.getBookmark()))
1206  {
1207  find(subId)->setRecoveryTimestamp(start, remain);
1208  }
1209  else
1210  {
1211  _isDiscarded(msg);
1212  _log(msg);
1213  _discard(msg);
1214  }
1215  }
1216  }
1217  _recovering = false;
1218  }
1219 
1220  virtual ~MemoryBookmarkStore()
1221  {
1222  __purge();
1223  }
1224 
1230  virtual size_t log(Message& message_)
1231  {
1232  Lock<Mutex> guard(_lock);
1233  return _log(message_);
1234  }
1235 
1241  virtual void discard(const Message& message_)
1242  {
1243  Lock<Mutex> guard(_lock);
1244  (void)_discard(message_);
1245  }
1246 
1254  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1255  {
1256  Lock<Mutex> guard(_lock);
1257  (void)_discard(subId_, bookmarkSeqNo_);
1258  }
1259 
1266  {
1267  Lock<Mutex> guard(_lock);
1268  return _getMostRecent(subId_);
1269  }
1270 
1279  virtual bool isDiscarded(Message& message_)
1280  {
1281  Lock<Mutex> guard(_lock);
1282  return _isDiscarded(message_);
1283  }
1284 
1290  virtual void purge()
1291  {
1292  Lock<Mutex> guard(_lock);
1293  _purge();
1294  }
1295 
1301  virtual void purge(const Message::Field& subId_)
1302  {
1303  Lock<Mutex> guard(_lock);
1304  _purge(subId_);
1305  }
1306 
1311  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_)
1312  {
1313  Lock<Mutex> guard(_lock);
1314  return _getOldestBookmarkSeq(subId_);
1315  }
1316 
1322  virtual void persisted(const Message::Field& subId_,
1323  const Message::Field& bookmark_)
1324  {
1325  Lock<Mutex> guard(_lock);
1326  _persisted(find(subId_), bookmark_);
1327  }
1328 
1335  virtual Message::Field persisted(const Message::Field& subId_,
1336  size_t bookmark_)
1337  {
1338  Lock<Mutex> guard(_lock);
1339  return _persisted(find(subId_), bookmark_);
1340  }
1341 
1346  void setServerVersion(const VersionInfo& version_)
1347  {
1348  setServerVersion(version_.getOldStyleVersion());
1349  }
1350 
1355  void setServerVersion(size_t version_)
1356  {
1357  Lock<Mutex> guard(_subsLock);
1358  _serverVersion = version_;
1359  }
1360 
1361  inline bool isWritableBookmark(size_t length)
1362  {
1363  return length >= AMPS_MIN_BOOKMARK_LEN;
1364  }
1365 
1366  typedef Subscription::EntryPtrList EntryPtrList;
1367 
1368  protected:
1369 
1370  // Called once lock is acquired
1371  size_t _log(Message& message_)
1372  {
1373  Message::Field bookmark = message_.getBookmark();
1374  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1375  if (!pSub)
1376  {
1377  Message::Field subId = message_.getSubscriptionId();
1378  if (subId.empty())
1379  {
1380  subId = message_.getSubscriptionIds();
1381  }
1382  pSub = find(subId);
1383  message_.setSubscriptionHandle(
1384  static_cast<amps_subscription_handle>(pSub));
1385  }
1386  size_t retVal = pSub->log(bookmark);
1387  message_.setBookmarkSeqNo(retVal);
1388  return retVal;
1389  }
1390 
1391  // Called once lock is acquired
1392  bool _discard(const Message& message_)
1393  {
1394  size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1395  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1396  if (!pSub)
1397  {
1398  Message::Field subId = message_.getSubscriptionId();
1399  if (subId.empty())
1400  {
1401  subId = message_.getSubscriptionIds();
1402  }
1403  pSub = find(subId);
1404  }
1405  bool retVal = pSub->discard(bookmarkSeqNo);
1406  if (retVal)
1407  {
1408  updateAdapter(pSub);
1409  }
1410  return retVal;
1411  }
1412 
1413  // Called once lock is acquired
1414  bool _discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1415  {
1416  Subscription* pSub = find(subId_);
1417  bool retVal = pSub->discard(bookmarkSeqNo_);
1418  if (retVal)
1419  {
1420  updateAdapter(pSub);
1421  }
1422  return retVal;
1423  }
1424 
1425  // Called once lock is acquired
1426  Message::Field _getMostRecent(const Message::Field& subId_,
1427  bool usePublishersList_ = true)
1428  {
1429  Subscription* pSub = find(subId_);
1430  return pSub->getMostRecentList(usePublishersList_);
1431  }
1432 
1433  // Called once lock is acquired
1434  bool _isDiscarded(Message& message_)
1435  {
1436  Message::Field subId = message_.getSubscriptionId();
1437  if (subId.empty())
1438  {
1439  subId = message_.getSubscriptionIds();
1440  }
1441  Subscription* pSub = find(subId);
1442  message_.setSubscriptionHandle(
1443  static_cast<amps_subscription_handle>(pSub));
1444  return pSub->isDiscarded(message_.getBookmark());
1445  }
1446 
1447  // Called once lock is acquired
1448  size_t _getOldestBookmarkSeq(const Message::Field& subId_)
1449  {
1450  Subscription* pSub = find(subId_);
1451  return pSub->getOldestBookmarkSeq();
1452  }
1453 
1454  // Called once lock is acquired
1455  virtual void _persisted(Subscription* pSub_,
1456  const Message::Field& bookmark_)
1457  {
1458  if (pSub_->lastPersisted(bookmark_))
1459  {
1460  updateAdapter(pSub_);
1461  }
1462  }
1463 
1464  // Called once lock is acquired
1465  virtual Message::Field _persisted(Subscription* pSub_, size_t bookmark_)
1466  {
1467  return pSub_->lastPersisted(bookmark_);
1468  }
1469 
1470  // Called once lock is acquired
1471  void _purge()
1472  {
1473  if (_recoveryPointAdapter.isValid())
1474  {
1475  _recoveryPointAdapter.purge();
1476  }
1477  __purge();
1478  }
1479 
1480  // Called once lock is acquired
1481  void __purge()
1482  {
1483  // Walk through list and clear Fields before calling clear
1484  while (!_subs.empty())
1485  {
1486  SubscriptionMap::iterator iter = _subs.begin();
1487  //The subId key is cleared when deleting the Subscription, which shares
1488  //the _data pointer in its id field.
1489  const_cast<Message::Field&>(iter->first).clear();
1490  delete (iter->second);
1491  _subs.erase(iter);
1492  }
1493  _subs.clear();
1494  }
1495 
1496  // Called once lock is acquired
1497  virtual void _purge(const Message::Field& subId_)
1498  {
1499  if (_recoveryPointAdapter.isValid())
1500  {
1501  _recoveryPointAdapter.purge(subId_);
1502  }
1503  __purge(subId_);
1504  }
1505 
1506  // Called once lock is acquired
1507  virtual void __purge(const Message::Field& subId_)
1508  {
1509  Lock<Mutex> guard(_subsLock);
1510  SubscriptionMap::iterator iter = _subs.find(subId_);
1511  if (iter == _subs.end())
1512  {
1513  return;
1514  }
1515  const_cast<Message::Field&>(iter->first).clear();
1516  delete (iter->second);
1517  _subs.erase(iter);
1518  }
1519 
1520  // Can be used by subclasses during recovery
1521  void setMostRecent(const Message::Field& subId_,
1522  const Message::Field& recent_)
1523  {
1524  find(subId_)->setMostRecent(recent_);
1525  }
1526 
1527  Mutex _subsLock;
1528  Mutex _lock;
1529  static const char ENTRY_BOOKMARK = 'b';
1530  static const char ENTRY_DISCARD = 'd';
1531  static const char ENTRY_PERSISTED = 'p';
1532 
1533  virtual Subscription* find(const Message::Field& subId_)
1534  {
1535  if (subId_.empty())
1536  {
1537  throw StoreException("A valid subscription ID must be provided to the Bookmark Store");
1538  }
1539  Lock<Mutex> guard(_subsLock);
1540  if (_subs.count(subId_) == 0)
1541  {
1542  // Subscription will be created
1543  Message::Field id;
1544  id.deepCopy(subId_);
1545  _subs[id] = new Subscription(this, id);
1546  return _subs[id];
1547  }
1548  return _subs[subId_];
1549  }
1550 
1551  virtual bool resize(const Message::Field& subId_, char** newBuffer_, size_t size_,
1552  bool callResizeHandler_ = true)
1553  {
1554  assert(newBuffer_ != 0);
1555  if (size_ == 0) // Delete the buffer
1556  {
1557  if (*newBuffer_)
1558  {
1559  free(*newBuffer_);
1560  *newBuffer_ = NULL;
1561  }
1562  return true;
1563  }
1564  if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1565  {
1566  return false;
1567  }
1568  char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1569  *newBuffer_ = (char*)malloc(size_);
1570  memset(*newBuffer_, 0, size_);
1571  if (oldBuffer)
1572  {
1573  find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1574  free(oldBuffer);
1575  }
1576  return true;
1577  }
1578 
1579  protected:
1580  void updateAdapter(Subscription* pSub_)
1581  {
1582  if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1583  {
1584  return;
1585  }
1586  if (_recoveryPointFactory)
1587  {
1588  RecoveryPoint update(_recoveryPointFactory(pSub_->id(),
1589  pSub_->getMostRecentList(false)));
1590  _recoveryPointAdapter.update(update);
1591  }
1592  else
1593  {
1594  RecoveryPoint update(new FixedRecoveryPoint(pSub_->id(),
1595  pSub_->getMostRecentList(false)));
1596  _recoveryPointAdapter.update(update);
1597  }
1598  }
1599 
1600  typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1601  SubscriptionMap _subs;
1602  size_t _serverVersion;
1603  bool _recentChanged;
1604  bool _recovering;
1605  typedef std::set<Subscription*> SubscriptionSet;
1606  RecoveryPointAdapter _recoveryPointAdapter;
1607  RecoveryPointFactory _recoveryPointFactory;
1608  };
1609 
1610 } // end namespace AMPS
1611 
1612 #endif //_MEMORYBOOKMARKSTORE_H_
1613 
Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1373
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1290
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1254
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1265
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1335
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1154
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1136
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1322
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1355
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1374
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:127
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1279
Provides AMPS::RecoveryPointAdapter, an iterface for implementing external storage of bookmark subscr...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1346
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1241
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1373
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implementation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1230
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1311
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1301
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1140
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1140