AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.3
HybridPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _HYBRIDPUBLISHSTORE_H_
27 #define _HYBRIDPUBLISHSTORE_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <MemoryPublishStore.hpp>
31 #include <PublishStore.hpp>
32 
37 
38 namespace AMPS
39 {
48 {
49  class HandlerData
50  {
51  public:
52  HybridPublishStore* _store;
54  void* _data;
55 
56  HandlerData()
57  : _store(NULL), _handler(NULL), _data(NULL)
58  { ; }
59 
60  void init(PublishStoreResizeHandler handler_, void* data_)
61  {
62  _handler = handler_;
63  _data = data_;
64  }
65  };
66 
67  class SwappingOutReplayer : public StoreReplayer
68  {
69  PublishStore* _pStore;
70  volatile size_t _entries, _errorCount;
71  volatile amps_uint64_t _lastIndex;
72  public:
73  SwappingOutReplayer(PublishStore* pStore_, size_t entries_)
74  : _pStore(pStore_), _entries(entries_)
75  , _errorCount(0), _lastIndex(0)
76  { }
77 
78  size_t getErrors() { return _errorCount; }
79 
80  amps_uint64_t lastIndex() { return _lastIndex; }
81 
82  void execute(Message& message_)
83  {
84  if(_entries > 0 && _errorCount == 0)
85  {
86  try
87  {
88  {
89  _pStore->store(message_, false);
90  }
91  _lastIndex = amps_message_get_field_uint64(message_.getMessage(),
92  AMPS_Sequence);
93  }
94  catch(...)
95  {
96  ++_errorCount;
97  }
98  --_entries;
99  }
100  }
101  };
102 
103 public:
111  HybridPublishStore(const char* fileName_, size_t maxMemoryCapacity_)
112  : StoreImpl(), _memStore(maxMemoryCapacity_), _fileStore(fileName_),
113  _cap(maxMemoryCapacity_),
114  _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5)),
115  _lowestIndexInMemory(0), _holdSwapping(false)
116  {
117  _handlerData._store = this;
118  _memStore.addRef();
119  _fileStore.addRef();
120  }
121 
129  HybridPublishStore(const std::string& fileName_, size_t maxMemoryCapacity_)
130  : StoreImpl(), _memStore(maxMemoryCapacity_), _fileStore(fileName_),
131  _cap(maxMemoryCapacity_),
132  _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5)),
133  _lowestIndexInMemory(0), _holdSwapping(false)
134  {
135  _handlerData._store = this;
136  _memStore.addRef();
137  _fileStore.addRef();
138  }
139 
146  void setLowWatermark(size_t lowWatermark_)
147  {
148  Lock<Mutex> guard(_lock);
149  _lowWatermark = lowWatermark_;
150  }
151 
159  {
160  Lock<Mutex> guard(_lock);
161  return _lowWatermark;
162  }
163 
167  void discardUpTo(amps_uint64_t index_)
168  {
169  Lock<Mutex> guard(_lock);
170  while (_holdSwapping)
171  {
172  if (!_lock.wait(1000))
173  {
174  Unlock<Mutex> u(_lock);
175  amps_invoke_waiting_function();
176  }
177  }
178  // Set _holdSwapping true to end of function
179  FlagFlip flip(&_holdSwapping);
180  {
181  Unlock<Mutex> u(_lock);
182  if (!index_)
183  {
184  _memStore.discardUpTo(_fileStore.getLastPersisted());
185  Lock<Mutex> l(_lock);
186  _lock.signalAll();
187  return;
188  }
189  _fileStore.discardUpTo(index_);
190  if (_lowestIndexInMemory <= index_)
191  {
192  _memStore.discardUpTo(index_);
193  _lowestIndexInMemory = index_ + 1;
194  }
195  }
196  _lock.signalAll();
197  }
198 
203  void replay(StoreReplayer& replayer_)
204  {
205  Lock<Mutex> guard(_lock);
206  while (_holdSwapping)
207  {
208  if (!_lock.wait(1000))
209  {
210  amps_invoke_waiting_function();
211  }
212  }
213  // Set _holdSwapping true to end of function
214  FlagFlip flip(&_holdSwapping);
215  {
216  Unlock<Mutex> u(_lock);
217  _fileStore.replay(replayer_);
218  _memStore.replay(replayer_);
219  }
220  _lock.signalAll();
221  }
222 
225  size_t unpersistedCount() const
226  {
227  return _fileStore.unpersistedCount() + _memStore.unpersistedCount();
228  }
229 
238  virtual void flush(long timeout_)
239  {
240  Lock<Mutex> guard(_lock);
241  amps_uint64_t waitFor = _getHybridHighestUnpersisted();
242  amps_uint64_t unset = getUnsetSequence();
243  // Check that we aren't already empty
244  if (waitFor == unset) return;
245  if (timeout_ > 0)
246  {
247  bool timedOut = false;
248  long waitTime = (timeout_ < 1000) ? timeout_ : 1000;
249  AMPS_START_TIMER(timeout_)
250  // While timeout hasn't expired and we haven't had everything acked
251  while (!timedOut && waitFor >= _getHybridLowestUnpersisted() &&
252  _getHybridLowestUnpersisted() != unset)
253  {
254  if (!_lock.wait(waitTime))
255  {
256  // May have woken up early, check real time
257  AMPS_RESET_TIMER(timedOut, timeout_);
258  waitTime = (timeout_ < 1000) ? timeout_ : 1000;
259  Unlock<Mutex> unlck(_lock);
260  amps_invoke_waiting_function();
261  }
262  }
263  // If we timed out and still haven't caught up with the acks
264  if (timedOut && waitFor >= _getHybridLowestUnpersisted() &&
265  _getHybridLowestUnpersisted() != unset)
266  {
267  throw TimedOutException("Timed out waiting to flush publish store.");
268  }
269  }
270  else
271  {
272  while (waitFor >= _getHybridLowestUnpersisted() &&
273  _getHybridLowestUnpersisted() != unset)
274  {
275  // Use timeout version so python can interrupt
276  _lock.wait(1000);
277  Unlock<Mutex> unlck(_lock);
278  amps_invoke_waiting_function();
279  }
280  }
281  }
282 
283  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
284  {
285  amps_uint64_t lowestIndexInMemory;
286  {
287  Lock<Mutex> guard(_lock);
288  lowestIndexInMemory = _lowestIndexInMemory;
289  }
290  if(index_ < lowestIndexInMemory)
291  {
292  return _fileStore.replaySingle(replayer_, index_);
293  }
294  else
295  {
296  return _memStore.replaySingle(replayer_, index_);
297  }
298  }
299 
303  amps_uint64_t store(const Message& message_)
304  {
305  Lock<Mutex> guard(_lock);
306  //while (_holdSwapping && _memStore.unpersistedCount() >= _cap)
307  while (_holdSwapping)
308  {
309  if (!_lock.wait(1000))
310  {
311  Unlock<Mutex> u(_lock);
312  amps_invoke_waiting_function();
313  }
314  }
315  if(_memStore.unpersistedCount() >= _cap && !_holdSwapping)
316  {
317  // Set _holdSwapping true to end of function
318  FlagFlip flip(&_holdSwapping);
319  SwappingOutReplayer swapper(&_fileStore,
320  _memStore.unpersistedCount() - _lowWatermark);
321  {
322  Unlock<Mutex> u(_lock);
323  _memStore.replay(swapper);
324  }
325  _lock.signalAll();
326  if(swapper.getErrors() == 0)
327  {
328  _lowestIndexInMemory = swapper.lastIndex();
329  _memStore.discardUpTo(_lowestIndexInMemory++);
330  }
331  }
332  return _memStore.store(message_);
333  }
334 
335  void setResizeHandler(PublishStoreResizeHandler handler_, void* data_)
336  {
337  _handlerData.init(handler_, data_);
338  _fileStore.setResizeHandler(HybridPublishStore::resizeHandler,
339  (void*)&_handlerData);
340  }
341 
342  inline virtual PublishStoreResizeHandler getResizeHandler() const
343  {
344  return _handlerData._handler;
345  }
346 
347  amps_uint64_t getLowestUnpersisted() const
348  {
349  Lock<Mutex> guard(_lock);
350  return _getHybridLowestUnpersisted();
351  }
352 
353  amps_uint64_t getHighestUnpersisted() const
354  {
355  Lock<Mutex> guard(_lock);
356  return _getHybridHighestUnpersisted();
357  }
358 
359  amps_uint64_t getLastPersisted(void)
360  {
361  Lock<Mutex> guard(_lock);
362  return _getHybridLastPersisted();
363  }
364 
365 private:
366 
367  static bool resizeHandler(Store store_, size_t size_, void* data_)
368  {
369  HandlerData* handlerData = (HandlerData*)data_;
370  //Unlock<Mutex> hybridUnlock(handlerData->_store->_lock);
371  return handlerData->_handler(store_, size_, handlerData->_data);
372  }
373 
374  // Lock should be held
375  amps_uint64_t _getHybridLowestUnpersisted() const
376  {
377  amps_uint64_t filemin = _fileStore.getLowestUnpersisted();
378  amps_uint64_t memmin = _memStore.getLowestUnpersisted();
379  if (filemin == AMPS_UNSET_SEQUENCE)
380  return memmin;
381  if (memmin == AMPS_UNSET_SEQUENCE || memmin > filemin)
382  return filemin;
383  // Only left with memmin <= filemin
384  return memmin;
385  }
386 
387  // Lock should be held
388  amps_uint64_t _getHybridHighestUnpersisted() const
389  {
390  amps_uint64_t filemax = _fileStore.getHighestUnpersisted();
391  amps_uint64_t memmax = _memStore.getHighestUnpersisted();
392  if (filemax == AMPS_UNSET_SEQUENCE)
393  return memmax;
394  if (memmax == AMPS_UNSET_SEQUENCE || memmax < filemax)
395  return filemax;
396  // Only left with memmax >= filemax
397  return memmax;
398  }
399 
400  amps_uint64_t _getHybridLastPersisted()
401  {
402  // If we've never swapped and nothing is in file
403  if (!_lowestIndexInMemory &&
404  _fileStore.unpersistedCount() == 0)
405  {
406  _fileStore.discardUpTo(_memStore.getLastPersisted());
407  return _fileStore.getLastPersisted();
408  }
409  amps_uint64_t memLast = _memStore.getLastPersisted();
410  amps_uint64_t fileLast = _fileStore.getLastPersisted();
411  return (memLast < fileLast) ? memLast : fileLast;
412  }
413 
414  MemoryPublishStore _memStore;
415  PublishStore _fileStore;
416  size_t _cap;
417  size_t _lowWatermark;
418  amps_uint64_t _lowestIndexInMemory;
419  mutable Mutex _lock;
420  HandlerData _handlerData;
421  volatile bool _holdSwapping;
422 
423 };//end HybridPublishStore
424 
425 }//end namespace AMPS
426 
427 #endif //_HYBRIDPUBLISHSTORE_H_
428 
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:761
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:733
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
Provides AMPS::PublishStore, a publish store that uses memory-mapped files to provide a publish store...
void replay(StoreReplayer &replayer_)
Used internally by Client to replay messages in the store to AMPS after a successful connection...
Definition: HybridPublishStore.hpp:203
amps_uint64_t store(const Message &message_)
Used internally by Client to put messages into the Store.
Definition: HybridPublishStore.hpp:303
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: HybridPublishStore.hpp:238
size_t unpersistedCount() const
The number of messages in the Store that have not been discarded.
Definition: HybridPublishStore.hpp:225
A StoreImpl implementation that uses a memory-mapped file to provide a publish store that persists ac...
Definition: PublishStore.hpp:46
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: HybridPublishStore.hpp:347
Core type, function, and class declarations for the AMPS C++ client.
HybridPublishStore(const char *fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:111
size_t getLowWatermark()
Get how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:158
void setLowWatermark(size_t lowWatermark_)
Set how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:146
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:755
Provides AMPS::MemoryPublishStore, a publish store that holds messages in memory. ...
void setResizeHandler(PublishStoreResizeHandler handler_, void *data_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: HybridPublishStore.hpp:335
void discardUpTo(amps_uint64_t index_)
Discard all messages in the store up to and including index_.
Definition: HybridPublishStore.hpp:167
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:859
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: HybridPublishStore.hpp:359
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: HybridPublishStore.hpp:283
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:129
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:817
A StoreImpl implementation that uses MemoryStoreBuffer as its buffer to hold published messages in me...
Definition: MemoryPublishStore.hpp:44
An implementation of StoreImpl for publication.
Definition: HybridPublishStore.hpp:47