AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
MemoryBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYBOOKMARKSTORE_H_
27 #define _MEMORYBOOKMARKSTORE_H_
28 
29 #include <BookmarkStore.hpp>
30 #include <Field.hpp>
31 #include <Message.hpp>
32 #include <RecoveryPoint.hpp>
33 #include <RecoveryPointAdapter.hpp>
34 #include <map>
35 #include <sstream>
36 #include <vector>
37 #include <stdlib.h>
38 #include <assert.h>
39 
40 #define AMPS_MIN_BOOKMARK_LEN 3
41 #define AMPS_INITIAL_MEMORY_BOOKMARK_SIZE 16384UL
42 
47 
48 namespace AMPS
49 {
50 
57 {
58 protected:
59  class Subscription
60  {
61  public:
62  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
63  typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
64  typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
65  RecoveryIterator;
66  typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
67 
68  // Start sequence at 1 so that 0 can be used during file subclasses
69  // recovery as an indicator that a message wasn't logged because
70  // isDiscarded() was true.
71  Subscription(MemoryBookmarkStore* store_, const Message::Field& id_)
72  : _current(1), _currentBase(0), _least(1), _leastBase(0)
73  , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
74  , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
75  , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
76  , _store(store_)
77  {
78  // Need our own memory for the sub id
79  _id.deepCopy(id_);
80  _store->resize(_id, (char**)&_entries,
81  sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE, false);
82  setLastPersistedToEpoch();
83  }
84 
85  ~Subscription()
86  {
87  Lock<Mutex> guard(_subLock);
88  if(_entries)
89  {
90  for(size_t i = 0; i < _entriesLength; ++i)
91  {
92  _entries[i]._val.clear();
93  }
94  // resize to 0 will free _entries
95  _store->resize(_id, (char**)&_entries, 0);
96  }
97  _id.clear();
98  _recent.clear();
99  _lastPersisted.clear();
100  _recentList.clear();
101  _range.clear();
102  _recoveryTimestamp.clear();
103  }
104 
105  size_t log(const Message::Field& bookmark_)
106  {
107  if (bookmark_ == AMPS_BOOKMARK_NOW)
108  return 0;
109  Lock<Mutex> guard(_subLock);
110  // Either relog the recovery or log it
111  size_t index = recover(bookmark_, true);
112  if (index == AMPS_UNSET_INDEX)
113  {
114  // Check for wrap
115  if(_current >= _entriesLength)
116  {
117  _current = 0;
118  _currentBase += _entriesLength;
119  }
120  // Check for resize
121  // If list is too small, double it
122  if((_current == _least && _leastBase < _currentBase) ||
123  (_current == _recoveryMin && _recoveryBase < _currentBase))
124  {
125  if (!_store->resize(_id, (char**)&_entries,
126  sizeof(Entry) * _entriesLength * 2))
127  {
128  //Try again
129  return log(bookmark_);
130  }
131  // Length was doubled
132  _entriesLength *= 2;
133  }
134 
135  // Add this entry to the end of our list
136  /*
137  if (bookmark_ == AMPS_BOOKMARK_NOW)
138  {
139  // Save a now timestamp bookmark
140  char* nowTimestamp = new char[AMPS_TIMESTAMP_LEN];
141  struct tm timeInfo;
142  time_t now;
143  time(&now);
144 #ifdef _WIN32
145  gmtime_s(&timeInfo, &now);
146 #else
147  gmtime_r(&now, &timeInfo);
148 #endif
149  strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
150  "%Y%m%dT%H%M%S", &timeInfo);
151  nowTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
152  _entries[_current]._val.assign(nowTimestamp,
153  AMPS_TIMESTAMP_LEN);
154  _entries[_current]._active = false;
155  index = _current++;
156  return index + _currentBase;
157  }
158  else
159  */
160  {
161  // Is this an attempt at a range?
162  if (!BookmarkRange::isRange(bookmark_))
163  {
164  _entries[_current]._val.deepCopy(bookmark_);
165  }
166  else
167  {
168  // Deep copy of the range is saved
169  _range.set(bookmark_);
170  // Stricter check on range syntax
171  if (!_range.isValid())
172  {
173  throw CommandException("Invalid bookmark range specified.");
174  }
175  _store->updateAdapter(this);
176  if (!_range.isStartInclusive())
177  {
178  // Put it in our publishers map
179  amps_uint64_t publisher,sequence;
180  parseBookmark(_range.getStart(), publisher, sequence);
181  _publishers[publisher] = sequence;
182  }
183  // Don't actually log a range
184  return 0;
185  }
186  }
187  _entries[_current]._active = true;
188  index = _current++;
189  }
190  return index + _currentBase;
191  }
192 
193  bool discard(size_t index_)
194  {
195  Lock<Mutex> guard(_subLock);
196  return _discard(index_);
197  }
198 
199  bool discard(const Message::Field& bookmark_)
200  {
201  // These are discarded when logged or not logged
202  if (bookmark_ == AMPS_BOOKMARK_NOW)
203  {
204  return false;
205  }
206  Lock<Mutex> guard(_subLock);
207  size_t search = _least;
208  size_t searchBase = _leastBase;
209  size_t searchMax = _current;
210  size_t searchMaxBase = _currentBase;
211  if (_least + _leastBase == _current + _currentBase)
212  {
213  if (_recoveryMin != AMPS_UNSET_INDEX)
214  {
215  search = _recoveryMin;
216  searchBase = _recoveryBase;
217  searchMax = _recoveryMax;
218  searchMaxBase = _recoveryMaxBase;
219  }
220  else // Store is empty, so nothing to do
221  {
222  return false;
223  }
224  }
225  assert(searchMax != AMPS_UNSET_INDEX);
226  assert(searchMaxBase != AMPS_UNSET_INDEX);
227  assert(search != AMPS_UNSET_INDEX);
228  assert(searchBase != AMPS_UNSET_INDEX);
229  // Search while we don't find the provided bookmark and we're in valid range
230  while (search + searchBase < searchMax + searchMaxBase)
231  {
232  if (_entries[search]._val == bookmark_)
233  {
234  return _discard(search+searchBase);
235  }
236  if(++search == _entriesLength)
237  {
238  // Least has now loooped around
239  searchBase += _entriesLength;
240  search = 0;
241  }
242  }
243  return false;
244  }
245 
246  // Get sequence number from a Field that is a bookmark
247  static void parseBookmark(const Message::Field& field_,
248  amps_uint64_t& publisherId_,
249  amps_uint64_t& sequenceNumber_)
250  {
251  Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
252  }
253 
254  // Check to see if this message is older than the most recent one seen,
255  // and if it is, check if it discarded.
256  bool isDiscarded(const Message::Field& bookmark_)
257  {
258  Lock<Mutex> guard(_subLock);
259  if (BookmarkRange::isRange(bookmark_))
260  {
261  return false;
262  }
263  // Check if we've already recovered this bookmark
264  size_t recoveredIdx = recover(bookmark_, false);
265 
266  amps_uint64_t publisher,sequence;
267  parseBookmark(bookmark_, publisher, sequence);
268  // Compare it to our publishers map
269  PublisherIterator pub = _publishers.find(publisher);
270  if(pub == _publishers.end() || pub->second < sequence)
271  {
272  _publishers[publisher] = sequence;
273  if (recoveredIdx == AMPS_UNSET_INDEX)
274  {
275  return false;
276  }
277  }
278  if (recoveredIdx != AMPS_UNSET_INDEX)
279  {
280  if (!_entries[recoveredIdx]._active)
281  {
282  _recovered.erase(bookmark_);
283  return true;
284  }
285  return false;
286  }
287  // During recovery, we don't really care if it's been discarded
288  // or not. We just want _publishers updated. No need for the
289  // costly linear search.
290  if (_store->_recovering) return false;
291  // During failure and recovery scenarios, we'll see out of order
292  // bookmarks arrive, either because (a) we're replaying or (b)
293  // a publisher has cut over, and we've cut over to a new server.
294  // Scan the list to see if we have a match.
295  size_t base = _leastBase;
296  for(size_t i = _least; i + base < _current + _currentBase; i++)
297  {
298  if( i >= _entriesLength )
299  {
300  i = 0;
301  base = _currentBase;
302  }
303  if(_entries[i]._val == bookmark_)
304  {
305  return !_entries[i]._active;
306  }
307  }
308 
309  return true; // message is totally discarded
310  }
311 
312  bool empty(void) const
313  {
314  if (_least == AMPS_UNSET_INDEX ||
315  ((_least+_leastBase)==(_current+_currentBase) &&
316  _recoveryMin == AMPS_UNSET_INDEX))
317  return true;
318  return false;
319  }
320 
321  void updateMostRecent()
322  {
323  Lock<Mutex> guard(_subLock);
324  _updateMostRecent();
325  }
326 
327  const BookmarkRange& getRange() const
328  {
329  return _range;
330  }
331 
332  Message::Field getMostRecentList(bool usePublishersList_ = true)
333  {
334  Lock<Mutex> guard(_subLock);
335  bool useLastPersisted = !_lastPersisted.empty() &&
336  _lastPersisted.len() > 1;
337  // when this is called, we'll take a moment to update the list
338  // of things recovered,
339  // so we don't accidentally log anything we ought not to.
340  _updateMostRecent();
341  bool useRecent = !_recent.empty() && _recent.len() > 1;
342  amps_uint64_t lastPublisher = 0;
343  amps_uint64_t lastSeq = 0;
344  amps_uint64_t recentPublisher = 0;
345  amps_uint64_t recentSeq = 0;
346  if (useLastPersisted)
347  {
348  parseBookmark(_lastPersisted,lastPublisher,lastSeq);
349  }
350  if (useRecent)
351  {
352  parseBookmark(_recent,recentPublisher,recentSeq);
353  if (empty() && useLastPersisted)
354  {
355  useRecent = false;
356  }
357  else
358  {
359  if (useLastPersisted && lastPublisher == recentPublisher)
360  {
361  if (lastSeq <= recentSeq) useRecent = false;
362  else useLastPersisted = false;
363  }
364  }
365  }
366  // Set size for all bookmarks that will be used
367  size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
368  if (useRecent) totalLen += _recent.len() + 1;
369  // If we don't have a non-EPOCH persisted ack and we don't have a
370  // non-EPOCH most recent bookmark, OR we have a range
371  // we can build a list based on all the publishers instead.
372  if (usePublishersList_
373  && ((!useLastPersisted && !useRecent)
374  || _lastPersisted == AMPS_BOOKMARK_EPOCH))
375  {
376  std::ostringstream os;
377  for (PublisherIterator pub = _publishers.begin();
378  pub != _publishers.end(); ++pub)
379  {
380  if (pub->first == 0 && pub->second == 0) continue;
381  if (pub->first == recentPublisher && recentSeq < pub->second)
382  {
383  os << recentPublisher << '|' << recentSeq << "|,";
384  }
385  else
386  {
387  os << pub->first << '|' << pub->second << "|,";
388  }
389  }
390  std::string recent = os.str();
391  totalLen = recent.length();
392  if (!recent.empty())
393  {
394  if (!_recoveryTimestamp.empty())
395  {
396  totalLen += _recoveryTimestamp.len();
397  recent += std::string(_recoveryTimestamp);
398  }
399  else
400  {
401  // Remove trailing ,
402  recent.erase(--totalLen);
403  }
404  // Reset _recentList to new value and return it
405  _recentList.clear();
406  _recentList = Message::Field(recent).deepCopy();
407  if (_range.isValid())
408  {
409  if (_range.getStart() != recent
410  && _recentList != AMPS_BOOKMARK_EPOCH)
411  {
412  _range.replaceStart(_recentList, true);
413  }
414  else if (_range.isStartInclusive())
415  {
416  amps_uint64_t publisher,sequence;
417  parseBookmark(_range.getStart(), publisher,
418  sequence);
419  PublisherIterator pub = _publishers.find(publisher);
420  if(pub != _publishers.end()
421  && pub->second >= sequence)
422  {
423  _range.makeStartExclusive();
424  }
425  }
426  return _range;
427  }
428  return _recentList;
429  }
430  if (_range.isValid())
431  {
432  return _range;
433  }
434  }
435  if (!_recoveryTimestamp.empty() && !_range.isValid())
436  {
437  totalLen += _recoveryTimestamp.len()+1;
438  }
439  // If we have nothing discarded, return EPOCH
440  if (totalLen == 0
441  || (_recent.len() < 2 && !empty()))
442  {
443  if (_range.isValid())
444  {
445  return _range;
446  }
447  if (!useRecent)
448  {
450  }
451  _setLastPersistedToEpoch();
452  return _lastPersisted;
453  }
454  // Remove the trailing , from the length
455  totalLen -= 1;
456  char* field = new char[totalLen];
457  size_t len = 0;
458  if (useRecent)
459  {
460  len = _recent.len();
461  memcpy(field, _recent.data(), len);
462  if (len < totalLen) field[len++] = ',';
463  }
464  if (useLastPersisted)
465  {
466  memcpy(field+len, _lastPersisted.data(), _lastPersisted.len());
467  len += _lastPersisted.len();
468  if (len < totalLen) field[len++] = ',';
469  }
470  if (!_recoveryTimestamp.empty() && !_range.isValid())
471  {
472  memcpy(field+len, _recoveryTimestamp.data(),
473  _recoveryTimestamp.len());
474  // If more is to be written after this, uncomment the following
475  //len += _lastPersisted.len();
476  //if (len < totalLen) field[len++] = ',';
477  }
478  // _recentList clear will delete[] current buffer and assign will get cleared
479  _recentList.clear();
480  _recentList.assign(field, totalLen);
481  if (_range.isValid())
482  {
483  if (_recentList != AMPS_BOOKMARK_EPOCH)
484  {
485  if (_range.getStart() != _recentList)
486  {
487  _range.replaceStart(_recentList, true);
488  }
489  else if (_range.isStartInclusive())
490  {
491  amps_uint64_t publisher,sequence;
492  parseBookmark(_range.getStart(), publisher,
493  sequence);
494  PublisherIterator pub = _publishers.find(publisher);
495  if(pub != _publishers.end()
496  && pub->second >= sequence)
497  {
498  _range.makeStartExclusive();
499  }
500  }
501  }
502  return _range;
503  }
504  return _recentList;
505  }
506 
507  Message::Field getMostRecent(bool update_ = false)
508  {
509  Lock<Mutex> guard(_subLock);
510  // Return the same as last time if nothing's changed
511  // _recent is the most recent bookmark.
512  if (update_ && _store->_recentChanged)
513  _updateMostRecent();
514  if(_recent.empty())
515  {
517  }
518  else
519  {
520  return _recent;
521  }
522  }
523 
524  Message::Field getLastPersisted()
525  {
526  Lock<Mutex> guard(_subLock);
527  return _lastPersisted;
528  }
529 
530  void setMostRecent(const Message::Field& recent_)
531  {
532  _recent.clear();
533  _recent.deepCopy(recent_);
534  }
535 
536  void setRecoveryTimestamp(const char* recoveryTimestamp_,
537  size_t len_ = 0)
538  {
539  _recoveryTimestamp.clear();
540  size_t len = (len_==0)?AMPS_TIMESTAMP_LEN:len_;
541  char* ts = new char[len];
542  memcpy((void*)ts, (const void*)recoveryTimestamp_, len);
543  _recoveryTimestamp.assign(ts, len);
544  }
545 
546  void moveEntries(char* old_, char* new_, size_t newSize_)
547  {
548  size_t least = _least;
549  size_t leastBase = _leastBase;
550  if (_recoveryMin != AMPS_UNSET_INDEX)
551  {
552  least = _recoveryMin;
553  leastBase = _recoveryBase;
554  }
555  // First check if we grew in place, if so, just move current after least
556  if (old_ == new_)
557  {
558  if (newSize_ - (sizeof(Entry)*_entriesLength) > sizeof(Entry)*least)
559  {
560  memcpy(new_ + (sizeof(Entry)*_entriesLength),
561  old_, (sizeof(Entry)*least));
562  // Clear the beginning where those entries were
563  memset(old_, 0, sizeof(Entry)*least);
564  }
565  else // We have to use an intermediate buffer
566  {
567  Entry* buffer = new Entry[least];
568  memcpy((void*)buffer, (void*)old_, sizeof(Entry)*least);
569  //Put the beginning entries at the start of the new buffer
570  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
571  (_entriesLength - least)*sizeof(Entry));
572  //Put the end entries after the beginning entries
573  memcpy((void*)((char*)new_+((_entriesLength - least)*sizeof(Entry))),
574  (void*)buffer, least*sizeof(Entry));
575  // Least is now at 0 so base must be increased
576  leastBase += least;
577  least = 0;
578  delete [] buffer;
579  }
580  }
581  else
582  {
583  //Put the beginning entries at the start of the new buffer
584  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
585  (_entriesLength - least)*sizeof(Entry));
586  //Put the end entries after the beginning entries
587  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
588  (void*)old_, least*sizeof(Entry));
589  // Least is now at 0 so base must be increased
590  leastBase += least;
591  least = 0;
592  }
593  if (_recoveryMin != AMPS_UNSET_INDEX)
594  {
595  _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
596  _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
597  (_recoveryMin + _recoveryBase);
598  _recoveryMaxBase = leastBase;
599  _recoveryMin = least;
600  _recoveryBase = leastBase;
601  }
602  else
603  {
604  _least = least;
605  }
606  _leastBase = leastBase;
607  // Current is now after everything and using the same base
608  _currentBase = _leastBase;
609  _current = least + _entriesLength;
610  }
611 
612  inline size_t getOldestBookmarkSeq()
613  {
614  Lock<Mutex> guard(_subLock);
615  // If there is nothing in the store, return -1, otherwise return lowest
616  return ((_least+_leastBase)==(_current+_currentBase)) ? AMPS_UNSET_INDEX :
617  _least + _leastBase;
618  }
619 
620  bool lastPersisted(const Message::Field& bookmark_)
621  {
622  // These shouldn't be persisted
623  if (bookmark_ == AMPS_BOOKMARK_NOW
624  || BookmarkRange::isRange(bookmark_))
625  {
626  return false;
627  }
628  Lock<Mutex> guard(_subLock);
629  return _setLastPersisted(bookmark_);
630  }
631 
632  bool _setLastPersisted(const Message::Field& bookmark_)
633  {
634  if (!_lastPersisted.empty())
635  {
636  amps_uint64_t publisher, publisher_lastPersisted;
637  amps_uint64_t sequence, sequence_lastPersisted;
638  parseBookmark(bookmark_,publisher,sequence);
639  parseBookmark(_lastPersisted, publisher_lastPersisted,
640  sequence_lastPersisted);
641  if(publisher == publisher_lastPersisted &&
642  sequence <= sequence_lastPersisted)
643  {
644  return false;
645  }
646  }
647  // deepCopy will clear what's in _lastPersisted
648  _lastPersisted.deepCopy(bookmark_);
649  _store->_recentChanged = true;
650  _recoveryTimestamp.clear();
651  return true;
652  }
653 
654  Message::Field lastPersisted(size_t bookmark_)
655  {
656  Lock<Mutex> guard(_subLock);
657  Message::Field& bookmark = _entries[bookmark_]._val;
658  // These shouldn't be persisted
659  if (bookmark == AMPS_BOOKMARK_NOW
660  || BookmarkRange::isRange(bookmark))
661  {
662  return bookmark;
663  }
664  _setLastPersisted(bookmark);
665  return bookmark;
666  }
667 
668  // Returns the index of the recovered item, either the index where it
669  // was first stored prior to getMostRecent, or the new index if it is
670  // relogged either because this is called from log() or because it was
671  // not active but also not persisted.
672  size_t recover(const Message::Field& bookmark_, bool relogIfNotDiscarded)
673  {
674  size_t retVal = AMPS_UNSET_INDEX;
675  if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
676  return retVal;
677  // Check if this is a recovered bookmark.
678  // If so, copy the existing one to the new location
679  RecoveryIterator item = _recovered.find(bookmark_);
680  if(item != _recovered.end())
681  {
682  size_t seqNo = item->second;
683  size_t index = (seqNo - _recoveryBase) % _entriesLength;
684  // If we only have recovery entries and isDiscarded is
685  // checking on an already discarded entry, update recent.
686  if (_least+_leastBase == _current+_currentBase &&
687  !_entries[index]._active)
688  {
689  _store->_recentChanged = true;
690  _recent.clear();
691  _recent = _entries[index]._val.deepCopy();
692  retVal = moveEntry(index);
693  if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
694  relogIfNotDiscarded);
695  _least = _current;
696  _leastBase = _currentBase;
697  }
698  else if (!_entries[index]._active || relogIfNotDiscarded)
699  {
700  retVal = moveEntry(index);
701  if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
702  relogIfNotDiscarded);
703  }
704  else
705  {
706  return index;
707  }
708  _recovered.erase(item);
709  if (_recovered.empty())
710  {
711  _recoveryMin = AMPS_UNSET_INDEX;
712  _recoveryBase = AMPS_UNSET_INDEX;
713  _recoveryMax = AMPS_UNSET_INDEX;
714  _recoveryMaxBase = AMPS_UNSET_INDEX;
715  }
716  else if (index == _recoveryMin)
717  {
718  while (_entries[_recoveryMin]._val.empty() &&
719  (_recoveryMin+_recoveryBase) < (_recoveryMax+_recoveryMaxBase))
720  {
721  if (++_recoveryMin == _entriesLength)
722  {
723  _recoveryMin = 0;
724  _recoveryBase += _entriesLength;
725  }
726  }
727  }
728  }
729  return retVal;
730  }
731 
732  // Return the id of this Subscription
733  Message::Field id() const
734  {
735  return _id;
736  }
737 
738  struct Entry
739  {
740  Message::Field _val; //16
741  bool _active; //17
742  char _padding[32-sizeof(Message::Field)-(sizeof(bool)*2)]; //32
743 
744  Entry() : _active(false)
745  {
746  _padding[0] = '\0';
747  }
748  };
749 
750  struct EntryHash
751  {
752  Field::FieldHash _hasher;
753 
754  size_t operator()(const Entry* entryPtr_) const
755  {
756  return _hasher(entryPtr_->_val);
757  }
758 
759  bool operator()(const Entry* lhsPtr_, const Entry* rhsPtr_) const
760  {
761  return _hasher(lhsPtr_->_val, rhsPtr_->_val);
762  }
763  };
764 
765  //typedef std::set<Entry*, EntryHash> EntryPtrList;
766  typedef std::vector<Entry*> EntryPtrList;
767 
768  void getRecoveryEntries(EntryPtrList& list_)
769  {
770  if (_recoveryMin == AMPS_UNSET_INDEX ||
771  _recoveryMax == AMPS_UNSET_INDEX)
772  return;
773  size_t base = _recoveryBase;
774  size_t max = _recoveryMax + _recoveryMaxBase;
775  for (size_t i=_recoveryMin; i+base<max; ++i)
776  {
777  if (i == _entriesLength)
778  {
779  i = 0;
780  base = _recoveryMaxBase;
781  }
782  //list_.insert(&(_entries[i]));
783  list_.push_back(&(_entries[i]));
784  }
785  return;
786  }
787 
788  void getActiveEntries(EntryPtrList& list_)
789  {
790  size_t base = _leastBase;
791  for (size_t i=_least; i+base < _current + _currentBase; ++i)
792  {
793  if (i >= _entriesLength)
794  {
795  i = 0;
796  base = _currentBase;
797  }
798  //list_.insert(&(_entries[i]));
799  list_.push_back(&(_entries[i]));
800  }
801  return;
802  }
803 
804  Entry* getEntryByIndex(size_t index_)
805  {
806  Lock<Mutex> guard(_subLock);
807  size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
808  index_ >= _least + _leastBase)
809  ? _leastBase : _recoveryBase;
810  // Return NULL if not a valid index
811  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
812  _least + _leastBase :
813  _recoveryMin + _recoveryBase);
814  if(index_ >= _current+_currentBase || index_ < min) return NULL;
815  return &(_entries[(index_ - base) % _entriesLength]);
816  }
817 
818  void justRecovered()
819  {
820  Lock<Mutex> guard(_subLock);
821  _updateMostRecent();
822  EntryPtrList list;
823  getRecoveryEntries(list);
824  setPublishersToDiscarded(&list, &_publishers);
825  }
826 
827  void setPublishersToDiscarded(EntryPtrList* recovered_,
828  PublisherMap* publishers_)
829  {
830  // Need to reset publishers to only have up to the last
831  // discarded sequence number. Messages that were in transit
832  // during previous run but not discarded should be considered
833  // new and not duplicate after a restart/recovery.
834  for (EntryPtrList::iterator i = recovered_->begin();
835  i != recovered_->end(); ++i)
836  {
837  amps_uint64_t publisher = (amps_uint64_t)0;
838  amps_uint64_t sequence = (amps_uint64_t)0;
839  parseBookmark((*i)->_val,publisher,sequence);
840  if (publisher && sequence && (*i)->_active &&
841  (*publishers_)[publisher] >= sequence)
842  {
843  (*publishers_)[publisher] = sequence - 1;
844  }
845  }
846  }
847 
848  void clearLastPersisted()
849  {
850  Lock<Mutex> guard(_subLock);
851  _lastPersisted.clear();
852  }
853 
854  void setLastPersistedToEpoch()
855  {
856  Lock<Mutex> guard(_subLock);
857  _setLastPersistedToEpoch();
858  }
859 
860  private:
861  Subscription(const Subscription&);
862  Subscription& operator=(const Subscription&);
863 
864  size_t moveEntry(size_t index_)
865  {
866  // Check for wrap
867  if (_current >= _entriesLength)
868  {
869  _current = 0;
870  _currentBase += _entriesLength;
871  }
872  // Check for resize
873  // If list is too small, double it
874  if((_current == _least%_entriesLength &&
875  _leastBase < _currentBase) ||
876  (_current == _recoveryMin && _recoveryBase < _currentBase))
877  {
878  if (!_store->resize(_id, (char**)&_entries,
879  sizeof(Entry) * _entriesLength * 2))
880  {
881  return AMPS_UNSET_INDEX;
882  }
883  // Length was doubled
884  _entriesLength *= 2;
885  }
886  _entries[_current]._val = _entries[index_]._val;
887  _entries[_current]._active = _entries[index_]._active;
888  // No need to clear Field, just set it to empty
889  _entries[index_]._val.assign(NULL, 0);
890  _entries[index_]._active = false;
891  return _current++;
892  }
893 
894  void _setLastPersistedToEpoch()
895  {
896  size_t fieldLen = strlen(AMPS_BOOKMARK_EPOCH);
897  char* field = new char[fieldLen];
898  memcpy(field, AMPS_BOOKMARK_EPOCH, fieldLen);
899  _lastPersisted.clear();
900  _lastPersisted.assign(field, fieldLen);
901  }
902 
903  bool _discard(size_t index_)
904  {
905  bool retVal = false;
906  // Lock should already be held
907  assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
908  (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
909  size_t base = (_recoveryBase == AMPS_UNSET_INDEX
910  || index_ >= _least + _leastBase)
911  ? _leastBase : _recoveryBase;
912  // discard of a record not in the log is a no-op
913  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
914  _recoveryMin + _recoveryBase);
915  if(index_ >= _current+_currentBase || index_ < min)
916  {
917  return retVal;
918  }
919 
920  // log that this one is discarded, then
921  // recalculate what the most recent entry is.
922  Entry& e = _entries[(index_ - base) % _entriesLength];
923  e._active = false;
924 
925  size_t index = index_;
926  if (_recoveryMin != AMPS_UNSET_INDEX &&
927  index_ == _recoveryMin + _recoveryBase)
928  {
929  // Find all to discard
930  size_t j = _recoveryMin;
931  while(j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
932  !_entries[j]._active)
933  {
934  // This index might be left-over from a slow discard and we
935  // may have reconnected. We have a few possibilites at this point.
936  // 1. If we re-logged this bookmark, this index will point at an
937  // empty bookmark. This could happen if the discard thread was slow
938  // and the reconnect was fast. We wouldn't report the
939  // the re-arrival of the bookmark as a duplicate because it
940  // hadn't been marked as discarded. In this case, we have to
941  // simply move past this in the recovery area.
942  // 2. This bookmark should become _recent because we haven't
943  // yet received anything since our last call to getMostRecent.
944  // In this case, we need to take it out of recovered but not
945  // clear it. The publishers map should report it as duplicate.
946  // 3. This is the 'oldest' recovered, but we have received new
947  // bookmarks since we got this one. We can clear it because the
948  // publishers map should report it as a duplicate if/when it
949  // does arrive again. Move the _recoveryMin ahead and remove it
950  // from recovered.
951  Message::Field& bookmark = _entries[j]._val;
952  // Option 1 skips this and just moves on
953  if (!bookmark.empty())
954  {
955  _recovered.erase(bookmark);
956  if (_least + _leastBase == _current + _currentBase ||
957  ((_least + _leastBase) % _entriesLength) ==
958  ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
959  {
960  // Option 2, reset recent
961  retVal = true;
962  _store->_recentChanged = true;
963  _recoveryTimestamp.clear();
964  _recent.clear();
965  _recent = bookmark;
966  bookmark.assign(NULL, 0);
967  }
968  else
969  {
970  // Option 3, simply clear this one
971  bookmark.clear();
972  }
973  }
974  // If we reach the buffer end,
975  // keep checking from the beginnning
976  if(++j == _entriesLength)
977  {
978  // Least has now loooped around
979  _recoveryBase += _entriesLength;
980  j = 0;
981  }
982  }
983  assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
984  _recovered.empty());
985  if (_recovered.empty())
986  {
987  _recoveryMin = AMPS_UNSET_INDEX;
988  _recoveryBase = AMPS_UNSET_INDEX;
989  _recoveryMax = AMPS_UNSET_INDEX;
990  _recoveryMaxBase = AMPS_UNSET_INDEX;
991  // Cleared recovered, want to check onward
992  index = _least + _leastBase;
993  }
994  else
995  {
996  _recoveryMin = j;
997  }
998  }
999  // if this is the first item in the list, discard all inactive ones
1000  // as long as recovery also says its okay
1001  if(index == _least + _leastBase)
1002  {
1003  // Find all to discard
1004  size_t j = _least;
1005  while(j + _leastBase < _current + _currentBase &&
1006  !_entries[j]._active)
1007  {
1008  //Must free associated memory
1009  _recent.clear();
1010  _recent = _entries[j]._val;
1011  _entries[j]._val.assign(NULL, 0);
1012  _store->_recentChanged = true;
1013  retVal = true;
1014  _recoveryTimestamp.clear();
1015  // If we reach the buffer end,
1016  // keep checking from the beginnning
1017  if(++j == _entriesLength)
1018  {
1019  // Least has now loooped around
1020  _leastBase += _entriesLength;
1021  j = 0;
1022  }
1023  }
1024  _least = j;
1025  }
1026  return retVal;
1027  }
1028 
1029  void _updateMostRecent()
1030  {
1031  // Lock is already held
1032  _recovered.clear();
1033  assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
1034  (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
1035  size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
1036  size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
1037  _recoveryMin = AMPS_UNSET_INDEX;
1038  _recoveryBase = AMPS_UNSET_INDEX;
1039  _recoveryMax = AMPS_UNSET_INDEX;
1040  _recoveryMaxBase = AMPS_UNSET_INDEX;
1041  for(size_t i = start; i + base < _current + _currentBase; i++)
1042  {
1043  if( i >= _entriesLength )
1044  {
1045  i = 0;
1046  base = _currentBase;
1047  }
1048  if (i >= _recoveryMax+_recoveryBase && i < _least+_leastBase)
1049  continue;
1050  Entry& entry = _entries[i];
1051  if (!entry._val.empty())
1052  {
1053  _recovered[entry._val] = i+base;
1054  if (_recoveryMin == AMPS_UNSET_INDEX)
1055  {
1056  _recoveryMin = i;
1057  _recoveryBase = base;
1058  _recoveryMax = _current;
1059  _recoveryMaxBase = _currentBase;
1060  }
1061  }
1062  }
1063  if (_current == _entriesLength)
1064  {
1065  _current = 0;
1066  _currentBase += _entriesLength;
1067  }
1068  _least = _current;
1069  _leastBase = _currentBase;
1070  }
1071 
1072  Message::Field _id;
1073  Message::Field _recent;
1074  Message::Field _lastPersisted;
1075  Message::Field _recentList;
1076  BookmarkRange _range;
1077  Message::Field _recoveryTimestamp;
1078  size_t _current;
1079  size_t _currentBase;
1080  size_t _least;
1081  size_t _leastBase;
1082  size_t _recoveryMin;
1083  size_t _recoveryBase;
1084  size_t _recoveryMax;
1085  size_t _recoveryMaxBase;
1086  size_t _entriesLength;
1087  Entry* _entries;
1088  MemoryBookmarkStore* _store;
1089  Mutex _subLock;
1090  RecoveryMap _recovered;
1091  public:
1092  PublisherMap _publishers;
1093  };
1094 
1095 public:
1099  _subsLock(),
1100  _lock(),
1101  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1102  _recentChanged(true),
1103  _recovering(false),
1104  _recoveryPointAdapter(NULL),
1105  _recoveryPointFactory(NULL)
1106  { ; }
1107 
1108  typedef RecoveryPointAdapter::iterator RecoveryIterator;
1109 
1117  RecoveryPointFactory factory_ = NULL)
1118  : BookmarkStoreImpl()
1119  , _subsLock()
1120  , _lock()
1121  , _serverVersion(AMPS_DEFAULT_MIN_VERSION)
1122  , _recentChanged(true)
1123  , _recovering(true)
1124  , _recoveryPointAdapter(adapter_)
1125  , _recoveryPointFactory(factory_)
1126  {
1127  Message msg;
1128  for (RecoveryIterator recoveryPoint = _recoveryPointAdapter.begin();
1129  recoveryPoint != _recoveryPointAdapter.end();
1130  ++recoveryPoint)
1131  {
1132  Field subId(recoveryPoint->getSubId());
1133  msg.setSubscriptionHandle(static_cast<amps_subscription_handle>(0));
1134  msg.setSubId(subId);
1135  Field bookmark = recoveryPoint->getBookmark();
1136  if (BookmarkRange::isRange(bookmark))
1137  {
1138  msg.setBookmark(bookmark);
1139  _log(msg);
1140  }
1141  else
1142  {
1143  const char* start = bookmark.data();
1144  size_t remain = bookmark.len();
1145  const char* comma = (const char*)memchr((const void*)start,
1146  (int)',', remain);
1147  while (comma)
1148  {
1149  size_t len = (size_t)(comma-start);
1150  msg.setBookmark(start, len);
1151  if (Field::isTimestamp(msg.getBookmark()))
1152  {
1153  find(subId)->setRecoveryTimestamp(start,len);
1154  }
1155  else
1156  {
1157  _isDiscarded(msg);
1158  _log(msg);
1159  _discard(msg);
1160  }
1161  start = ++comma;
1162  remain = bookmark.len() - (size_t)(start-bookmark.data());
1163  comma = (const char*)memchr((const void*)start,
1164  (int)',', remain);
1165  }
1166  msg.setBookmark(start, remain);
1167  if (Field::isTimestamp(msg.getBookmark()))
1168  {
1169  find(subId)->setRecoveryTimestamp(start,remain);
1170  }
1171  else
1172  {
1173  _isDiscarded(msg);
1174  _log(msg);
1175  _discard(msg);
1176  }
1177  }
1178  }
1179  _recovering = false;
1180  }
1181 
1182  virtual ~MemoryBookmarkStore()
1183  {
1184  __purge();
1185  }
1186 
1192  virtual size_t log(Message& message_)
1193  {
1194  Lock<Mutex> guard(_lock);
1195  return _log(message_);
1196  }
1197 
1203  virtual void discard(const Message& message_)
1204  {
1205  Lock<Mutex> guard(_lock);
1206  (void)_discard(message_);
1207  }
1208 
1216  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1217  {
1218  Lock<Mutex> guard(_lock);
1219  (void)_discard(subId_, bookmarkSeqNo_);
1220  }
1221 
1228  {
1229  Lock<Mutex> guard(_lock);
1230  return _getMostRecent(subId_);
1231  }
1232 
1241  virtual bool isDiscarded(Message& message_)
1242  {
1243  Lock<Mutex> guard(_lock);
1244  return _isDiscarded(message_);
1245  }
1246 
1252  virtual void purge()
1253  {
1254  Lock<Mutex> guard(_lock);
1255  _purge();
1256  }
1257 
1263  virtual void purge(const Message::Field& subId_)
1264  {
1265  Lock<Mutex> guard(_lock);
1266  _purge(subId_);
1267  }
1268 
1273  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_)
1274  {
1275  Lock<Mutex> guard(_lock);
1276  return _getOldestBookmarkSeq(subId_);
1277  }
1278 
1284  virtual void persisted(const Message::Field& subId_,
1285  const Message::Field& bookmark_)
1286  {
1287  Lock<Mutex> guard(_lock);
1288  _persisted(find(subId_), bookmark_);
1289  }
1290 
1297  virtual Message::Field persisted(const Message::Field& subId_,
1298  size_t bookmark_)
1299  {
1300  Lock<Mutex> guard(_lock);
1301  return _persisted(find(subId_), bookmark_);
1302  }
1303 
1308  void setServerVersion(const VersionInfo& version_)
1309  {
1310  setServerVersion(version_.getOldStyleVersion());
1311  }
1312 
1317  void setServerVersion(size_t version_)
1318  {
1319  Lock<Mutex> guard(_subsLock);
1320  _serverVersion = version_;
1321  }
1322 
1323  inline bool isWritableBookmark(size_t length)
1324  {
1325  return length >= AMPS_MIN_BOOKMARK_LEN;
1326  }
1327 
1328  typedef Subscription::EntryPtrList EntryPtrList;
1329 
1330 protected:
1331 
1332  // Called once lock is acquired
1333  size_t _log(Message& message_)
1334  {
1335  Message::Field bookmark = message_.getBookmark();
1336  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1337  if (!pSub)
1338  {
1339  Message::Field subId = message_.getSubscriptionId();
1340  if (subId.empty())
1341  subId = message_.getSubscriptionIds();
1342  pSub = find(subId);
1343  message_.setSubscriptionHandle(
1344  static_cast<amps_subscription_handle>(pSub));
1345  }
1346  size_t retVal = pSub->log(bookmark);
1347  message_.setBookmarkSeqNo(retVal);
1348  return retVal;
1349  }
1350 
1351  // Called once lock is acquired
1352  bool _discard(const Message& message_)
1353  {
1354  size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1355  Subscription* pSub = (Subscription*)(message_.getSubscriptionHandle());
1356  if (!pSub)
1357  {
1358  Message::Field subId = message_.getSubscriptionId();
1359  if (subId.empty())
1360  subId = message_.getSubscriptionIds();
1361  pSub = find(subId);
1362  }
1363  bool retVal = pSub->discard(bookmarkSeqNo);
1364  if (retVal)
1365  {
1366  updateAdapter(pSub);
1367  }
1368  return retVal;
1369  }
1370 
1371  // Called once lock is acquired
1372  bool _discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1373  {
1374  Subscription* pSub = find(subId_);
1375  bool retVal = pSub->discard(bookmarkSeqNo_);
1376  if (retVal)
1377  {
1378  updateAdapter(pSub);
1379  }
1380  return retVal;
1381  }
1382 
1383  // Called once lock is acquired
1384  Message::Field _getMostRecent(const Message::Field& subId_,
1385  bool usePublishersList_ = true)
1386  {
1387  Subscription* pSub = find(subId_);
1388  return pSub->getMostRecentList(usePublishersList_);
1389  }
1390 
1391  // Called once lock is acquired
1392  bool _isDiscarded(Message& message_)
1393  {
1394  Message::Field subId = message_.getSubscriptionId();
1395  if (subId.empty())
1396  subId = message_.getSubscriptionIds();
1397  Subscription* pSub = find(subId);
1398  message_.setSubscriptionHandle(
1399  static_cast<amps_subscription_handle>(pSub));
1400  return pSub->isDiscarded(message_.getBookmark());
1401  }
1402 
1403  // Called once lock is acquired
1404  size_t _getOldestBookmarkSeq(const Message::Field& subId_)
1405  {
1406  Subscription* pSub = find(subId_);
1407  return pSub->getOldestBookmarkSeq();
1408  }
1409 
1410  // Called once lock is acquired
1411  virtual void _persisted(Subscription* pSub_,
1412  const Message::Field& bookmark_)
1413  {
1414  if (pSub_->lastPersisted(bookmark_))
1415  {
1416  updateAdapter(pSub_);
1417  }
1418  }
1419 
1420  // Called once lock is acquired
1421  virtual Message::Field _persisted(Subscription* pSub_, size_t bookmark_)
1422  {
1423  return pSub_->lastPersisted(bookmark_);
1424  }
1425 
1426  // Called once lock is acquired
1427  void _purge()
1428  {
1429  if (_recoveryPointAdapter.isValid())
1430  {
1431  _recoveryPointAdapter.purge();
1432  }
1433  __purge();
1434  }
1435 
1436  // Called once lock is acquired
1437  void __purge()
1438  {
1439  // Walk through list and clear Fields before calling clear
1440  while(!_subs.empty())
1441  {
1442  SubscriptionMap::iterator iter = _subs.begin();
1443  //The subId key is cleared when deleting the Subscription, which shares
1444  //the _data pointer in its id field.
1445  const_cast<Message::Field&>(iter->first).clear();
1446  delete (iter->second);
1447  _subs.erase(iter);
1448  }
1449  _subs.clear();
1450  }
1451 
1452  // Called once lock is acquired
1453  virtual void _purge(const Message::Field& subId_)
1454  {
1455  if (_recoveryPointAdapter.isValid())
1456  {
1457  _recoveryPointAdapter.purge(subId_);
1458  }
1459  __purge(subId_);
1460  }
1461 
1462  // Called once lock is acquired
1463  virtual void __purge(const Message::Field& subId_)
1464  {
1465  Lock<Mutex> guard(_subsLock);
1466  SubscriptionMap::iterator iter = _subs.find(subId_);
1467  if (iter == _subs.end()) return;
1468  const_cast<Message::Field&>(iter->first).clear();
1469  delete (iter->second);
1470  _subs.erase(iter);
1471  }
1472 
1473  // Can be used by subclasses during recovery
1474  void setMostRecent(const Message::Field& subId_,
1475  const Message::Field& recent_)
1476  {
1477  find(subId_)->setMostRecent(recent_);
1478  }
1479 
1480  Mutex _subsLock;
1481  Mutex _lock;
1482  static const char ENTRY_BOOKMARK = 'b';
1483  static const char ENTRY_DISCARD = 'd';
1484  static const char ENTRY_PERSISTED = 'p';
1485 
1486  virtual Subscription* find(const Message::Field& subId_)
1487  {
1488  if(subId_.empty())
1489  {
1490  throw StoreException("A valid subscription ID must be provided to the Bookmark Store");
1491  }
1492  Lock<Mutex> guard(_subsLock);
1493  if(_subs.count(subId_) == 0)
1494  {
1495  // Subscription will be created
1496  Message::Field id;
1497  id.deepCopy(subId_);
1498  _subs[id] = new Subscription(this, id);
1499  return _subs[id];
1500  }
1501  return _subs[subId_];
1502  }
1503 
1504  virtual bool resize(const Message::Field& subId_, char** newBuffer_, size_t size_,
1505  bool callResizeHandler_ = true)
1506  {
1507  assert(newBuffer_ != 0);
1508  if (size_ == 0) // Delete the buffer
1509  {
1510  if (*newBuffer_)
1511  {
1512  free(*newBuffer_);
1513  *newBuffer_ = NULL;
1514  }
1515  return true;
1516  }
1517  if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1518  {
1519  return false;
1520  }
1521  char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1522  *newBuffer_ = (char*)malloc(size_);
1523  memset(*newBuffer_, 0, size_);
1524  if (oldBuffer)
1525  {
1526  find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1527  free(oldBuffer);
1528  }
1529  return true;
1530  }
1531 
1532 protected:
1533  void updateAdapter(Subscription* pSub_)
1534  {
1535  if (_recovering || !_recentChanged || !_recoveryPointAdapter.isValid())
1536  {
1537  return;
1538  }
1539  if (_recoveryPointFactory)
1540  {
1541  RecoveryPoint update(_recoveryPointFactory(pSub_->id(),
1542  pSub_->getMostRecentList(false)));
1543  _recoveryPointAdapter.update(update);
1544  }
1545  else
1546  {
1547  RecoveryPoint update(new FixedRecoveryPoint(pSub_->id(),
1548  pSub_->getMostRecentList(false)));
1549  _recoveryPointAdapter.update(update);
1550  }
1551  }
1552 
1553  typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1554  SubscriptionMap _subs;
1555  size_t _serverVersion;
1556  bool _recentChanged;
1557  bool _recovering;
1558  typedef std::set<Subscription*> SubscriptionSet;
1559  RecoveryPointAdapter _recoveryPointAdapter;
1560  RecoveryPointFactory _recoveryPointFactory;
1561 };
1562 
1563 } // end namespace AMPS
1564 
1565 #endif //_MEMORYBOOKMARKSTORE_H_
1566 
Defines the AMPS::Message class and related classes.
Abstract base class for storing received bookmarks for HA clients.
Definition: BookmarkStore.hpp:77
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1235
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1252
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MemoryBookmarkStore.hpp:1216
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:1227
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1297
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
MemoryBookmarkStore(const RecoveryPointAdapter &adapter_, RecoveryPointFactory factory_=NULL)
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1116
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
MemoryBookmarkStore()
Creates a MemoryBookmarkStore.
Definition: MemoryBookmarkStore.hpp:1098
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1284
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1317
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1236
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:126
Defines the AMPS::Field class, which represents the value of a field in a message.
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: BookmarkStore.hpp:51
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:1241
Provides AMPS::RecoveryPointAdapter, an iterface for impplementing external storage of bookmark subsc...
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1308
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:1203
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: BookmarkStore.hpp:55
Message & setSubId(const std::string &v)
Sets the value of the SubscriptionId header for this Message.
Definition: Message.hpp:1235
RecoveryPoint(* RecoveryPointFactory)(const Field &subId_, const Field &bookmark_)
RecoveryPointFactory is a function type for producing a RecoveryPoint that is sent to a RecoveryPoint...
Definition: RecoveryPoint.hpp:126
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:56
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:1192
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
FixedRecoveryPoint is a RecoveryPoint implementation where subId and bookmark are set explicitly...
Definition: RecoveryPoint.hpp:133
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1273
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1263
Message & setBookmark(const std::string &v)
Sets the value of the Bookmark header for this Message.
Definition: Message.hpp:1034
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:103
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1034