AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
RecoveryPointAdapter.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RECOVERYPOINTADAPTER_H_
27 #define _RECOVERYPOINTADAPTER_H_
28 
29 #include <Field.hpp>
30 #include <RecoveryPoint.hpp>
31 #include <vector>
32 #include <iterator>
33 #include <memory>
34 #include <thread>
35 #include <unordered_map>
36 
41 
42 namespace AMPS
43 {
44 
45  class RecoveryPointAdapter;
46 
49  class RecoveryPointAdapterImpl : public RefBody
50  {
51  public:
52  virtual ~RecoveryPointAdapterImpl() { }
57  virtual bool next(RecoveryPoint& current_) = 0;
60  virtual void update(RecoveryPoint& recoveryPoint_) = 0;
62  virtual void purge() = 0;
65  virtual void purge(const Field& subId_) = 0;
67  virtual void close() = 0;
69  virtual void prune() { ; }
70  };
71 
74  class RecoveryPointAdapter // -V690
75  {
76  public:
77  class iterator
78  {
79  RecoveryPointAdapterImpl* _pAdapter;
80  RecoveryPoint _current;
81  inline void advance()
82  {
83  if (!_pAdapter || !_pAdapter->next(_current))
84  {
85  _pAdapter = NULL;
86  }
87  }
88 
89  public:
90  iterator() // end
91  : _pAdapter(NULL)
92  {;}
93  iterator(RecoveryPointAdapterImpl* pAdapter_)
94  : _pAdapter(pAdapter_)
95  {
96  advance();
97  }
98 
99  bool operator==(const iterator& rhs)
100  {
101  return _pAdapter == rhs._pAdapter;
102  }
103  bool operator!=(const iterator& rhs)
104  {
105  return _pAdapter != rhs._pAdapter;
106  }
107  void operator++(void)
108  {
109  advance();
110  }
111  RecoveryPoint operator*(void)
112  {
113  return _current;
114  }
115  RecoveryPoint* operator->(void)
116  {
117  return &_current;
118  }
119  };
120 
121  RecoveryPointAdapter() : _body() { }
122  RecoveryPointAdapter(RecoveryPointAdapterImpl* body_, bool isRef_ = true)
123  : _body(body_, isRef_) { }
125  : _body(rhs_._body)
126  { }
127 
134  iterator begin()
135  {
136  return iterator(&(_body.get()));
137  }
138 
141  iterator end()
142  {
143  return iterator();
144  }
145 
148  void update(RecoveryPoint& recoveryPoint_)
149  {
150  _body.get().update(recoveryPoint_);
151  }
152 
154  void purge()
155  {
156  _body.get().purge();
157  }
158 
161  void purge(const Field& subId_)
162  {
163  _body.get().purge(subId_);
164  }
165 
167  void close()
168  {
169  _body.get().close();
170  }
171 
173  void prune()
174  {
175  _body.get().prune();
176  }
177 
179  bool isValid() const
180  {
181  return _body.isValid();
182  }
183  private:
184  BorrowRefHandle<RecoveryPointAdapterImpl> _body;
185  };
186 
191  {
192  public:
208  const std::shared_ptr<RecoveryPointAdapterImpl>& delegate_,
209  unsigned updateThreshold_ = 10,
210  double timeoutMillis_ = 2000.0,
211  long updateIntervalMillis_ = 2000
212  )
213  : _closed(false)
214  , _delegate(delegate_)
215  , _updateThreshold(updateThreshold_)
216  , _timeoutMillis(timeoutMillis_)
217  , _updateIntervalMillis(updateIntervalMillis_)
218  , _updateAll(false)
219  {
220  // Start the update thread
221  _thread = std::thread(&ConflatingRecoveryPointAdapter::updateThread,
222  this);
223  }
224 
226  {
227  _close();
228  _thread.join();
229  for (UpdateIter purged = _latestUpdates.begin();
230  purged != _latestUpdates.end(); ++purged)
231  {
232  Field clearableSubId = purged->first;
233  purged->second.clear();
234  clearableSubId.clear();
235  }
236  }
237 
242  virtual bool next(RecoveryPoint& current_)
243  {
244  return _delegate->next(current_);
245  }
246 
249  virtual void update(RecoveryPoint& recoveryPoint_)
250  {
251  Field subId = recoveryPoint_.getSubId();
252  Lock<Mutex> lock(_lock);
253  UpdateIter lastUpdate = _latestUpdates.find(subId);
254  if (lastUpdate == _latestUpdates.end())
255  {
256  // New sub id, use deep copies and a new Timer.
257  subId = subId.deepCopy();
258  _latestUpdates[subId] = recoveryPoint_.deepCopy();
259  _counts[subId] = 1;
260  if (_timeoutMillis != 0.0) // -V550
261  {
262  Timer timer(_timeoutMillis);
263  timer.start();
264  _timers[subId] = timer;
265  }
266  }
267  else
268  {
269  // SubId already exists, set to new recovery point.
270  lastUpdate->second.deepCopy(recoveryPoint_);
271  // Increment and check the count.
272  if (++_counts[subId] >= _updateThreshold)
273  {
274  // Time to update, make sure update thread wakes up.
275  _lock.signalAll();
276  }
277  }
278  }
279 
281  virtual void purge()
282  {
283  _delegate->purge();
284  Lock<Mutex> lock(_lock);
285  _counts.clear();
286  _timers.clear();
287  for (UpdateIter purged = _latestUpdates.begin();
288  purged != _latestUpdates.end(); ++purged)
289  {
290  Field clearableSubId = purged->first;
291  purged->second.clear();
292  clearableSubId.clear();
293  }
294  _latestUpdates.clear();
295  }
296 
299  virtual void purge(const Field& subId_)
300  {
301  _delegate->purge(subId_);
302  Lock<Mutex> lock(_lock);
303  UpdateIter purged = _latestUpdates.find(subId_);
304  if (purged != _latestUpdates.end())
305  {
306  Field clearableSubId = purged->first;
307  purged->second.clear();
308  _latestUpdates.erase(purged);
309  _counts.erase(subId_);
310  _timers.erase(subId_);
311  clearableSubId.clear();
312  }
313  }
314 
316  virtual void close()
317  {
318  _close();
319  }
320 
322  virtual void updateAll()
323  {
324  Lock<Mutex> lock(_lock);
325  _runUpdateAll();
326  }
327 
329  virtual void _runUpdateAll()
330  {
331  _updateAll = true;
332  while (!_counts.empty())
333  {
334  _lock.signalAll();
335  _lock.wait(250);
336  }
337  }
338 
339  protected:
340  void _close()
341  {
342  // Save all cached updates before shutting down update thread.
343  if (!_closed)
344  {
345  Lock<Mutex> lock(_lock);
346  _runUpdateAll();
347  _closed = true;
348  _lock.signalAll();
349  }
350  _delegate->close();
351  }
352  void updateThread()
353  {
354  // A place to hold updates to save
355  std::vector<SavedUpdate> _queuedUpdates;
356  while (!_closed)
357  {
358  DeferLock<Mutex> lock(_lock);
359  lock.lock();
360  // Wait for a signal or update interval
361  _lock.wait(_updateIntervalMillis);
362 
363  // Check for timeouts
364  for (TimerMap::iterator timer = _timers.begin();
365  timer != _timers.end(); )
366  {
367  if (timer->second.check())
368  {
369  UpdateIter update = _latestUpdates.find(timer->first);
370  if (update != _latestUpdates.end())
371  {
372  // Remove subId from all, clear of subId will
373  // occur after save.
374  _queuedUpdates.push_back(*update);
375  _counts.erase(update->first);
376  timer = _timers.erase(timer);
377  _latestUpdates.erase(update);
378  }
379  else
380  {
381  ++timer;
382  }
383  }
384  else
385  {
386  ++timer;
387  }
388  }
389 
390  // Need a local version so it doesn't change after we unlock to
391  // deliver updates.
392  volatile bool updateAll = _updateAll;
393  // Check for update counts
394  for (CountMap::iterator count = _counts.begin();
395  count != _counts.end(); )
396  {
397  if (updateAll || _timeoutMillis == 0.0 // -V550
398  || count->second >= _updateThreshold)
399  {
400  UpdateIter update = _latestUpdates.find(count->first);
401  if (update != _latestUpdates.end())
402  {
403  // Remove subId from all, clear of subId will
404  // occur after save.
405  _queuedUpdates.push_back(*update);
406  count = _counts.erase(count);
407  _timers.erase(update->first);
408  _latestUpdates.erase(update);
409  }
410  else
411  {
412  ++count;
413  }
414  }
415  else
416  {
417  ++count;
418  }
419  }
420  // Release the lock unless we're doing an update all, then we
421  // hold it until updates are completed and signal when done.
422  if (!updateAll)
423  {
424  lock.unlock();
425  }
426  // Shouldn't need the lock to send the updates
427  for (std::vector<SavedUpdate>::iterator update = _queuedUpdates.begin(), end = _queuedUpdates.end(); update != end; ++update)
428  {
429  _delegate->update(update->second);
430  Field clearableSubId(update->first);
431  clearableSubId.clear();
432  update->second.clear();
433  }
434  _queuedUpdates.clear();
435  if (updateAll)
436  {
437  _updateAll = false;
438  _lock.signalAll();
439  }
440  } // -V1020
441  }
442 
443  // The update thread runs until this is true.
444  volatile bool _closed;
445 
446  // The adapter doing the real saves
447  std::shared_ptr<RecoveryPointAdapterImpl> _delegate;
448 
449  // Lock used to protect _latestUpdates, _timers, and _counts.
450  Mutex _lock;
451 
452  // Types for our maps
453  typedef std::unordered_map<Field, RecoveryPoint, Field::FieldHash> UpdateMap;
454  typedef std::pair<Field, RecoveryPoint> SavedUpdate;
455  typedef UpdateMap::value_type Update;
456  typedef UpdateMap::iterator UpdateIter;
457  typedef std::unordered_map<Field, Timer, Field::FieldHash> TimerMap;
458  typedef TimerMap::iterator TimerIter;
459  typedef std::unordered_map<Field, unsigned, Field::FieldHash> CountMap;
460  typedef CountMap::iterator CountIter;
461 
462  // Saves the most recent update for each sub id.
463  UpdateMap _latestUpdates;
464 
465  // Saves a timer for each sub id that is reset each time we save.
466  TimerMap _timers;
467 
468  // Saves a count of how many updates have come in since last save.
469  CountMap _counts;
470 
471  // The thread doing the saves.
472  std::thread _thread;
473 
474  // How many updates before we force a save.
475  unsigned _updateThreshold;
476 
477  // How long between getting first cached update and save.
478  double _timeoutMillis;
479 
480  // How long between automatic checks of the timers.
481  long _updateIntervalMillis;
482 
483  // Flag to tell update thread to save everything.
484  volatile bool _updateAll;
485  };
486 
487 }
488 
489 #endif //_RECOVERYPOINTADAPTER_H_
490 
virtual void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:316
void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:154
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:299
virtual void purge()=0
Remove all data from the storage.
void prune()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:173
RecoveryPointAdapter implementation that delegates storage to another RecoveryPointAdapter but provid...
Definition: RecoveryPointAdapter.hpp:190
void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:161
virtual void _runUpdateAll()
Lock is already held.
Definition: RecoveryPointAdapter.hpp:329
virtual void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:281
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:246
virtual void prune()
Take any necessary actions to reduce associated storage size.
Definition: RecoveryPointAdapter.hpp:69
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
virtual void update(RecoveryPoint &recoveryPoint_)=0
Update the storage information with the given recovery point.
virtual bool next(RecoveryPoint &current_)=0
Recovery is done by iteration over elements in storage.
bool isValid() const
Return if this has a valid implementation.
Definition: RecoveryPointAdapter.hpp:179
ConflatingRecoveryPointAdapter(const std::shared_ptr< RecoveryPointAdapterImpl > &delegate_, unsigned updateThreshold_=10, double timeoutMillis_=2000.0, long updateIntervalMillis_=2000)
Conflate updates to delegate_ where they will only be processed every updateIntervalMillis_ for subsc...
Definition: RecoveryPointAdapter.hpp:207
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
Defines the AMPS::Field class, which represents the value of a field in a message.
RecoveryPoint deepCopy()
Return a deep copy of self.
Definition: RecoveryPoint.hpp:97
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:148
virtual void close()=0
Take any necessary actions to close the associated storage.
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: RecoveryPointAdapter.hpp:242
iterator begin()
To recover from an adapter, iterate over the adapter from begin() to end() with a RecoveryPointIterat...
Definition: RecoveryPointAdapter.hpp:134
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
virtual void updateAll()
Push all updates to underlying adapter.
Definition: RecoveryPointAdapter.hpp:322
void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:167
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:49
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:217
Definition: ampsplusplus.hpp:103
iterator end()
Return the end of recovery marker.
Definition: RecoveryPointAdapter.hpp:141
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:249