AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
HybridPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _HYBRIDPUBLISHSTORE_H_
27 #define _HYBRIDPUBLISHSTORE_H_
28 
29 #include <ampsplusplus.hpp>
30 #include <MemoryPublishStore.hpp>
31 #include <PublishStore.hpp>
32 
37 
38 namespace AMPS
39 {
48  {
49  class HandlerData
50  {
51  public:
52  HybridPublishStore* _store;
54  void* _data;
55 
56  HandlerData()
57  : _store(NULL), _handler(NULL), _data(NULL)
58  { ; }
59 
60  void init(PublishStoreResizeHandler handler_, void* data_)
61  {
62  _handler = handler_;
63  _data = data_;
64  }
65  };
66 
67  class SwappingOutReplayer : public StoreReplayer
68  {
69  PublishStore* _pStore;
70  volatile size_t _entries, _errorCount;
71  volatile amps_uint64_t _lastIndex;
72  public:
73  SwappingOutReplayer(PublishStore* pStore_, size_t entries_)
74  : _pStore(pStore_), _entries(entries_)
75  , _errorCount(0), _lastIndex(0)
76  { }
77 
78  size_t getErrors()
79  {
80  return _errorCount;
81  }
82 
83  amps_uint64_t lastIndex()
84  {
85  return _lastIndex;
86  }
87 
88  void execute(Message& message_)
89  {
90  if (_entries > 0 && _errorCount == 0)
91  {
92  try
93  {
94  {
95  _pStore->store(message_, false);
96  }
97  _lastIndex = amps_message_get_field_uint64(
98  message_.getMessage(),
99  AMPS_Sequence);
100  }
101  catch (...)
102  {
103  ++_errorCount;
104  }
105  --_entries;
106  }
107  }
108  };
109 
110  public:
118  HybridPublishStore(const char* fileName_, size_t maxMemoryCapacity_)
119  : StoreImpl()
120  , _memStore(maxMemoryCapacity_)
121  , _fileStore(fileName_)
122  , _cap(maxMemoryCapacity_)
123  , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
124  , _lowestIndexInMemory(0)
125  , _holdSwapping(false)
126  {
127  _handlerData._store = this;
128  _memStore.addRef();
129  _fileStore.addRef();
130  }
131 
139  HybridPublishStore(const std::string& fileName_, size_t maxMemoryCapacity_)
140  : StoreImpl()
141  , _memStore(maxMemoryCapacity_)
142  , _fileStore(fileName_)
143  , _cap(maxMemoryCapacity_)
144  , _lowWatermark((size_t)((double)maxMemoryCapacity_ * 0.5))
145  , _lowestIndexInMemory(0)
146  , _holdSwapping(false)
147  {
148  _handlerData._store = this;
149  _memStore.addRef();
150  _fileStore.addRef();
151  }
152 
159  void setLowWatermark(size_t lowWatermark_)
160  {
161  Lock<Mutex> guard(_lock);
162  _lowWatermark = lowWatermark_;
163  }
164 
172  {
173  Lock<Mutex> guard(_lock);
174  return _lowWatermark;
175  }
176 
180  void discardUpTo(amps_uint64_t index_)
181  {
182  Lock<Mutex> guard(_lock);
183  while (_holdSwapping)
184  {
185  if (!_lock.wait(1000))
186  {
187  Unlock<Mutex> u(_lock);
188  amps_invoke_waiting_function();
189  }
190  }
191  // Set _holdSwapping true to end of function
192  FlagFlip flip(&_holdSwapping);
193  {
194  Unlock<Mutex> u(_lock);
195  if (!index_)
196  {
197  _memStore.discardUpTo(_fileStore.getLastPersisted());
198  Lock<Mutex> l(_lock);
199  _lock.signalAll();
200  return;
201  }
202  _fileStore.discardUpTo(index_);
203  if (_lowestIndexInMemory <= index_)
204  {
205  _memStore.discardUpTo(index_);
206  _lowestIndexInMemory = index_ + 1;
207  }
208  }
209  _lock.signalAll();
210  }
211 
216  void replay(StoreReplayer& replayer_)
217  {
218  Lock<Mutex> guard(_lock);
219  while (_holdSwapping)
220  {
221  if (!_lock.wait(1000))
222  {
223  amps_invoke_waiting_function();
224  }
225  }
226  // Set _holdSwapping true to end of function
227  FlagFlip flip(&_holdSwapping);
228  {
229  Unlock<Mutex> u(_lock);
230  _fileStore.replay(replayer_);
231  _memStore.replay(replayer_);
232  }
233  _lock.signalAll();
234  }
235 
238  size_t unpersistedCount() const
239  {
240  return _fileStore.unpersistedCount() + _memStore.unpersistedCount();
241  }
242 
251  virtual void flush(long timeout_)
252  {
253  Lock<Mutex> guard(_lock);
254  amps_uint64_t waitFor = _getHybridHighestUnpersisted();
255  amps_uint64_t unset = getUnsetSequence();
256  // Check that we aren't already empty
257  if (waitFor == unset)
258  {
259  return;
260  }
261  if (timeout_ > 0)
262  {
263  bool timedOut = false;
264  long waitTime = (timeout_ < 1000) ? timeout_ : 1000;
265  AMPS_START_TIMER(timeout_)
266  // While timeout hasn't expired and we haven't had everything acked
267  while (!timedOut && waitFor >= _getHybridLowestUnpersisted() &&
268  _getHybridLowestUnpersisted() != unset)
269  {
270  if (!_lock.wait(waitTime))
271  {
272  // May have woken up early, check real time
273  AMPS_RESET_TIMER(timedOut, timeout_);
274  waitTime = (timeout_ < 1000) ? timeout_ : 1000;
275  Unlock<Mutex> unlck(_lock);
276  amps_invoke_waiting_function();
277  }
278  }
279  // If we timed out and still haven't caught up with the acks
280  if (timedOut && waitFor >= _getHybridLowestUnpersisted() &&
281  _getHybridLowestUnpersisted() != unset)
282  {
283  throw TimedOutException("Timed out waiting to flush publish store.");
284  }
285  }
286  else
287  {
288  while (waitFor >= _getHybridLowestUnpersisted() &&
289  _getHybridLowestUnpersisted() != unset)
290  {
291  // Use timeout version so python can interrupt
292  _lock.wait(1000);
293  Unlock<Mutex> unlck(_lock);
294  amps_invoke_waiting_function();
295  }
296  }
297  }
298 
299  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
300  {
301  amps_uint64_t lowestIndexInMemory;
302  {
303  Lock<Mutex> guard(_lock);
304  lowestIndexInMemory = _lowestIndexInMemory;
305  }
306  if (index_ < lowestIndexInMemory)
307  {
308  return _fileStore.replaySingle(replayer_, index_);
309  }
310  else
311  {
312  return _memStore.replaySingle(replayer_, index_);
313  }
314  }
315 
319  amps_uint64_t store(const Message& message_)
320  {
321  Lock<Mutex> guard(_lock);
322  //while (_holdSwapping && _memStore.unpersistedCount() >= _cap)
323  while (_holdSwapping)
324  {
325  if (!_lock.wait(1000))
326  {
327  Unlock<Mutex> u(_lock);
328  amps_invoke_waiting_function();
329  }
330  }
331  if (_memStore.unpersistedCount() >= _cap && !_holdSwapping)
332  {
333  // Set _holdSwapping true to end of function
334  FlagFlip flip(&_holdSwapping);
335  SwappingOutReplayer swapper(&_fileStore,
336  _memStore.unpersistedCount() - _lowWatermark);
337  {
338  Unlock<Mutex> u(_lock);
339  _memStore.replay(swapper);
340  }
341  _lock.signalAll();
342  if (swapper.getErrors() == 0)
343  {
344  _lowestIndexInMemory = swapper.lastIndex();
345  _memStore.discardUpTo(_lowestIndexInMemory++);
346  }
347  }
348  return _memStore.store(message_);
349  }
350 
351  void setResizeHandler(PublishStoreResizeHandler handler_, void* data_)
352  {
353  _handlerData.init(handler_, data_);
354  _fileStore.setResizeHandler(HybridPublishStore::resizeHandler,
355  (void*)&_handlerData);
356  }
357 
358  inline virtual PublishStoreResizeHandler getResizeHandler() const
359  {
360  return _handlerData._handler;
361  }
362 
363  amps_uint64_t getLowestUnpersisted() const
364  {
365  Lock<Mutex> guard(_lock);
366  return _getHybridLowestUnpersisted();
367  }
368 
369  amps_uint64_t getHighestUnpersisted() const
370  {
371  Lock<Mutex> guard(_lock);
372  return _getHybridHighestUnpersisted();
373  }
374 
375  amps_uint64_t getLastPersisted(void)
376  {
377  Lock<Mutex> guard(_lock);
378  return _getHybridLastPersisted();
379  }
380 
381  private:
382 
383  // Resize handlers are invoked with Store not const Store&
384  static bool resizeHandler(Store store_, size_t size_, void* data_) // -V813
385  {
386  HandlerData* handlerData = (HandlerData*)data_;
387  //Unlock<Mutex> hybridUnlock(handlerData->_store->_lock);
388  return handlerData->_handler(store_, size_, handlerData->_data);
389  }
390 
391  // Lock should be held
392  amps_uint64_t _getHybridLowestUnpersisted() const
393  {
394  amps_uint64_t filemin = _fileStore.getLowestUnpersisted();
395  amps_uint64_t memmin = _memStore.getLowestUnpersisted();
396  if (filemin == AMPS_UNSET_SEQUENCE)
397  {
398  return memmin;
399  }
400  if (memmin == AMPS_UNSET_SEQUENCE || memmin > filemin)
401  {
402  return filemin;
403  }
404  // Only left with memmin <= filemin
405  return memmin;
406  }
407 
408  // Lock should be held
409  amps_uint64_t _getHybridHighestUnpersisted() const
410  {
411  amps_uint64_t filemax = _fileStore.getHighestUnpersisted();
412  amps_uint64_t memmax = _memStore.getHighestUnpersisted();
413  if (filemax == AMPS_UNSET_SEQUENCE)
414  {
415  return memmax;
416  }
417  if (memmax == AMPS_UNSET_SEQUENCE || memmax < filemax)
418  {
419  return filemax;
420  }
421  // Only left with memmax >= filemax
422  return memmax;
423  }
424 
425  amps_uint64_t _getHybridLastPersisted()
426  {
427  // If we've never swapped and nothing is in file
428  if (!_lowestIndexInMemory &&
429  _fileStore.unpersistedCount() == 0)
430  {
431  _fileStore.discardUpTo(_memStore.getLastPersisted());
432  return _fileStore.getLastPersisted();
433  }
434  amps_uint64_t memLast = _memStore.getLastPersisted();
435  amps_uint64_t fileLast = _fileStore.getLastPersisted();
436  return (memLast < fileLast) ? memLast : fileLast;
437  }
438 
439  MemoryPublishStore _memStore;
440  PublishStore _fileStore;
441  size_t _cap;
442  size_t _lowWatermark;
443  amps_uint64_t _lowestIndexInMemory;
444  mutable Mutex _lock;
445  HandlerData _handlerData;
446  volatile bool _holdSwapping;
447 
448  };//end HybridPublishStore
449 
450 }//end namespace AMPS
451 
452 #endif //_HYBRIDPUBLISHSTORE_H_
453 
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:927
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:899
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
Provides AMPS::PublishStore, a publish store that uses memory-mapped files to provide a publish store...
void replay(StoreReplayer &replayer_)
Used internally by Client to replay messages in the store to AMPS after a successful connection...
Definition: HybridPublishStore.hpp:216
amps_uint64_t store(const Message &message_)
Used internally by Client to put messages into the Store.
Definition: HybridPublishStore.hpp:319
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: HybridPublishStore.hpp:251
size_t unpersistedCount() const
The number of messages in the Store that have not been discarded.
Definition: HybridPublishStore.hpp:238
A StoreImpl implementation that uses a memory-mapped file to provide a publish store that persists ac...
Definition: PublishStore.hpp:46
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: HybridPublishStore.hpp:363
Core type, function, and class declarations for the AMPS C++ client.
HybridPublishStore(const char *fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:118
size_t getLowWatermark()
Get how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:171
void setLowWatermark(size_t lowWatermark_)
Set how many messags remain in memory after messages get offlined.
Definition: HybridPublishStore.hpp:159
bool(* PublishStoreResizeHandler)(Store store_, size_t size_, void *userData_)
Function type for PublishStore resize events The store_ param is store which is resizing.
Definition: ampsplusplus.hpp:921
Provides AMPS::MemoryPublishStore, a publish store that holds messages in memory. ...
void setResizeHandler(PublishStoreResizeHandler handler_, void *data_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: HybridPublishStore.hpp:351
void discardUpTo(amps_uint64_t index_)
Discard all messages in the store up to and including index_.
Definition: HybridPublishStore.hpp:180
Handle class for StoreImpl classes that track publish messages.
Definition: ampsplusplus.hpp:1034
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: HybridPublishStore.hpp:375
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Called by Client to get a single message replayed by the store onto the StoreReplayer.
Definition: HybridPublishStore.hpp:299
HybridPublishStore(const std::string &fileName_, size_t maxMemoryCapacity_)
Create a HybridPublishStore that will use fileName_ as its file storage and stores at most maxMemoryC...
Definition: HybridPublishStore.hpp:139
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:989
A StoreImpl implementation that uses MemoryStoreBuffer as its buffer to hold published messages in me...
Definition: MemoryPublishStore.hpp:44
An implementation of StoreImpl for publication.
Definition: HybridPublishStore.hpp:47