AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
RecoveryPointAdapter.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RECOVERYPOINTADAPTER_H_
27 #define _RECOVERYPOINTADAPTER_H_
28 
29 #include <Field.hpp>
30 #include <RecoveryPoint.hpp>
31 #include <vector>
32 #include <iterator>
33 #include <memory>
34 #include <thread>
35 #include <unordered_map>
36 
41 
42 namespace AMPS
43 {
44 
45 class RecoveryPointAdapter;
46 
49 class RecoveryPointAdapterImpl : public RefBody
50 {
51 public:
52  virtual ~RecoveryPointAdapterImpl() { }
57  virtual bool next(RecoveryPoint& current_) = 0;
60  virtual void update(RecoveryPoint& recoveryPoint_) = 0;
62  virtual void purge() = 0;
65  virtual void purge(const Field& subId_) = 0;
67  virtual void close() = 0;
69  virtual void prune() { ; }
70 };
71 
74 class RecoveryPointAdapter // -V690
75 {
76 public:
77  class iterator
78  {
79  RecoveryPointAdapterImpl* _pAdapter;
80  RecoveryPoint _current;
81  inline void advance()
82  {
83  if (!_pAdapter || !_pAdapter->next(_current))
84  {
85  _pAdapter = NULL;
86  }
87  }
88 
89  public:
90  iterator() // end
91  :_pAdapter(NULL)
92  {;}
93  iterator(RecoveryPointAdapterImpl* pAdapter_)
94  :_pAdapter(pAdapter_)
95  {
96  advance();
97  }
98 
99  bool operator==(const iterator& rhs)
100  {
101  return _pAdapter == rhs._pAdapter;
102  }
103  bool operator!=(const iterator& rhs)
104  {
105  return _pAdapter != rhs._pAdapter;
106  }
107  void operator++(void) { advance(); }
108  RecoveryPoint operator*(void) { return _current; }
109  RecoveryPoint* operator->(void) { return &_current; }
110  };
111 
112  RecoveryPointAdapter() : _body() { }
113  RecoveryPointAdapter(RecoveryPointAdapterImpl* body_, bool isRef_ = true)
114  : _body(body_, isRef_) { }
116  : _body(rhs_._body)
117  { }
118 
125  iterator begin() { return iterator(&(_body.get())); }
126 
129  iterator end() { return iterator(); }
130 
133  void update(RecoveryPoint& recoveryPoint_)
134  {
135  _body.get().update(recoveryPoint_);
136  }
137 
139  void purge() { _body.get().purge(); }
140 
143  void purge(const Field& subId_) { _body.get().purge(subId_); }
144 
146  void close() { _body.get().close(); }
147 
149  void prune() { _body.get().prune(); }
150 
152  bool isValid() const
153  {
154  return _body.isValid();
155  }
156 private:
157  BorrowRefHandle<RecoveryPointAdapterImpl> _body;
158 };
159 
164 {
165 public:
181  const std::shared_ptr<RecoveryPointAdapterImpl>& delegate_,
182  unsigned updateThreshold_ = 10,
183  double timeoutMillis_ = 2000.0,
184  long updateIntervalMillis_ = 2000
185  )
186  : _closed(false)
187  , _delegate(delegate_)
188  , _updateThreshold(updateThreshold_)
189  , _timeoutMillis(timeoutMillis_)
190  , _updateIntervalMillis(updateIntervalMillis_)
191  , _updateAll(false)
192  {
193  // Start the update thread
194  _thread = std::thread(&ConflatingRecoveryPointAdapter::updateThread,
195  this);
196  }
197 
199  {
200  _close();
201  _thread.join();
202  for (UpdateIter purged = _latestUpdates.begin();
203  purged != _latestUpdates.end(); ++purged)
204  {
205  Field clearableSubId = purged->first;
206  purged->second.clear();
207  clearableSubId.clear();
208  }
209  }
210 
215  virtual bool next(RecoveryPoint& current_)
216  {
217  return _delegate->next(current_);
218  }
219 
222  virtual void update(RecoveryPoint& recoveryPoint_)
223  {
224  Field subId = recoveryPoint_.getSubId();
225  Lock<Mutex> lock(_lock);
226  UpdateIter lastUpdate = _latestUpdates.find(subId);
227  if (lastUpdate == _latestUpdates.end())
228  {
229  // New sub id, use deep copies and a new Timer.
230  subId = subId.deepCopy();
231  _latestUpdates[subId] = recoveryPoint_.deepCopy();
232  _counts[subId] = 1;
233  if (_timeoutMillis != 0.0) // -V550
234  {
235  Timer timer(_timeoutMillis);
236  timer.start();
237  _timers[subId] = timer;
238  }
239  }
240  else
241  {
242  // SubId already exists, set to new recovery point.
243  lastUpdate->second.deepCopy(recoveryPoint_);
244  // Increment and check the count.
245  if (++_counts[subId] >= _updateThreshold)
246  {
247  // Time to update, make sure update thread wakes up.
248  _lock.signalAll();
249  }
250  }
251  }
252 
254  virtual void purge()
255  {
256  _delegate->purge();
257  Lock<Mutex> lock(_lock);
258  _counts.clear();
259  _timers.clear();
260  for (UpdateIter purged = _latestUpdates.begin();
261  purged != _latestUpdates.end(); ++purged)
262  {
263  Field clearableSubId = purged->first;
264  purged->second.clear();
265  clearableSubId.clear();
266  }
267  _latestUpdates.clear();
268  }
269 
272  virtual void purge(const Field& subId_)
273  {
274  _delegate->purge(subId_);
275  Lock<Mutex> lock(_lock);
276  UpdateIter purged = _latestUpdates.find(subId_);
277  if (purged != _latestUpdates.end())
278  {
279  Field clearableSubId = purged->first;
280  purged->second.clear();
281  _latestUpdates.erase(purged);
282  _counts.erase(subId_);
283  _timers.erase(subId_);
284  clearableSubId.clear();
285  }
286  }
287 
289  virtual void close()
290  {
291  _close();
292  }
293 
295  virtual void updateAll()
296  {
297  Lock<Mutex> lock(_lock);
298  _runUpdateAll();
299  }
300 
302  virtual void _runUpdateAll()
303  {
304  _updateAll = true;
305  while (!_counts.empty())
306  {
307  _lock.signalAll();
308  _lock.wait(250);
309  }
310  }
311 
312 protected:
313  void _close()
314  {
315  // Save all cached updates before shutting down update thread.
316  if (!_closed)
317  {
318  Lock<Mutex> lock(_lock);
319  _runUpdateAll();
320  _closed = true;
321  _lock.signalAll();
322  }
323  _delegate->close();
324  }
325  void updateThread()
326  {
327  // A place to hold updates to save
328  std::vector<SavedUpdate> _queuedUpdates;
329  while (!_closed)
330  {
331  DeferLock<Mutex> lock(_lock);
332  lock.lock();
333  // Wait for a signal or update interval
334  _lock.wait(_updateIntervalMillis);
335 
336  // Check for timeouts
337  for (TimerMap::iterator timer = _timers.begin();
338  timer != _timers.end(); )
339  {
340  if (timer->second.check())
341  {
342  UpdateIter update = _latestUpdates.find(timer->first);
343  if (update != _latestUpdates.end())
344  {
345  // Remove subId from all, clear of subId will
346  // occur after save.
347  _queuedUpdates.push_back(*update);
348  _counts.erase(update->first);
349  timer = _timers.erase(timer);
350  _latestUpdates.erase(update);
351  }
352  else
353  {
354  ++timer;
355  }
356  }
357  else
358  {
359  ++timer;
360  }
361  }
362 
363  // Need a local version so it doesn't change after we unlock to
364  // deliver updates.
365  volatile bool updateAll = _updateAll;
366  // Check for update counts
367  for (CountMap::iterator count = _counts.begin();
368  count != _counts.end(); )
369  {
370  if (updateAll || _timeoutMillis == 0.0 // -V550
371  || count->second >= _updateThreshold)
372  {
373  UpdateIter update = _latestUpdates.find(count->first);
374  if (update != _latestUpdates.end())
375  {
376  // Remove subId from all, clear of subId will
377  // occur after save.
378  _queuedUpdates.push_back(*update);
379  count = _counts.erase(count);
380  _timers.erase(update->first);
381  _latestUpdates.erase(update);
382  }
383  else
384  {
385  ++count;
386  }
387  }
388  else
389  {
390  ++count;
391  }
392  }
393  // Release the lock unless we're doing an update all, then we
394  // hold it until updates are completed and signal when done.
395  if (!updateAll)
396  {
397  lock.unlock();
398  }
399  // Shouldn't need the lock to send the updates
400  for (std::vector<SavedUpdate>::iterator update = _queuedUpdates.begin(), end = _queuedUpdates.end(); update != end; ++update)
401  {
402  _delegate->update(update->second);
403  Field clearableSubId(update->first);
404  clearableSubId.clear();
405  update->second.clear();
406  }
407  _queuedUpdates.clear();
408  if (updateAll)
409  {
410  _updateAll = false;
411  _lock.signalAll();
412  }
413  } // -V1020
414  }
415 
416  // The update thread runs until this is true.
417  volatile bool _closed;
418 
419  // The adapter doing the real saves
420  std::shared_ptr<RecoveryPointAdapterImpl> _delegate;
421 
422  // Lock used to protect _latestUpdates, _timers, and _counts.
423  Mutex _lock;
424 
425  // Types for our maps
426  typedef std::unordered_map<Field, RecoveryPoint, Field::FieldHash> UpdateMap;
427  typedef std::pair<Field, RecoveryPoint> SavedUpdate;
428  typedef UpdateMap::value_type Update;
429  typedef UpdateMap::iterator UpdateIter;
430  typedef std::unordered_map<Field, Timer, Field::FieldHash> TimerMap;
431  typedef TimerMap::iterator TimerIter;
432  typedef std::unordered_map<Field, unsigned, Field::FieldHash> CountMap;
433  typedef CountMap::iterator CountIter;
434 
435  // Saves the most recent update for each sub id.
436  UpdateMap _latestUpdates;
437 
438  // Saves a timer for each sub id that is reset each time we save.
439  TimerMap _timers;
440 
441  // Saves a count of how many updates have come in since last save.
442  CountMap _counts;
443 
444  // The thread doind the saves.
445  std::thread _thread;
446 
447  // How many updates before we force a save.
448  unsigned _updateThreshold;
449 
450  // How long between getting first cached update and save.
451  double _timeoutMillis;
452 
453  // How long between automatic checks of the timers.
454  long _updateIntervalMillis;
455 
456  // Flag to tell update thread to save everything.
457  volatile bool _updateAll;
458 };
459 
460 }
461 
462 #endif //_RECOVERYPOINTADAPTER_H_
463 
virtual void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:289
void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:139
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:272
virtual void purge()=0
Remove all data from the storage.
void prune()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:149
RecoveryPointAdapter implementation that delegates storage to another RecoveryPointAdapter but provid...
Definition: RecoveryPointAdapter.hpp:163
void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:143
virtual void _runUpdateAll()
Lock is already held.
Definition: RecoveryPointAdapter.hpp:302
virtual void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:254
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
virtual void prune()
Take any necessary actions to reduce associated storage size.
Definition: RecoveryPointAdapter.hpp:69
RecoveryPointAdapter a handle class for implementing external storage of subscription recovery points...
Definition: RecoveryPointAdapter.hpp:74
virtual void update(RecoveryPoint &recoveryPoint_)=0
Update the storage information with the given recovery point.
virtual bool next(RecoveryPoint &current_)=0
Recovery is done by iteration over elements in storage.
bool isValid() const
Return if this has a valid implementation.
Definition: RecoveryPointAdapter.hpp:152
ConflatingRecoveryPointAdapter(const std::shared_ptr< RecoveryPointAdapterImpl > &delegate_, unsigned updateThreshold_=10, double timeoutMillis_=2000.0, long updateIntervalMillis_=2000)
Conflate updates to delegate_ where they will only be processed every updateIntervalMillis_ for subsc...
Definition: RecoveryPointAdapter.hpp:180
Provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:67
Defines the AMPS::Field class, which represents the value of a field in a message.
RecoveryPoint deepCopy()
Return a deep copy of self.
Definition: RecoveryPoint.hpp:97
Provides AMPS::RecoveryPoint, AMPS::RecoveryPointFactory, AMPS::FixedRecoveryPoint, and AMPS::DynamicRecoveryPoint.
void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:133
virtual void close()=0
Take any necessary actions to close the associated storage.
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: RecoveryPointAdapter.hpp:215
iterator begin()
To recover from an adapter, iterate over the adapter from begin() to end() with a RecoveryPointIterat...
Definition: RecoveryPointAdapter.hpp:125
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual void updateAll()
Push all updates to underlying adapter.
Definition: RecoveryPointAdapter.hpp:295
void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:146
RecoveryPointAdapterImpl virtual base class for implementing external storage of subscription recover...
Definition: RecoveryPointAdapter.hpp:49
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:103
iterator end()
Return the end of recovery marker.
Definition: RecoveryPointAdapter.hpp:129
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:84
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:222