26 #ifndef _HYBRIDPUBLISHSTORE_H_ 27 #define _HYBRIDPUBLISHSTORE_H_ 57 : _store(NULL), _handler(NULL), _data(NULL)
70 volatile size_t _entries, _errorCount;
71 volatile amps_uint64_t _lastIndex;
73 SwappingOutReplayer(
PublishStore* pStore_,
size_t entries_)
74 : _pStore(pStore_), _entries(entries_)
75 , _errorCount(0), _lastIndex(0)
78 size_t getErrors() {
return _errorCount; }
80 amps_uint64_t lastIndex() {
return _lastIndex; }
84 if(_entries > 0 && _errorCount == 0)
89 _pStore->
store(message_,
false);
112 :
StoreImpl(), _memStore(maxMemoryCapacity_), _fileStore(fileName_),
113 _cap(maxMemoryCapacity_),
114 _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5)),
115 _lowestIndexInMemory(0), _holdSwapping(false)
117 _handlerData._store =
this;
130 :
StoreImpl(), _memStore(maxMemoryCapacity_), _fileStore(fileName_),
131 _cap(maxMemoryCapacity_),
132 _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5)),
133 _lowestIndexInMemory(0), _holdSwapping(false)
135 _handlerData._store =
this;
148 Lock<Mutex> guard(_lock);
149 _lowWatermark = lowWatermark_;
160 Lock<Mutex> guard(_lock);
161 return _lowWatermark;
169 Lock<Mutex> guard(_lock);
170 while (_holdSwapping)
172 if (!_lock.wait(1000))
174 Unlock<Mutex> u(_lock);
175 amps_invoke_waiting_function();
179 FlagFlip flip(&_holdSwapping);
181 Unlock<Mutex> u(_lock);
184 _memStore.discardUpTo(_fileStore.getLastPersisted());
185 Lock<Mutex> l(_lock);
189 _fileStore.discardUpTo(index_);
190 if (_lowestIndexInMemory <= index_)
192 _memStore.discardUpTo(index_);
193 _lowestIndexInMemory = index_ + 1;
205 Lock<Mutex> guard(_lock);
206 while (_holdSwapping)
208 if (!_lock.wait(1000))
210 amps_invoke_waiting_function();
214 FlagFlip flip(&_holdSwapping);
216 Unlock<Mutex> u(_lock);
217 _fileStore.replay(replayer_);
218 _memStore.replay(replayer_);
227 return _fileStore.unpersistedCount() + _memStore.unpersistedCount();
240 Lock<Mutex> guard(_lock);
241 amps_uint64_t waitFor = _getHybridHighestUnpersisted();
244 if (waitFor == unset)
return;
247 bool timedOut =
false;
248 long waitTime = (timeout_ < 1000) ? timeout_ : 1000;
249 AMPS_START_TIMER(timeout_)
251 while (!timedOut && waitFor >= _getHybridLowestUnpersisted() &&
252 _getHybridLowestUnpersisted() != unset)
254 if (!_lock.wait(waitTime))
257 AMPS_RESET_TIMER(timedOut, timeout_);
258 waitTime = (timeout_ < 1000) ? timeout_ : 1000;
259 Unlock<Mutex> unlck(_lock);
260 amps_invoke_waiting_function();
264 if (timedOut && waitFor >= _getHybridLowestUnpersisted() &&
265 _getHybridLowestUnpersisted() != unset)
267 throw TimedOutException(
"Timed out waiting to flush publish store.");
272 while (waitFor >= _getHybridLowestUnpersisted() &&
273 _getHybridLowestUnpersisted() != unset)
277 Unlock<Mutex> unlck(_lock);
278 amps_invoke_waiting_function();
285 amps_uint64_t lowestIndexInMemory;
287 Lock<Mutex> guard(_lock);
288 lowestIndexInMemory = _lowestIndexInMemory;
290 if(index_ < lowestIndexInMemory)
292 return _fileStore.replaySingle(replayer_, index_);
296 return _memStore.replaySingle(replayer_, index_);
305 Lock<Mutex> guard(_lock);
307 while (_holdSwapping)
309 if (!_lock.wait(1000))
311 Unlock<Mutex> u(_lock);
312 amps_invoke_waiting_function();
315 if(_memStore.unpersistedCount() >= _cap && !_holdSwapping)
318 FlagFlip flip(&_holdSwapping);
319 SwappingOutReplayer swapper(&_fileStore,
320 _memStore.unpersistedCount() - _lowWatermark);
322 Unlock<Mutex> u(_lock);
323 _memStore.replay(swapper);
326 if(swapper.getErrors() == 0)
328 _lowestIndexInMemory = swapper.lastIndex();
329 _memStore.discardUpTo(_lowestIndexInMemory++);
332 return _memStore.store(message_);
337 _handlerData.init(handler_, data_);
338 _fileStore.setResizeHandler(HybridPublishStore::resizeHandler,
339 (
void*)&_handlerData);
344 return _handlerData._handler;
349 Lock<Mutex> guard(_lock);
350 return _getHybridLowestUnpersisted();
353 amps_uint64_t getHighestUnpersisted()
const 355 Lock<Mutex> guard(_lock);
356 return _getHybridHighestUnpersisted();
361 Lock<Mutex> guard(_lock);
362 return _getHybridLastPersisted();
367 static bool resizeHandler(
Store store_,
size_t size_,
void* data_)
369 HandlerData* handlerData = (HandlerData*)data_;
371 return handlerData->_handler(store_, size_, handlerData->_data);
375 amps_uint64_t _getHybridLowestUnpersisted()
const 377 amps_uint64_t filemin = _fileStore.getLowestUnpersisted();
378 amps_uint64_t memmin = _memStore.getLowestUnpersisted();
379 if (filemin == AMPS_UNSET_SEQUENCE)
381 if (memmin == AMPS_UNSET_SEQUENCE || memmin > filemin)
388 amps_uint64_t _getHybridHighestUnpersisted()
const 390 amps_uint64_t filemax = _fileStore.getHighestUnpersisted();
391 amps_uint64_t memmax = _memStore.getHighestUnpersisted();
392 if (filemax == AMPS_UNSET_SEQUENCE)
394 if (memmax == AMPS_UNSET_SEQUENCE || memmax < filemax)
400 amps_uint64_t _getHybridLastPersisted()
403 if (!_lowestIndexInMemory &&
404 _fileStore.unpersistedCount() == 0)
406 _fileStore.discardUpTo(_memStore.getLastPersisted());
407 return _fileStore.getLastPersisted();
409 amps_uint64_t memLast = _memStore.getLastPersisted();
410 amps_uint64_t fileLast = _fileStore.getLastPersisted();
411 return (memLast < fileLast) ? memLast : fileLast;
417 size_t _lowWatermark;
418 amps_uint64_t _lowestIndexInMemory;
420 HandlerData _handlerData;
421 volatile bool _holdSwapping;
427 #endif //_HYBRIDPUBLISHSTORE_H_ Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:761
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:733
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Provides AMPS::PublishStore, a publish store that uses memory-mapped files to provide a publish store...
void replay(StoreReplayer &replayer_)
Used internally by Client to replay messages in the store to AMPS after a successful connection...
Definition: HybridPublishStore.hpp:203
amps_uint64_t store(const Message &message_)
Used internally by Client to put messages into the Store.
Definition: HybridPublishStore.hpp:303
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: HybridPublishStore.hpp:238
size_t unpersistedCount() const
The number of messages in the Store that have not been discarded.
Definition: HybridPublishStore.hpp:225
A StoreImpl implementation that uses a memory-mapped file to provide a publish store that persists ac...
Definition: PublishStore.hpp:46
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: HybridPublishStore.hpp:347
Core type, function, and class declarations for the AMPS C++ client.
HybridPublishStore(const char *fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:111
size_t getLowWatermark()
Get how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:158
void setLowWatermark(size_t lowWatermark_)
Set how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:146
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:755
Provides AMPS::MemoryPublishStore, a publish store that holds messages in memory. ...
void setResizeHandler(PublishStoreResizeHandler handler_, void *data_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: HybridPublishStore.hpp:335
void discardUpTo(amps_uint64_t index_)
Discard all messages in the store up to and including index_.
Definition: HybridPublishStore.hpp:167
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:859
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: HybridPublishStore.hpp:359
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: HybridPublishStore.hpp:283
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:129
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:817
A StoreImpl implementation that uses MemoryStoreBuffer as its buffer to hold published messages in me...
Definition: MemoryPublishStore.hpp:44
An implementation of StoreImpl for publication.
Definition: HybridPublishStore.hpp:47