AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
MemoryBookmarkStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _MEMORYBOOKMARKSTORE_H_
27 #define _MEMORYBOOKMARKSTORE_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <Field.hpp>
31 #include <map>
32 #include <sstream>
33 #include <vector>
34 #include <stdlib.h>
35 #include <assert.h>
36 
37 #define AMPS_MIN_BOOKMARK_LEN 3
38 
43 namespace AMPS
44 {
51 {
52 protected:
53  class Subscription
54  {
55  public:
56  typedef std::map<Message::Field, size_t, Message::Field::FieldHash> RecoveryMap;
57  typedef std::map<amps_uint64_t, amps_uint64_t> PublisherMap;
58  typedef std::map<Message::Field, size_t, Message::Field::FieldHash>::iterator
59  RecoveryIterator;
60  typedef std::map<amps_uint64_t, amps_uint64_t>::iterator PublisherIterator;
61 
62  // Start sequence at 1 so that 0 can be used during file subclasses
63  // recovery as an indicator that a message wasn't logged because
64  // isDiscarded() was true.
65  Subscription(MemoryBookmarkStore* store_, const Message::Field& id_)
66  : _current(1), _currentBase(0), _least(1), _leastBase(0)
67  , _recoveryMin(AMPS_UNSET_INDEX), _recoveryBase(AMPS_UNSET_INDEX)
68  , _recoveryMax(AMPS_UNSET_INDEX), _recoveryMaxBase(AMPS_UNSET_INDEX)
69  , _entriesLength(AMPS_INITIAL_MEMORY_BOOKMARK_SIZE), _entries(NULL)
70  , _store(store_)
71  {
72  // Need our own memory for the sub id
73  _id.deepCopy(id_);
74  _store->resize(_id, (char**)&_entries,
75  sizeof(Entry)*AMPS_INITIAL_MEMORY_BOOKMARK_SIZE, false);
76  setLastPersistedToEpoch();
77  }
78 
79  ~Subscription()
80  {
81  Lock<Mutex> guard(_subLock);
82  if(_entries)
83  {
84  for(size_t i = 0; i < _entriesLength; ++i)
85  {
86  _entries[i]._val.clear();
87  }
88  // resize to 0 will free _entries
89  _store->resize(_id, (char**)&_entries, 0);
90  }
91  _id.clear();
92  _recent.clear();
93  _lastPersisted.clear();
94  _recentList.clear();
95  }
96 
97  size_t log(const Message::Field& bookmark_)
98  {
99  if (bookmark_ == AMPS_BOOKMARK_NOW)
100  return 0;
101  Lock<Mutex> guard(_subLock);
102  // Either relog the recovery or log it
103  size_t index = recover(bookmark_, true);
104  if (index == AMPS_UNSET_INDEX)
105  {
106  // Check for wrap
107  if(_current >= _entriesLength)
108  {
109  _current = 0;
110  _currentBase += _entriesLength;
111  }
112  // Check for resize
113  // If list is too small, double it
114  if((_current == _least && _leastBase < _currentBase) ||
115  (_current == _recoveryMin && _recoveryBase < _currentBase))
116  {
117  if (!_store->resize(_id, (char**)&_entries,
118  sizeof(Entry) * _entriesLength * 2))
119  {
120  //Try again
121  return log(bookmark_);
122  }
123  // Length was doubled
124  _entriesLength *= 2;
125  }
126 
127  // Add this entry to the end of our list
128  if (bookmark_ == AMPS_BOOKMARK_NOW)
129  {
130  // Save a now timestamp bookmark
131  char* nowTimestamp = new char[AMPS_TIMESTAMP_LEN];
132  struct tm timeInfo;
133  time_t now;
134  time(&now);
135 #ifdef _WIN32
136  gmtime_s(&timeInfo, &now);
137 #else
138  gmtime_r(&now, &timeInfo);
139 #endif
140  strftime(nowTimestamp, AMPS_TIMESTAMP_LEN,
141  "%Y%m%dT%H%M%S", &timeInfo);
142  nowTimestamp[AMPS_TIMESTAMP_LEN-1] = 'Z';
143  _entries[_current]._val.assign(nowTimestamp,
144  AMPS_TIMESTAMP_LEN);
145  }
146  else
147  {
148  _entries[_current]._val.deepCopy(bookmark_);
149  }
150  _entries[_current]._active = true;
151  // Mark entry as persisted if server doesn't support persisted acks
152  index = _current++;
153  }
154  return index + _currentBase;
155  }
156 
157  void discard(size_t index_)
158  {
159  Lock<Mutex> guard(_subLock);
160  _discard(index_);
161  return;
162  }
163 
164  void discard(const Message::Field& bookmark_)
165  {
166  if (bookmark_ == AMPS_BOOKMARK_NOW)
167  return;
168  Lock<Mutex> guard(_subLock);
169  size_t search = _least;
170  size_t searchBase = _leastBase;
171  size_t searchMax = _current;
172  size_t searchMaxBase = _currentBase;
173  if (_least + _leastBase == _current + _currentBase)
174  {
175  if (_recoveryMin != AMPS_UNSET_INDEX)
176  {
177  search = _recoveryMin;
178  searchBase = _recoveryBase;
179  searchMax = _recoveryMax;
180  searchMaxBase = _recoveryMaxBase;
181  }
182  else // Store is empty, so nothing to do
183  {
184  return;
185  }
186  }
187  assert(searchMax != AMPS_UNSET_INDEX);
188  assert(searchMaxBase != AMPS_UNSET_INDEX);
189  assert(search != AMPS_UNSET_INDEX);
190  assert(searchBase != AMPS_UNSET_INDEX);
191  // Search while we don't find the provided bookmark and we're in valid range
192  while (search + searchBase < searchMax + searchMaxBase)
193  {
194  if (_entries[search]._val == bookmark_)
195  {
196  _discard(search+searchBase);
197  return;
198  }
199  if(++search == _entriesLength)
200  {
201  // Least has now loooped around
202  searchBase += _entriesLength;
203  search = 0;
204  }
205  }
206  }
207 
208  // Get sequence number from a Field that is a bookmark
209  static void parseBookmark(const Message::Field& field_,
210  amps_uint64_t& publisherId_,
211  amps_uint64_t& sequenceNumber_)
212  {
213  Message::Field::parseBookmark(field_, publisherId_, sequenceNumber_);
214  }
215 
216  // Check to see if this message is older than the most recent one seen,
217  // and if it is, check if it discarded.
218  bool isDiscarded(const Message::Field& bookmark_)
219  {
220  Lock<Mutex> guard(_subLock);
221  // Check if we've already recovered this bookmark
222  size_t recoveredIdx = recover(bookmark_, false);
223  if (recoveredIdx != AMPS_UNSET_INDEX)
224  {
225  return !_entries[recoveredIdx]._active;
226  }
227 
228  amps_uint64_t publisher,sequence;
229  parseBookmark(bookmark_, publisher, sequence);
230  // Compare it to our publishers map
231  PublisherIterator pub = _publishers.find(publisher);
232  if(pub == _publishers.end() || pub->second < sequence)
233  {
234  _publishers[publisher] = sequence;
235  return false;
236  }
237  // During recovery, we don't really care if it's been discarded
238  // or not. We just want _publishers updated. No need for the
239  // costly linear search.
240  if (_store->_recovering) return false;
241  // During failure and recovery scenarios, we'll see out of order
242  // bookmarks arrive, either because (a) we're replaying or (b)
243  // a publisher has cut over, and we've cut over to a new server.
244  // Scan the list to see if we have a match.
245  size_t base = _leastBase;
246  for(size_t i = _least; i + base < _current + _currentBase; i++)
247  {
248  if( i >= _entriesLength )
249  {
250  i = 0;
251  base = _currentBase;
252  }
253  if(_entries[i]._val == bookmark_)
254  {
255  return !_entries[i]._active;
256  }
257  }
258 
259  return true; // message is totally discarded
260  }
261 
262  bool empty(void) const
263  {
264  if (_least == AMPS_UNSET_INDEX ||
265  ((_least+_leastBase)==(_current+_currentBase) &&
266  _recoveryMin == AMPS_UNSET_INDEX))
267  return true;
268  return false;
269  }
270 
271  void updateMostRecent()
272  {
273  Lock<Mutex> guard(_subLock);
274  _updateMostRecent();
275  }
276 
277  Message::Field getMostRecentList()
278  {
279  Lock<Mutex> guard(_subLock);
280  bool useLastPersisted = !_lastPersisted.empty() &&
281  _lastPersisted.len() > 1;
282  // when this is called, we'll take a moment to update the list
283  // of things recovered,
284  // so we don't accidentally log anything we ought not to.
285  _updateMostRecent();
286  bool useRecent = !_recent.empty() && _recent.len() > 1;
287  amps_uint64_t lastPublisher = 0;
288  amps_uint64_t lastSeq = 0;
289  amps_uint64_t recentPublisher = 0;
290  amps_uint64_t recentSeq = 0;
291  if (useLastPersisted)
292  {
293  parseBookmark(_lastPersisted,lastPublisher,lastSeq);
294  }
295  if (useRecent)
296  {
297  if (empty() && useLastPersisted)
298  {
299  useRecent = false;
300  }
301  else
302  {
303  parseBookmark(_recent,recentPublisher,recentSeq);
304  if (useLastPersisted && lastPublisher == recentPublisher)
305  {
306  if (lastSeq <= recentSeq) useRecent = false;
307  else useLastPersisted = false;
308  }
309  }
310  }
311  // Set size for all bookmarks that will be used
312  size_t totalLen = (useLastPersisted ? _lastPersisted.len() + 1 : 0);
313  if (useRecent) totalLen += _recent.len() + 1;
314  // If we don't have a non-EPOCH persisted ack and we don't have a
315  // non-EPOCH most recent bookmark, OR we're expecting persisted
316  // acks and don't have one yet, we can build a list based on
317  // all the publishers instead.
318  if (!useLastPersisted &&
319  (!useRecent || _lastPersisted == AMPS_BOOKMARK_EPOCH))
320  {
321  std::ostringstream os;
322  for (PublisherIterator pub = _publishers.begin();
323  pub != _publishers.end(); ++pub)
324  {
325  if (pub->first == 0 && pub->second == 0) continue;
326  if (pub->first == recentPublisher && recentSeq < pub->second)
327  os << recentPublisher << '|' << recentSeq << "|,";
328  else
329  os << pub->first << '|' << pub->second << "|,";
330  }
331  std::string recent = os.str();
332  totalLen = recent.length();
333  if (!recent.empty())
334  {
335  // Remove trailing ,
336  recent.erase(--totalLen);
337  // Reset _recentList to new value and return it
338  _recentList.clear();
339  _recentList = Message::Field(recent).deepCopy();
340  return _recentList;
341  }
342  }
343  // If we have nothing, return EPOCH
344  if (totalLen == 0)
345  {
346  _setLastPersistedToEpoch();
347  return _lastPersisted;
348  }
349  // Remove the trailing , from the length
350  totalLen -= 1;
351  char* field = new char[totalLen];
352  size_t len = 0;
353  if (useRecent)
354  {
355  len = _recent.len();
356  memcpy(field, _recent.data(), len);
357  if (len < totalLen) field[len++] = ',';
358  }
359  if (useLastPersisted)
360  {
361  memcpy(field+len, _lastPersisted.data(), _lastPersisted.len());
362  // If more is to be written after this, uncomment the following
363  //len += _lastPersisted.len();
364  }
365  // _recentList clear will delete[] current buffer and assign will get cleared
366  _recentList.clear();
367  _recentList.assign(field, totalLen);
368  return _recentList;
369  }
370 
371  Message::Field getMostRecent(bool update_ = true)
372  {
373  Lock<Mutex> guard(_subLock);
374  // Return the same as last time if nothing's changed
375  // _recent is the most recent bookmark.
376  if (update_ && _store->_recentChanged)
377  _updateMostRecent();
378  if(_recent.empty())
379  {
381  }
382  else
383  {
384  return _recent;
385  }
386  }
387 
388  Message::Field getLastPersisted()
389  {
390  Lock<Mutex> guard(_subLock);
391  return _lastPersisted;
392  }
393 
394  void setMostRecent(const Message::Field& recent_)
395  {
396  _recent.clear();
397  _recent.deepCopy(recent_);
398  }
399 
400  void moveEntries(char* old_, char* new_, size_t newSize_)
401  {
402  size_t least = _least;
403  size_t leastBase = _leastBase;
404  if (_recoveryMin != AMPS_UNSET_INDEX)
405  {
406  least = _recoveryMin;
407  leastBase = _recoveryBase;
408  }
409  // First check if we grew in place, if so, just move current after least
410  if (old_ == new_)
411  {
412  if (newSize_ - (sizeof(Entry)*_entriesLength) > sizeof(Entry)*least)
413  {
414  memcpy(new_ + (sizeof(Entry)*_entriesLength),
415  old_, (sizeof(Entry)*least));
416  // Clear the beginning where those entries were
417  memset(old_, 0, sizeof(Entry)*least);
418  }
419  else // We have to use an intermediate buffer
420  {
421  Entry* buffer = new Entry[least];
422  memcpy((void*)buffer, (void*)old_, sizeof(Entry)*least);
423  //Put the beginning entries at the start of the new buffer
424  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
425  (_entriesLength - least)*sizeof(Entry));
426  //Put the end entries after the beginning entries
427  memcpy((void*)((char*)new_+((_entriesLength - least)*sizeof(Entry))),
428  (void*)buffer, least*sizeof(Entry));
429  // Least is now at 0 so base must be increased
430  leastBase += least;
431  least = 0;
432  }
433  }
434  else
435  {
436  //Put the beginning entries at the start of the new buffer
437  memcpy((void*)new_, (void*)((char*)old_ + (sizeof(Entry)*least)),
438  (_entriesLength - least)*sizeof(Entry));
439  //Put the end entries after the beginning entries
440  memcpy((void*)((char*)new_ + ((_entriesLength - least)*sizeof(Entry))),
441  (void*)old_, least*sizeof(Entry));
442  // Least is now at 0 so base must be increased
443  leastBase += least;
444  least = 0;
445  }
446  if (_recoveryMin != AMPS_UNSET_INDEX)
447  {
448  _least = least + (_least + _leastBase) - (_recoveryMin + _recoveryBase);
449  _recoveryMax = least + (_recoveryMax + _recoveryMaxBase) -
450  (_recoveryMin + _recoveryBase);
451  _recoveryMaxBase = leastBase;
452  _recoveryMin = least;
453  _recoveryBase = leastBase;
454  }
455  else
456  {
457  _least = least;
458  }
459  _leastBase = leastBase;
460  // Current is now after everything and using the same base
461  _currentBase = _leastBase;
462  _current = least + _entriesLength;
463  }
464 
465  inline size_t getOldestBookmarkSeq()
466  {
467  Lock<Mutex> guard(_subLock);
468  // If there is nothing in the store, return -1, otherwise return lowest
469  return ((_least+_leastBase)==(_current+_currentBase)) ? AMPS_UNSET_INDEX :
470  _least + _leastBase;
471  }
472 
473  void lastPersisted(const Message::Field& bookmark_)
474  {
475  Lock<Mutex> guard(_subLock);
476  _setLastPersisted(bookmark_);
477  }
478 
479  void _setLastPersisted(const Message::Field& bookmark_)
480  {
481  if (!_lastPersisted.empty())
482  {
483  amps_uint64_t publisher, publisher_lastPersisted;
484  amps_uint64_t sequence, sequence_lastPersisted;
485  parseBookmark(bookmark_,publisher,sequence);
486  parseBookmark(_lastPersisted, publisher_lastPersisted,
487  sequence_lastPersisted);
488  if(publisher == publisher_lastPersisted &&
489  sequence <= sequence_lastPersisted)
490  {
491  return;
492  }
493  parseBookmark(_recent, publisher_lastPersisted,
494  sequence_lastPersisted);
495  // If persisted and recent are same publisher or store is
496  // empty, assume a change.
497  if(publisher == publisher_lastPersisted
498  || (_least + _leastBase == _current + _currentBase))
499  _store->_recentChanged = true;
500  }
501  // deepCopy will clear what's in _lastPersisted
502  _lastPersisted.deepCopy(bookmark_);
503  }
504 
505  Message::Field lastPersisted(size_t bookmark_)
506  {
507  Lock<Mutex> guard(_subLock);
508  Message::Field& bookmark = _entries[bookmark_]._val;
509  _setLastPersisted(bookmark);
510  return bookmark;
511  }
512 
513  // Returns the index of the recovered item, either the index where it was
514  // first stored prior to getMostRecent, or the new index if it is relogged
515  // either because this is called from log() or because it was not
516  // active but also not persisted.
517  size_t recover(const Message::Field& bookmark_, bool relogIfNotDiscarded)
518  {
519  size_t retVal = AMPS_UNSET_INDEX;
520  if (_recovered.empty() || _recoveryBase == AMPS_UNSET_INDEX)
521  return retVal;
522  // Check if this is a recovered bookmark.
523  // If so, copy the existing one to the new location
524  RecoveryIterator item = _recovered.find(bookmark_);
525  if(item != _recovered.end())
526  {
527  size_t seqNo = item->second;
528  size_t index = (seqNo - _recoveryBase) % _entriesLength;
529  // If we only have recovery entries and isDiscarded is
530  // checking on an already discarded entry, update recent.
531  if (_least+_leastBase == _current+_currentBase &&
532  !_entries[index]._active)
533  {
534  _recent.clear();
535  _recent = _entries[index]._val.deepCopy();
536  retVal = moveEntry(index);
537  if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
538  relogIfNotDiscarded);
539  _least = _current;
540  _leastBase = _currentBase;
541  }
542  else if (!_entries[index]._active || relogIfNotDiscarded)
543  {
544  retVal = moveEntry(index);
545  if (retVal == AMPS_UNSET_INDEX) recover(bookmark_,
546  relogIfNotDiscarded);
547  }
548  else
549  {
550  return index;
551  }
552  _recovered.erase(item);
553  if (_recovered.empty())
554  {
555  _recoveryMin = AMPS_UNSET_INDEX;
556  _recoveryBase = AMPS_UNSET_INDEX;
557  _recoveryMax = AMPS_UNSET_INDEX;
558  _recoveryMaxBase = AMPS_UNSET_INDEX;
559  }
560  else if (index == _recoveryMin)
561  {
562  while (_entries[_recoveryMin]._val.empty() &&
563  (_recoveryMin+_recoveryBase) < (_recoveryMax+_recoveryMaxBase))
564  {
565  if (++_recoveryMin == _entriesLength)
566  {
567  _recoveryMin = 0;
568  _recoveryBase += _entriesLength;
569  }
570  }
571  }
572  }
573  return retVal;
574  }
575 
576  // Return the id of this Subscription
577  Message::Field id() const
578  {
579  return _id;
580  }
581 
582  struct Entry
583  {
584  Message::Field _val; //16
585  bool _active; //17
586  char _padding[32-sizeof(Message::Field)-(sizeof(bool)*2)]; //32
587 
588  Entry() : _active(false) {}
589  };
590 
591  struct EntryHash
592  {
593  Field::FieldHash _hasher;
594 
595  size_t operator()(const Entry* entryPtr_) const
596  {
597  return _hasher(entryPtr_->_val);
598  }
599 
600  bool operator()(const Entry* lhsPtr_, const Entry* rhsPtr_) const
601  {
602  return _hasher(lhsPtr_->_val, rhsPtr_->_val);
603  }
604  };
605 
606  //typedef std::set<Entry*, EntryHash> EntryPtrList;
607  typedef std::vector<Entry*> EntryPtrList;
608 
609  void getRecoveryEntries(EntryPtrList& list_)
610  {
611  if (_recoveryMin == AMPS_UNSET_INDEX ||
612  _recoveryMax == AMPS_UNSET_INDEX)
613  return;
614  size_t base = _recoveryBase;
615  size_t max = _recoveryMax + _recoveryMaxBase;
616  for (size_t i=_recoveryMin; i+base<max; ++i)
617  {
618  if (i == _entriesLength)
619  {
620  i = 0;
621  base = _recoveryMaxBase;
622  }
623  //list_.insert(&(_entries[i]));
624  list_.push_back(&(_entries[i]));
625  }
626  return;
627  }
628 
629  void getActiveEntries(EntryPtrList& list_)
630  {
631  size_t base = _leastBase;
632  for (size_t i=_least; i+base < _current + _currentBase; ++i)
633  {
634  if (i >= _entriesLength)
635  {
636  i = 0;
637  base = _currentBase;
638  }
639  //list_.insert(&(_entries[i]));
640  list_.push_back(&(_entries[i]));
641  }
642  return;
643  }
644 
645  Entry* getEntryByIndex(size_t index_)
646  {
647  Lock<Mutex> guard(_subLock);
648  size_t base = (_recoveryBase == AMPS_UNSET_INDEX ||
649  index_ >= _least + _leastBase)
650  ? _leastBase : _recoveryBase;
651  // Return NULL if not a valid index
652  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ?
653  _least + _leastBase :
654  _recoveryMin + _recoveryBase);
655  if(index_ >= _current+_currentBase || index_ < min) return NULL;
656  return &(_entries[(index_ - base) % _entriesLength]);
657  }
658 
659  void justRecovered()
660  {
661  Lock<Mutex> guard(_subLock);
662  _updateMostRecent();
663  EntryPtrList list;
664  getRecoveryEntries(list);
665  setPublishersToDiscarded(&list, &_publishers);
666  }
667 
668  void setPublishersToDiscarded(EntryPtrList* recovered_,
669  PublisherMap* publishers_)
670  {
671  // Need to reset publishers to only have up to the last
672  // discarded sequence number. Messages that were in transit
673  // during previous run but not discarded should be considered
674  // new and not duplicate after a restart/recovery.
675  for (EntryPtrList::iterator i = recovered_->begin();
676  i != recovered_->end(); i++)
677  {
678  amps_uint64_t publisher = (amps_uint64_t)0;
679  amps_uint64_t sequence = (amps_uint64_t)0;
680  parseBookmark((*i)->_val,publisher,sequence);
681  if (publisher && sequence && (*i)->_active &&
682  (*publishers_)[publisher] >= sequence)
683  {
684  (*publishers_)[publisher] = sequence - 1;
685  }
686  }
687  }
688 
689  void clearLastPersisted()
690  {
691  Lock<Mutex> guard(_subLock);
692  _lastPersisted.clear();
693  }
694 
695  void setLastPersistedToEpoch()
696  {
697  Lock<Mutex> guard(_subLock);
698  _setLastPersistedToEpoch();
699  }
700 
701  private:
702  Subscription(const Subscription&);
703  Subscription& operator=(const Subscription&);
704 
705  size_t moveEntry(size_t index_)
706  {
707  // Check for wrap
708  if (_current >= _entriesLength)
709  {
710  _current = 0;
711  _currentBase += _entriesLength;
712  }
713  // Check for resize
714  // If list is too small, double it
715  if((_current == _least%_entriesLength &&
716  _leastBase < _currentBase) ||
717  (_current == _recoveryMin && _recoveryBase < _currentBase))
718  {
719  if (!_store->resize(_id, (char**)&_entries,
720  sizeof(Entry) * _entriesLength * 2))
721  {
722  return AMPS_UNSET_INDEX;
723  }
724  // Length was doubled
725  _entriesLength *= 2;
726  }
727  _entries[_current]._val = _entries[index_]._val;
728  _entries[_current]._active = _entries[index_]._active;
729  // No need to clear Field, just set it to empty
730  _entries[index_]._val.assign(NULL, 0);
731  _entries[index_]._active = false;
732  return _current++;
733  }
734 
735  void _setLastPersistedToEpoch()
736  {
737  size_t fieldLen = strlen(AMPS_BOOKMARK_EPOCH);
738  char* field = new char[fieldLen];
739  memcpy(field, AMPS_BOOKMARK_EPOCH, fieldLen);
740  _lastPersisted.clear();
741  _lastPersisted.assign(field, fieldLen);
742  }
743 
744  void _discard(size_t index_)
745  {
746  // Lock should already be held
747  assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
748  (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
749  size_t base = (_recoveryBase == AMPS_UNSET_INDEX
750  || index_ >= _least + _leastBase)
751  ? _leastBase : _recoveryBase;
752  // discard of a record not in the log is a no-op
753  size_t min = (_recoveryMin == AMPS_UNSET_INDEX ? _least + _leastBase :
754  _recoveryMin + _recoveryBase);
755  if(index_ >= _current+_currentBase || index_ < min)
756  {
757  return;
758  }
759 
760  // log that this one is discarded, then
761  // recalculate what the most recent entry is.
762  Entry& e = _entries[(index_ - base) % _entriesLength];
763  e._active = false;
764 
765  size_t index = index_;
766  if (_recoveryMin != AMPS_UNSET_INDEX &&
767  index_ == _recoveryMin + _recoveryBase)
768  {
769  // Find all to discard
770  size_t j = _recoveryMin;
771  while(j + _recoveryBase < _recoveryMax + _recoveryMaxBase &&
772  !_entries[j]._active)
773  {
774  // This index might be left-over from a slow discard and we
775  // may have reconnected. We have a few possibilites at this point.
776  // 1. If we re-logged this bookmark, this index will point at an
777  // empty bookmark. This could happen if the discard thread was slow
778  // and the reconnect was fast. We wouldn't report the
779  // the re-arrival of the bookmark as a duplicate because it
780  // hadn't been marked as discarded. In this case, we have to
781  // simply move past this in the recovery area.
782  // 2. This bookmark should become _recent because we haven't
783  // yet received anything since our last call to getMostRecent.
784  // In this case, we need to take it out of recovered but not
785  // clear it. The publishers map should report it as duplicate.
786  // 3. This is the 'oldest' recovered, but we have received new
787  // bookmarks since we got this one. We can clear it because the
788  // publishers map should report it as a duplicate if/when it
789  // does arrive again. Move the _recoveryMin ahead and remove it
790  // from recovered.
791  Message::Field& bookmark = _entries[j]._val;
792  // Option 1 skips this and just moves on
793  if (!bookmark.empty())
794  {
795  _recovered.erase(bookmark);
796  if (_least + _leastBase == _current + _currentBase ||
797  ((_least + _leastBase) % _entriesLength) ==
798  ((_recoveryMin + _recoveryBase + 1)) % _entriesLength)
799  {
800  // Option 2, reset recent
801  _store->_recentChanged = true;
802  _recent.clear();
803  _recent = bookmark;
804  bookmark.assign(NULL, 0);
805  }
806  else
807  {
808  // Option 3, simply clear this one
809  bookmark.clear();
810  }
811  }
812  // If we reach the buffer end,
813  // keep checking from the beginnning
814  if(++j == _entriesLength)
815  {
816  // Least has now loooped around
817  _recoveryBase += _entriesLength;
818  j = 0;
819  }
820  }
821  assert(j + _recoveryBase != _recoveryMax + _recoveryMaxBase ||
822  _recovered.empty());
823  if (_recovered.empty())
824  {
825  _recoveryMin = AMPS_UNSET_INDEX;
826  _recoveryBase = AMPS_UNSET_INDEX;
827  _recoveryMax = AMPS_UNSET_INDEX;
828  _recoveryMaxBase = AMPS_UNSET_INDEX;
829  // Cleared recovered, want to check onward
830  index = _least + _leastBase;
831  }
832  else
833  {
834  _recoveryMin = j;
835  }
836  }
837  // if this is the first item in the list, discard all inactive ones
838  // as long as recovery also says its okay
839  if(index == _least + _leastBase)
840  {
841  // Find all to discard
842  size_t j = _least;
843  while(j + _leastBase < _current + _currentBase &&
844  !_entries[j]._active)
845  {
846  //Must free associated memory
847  _recent.clear();
848  _recent = _entries[j]._val;
849  _entries[j]._val.assign(NULL, 0);
850  _store->_recentChanged = true;
851  // If we reach the buffer end,
852  // keep checking from the beginnning
853  if(++j == _entriesLength)
854  {
855  // Least has now loooped around
856  _leastBase += _entriesLength;
857  j = 0;
858  }
859  }
860  _least = j;
861  }
862  return;
863  }
864 
865  void _updateMostRecent()
866  {
867  // Lock is already held
868  _recovered.clear();
869  assert((_recoveryBase==AMPS_UNSET_INDEX && _recoveryMin==AMPS_UNSET_INDEX) ||
870  (_recoveryBase!=AMPS_UNSET_INDEX && _recoveryMin!=AMPS_UNSET_INDEX));
871  size_t base = (_recoveryMin == AMPS_UNSET_INDEX) ? _leastBase : _recoveryBase;
872  size_t start = (_recoveryMin == AMPS_UNSET_INDEX) ? _least : _recoveryMin;
873  _recoveryMin = AMPS_UNSET_INDEX;
874  _recoveryBase = AMPS_UNSET_INDEX;
875  _recoveryMax = AMPS_UNSET_INDEX;
876  _recoveryMaxBase = AMPS_UNSET_INDEX;
877  for(size_t i = start; i + base < _current + _currentBase; i++)
878  {
879  if( i >= _entriesLength )
880  {
881  i = 0;
882  base = _currentBase;
883  }
884  if (i >= _recoveryMax+_recoveryBase && i < _least+_leastBase)
885  continue;
886  Entry& entry = _entries[i];
887  if (!entry._val.empty())
888  {
889  _recovered[entry._val] = i+base;
890  if (_recoveryMin == AMPS_UNSET_INDEX)
891  {
892  _recoveryMin = i;
893  _recoveryBase = base;
894  _recoveryMax = _current;
895  _recoveryMaxBase = _currentBase;
896  }
897  }
898  }
899  if (_current == _entriesLength)
900  {
901  _current = 0;
902  _currentBase += _entriesLength;
903  }
904  _least = _current;
905  _leastBase = _currentBase;
906  }
907 
908  Message::Field _id;
909  Message::Field _recent;
910  Message::Field _lastPersisted;
911  Message::Field _recentList;
912  size_t _current;
913  size_t _currentBase;
914  size_t _least;
915  size_t _leastBase;
916  size_t _recoveryMin;
917  size_t _recoveryBase;
918  size_t _recoveryMax;
919  size_t _recoveryMaxBase;
920  size_t _entriesLength;
921  Entry* _entries;
922  MemoryBookmarkStore* _store;
923  Mutex _subLock;
924  RecoveryMap _recovered;
925  public:
926  PublisherMap _publishers;
927  };
928 
929 public:
931  _subsLock(),
932  _lock(),
933  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
934  _recentChanged(true),
935  _recovering(false),
936  _supportsPersistedAcks(true)
937  { ; }
938 
939  virtual ~MemoryBookmarkStore()
940  {
941  _purge();
942  }
943 
949  virtual size_t log(Message& message_)
950  {
951  Lock<Mutex> guard(_lock);
952  return _log(message_);
953  }
954 
960  virtual void discard(const Message& message_)
961  {
962  Lock<Mutex> guard(_lock);
963  _discard(message_);
964  }
965 
973  virtual void discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
974  {
975  Lock<Mutex> guard(_lock);
976  _discard(subId_, bookmarkSeqNo_);
977  }
978 
985  {
986  Lock<Mutex> guard(_lock);
987  return _getMostRecent(subId_);
988  }
989 
998  virtual bool isDiscarded(Message& message_)
999  {
1000  Lock<Mutex> guard(_lock);
1001  return _isDiscarded(message_);
1002  }
1003 
1009  virtual void purge()
1010  {
1011  Lock<Mutex> guard(_lock);
1012  _purge();
1013  }
1014 
1020  virtual void purge(const Message::Field& subId_)
1021  {
1022  Lock<Mutex> guard(_lock);
1023  _purge(subId_);
1024  }
1025 
1030  virtual size_t getOldestBookmarkSeq(const Message::Field& subId_)
1031  {
1032  Lock<Mutex> guard(_lock);
1033  return _getOldestBookmarkSeq(subId_);
1034  }
1035 
1041  virtual void persisted(const Message::Field& subId_,
1042  const Message::Field& bookmark_)
1043  {
1044  Lock<Mutex> guard(_lock);
1045  _persisted(find(subId_), bookmark_);
1046  }
1047 
1054  virtual Message::Field persisted(const Message::Field& subId_,
1055  size_t bookmark_)
1056  {
1057  Lock<Mutex> guard(_lock);
1058  return _persisted(find(subId_), bookmark_);
1059  }
1060 
1065  virtual void noPersistedAcks(const Message::Field& subId_)
1066  {
1067  Lock<Mutex> guard(_lock);
1068  Subscription* sub = find(subId_);
1069  _noPersistedAcks.insert(sub);
1070  }
1071 
1076  void setServerVersion(const VersionInfo& version_)
1077  {
1078  setServerVersion(version_.getOldStyleVersion());
1079  }
1080 
1085  void setServerVersion(size_t version_)
1086  {
1087  Lock<Mutex> guard(_subsLock);
1088  _serverVersion = version_;
1089  }
1090 
1091  inline bool isWritableBookmark(size_t length)
1092  {
1093  return length >= AMPS_MIN_BOOKMARK_LEN;
1094  }
1095 
1096  typedef Subscription::EntryPtrList EntryPtrList;
1097 
1098 protected:
1099 
1100  // Called once lock is acquired
1101  size_t _log(Message& message_)
1102  {
1103  Message::Field bookmark = message_.getBookmark();
1104  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
1105  if (!sub)
1106  {
1107  Message::Field subId = message_.getSubscriptionId();
1108  if (subId.empty())
1109  subId = message_.getSubscriptionIds();
1110  sub = find(subId);
1111  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
1112  }
1113  size_t retVal = sub->log(bookmark);
1114  message_.setBookmarkSeqNo(retVal);
1115  return retVal;
1116  }
1117 
1118  // Called once lock is acquired
1119  void _discard(const Message& message_)
1120  {
1121  size_t bookmarkSeqNo = message_.getBookmarkSeqNo();
1122  Subscription* sub = (Subscription*)(message_.getSubscriptionHandle());
1123  if (!sub)
1124  {
1125  Message::Field subId = message_.getSubscriptionId();
1126  if (subId.empty())
1127  subId = message_.getSubscriptionIds();
1128  sub = find(subId);
1129  }
1130  sub->discard(bookmarkSeqNo);
1131  return;
1132  }
1133 
1134  // Called once lock is acquired
1135  void _discard(const Message::Field& subId_, size_t bookmarkSeqNo_)
1136  {
1137  Subscription* sub = find(subId_);
1138  sub->discard(bookmarkSeqNo_);
1139  return;
1140  }
1141 
1142  // Called once lock is acquired
1143  Message::Field _getMostRecent(const Message::Field& subId_)
1144  {
1145  Subscription* sub = find(subId_);
1146  if (!_supportsPersistedAcks || _noPersistedAcks.count(sub) > 0)
1147  {
1148  return sub->getMostRecent();
1149  }
1150  return sub->getMostRecentList();
1151  }
1152 
1153  // Called once lock is acquired
1154  bool _isDiscarded(Message& message_)
1155  {
1156  Message::Field bookmark = message_.getBookmark();
1157  Message::Field subId = message_.getSubscriptionId();
1158  if (subId.empty())
1159  subId = message_.getSubscriptionIds();
1160  Subscription* sub = find(subId);
1161  message_.setSubscriptionHandle(static_cast<amps_subscription_handle>(sub));
1162  return sub->isDiscarded(bookmark);
1163  }
1164 
1165  // Called once lock is acquired
1166  size_t _getOldestBookmarkSeq(const Message::Field& subId_)
1167  {
1168  Subscription* sub = find(subId_);
1169  return sub->getOldestBookmarkSeq();
1170  }
1171 
1172  // Called once lock is acquired
1173  virtual void _persisted(Subscription* subP_,
1174  const Message::Field& bookmark_)
1175  {
1176  subP_->lastPersisted(bookmark_);
1177  }
1178 
1179  // Called once lock is acquired
1180  virtual Message::Field _persisted(Subscription* subP_, size_t bookmark_)
1181  {
1182  return subP_->lastPersisted(bookmark_);
1183  }
1184 
1185  // Called once lock is acquired
1186  void _purge()
1187  {
1188  // Walk through list and clear Fields before calling clear
1189  while(!_subs.empty())
1190  {
1191  SubscriptionMap::iterator iter = _subs.begin();
1192  //The subId key is cleared when deleting the Subscription, which shares
1193  //the _data pointer in its id field.
1194  const_cast<Message::Field&>(iter->first).clear();
1195  delete (iter->second);
1196  _subs.erase(iter);
1197  }
1198  _subs.clear();
1199  _noPersistedAcks.clear();
1200  }
1201 
1202  // Called once lock is acquired
1203  virtual void _purge(const Message::Field& subId_)
1204  {
1205  Lock<Mutex> guard(_subsLock);
1206  SubscriptionMap::iterator iter = _subs.find(subId_);
1207  if (iter == _subs.end()) return;
1208  const_cast<Message::Field&>(iter->first).clear();
1209  delete (iter->second);
1210  _subs.erase(iter);
1211  }
1212 
1213  // Can be used by subclasses during recovery
1214  void setMostRecent(const Message::Field& subId_,
1215  const Message::Field& recent_)
1216  {
1217  find(subId_)->setMostRecent(recent_);
1218  }
1219 
1220  Mutex _subsLock;
1221  Mutex _lock;
1222  static const char ENTRY_BOOKMARK = 'b';
1223  static const char ENTRY_DISCARD = 'd';
1224  static const char ENTRY_PERSISTED = 'p';
1225 
1226  virtual Subscription* find(const Message::Field& subId_)
1227  {
1228  if(subId_.empty())
1229  {
1230  throw StoreException("A valid subscription ID must be provided to the Bookmark Store");
1231  }
1232  Lock<Mutex> guard(_subsLock);
1233  if(_subs.count(subId_) == 0)
1234  {
1235  // Subscription will be created
1236  Message::Field id;
1237  id.deepCopy(subId_);
1238  _subs[id] = new Subscription(this, id);
1239  if (!_supportsPersistedAcks)
1240  {
1241  _noPersistedAcks.insert(_subs[id]);
1242  }
1243  return _subs[id];
1244  }
1245  return _subs[subId_];
1246  }
1247 
1248  virtual bool resize(const Message::Field& subId_, char** newBuffer_, size_t size_,
1249  bool callResizeHandler_ = true)
1250  {
1251  assert(newBuffer_ != 0);
1252  if (size_ == 0) // Delete the buffer
1253  {
1254  if (*newBuffer_)
1255  {
1256  free(*newBuffer_);
1257  *newBuffer_ = NULL;
1258  }
1259  return true;
1260  }
1261  if (callResizeHandler_ && !callResizeHandler(subId_, size_))
1262  {
1263  return false;
1264  }
1265  char* oldBuffer = *newBuffer_ ? *newBuffer_ : NULL;
1266  *newBuffer_ = (char*)malloc(size_);
1267  memset(*newBuffer_, 0, size_);
1268  if (oldBuffer)
1269  {
1270  find(subId_)->moveEntries(oldBuffer, *newBuffer_, size_);
1271  free(oldBuffer);
1272  }
1273  return true;
1274  }
1275 
1276 protected:
1277  typedef std::map<Message::Field, Subscription*, Message::Field::FieldHash> SubscriptionMap;
1278  SubscriptionMap _subs;
1279  size_t _serverVersion;
1280  bool _recentChanged;
1281  bool _recovering;
1282  bool _supportsPersistedAcks;
1283  typedef std::set<Subscription*> SubscriptionSet;
1284  SubscriptionSet _noPersistedAcks;
1285 
1286  MemoryBookmarkStore(bool supportsPersistedAcks_) :
1288  _subsLock(),
1289  _lock(),
1290  _serverVersion(AMPS_DEFAULT_MIN_VERSION),
1291  _recentChanged(true),
1292  _recovering(false),
1293  _supportsPersistedAcks(supportsPersistedAcks_)
1294  { ; }
1295 
1296 };
1297 
1298 } // end namespace AMPS
1299 
1300 #endif //_MEMORYBOOKMARKSTORE_H_
1301 
Abstract base class for storing received bookmarks for HA clients.
Definition: ampsplusplus.hpp:679
Field getSubscriptionId() const
Retrieves the value of the SubscriptionId header of the Message as a new Field.
Definition: Message.hpp:1051
virtual void purge()
Called to purge the contents of this store.
Definition: MemoryBookmarkStore.hpp:1009
virtual void discard(const Message::Field &subId_, size_t bookmarkSeqNo_)
Definition: MemoryBookmarkStore.hpp:973
virtual Message::Field getMostRecent(const Message::Field &subId_)
Returns the most recent bookmark from the log that ought to be used for (re-)subscriptions.
Definition: MemoryBookmarkStore.hpp:984
virtual Message::Field persisted(const Message::Field &subId_, size_t bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1054
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:196
virtual void persisted(const Message::Field &subId_, const Message::Field &bookmark_)
Mark the bookmark provided as replicated to all sync replication destinations for the given subscript...
Definition: MemoryBookmarkStore.hpp:1041
void setServerVersion(size_t version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1085
Field getSubscriptionIds() const
Retrieves the value of the SubscriptionIds header of the Message as a new Field.
Definition: Message.hpp:1074
bool empty() const
Returns &#39;true&#39; if empty, &#39;false&#39; otherwise.
Definition: Field.hpp:93
Defines the AMPS::Field class, which represents the value of a field in a message.
Core type, function, and class declarations for the AMPS C++ client.
#define AMPS_BOOKMARK_EPOCH
Start the subscription at the beginning of the journal.
Definition: ampsplusplus.hpp:105
virtual bool isDiscarded(Message &message_)
Called for each arriving message to determine if the application has already seen this bookmark and s...
Definition: MemoryBookmarkStore.hpp:998
void setServerVersion(const VersionInfo &version_)
Internally used to set the server version so the store knows how to deal with persisted acks and call...
Definition: MemoryBookmarkStore.hpp:1076
virtual void discard(const Message &message_)
Log a discard-bookmark entry to the persistent log based on a bookmark sequence number.
Definition: MemoryBookmarkStore.hpp:960
#define AMPS_BOOKMARK_NOW
Start the subscription at the point in time when AMPS processes the subscription. ...
Definition: ampsplusplus.hpp:109
virtual void noPersistedAcks(const Message::Field &subId_)
Flag the subscription as not requesting persisted acks No persisted acks will be sent for any bookmar...
Definition: MemoryBookmarkStore.hpp:1065
A BookmarkStoreImpl implmentation that stores bookmarks in memory.
Definition: MemoryBookmarkStore.hpp:50
virtual size_t log(Message &message_)
Log a bookmark to the persistent log and return the corresponding sequence number for this bookmark...
Definition: MemoryBookmarkStore.hpp:949
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual size_t getOldestBookmarkSeq(const Message::Field &subId_)
Called to find the oldest bookmark in the store.
Definition: MemoryBookmarkStore.hpp:1030
virtual void purge(const Message::Field &subId_)
Called to purge the contents of this store for particular subId.
Definition: MemoryBookmarkStore.hpp:1020
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:167
Definition: ampsplusplus.hpp:136
Field getBookmark() const
Retrieves the value of the Bookmark header of the Message as a new Field.
Definition: Message.hpp:1064