AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.2.0
RecoveryPointAdapter.hpp
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _RECOVERYPOINTADAPTER_H_
27 #define _RECOVERYPOINTADAPTER_H_
28 
29 #include <Field.hpp>
30 #include <RecoveryPoint.hpp>
31 #include <deque>
32 #include <iterator>
33 #include <memory>
34 #include <thread>
35 #include <unordered_map>
36 
37 namespace AMPS
38 {
39 
40 class RecoveryPointAdapter;
41 
44 class RecoveryPointAdapterImpl : public RefBody
45 {
46 public:
47  virtual ~RecoveryPointAdapterImpl() { }
52  virtual bool next(RecoveryPoint& current_) = 0;
55  virtual void update(RecoveryPoint& recoveryPoint_) = 0;
57  virtual void purge() = 0;
60  virtual void purge(const Field& subId_) = 0;
62  virtual void close() = 0;
64  virtual void prune() { ; }
65 };
66 
67 class RecoveryPointAdapter
68 {
69 public:
70  class iterator
71  {
72  RecoveryPointAdapterImpl* _pAdapter;
73  RecoveryPoint _current;
74  inline void advance()
75  {
76  if (!_pAdapter || !_pAdapter->next(_current))
77  {
78  _pAdapter = NULL;
79  }
80  }
81 
82  public:
83  iterator() // end
84  :_pAdapter(NULL)
85  {;}
86  iterator(RecoveryPointAdapterImpl* pAdapter_)
87  :_pAdapter(pAdapter_)
88  {
89  advance();
90  }
91 
92  bool operator==(const iterator& rhs)
93  {
94  return _pAdapter == rhs._pAdapter;
95  }
96  bool operator!=(const iterator& rhs)
97  {
98  return _pAdapter != rhs._pAdapter;
99  }
100  void operator++(void) { advance(); }
101  RecoveryPoint operator*(void) { return _current; }
102  RecoveryPoint* operator->(void) { return &_current; }
103  };
104 
105  RecoveryPointAdapter() : _body() { }
106  RecoveryPointAdapter(RecoveryPointAdapterImpl* body_, bool isRef_ = true)
107  : _body(body_, isRef_) { }
108  RecoveryPointAdapter(const RecoveryPointAdapter& rhs_)
109  : _body(rhs_._body)
110  { }
111 
118  iterator begin() { return iterator(&(_body.get())); }
119 
122  iterator end() { return iterator(); }
123 
126  void update(RecoveryPoint& recoveryPoint_)
127  {
128  _body.get().update(recoveryPoint_);
129  }
130 
132  void purge() { _body.get().purge(); }
133 
136  void purge(const Field& subId_) { _body.get().purge(subId_); }
137 
139  void close() { _body.get().close(); }
140 
142  void prune() { _body.get().prune(); }
143 
145  bool isValid() const
146  {
147  return _body.isValid();
148  }
149 private:
150  BorrowRefHandle<RecoveryPointAdapterImpl> _body;
151 };
152 
157 {
158 public:
174  const std::shared_ptr<RecoveryPointAdapterImpl>& delegate_,
175  unsigned updateThreshold_ = 10,
176  double timeoutMillis_ = 2000.0,
177  long updateIntervalMillis_ = 2000
178  )
179  : _closed(false)
180  , _delegate(delegate_)
181  , _updateThreshold(updateThreshold_)
182  , _timeoutMillis(timeoutMillis_)
183  , _updateIntervalMillis(updateIntervalMillis_)
184  , _updateAll(false)
185  {
186  // Start the update thread
187  _thread = std::thread(&ConflatingRecoveryPointAdapter::updateThread,
188  this);
189  }
190 
192  {
193  close();
194  _thread.join();
195  for (UpdateIter purged = _latestUpdates.begin();
196  purged != _latestUpdates.end(); ++purged)
197  {
198  Field clearableSubId = purged->first;
199  purged->second.clear();
200  clearableSubId.clear();
201  }
202  }
203 
208  virtual bool next(RecoveryPoint& current_)
209  {
210  return _delegate->next(current_);
211  }
212 
215  virtual void update(RecoveryPoint& recoveryPoint_)
216  {
217  Field subId = recoveryPoint_.getSubId();
218  Lock<Mutex> lock(_lock);
219  UpdateIter lastUpdate = _latestUpdates.find(subId);
220  if (lastUpdate == _latestUpdates.end())
221  {
222  // New sub id, use deep copies and a new Timer.
223  subId = subId.deepCopy();
224  _latestUpdates[subId] = recoveryPoint_.deepCopy();
225  _counts[subId] = 1;
226  if (_timeoutMillis != 0.0)
227  {
228  Timer timer(_timeoutMillis);
229  timer.start();
230  _timers[subId] = timer;
231  }
232  }
233  else
234  {
235  // SubId already exists, set to new recovery point.
236  lastUpdate->second.deepCopy(recoveryPoint_);
237  // Increment and check the count.
238  if (++_counts[subId] >= _updateThreshold)
239  {
240  // Time to update, make sure update thread wakes up.
241  _lock.signalAll();
242  }
243  }
244  }
245 
247  virtual void purge()
248  {
249  _delegate->purge();
250  Lock<Mutex> lock(_lock);
251  _counts.clear();
252  _timers.clear();
253  for (UpdateIter purged = _latestUpdates.begin();
254  purged != _latestUpdates.end(); ++purged)
255  {
256  Field clearableSubId = purged->first;
257  purged->second.clear();
258  clearableSubId.clear();
259  }
260  _latestUpdates.clear();
261  }
262 
265  virtual void purge(const Field& subId_)
266  {
267  _delegate->purge(subId_);
268  Lock<Mutex> lock(_lock);
269  UpdateIter purged = _latestUpdates.find(subId_);
270  if (purged != _latestUpdates.end())
271  {
272  Field clearableSubId = purged->first;
273  purged->second.clear();
274  _latestUpdates.erase(purged);
275  _counts.erase(subId_);
276  _timers.erase(subId_);
277  clearableSubId.clear();
278  }
279  }
280 
282  virtual void close()
283  {
284  // Save all cached updates before shutting down update thread.
285  if (!_closed)
286  {
287  Lock<Mutex> lock(_lock);
288  _runUpdateAll();
289  _closed = true;
290  _lock.signalAll();
291  }
292  _delegate->close();
293  }
294 
296  virtual void updateAll()
297  {
298  Lock<Mutex> lock(_lock);
299  _runUpdateAll();
300  }
301 
303  virtual void _runUpdateAll()
304  {
305  _updateAll = true;
306  while (!_counts.empty())
307  {
308  _lock.signalAll();
309  _lock.wait(250);
310  }
311  }
312 
313 protected:
314  void updateThread()
315  {
316  // A place to hold updates to save
317  std::deque<Update> _queuedUpdates;
318  while (!_closed)
319  {
320  DeferLock<Mutex> lock(_lock);
321  lock.lock();
322  // Wait for a signal or update interval
323  _lock.wait(_updateIntervalMillis);
324 
325  // Check for timeouts
326  for (TimerMap::iterator timer = _timers.begin();
327  timer != _timers.end(); )
328  {
329  if (timer->second.check())
330  {
331  UpdateIter update = _latestUpdates.find(timer->first);
332  if (update != _latestUpdates.end())
333  {
334  // Remove subId from all, clear of subId will
335  // occur after save.
336  _queuedUpdates.push_back(*update);
337  _counts.erase(update->first);
338  timer = _timers.erase(timer);
339  _latestUpdates.erase(update);
340  }
341  else
342  {
343  timer++;
344  }
345  }
346  else
347  {
348  timer++;
349  }
350  }
351 
352  // Need a local version so it doesn't change after we unlock to
353  // deliver updates.
354  volatile bool updateAll = _updateAll;
355  // Check for update counts
356  for (CountMap::iterator count = _counts.begin();
357  count != _counts.end(); )
358  {
359  if (updateAll || _timeoutMillis == 0.0
360  || count->second >= _updateThreshold)
361  {
362  UpdateIter update = _latestUpdates.find(count->first);
363  if (update != _latestUpdates.end())
364  {
365  // Remove subId from all, clear of subId will
366  // occur after save.
367  _queuedUpdates.push_back(*update);
368  count = _counts.erase(count);
369  _timers.erase(update->first);
370  _latestUpdates.erase(update);
371  }
372  else
373  {
374  count++;
375  }
376  }
377  else
378  {
379  count++;
380  }
381  }
382  // Release the lock unless we're doing an update all, then we
383  // hold it until updates are completed and signal when done.
384  if (!updateAll)
385  {
386  lock.unlock();
387  }
388  // Shouldn't need the lock to send the updates
389  for (std::deque<Update>::iterator update = _queuedUpdates.begin();
390  update != _queuedUpdates.end(); ++update)
391  {
392  _delegate->update(update->second);
393  Field clearableSubId(update->first);
394  clearableSubId.clear();
395  update->second.clear();
396  }
397  _queuedUpdates.clear();
398  if (updateAll)
399  {
400  _updateAll = false;
401  _lock.signalAll();
402  }
403  }
404  }
405 
406  // The update thread runs until this is true.
407  volatile bool _closed;
408 
409  // The adapter doing the real saves
410  std::shared_ptr<RecoveryPointAdapterImpl> _delegate;
411 
412  // Lock used to protect _latestUpdates, _timers, and _counts.
413  Mutex _lock;
414 
415  // Types for our maps
416  typedef std::unordered_map<Field, RecoveryPoint, Field::FieldHash> UpdateMap;
417  typedef UpdateMap::value_type Update;
418  typedef UpdateMap::iterator UpdateIter;
419  typedef std::unordered_map<Field, Timer, Field::FieldHash> TimerMap;
420  typedef TimerMap::iterator TimerIter;
421  typedef std::unordered_map<Field, unsigned, Field::FieldHash> CountMap;
422  typedef CountMap::iterator CountIter;
423 
424  // Saves the most recent update for each sub id.
425  UpdateMap _latestUpdates;
426 
427  // Saves a timer for each sub id that is reset each time we save.
428  TimerMap _timers;
429 
430  // Saves a count of how many updates have come in since last save.
431  CountMap _counts;
432 
433  // The thread doind the saves.
434  std::thread _thread;
435 
436  // How many updates before we force a save.
437  unsigned _updateThreshold;
438 
439  // How long between getting first cached update and save.
440  double _timeoutMillis;
441 
442  // How long between automatic checks of the timers.
443  long _updateIntervalMillis;
444 
445  // Flag to tell update thread to save everything.
446  volatile bool _updateAll;
447 };
448 
449 }
450 
451 #endif //_RECOVERYPOINTADAPTER_H_
452 
virtual void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:282
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:265
virtual void purge()=0
Remove all data from the storage.
RecoveryPointAdapter implementation that delegates storage to another RecoveryPointAdapter but provid...
Definition: RecoveryPointAdapter.hpp:156
virtual void _runUpdateAll()
Lock is already held.
Definition: RecoveryPointAdapter.hpp:303
virtual void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:247
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
virtual void prune()
Take any necessary actions to reduce associated storage size.
Definition: RecoveryPointAdapter.hpp:64
virtual void update(RecoveryPoint &recoveryPoint_)=0
Update the storage information with the given recovery point.
virtual bool next(RecoveryPoint &current_)=0
Recovery is done by iteration over elements in storage.
ConflatingRecoveryPointAdapter(const std::shared_ptr< RecoveryPointAdapterImpl > &delegate_, unsigned updateThreshold_=10, double timeoutMillis_=2000.0, long updateIntervalMillis_=2000)
Conflate updates to delegate_ where they will only be processed every updateIntervalMillis_ for subsc...
Definition: RecoveryPointAdapter.hpp:173
RecoveryPoint provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:62
Defines the AMPS::Field class, which represents the value of a field in a message.
RecoveryPoint deepCopy()
Return a deep copy of self.
Definition: RecoveryPoint.hpp:92
virtual void close()=0
Take any necessary actions to close the associated storage.
virtual bool next(RecoveryPoint &current_)
Recovery is done by iteration over elements in storage.
Definition: RecoveryPointAdapter.hpp:208
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual void updateAll()
Push all updates to underlying adapter.
Definition: RecoveryPointAdapter.hpp:296
RecoveryPointAdapter virtual base class for implementing external storage of subscription recovery po...
Definition: RecoveryPointAdapter.hpp:44
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:105
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:79
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:215