26 #ifndef _RECOVERYPOINTADAPTER_H_ 27 #define _RECOVERYPOINTADAPTER_H_ 30 #include <RecoveryPoint.hpp> 35 #include <unordered_map> 40 class RecoveryPointAdapter;
57 virtual void purge() = 0;
62 virtual void close() = 0;
67 class RecoveryPointAdapter
76 if (!_pAdapter || !_pAdapter->
next(_current))
92 bool operator==(
const iterator& rhs)
94 return _pAdapter == rhs._pAdapter;
96 bool operator!=(
const iterator& rhs)
98 return _pAdapter != rhs._pAdapter;
100 void operator++(
void) { advance(); }
105 RecoveryPointAdapter() : _body() { }
107 : _body(body_, isRef_) { }
108 RecoveryPointAdapter(
const RecoveryPointAdapter& rhs_)
118 iterator begin() {
return iterator(&(_body.get())); }
122 iterator end() {
return iterator(); }
128 _body.get().update(recoveryPoint_);
132 void purge() { _body.get().purge(); }
136 void purge(
const Field& subId_) { _body.get().purge(subId_); }
139 void close() { _body.get().close(); }
142 void prune() { _body.get().prune(); }
147 return _body.isValid();
150 BorrowRefHandle<RecoveryPointAdapterImpl> _body;
174 const std::shared_ptr<RecoveryPointAdapterImpl>& delegate_,
175 unsigned updateThreshold_ = 10,
176 double timeoutMillis_ = 2000.0,
177 long updateIntervalMillis_ = 2000
180 , _delegate(delegate_)
181 , _updateThreshold(updateThreshold_)
182 , _timeoutMillis(timeoutMillis_)
183 , _updateIntervalMillis(updateIntervalMillis_)
187 _thread = std::thread(&ConflatingRecoveryPointAdapter::updateThread,
195 for (UpdateIter purged = _latestUpdates.begin();
196 purged != _latestUpdates.end(); ++purged)
198 Field clearableSubId = purged->first;
199 purged->second.
clear();
200 clearableSubId.
clear();
210 return _delegate->next(current_);
218 Lock<Mutex> lock(_lock);
219 UpdateIter lastUpdate = _latestUpdates.find(subId);
220 if (lastUpdate == _latestUpdates.end())
224 _latestUpdates[subId] = recoveryPoint_.
deepCopy();
226 if (_timeoutMillis != 0.0)
228 Timer timer(_timeoutMillis);
230 _timers[subId] = timer;
236 lastUpdate->second.deepCopy(recoveryPoint_);
238 if (++_counts[subId] >= _updateThreshold)
250 Lock<Mutex> lock(_lock);
253 for (UpdateIter purged = _latestUpdates.begin();
254 purged != _latestUpdates.end(); ++purged)
256 Field clearableSubId = purged->first;
257 purged->second.
clear();
258 clearableSubId.
clear();
260 _latestUpdates.clear();
267 _delegate->purge(subId_);
268 Lock<Mutex> lock(_lock);
269 UpdateIter purged = _latestUpdates.find(subId_);
270 if (purged != _latestUpdates.end())
272 Field clearableSubId = purged->first;
273 purged->second.
clear();
274 _latestUpdates.erase(purged);
275 _counts.erase(subId_);
276 _timers.erase(subId_);
277 clearableSubId.
clear();
287 Lock<Mutex> lock(_lock);
298 Lock<Mutex> lock(_lock);
306 while (!_counts.empty())
317 std::deque<Update> _queuedUpdates;
320 DeferLock<Mutex> lock(_lock);
323 _lock.wait(_updateIntervalMillis);
326 for (TimerMap::iterator timer = _timers.begin();
327 timer != _timers.end(); )
329 if (timer->second.check())
331 UpdateIter
update = _latestUpdates.find(timer->first);
332 if (update != _latestUpdates.end())
336 _queuedUpdates.push_back(*update);
337 _counts.erase(update->first);
338 timer = _timers.erase(timer);
339 _latestUpdates.erase(update);
354 volatile bool updateAll = _updateAll;
356 for (CountMap::iterator count = _counts.begin();
357 count != _counts.end(); )
359 if (updateAll || _timeoutMillis == 0.0
360 || count->second >= _updateThreshold)
362 UpdateIter
update = _latestUpdates.find(count->first);
363 if (update != _latestUpdates.end())
367 _queuedUpdates.push_back(*update);
368 count = _counts.erase(count);
369 _timers.erase(update->first);
370 _latestUpdates.erase(update);
389 for (std::deque<Update>::iterator
update = _queuedUpdates.begin();
392 _delegate->update(
update->second);
394 clearableSubId.
clear();
397 _queuedUpdates.clear();
407 volatile bool _closed;
410 std::shared_ptr<RecoveryPointAdapterImpl> _delegate;
416 typedef std::unordered_map<Field, RecoveryPoint, Field::FieldHash> UpdateMap;
417 typedef UpdateMap::value_type Update;
418 typedef UpdateMap::iterator UpdateIter;
419 typedef std::unordered_map<Field, Timer, Field::FieldHash> TimerMap;
420 typedef TimerMap::iterator TimerIter;
421 typedef std::unordered_map<Field, unsigned, Field::FieldHash> CountMap;
422 typedef CountMap::iterator CountIter;
425 UpdateMap _latestUpdates;
437 unsigned _updateThreshold;
440 double _timeoutMillis;
443 long _updateIntervalMillis;
446 volatile bool _updateAll;
451 #endif //_RECOVERYPOINTADAPTER_H_ virtual void close()
Take any necessary actions to close the associated storage.
Definition: RecoveryPointAdapter.hpp:282
virtual void purge(const Field &subId_)
Remove the specified subId_ from the storage.
Definition: RecoveryPointAdapter.hpp:265
virtual void purge()=0
Remove all data from the storage.
RecoveryPointAdapter implementation that delegates storage to another RecoveryPointAdapter but provid...
Definition: RecoveryPointAdapter.hpp:156
virtual void _runUpdateAll()
Lock is already held.
Definition: RecoveryPointAdapter.hpp:303
virtual void purge()
Remove all data from the storage.
Definition: RecoveryPointAdapter.hpp:247
void clear()
Deletes the data associated with this Field, should only be used on Fields that were created as deepC...
Definition: Field.hpp:241
virtual void prune()
Take any necessary actions to reduce associated storage size.
Definition: RecoveryPointAdapter.hpp:64
virtual void update(RecoveryPoint &recoveryPoint_)=0
Update the storage information with the given recovery point.
virtual bool next(RecoveryPoint ¤t_)=0
Recovery is done by iteration over elements in storage.
ConflatingRecoveryPointAdapter(const std::shared_ptr< RecoveryPointAdapterImpl > &delegate_, unsigned updateThreshold_=10, double timeoutMillis_=2000.0, long updateIntervalMillis_=2000)
Conflate updates to delegate_ where they will only be processed every updateIntervalMillis_ for subsc...
Definition: RecoveryPointAdapter.hpp:173
RecoveryPoint provides access to the subId and bookmark needed to restart a subscription.
Definition: RecoveryPoint.hpp:62
Defines the AMPS::Field class, which represents the value of a field in a message.
RecoveryPoint deepCopy()
Return a deep copy of self.
Definition: RecoveryPoint.hpp:92
virtual void close()=0
Take any necessary actions to close the associated storage.
virtual bool next(RecoveryPoint ¤t_)
Recovery is done by iteration over elements in storage.
Definition: RecoveryPointAdapter.hpp:208
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual void updateAll()
Push all updates to underlying adapter.
Definition: RecoveryPointAdapter.hpp:296
RecoveryPointAdapter virtual base class for implementing external storage of subscription recovery po...
Definition: RecoveryPointAdapter.hpp:44
void deepCopy(const Field &orig_)
Makes self a deep copy of the original field.
Definition: Field.hpp:212
Definition: ampsplusplus.hpp:105
const Field & getSubId() const
Get the sub id for this recovery point.
Definition: RecoveryPoint.hpp:79
virtual void update(RecoveryPoint &recoveryPoint_)
Update the storage information with the given recovery point.
Definition: RecoveryPointAdapter.hpp:215