AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.1
BlockPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2021 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKPUBLISHSTORE_H_
27 #define _BLOCKPUBLISHSTORE_H_
28 #include <ampsplusplus.hpp>
29 #include <BlockStore.hpp>
30 #include <Buffer.hpp>
31 #include <sstream>
32 #include <stack>
33 #include <string>
34 #include <map>
35 #include <ampscrc.hpp>
36 
37 #ifdef _WIN32
38 #include <intrin.h>
39 #include <sys/timeb.h>
40 #else
41 #include <sys/time.h>
42 #if AMPS_SSE_42
43 #include <cpuid.h>
44 #endif
45 #endif
46 #include <iostream>
47 
52 
53 namespace AMPS
54 {
61 {
62 public:
63  typedef BlockStore::Block Block;
64  typedef Lock<BlockStore> BufferLock;
65  typedef Unlock<BlockStore> BufferUnlock;
66 
67  typedef enum
68  {
69  SOW_DELETE_DATA=0x01,
70  SOW_DELETE_FILTER=0x02,
71  SOW_DELETE_KEYS=0x04,
72  SOW_DELETE_BOOKMARK=0x08,
73  SOW_DELETE_BOOKMARK_CANCEL=0x10,
74  SOW_DELETE_UNKNOWN=0x80
75  } SowDeleteType;
76 
79  enum Constants : amps_uint32_t
80  {
81  DEFAULT_BLOCK_HEADER_SIZE = 32,
82  DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83  DEFAULT_BLOCKS_PER_REALLOC = 1000,
84  DEFAULT_BLOCK_SIZE = 2048
85  };
86 
87  /**********************************************************************
88  * Storage format
89  *************************************************************************
90  * Field Description | Type | # Bytes
91  *************************************************************************
92  * HEADER as detailed below | | 32 TOTAL
93  * | |
94  * Total number of blocks used by the record | uint32_t | 4
95  * Total length of the saved record | uint32_t | 4
96  * HA Message sequence | uint64_t | 8
97  * CRC Value - only set in first Block | uint64_t | 8
98  * next in chain offset | uint64_t | 8
99  *************************************************************************
100  * CHAIN HEADER as detailed below | | 64 TOTAL
101  * | |
102  * operation | uint32_t | 4
103  * command id length | uint32_t | 4
104  * correltation id length | uint32_t | 4
105  * expiration length | uint32_t | 4
106  * sow key length | uint32_t | 4
107  * topic length | uint32_t | 4
108  * sow delete flag | int32_t | 4
109  * ack types | uint32_t | 4
110  * unused [8] | uint32_t | 4*8 = 32
111  *************************************************************************
112  * DATA SECTION - can be spread across multiple blocks
113  *
114  * command id | char[]
115  * correlation id | char[]
116  * expiration | char[]
117  * sow key | char[]
118  * topic | char[]
119  * data | char[]
120  *************************************************************************/
121 
122  struct BlockHeader
123  {
124  amps_uint32_t _blocksToWrite;
125  amps_uint32_t _totalRemaining;
126  amps_uint64_t _seq;
127  amps_uint64_t _crcVal;
128  amps_uint64_t _nextInChain;
129  };
130 
131  struct BlockChainHeader
132  {
133  amps_uint32_t _operation;
134  amps_uint32_t _commandIdLen;
135  amps_uint32_t _correlationIdLen;
136  amps_uint32_t _expirationLen;
137  amps_uint32_t _sowKeyLen;
138  amps_uint32_t _topicLen;
139  amps_int32_t _flag;
140  amps_uint32_t _ackTypes;
141  amps_uint32_t _unused[8];
142  BlockChainHeader() // -V730
143  : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144  , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
145  , _ackTypes(0)
146  { ; }
147  };
148 
153  static inline amps_uint32_t getBlockHeaderSize()
154  {
155  return DEFAULT_BLOCK_HEADER_SIZE;
156  }
157 
163  static inline amps_uint32_t getBlockChainHeaderSize()
164  {
165  return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
166  }
167 
171  inline amps_uint32_t getBlockSize()
172  {
173  return _blockStore.getBlockSize();
174  }
175 
179  inline amps_uint32_t getBlockDataSize()
180  {
181  return _blockStore.getBlockSize() - getBlockHeaderSize();
182  }
183 
197  amps_uint32_t blocksPerRealloc_ = 1000,
198  bool isFile_ = false,
199  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
200  : StoreImpl()
201  , _blockStore(buffer_, blocksPerRealloc_,
202  DEFAULT_BLOCK_HEADER_SIZE,
203  (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE*2
204  ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE*2))
205  , _metadataBlock(0)
206  , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
207  , _stored(0)
208  {
209  _blockStore.setResizeHandler(&BlockPublishStore::canResize, (void*)this);
210  chooseCRC(isFile_);
211  if (!isFile_)
212  {
213  // This gets set in recover in file-based stores
214  BufferLock bufferGuard(_blockStore);
215  _blockStore.init();
216  _metadataBlock = _blockStore.get(1);
217  // Remove metadata block from used list
218  _blockStore.setUsedList(0);
219  _blockStore.setEndOfUsedList(0);
220  // Metadata block holds block size, block header size,
221  // last discarded sequence, client version
222  _metadataBlock->_sequence = (amps_uint64_t)0;
223  Buffer* pBuffer = _blockStore.getBuffer();
224  pBuffer->setPosition(_metadataBlock->_offset);
225  pBuffer->putUint32((amps_uint32_t)getBlockSize());
226  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
227  pBuffer->putUint64((amps_uint64_t)0);
228  // Metadata blocks puts client version in CRC position
229  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
230  // No next in chain
231  pBuffer->putUint64((amps_uint64_t)0);
232  }
233  }
234 
238  {
239  }
240 
247  virtual amps_uint64_t store(const Message& message_)
248  {
249  return store(message_, true);
250  }
251 
262  amps_uint64_t store(const Message& message_, bool assignSequence_)
263  {
264  const char *commandId, *correlationId, *expiration, *sowKey,
265  *topic, *data;
266  size_t dataLen = 0;
267  BlockHeader blockHeader;
268  BlockChainHeader chainHeader;
269  message_.getRawCommandId(&commandId, &dataLen);
270  chainHeader._commandIdLen = (amps_uint32_t)dataLen;
271  message_.getRawCorrelationId(&correlationId, &dataLen);
272  chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
273  message_.getRawExpiration(&expiration, &dataLen);
274  chainHeader._expirationLen = (amps_uint32_t)dataLen;
275  message_.getRawSowKey(&sowKey, &dataLen);
276  chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
277  message_.getRawTopic(&topic, &dataLen);
278  chainHeader._topicLen = (amps_uint32_t)dataLen;
279  message_.getRawData(&data, &dataLen);
280  chainHeader._flag = -1;
281  Message::Command::Type operation = message_.getCommandEnum();
282  chainHeader._operation = (amps_uint32_t)operation;
283  if (operation == Message::Command::SOWDelete)
284  {
285  if (dataLen > 0)
286  {
287  chainHeader._flag = SOW_DELETE_DATA;
288  }
289  else
290  {
291  message_.getRawFilter(&data, &dataLen);
292  if (dataLen > 0)
293  {
294  chainHeader._flag = SOW_DELETE_FILTER;
295  }
296  else
297  {
298  message_.getRawSowKeys(&data, &dataLen);
299  if (dataLen > 0)
300  {
301  chainHeader._flag = SOW_DELETE_KEYS;
302  }
303  else
304  {
305  message_.getRawBookmark(&data, &dataLen);
306  chainHeader._flag = SOW_DELETE_BOOKMARK;
307  // Check options for cancel
308  Message::Field options = message_.getOptions();
309  size_t remaining = options.len();
310  const void* next = NULL;
311  const void* start = (const void*)(options.data());
312  // Not necessarily null-terminated so no strstr
313  while (remaining >= 6 &&
314  (next = memchr(start,(int)'c',remaining)) != NULL)
315  {
316  remaining = (size_t)next-(size_t)start;
317  if (remaining >= 6 && strncmp((const char*)start,
318  "cancel", 6) == 0)
319  {
320  chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
321  break;
322  }
323  }
324  }
325  }
326  }
327  }
328  blockHeader._totalRemaining = (
329  (chainHeader._operation == Message::Command::Unknown)
330  ? 0
332  + chainHeader._commandIdLen
333  + chainHeader._correlationIdLen
334  + chainHeader._expirationLen
335  + chainHeader._sowKeyLen
336  + chainHeader._topicLen
337  + (amps_uint32_t)dataLen));
338  size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
339  (blockHeader._totalRemaining % getBlockDataSize()));
340  blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
341  ? 1
342  : ((amps_uint32_t)(blockHeader._totalRemaining
343  / getBlockDataSize())
344  + ((lastBlockLength > 0) ? 1 : 0)));
345  blockHeader._crcVal = (amps_uint64_t)0ULL;
346  blockHeader._crcVal = _crc(commandId,
347  chainHeader._commandIdLen,
348  blockHeader._crcVal);
349  blockHeader._crcVal = _crc(correlationId,
350  chainHeader._correlationIdLen,
351  blockHeader._crcVal);
352  blockHeader._crcVal = _crc(expiration,
353  chainHeader._expirationLen,
354  blockHeader._crcVal);
355  blockHeader._crcVal = _crc(sowKey,
356  chainHeader._sowKeyLen,
357  blockHeader._crcVal);
358  blockHeader._crcVal = _crc(topic,
359  chainHeader._topicLen,
360  blockHeader._crcVal);
361  blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
362 
363  // Reserve slots for storage, growing if necessary
364  BufferLock bufferGuard(_blockStore);
365  Block* first = _blockStore.get(blockHeader._blocksToWrite);
366  if (assignSequence_)
367  {
368  if (_lastSequence <= 2)
369  {
370  _getLastPersisted();
371  }
372  blockHeader._seq = ++_lastSequence;
373  }
374  else
375  {
376  blockHeader._seq = amps_message_get_field_uint64(message_.getMessage(),
377  AMPS_Sequence);
378  if (!_maxDiscarded)
379  {
380  _maxDiscarded = blockHeader._seq - 1;
381  }
382  if (blockHeader._seq >= _lastSequence)
383  {
384  _lastSequence = blockHeader._seq;
385  }
386  }
387 
388  try
389  {
390  size_t topicWritten = 0UL;
391  size_t dataWritten = 0UL;
392  size_t commandWritten = 0UL;
393  size_t correlationWritten = 0UL;
394  size_t expirationWritten = 0UL;
395  size_t sowKeyWritten = 0UL;
396  Buffer* pBuffer = _blockStore.getBuffer();
397  for(Block* next = first; next; next = next->_nextInChain)
398  {
399  next->_sequence = blockHeader._seq;
400  if (next->_nextInChain)
401  blockHeader._nextInChain = next->_nextInChain->_offset;
402  else
403  blockHeader._nextInChain = (amps_uint64_t)0;
404  // Set buffer to start of Block and write the header
405  pBuffer->setPosition(next->_offset);
406  pBuffer->putBytes((const char*)&blockHeader, sizeof(BlockHeader));
407  // Clear crcVal, as it's only written in the first Block
408  blockHeader._crcVal = (amps_uint64_t)0;
409  size_t bytesRemaining = getBlockDataSize();
410  if (next == first)
411  {
412  // Write Block chain header
413  chainHeader._ackTypes = (amps_uint32_t)message_.getAckTypeEnum();
414  pBuffer->putBytes((const char*)&chainHeader,
415  sizeof(BlockChainHeader));
416  pBuffer->setPosition(next->_offset+getBlockHeaderSize()+getBlockChainHeaderSize());
417  bytesRemaining -= getBlockChainHeaderSize();
418  }
419  else
420  {
421  pBuffer->setPosition(next->_offset+getBlockHeaderSize());
422  }
423 
424  if(commandWritten < chainHeader._commandIdLen)
425  {
426  size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
427  pBuffer->putBytes(commandId + commandWritten,
428  commandWrite);
429  bytesRemaining -= commandWrite;
430  commandWritten += commandWrite;
431  }
432  if(correlationWritten < chainHeader._correlationIdLen)
433  {
434  size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
435  pBuffer->putBytes(correlationId + correlationWritten,
436  correlationWrite);
437  bytesRemaining -= correlationWrite;
438  correlationWritten += correlationWrite;
439  }
440  if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
441  {
442  size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
443  pBuffer->putBytes(expiration + expirationWritten, expWrite);
444  bytesRemaining -= expWrite;
445  expirationWritten += expWrite;
446  }
447  if(bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
448  {
449  size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
450  pBuffer->putBytes(sowKey + sowKeyWritten, sowKeyWrite);
451  bytesRemaining -= sowKeyWrite;
452  sowKeyWritten += sowKeyWrite;
453  }
454  if(bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
455  {
456  size_t topicWrite = (chainHeader._topicLen - topicWritten
457  < bytesRemaining)
458  ? chainHeader._topicLen - topicWritten
459  : bytesRemaining;
460  pBuffer->putBytes(topic + topicWritten, topicWrite);
461  bytesRemaining -= topicWrite;
462  topicWritten += topicWrite;
463  }
464  if(bytesRemaining > 0 && dataWritten < dataLen)
465  {
466  size_t dataWrite = (dataLen-dataWritten < bytesRemaining) ?
467  dataLen-dataWritten : bytesRemaining;
468  pBuffer->putBytes(data + dataWritten, dataWrite);
469  bytesRemaining -= dataWrite;
470  dataWritten += dataWrite;
471  }
472  }
473  }
474  catch (const AMPSException& ex)
475  {
476  _blockStore.put(first);
477  throw ex;
478  }
479  AMPS_FETCH_ADD(&_stored, 1);
480  return blockHeader._seq;
481  }
482 
487  virtual void discardUpTo(amps_uint64_t index_)
488  {
489  // Get the lock
490  BufferLock bufferGuard(_blockStore);
491  Buffer* pBuffer = _blockStore.getBuffer();
492  // Don't use _getLastPersisted() here, don't want to set it
493  // to something other than index_ if it's not already set
494  amps_uint64_t lastPersisted = _metadataBlock->_sequence;
495  // Make sure it's a real index and we have messages to discard
496  if(index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
497  {
498  // During logon it's very possible we don't have a last persisted
499  // but that the Client is calling discardUpTo with the ack value.
500  if (lastPersisted < index_)
501  {
502  pBuffer->setPosition(_metadataBlock->_offset+8);
503  pBuffer->putUint64(index_);
504  _metadataBlock->_sequence = index_;
505  if (_maxDiscarded < index_)
506  {
507  _maxDiscarded = index_;
508  }
509  if (_lastSequence <= index_)
510  {
511  _lastSequence = index_;
512  }
513  }
514  else if (!index_) // Fresh logon, no sequence history
515  {
516  _getLastPersisted();
517  }
518  _blockStore.signalAll();
519  return;
520  }
521 
522  _maxDiscarded = index_;
523  AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
524  _blockStore.signalAll();
525  if (lastPersisted >= index_)
526  {
527  return;
528  }
529  pBuffer->setPosition(_metadataBlock->_offset+8);
530  pBuffer->putUint64(index_);
531  _metadataBlock->_sequence = index_;
532  if (_lastSequence < index_)
533  {
534  _lastSequence = index_;
535  }
536  }
537 
542  void replay(StoreReplayer& replayer_)
543  {
544  // Get the lock
545  BufferLock bufferGuard(_blockStore);
546  // If we don't have anything yet, return
547  if(!_blockStore.front()) return;
548  Block* next = _blockStore.front();
549  try
550  {
551  for (Block* block = _blockStore.front(); block; block = next)
552  {
553  // Replay the message
554  replayOnto(block, replayer_);
555  next = block->_nextInList;
556  }
557  }
558  catch (const StoreException& e)
559  {
560  _blockStore.putAll(next);
561  throw e;
562  }
563  }
564 
571  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
572  {
573  BufferLock bufferGuard(_blockStore);
574  // If we don't have anything yet, return
575  if(!_blockStore.front()) return false;
576  // Get the end point
577  amps_uint64_t lastIdx = _blockStore.back()->_sequence;
578  // Get the start point
579  amps_uint64_t leastIdx = _blockStore.front()->_sequence;
580  if(index_>=leastIdx && index_ <=lastIdx)
581  {
582  Block* block = _blockStore.front();
583  while(block && block->_sequence != index_)
584  {
585  block = block->_nextInList;
586  }
587  if (!block)
588  {
589  return false;
590  }
591  // If total bytes is 0, it's a queue ack and gets skipped.
592  Buffer* pBuffer = _blockStore.getBuffer();
593  pBuffer->setPosition(block->_offset +
594  sizeof(amps_uint32_t));
595  if (pBuffer->getUint32() == 0) return false;
596  replayOnto(block, replayer_);
597  return true;
598  }
599  else // Get Store and Client back in sync
600  {
601  _message.reset();
602  leastIdx -= 1;
603  _message.setSequence(leastIdx);
604  replayer_.execute(_message);
605  return false;
606  }
607  }
608 
614  size_t unpersistedCount() const
615  {
616  size_t count = (size_t)_stored;
617  return count;
618  }
619 
628  virtual void flush(long timeout_)
629  {
630  BufferLock bufferGuard(_blockStore);
631  amps_uint64_t waitFor = _getHighestUnpersisted();
632  // Check that we aren't already empty
633  if (waitFor == getUnsetSequence()) return;
634  if (timeout_ > 0)
635  {
636  bool timedOut = false;
637  AMPS_START_TIMER(timeout_);
638  // While timeout hasn't expired and we haven't had everything acked
639  while (!timedOut && _stored != 0
640  && waitFor >= _getLowestUnpersisted())
641  {
642  if (!_blockStore.wait(timeout_))
643  {
644  // May have woken up early, check real time
645  AMPS_RESET_TIMER(timedOut, timeout_);
646  }
647  }
648  // If we timed out and still haven't caught up with the acks
649  if (timedOut && _stored != 0
650  && waitFor >= _getLowestUnpersisted())
651  {
652  throw TimedOutException("Timed out waiting to flush publish store.");
653  }
654  }
655  else
656  {
657  while (_stored != 0 && waitFor >= _getLowestUnpersisted())
658  {
659  // Still wake up every 1s so python can interrupt
660  _blockStore.wait(1000);
661  // Don't hold lock if possibly grabbing GIL
662  BufferUnlock unlck(_blockStore);
663  amps_invoke_waiting_function();
664  }
665  }
666  }
667 
668  amps_uint64_t getLowestUnpersisted() const
669  {
670  BufferLock bufferGuard(_blockStore);
671  return _getLowestUnpersisted();
672  }
673 
674  amps_uint64_t getHighestUnpersisted() const
675  {
676  BufferLock bufferGuard(_blockStore);
677  return _getHighestUnpersisted();
678  }
679 
680  amps_uint64_t getLastPersisted(void)
681  {
682  BufferLock bufferGuard(_blockStore);
683  return _getLastPersisted();
684  }
685 
686 protected:
687  static bool canResize(size_t requestedSize_, void* vpThis_)
688  {
689  return ((BlockPublishStore*)vpThis_)->callResizeHandler(requestedSize_);
690  }
691 
692  amps_uint64_t _getLowestUnpersisted() const
693  {
694  // Assume the lock is held
695  // If we don't have anything, return MAX
696  if(!_blockStore.front()) return getUnsetSequence();
697  return _blockStore.front()->_sequence;
698  }
699 
700  amps_uint64_t _getHighestUnpersisted() const
701  {
702  // Assume the lock is held
703  // If we don't have anything, return MAX
704  if(!_blockStore.back()) return getUnsetSequence();
705  return _blockStore.back()->_sequence;
706  }
707 
708  amps_uint64_t _getLastPersisted(void)
709  {
710  // Assume the lock is held
711  amps_uint64_t lastPersisted = (amps_uint64_t)0;
712  Buffer* pBuffer = _blockStore.getBuffer();
713  pBuffer->setPosition(_metadataBlock->_offset+8);
714  lastPersisted = pBuffer->getUint64();
715  if (lastPersisted)
716  {
717  if (_lastSequence < lastPersisted)
718  _lastSequence = lastPersisted;
719  return lastPersisted;
720  }
721  if (_maxDiscarded)
722  {
723  lastPersisted = _maxDiscarded;
724  }
725  else
726  {
727 #ifdef _WIN32
728  struct _timeb t;
729  _ftime_s(&t);
730  lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
731 #else // not _WIN32
732  struct timeval tv;
733  gettimeofday(&tv, NULL);
734  lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
735  * (amps_uint64_t)1000000;
736 #endif
737  }
738  if (_lastSequence > 2)
739  {
740  amps_uint64_t low = _getLowestUnpersisted();
741  amps_uint64_t high = _getHighestUnpersisted();
742  if (low != getUnsetSequence())
743  {
744  lastPersisted = low - 1;
745  }
746  if (high != getUnsetSequence() && _lastSequence <= high)
747  {
748  _lastSequence = high;
749  }
750  if (_lastSequence < lastPersisted)
751  {
752  lastPersisted = _lastSequence - 1;
753  }
754  }
755  else
756  {
757  _lastSequence = lastPersisted;
758  }
759  pBuffer->setPosition(_metadataBlock->_offset
760  + sizeof(amps_uint32_t) // blocks used
761  + sizeof(amps_uint32_t)); // record length
762  pBuffer->putUint64(lastPersisted);
763  _metadataBlock->_sequence = lastPersisted;
764  return lastPersisted;
765  }
766 
767  void recover(void)
768  {
769  BufferLock bufferGuard(_blockStore);
770  // Make sure the size isn't 0 and is a multiple of block size
771  Buffer* pBuffer = _blockStore.getBuffer();
772  size_t size = pBuffer->getSize();
773  amps_uint32_t blockSize = getBlockSize();
774  if(size == 0)
775  {
776  _blockStore.init();
777  _metadataBlock = _blockStore.get(1);
778  _metadataBlock->_sequence = (amps_uint64_t)0;
779  pBuffer->setPosition(_metadataBlock->_offset);
780  // Metadata block holds block size, block header size,
781  // last discarded sequence, client version
782  pBuffer->putUint32((amps_uint32_t)blockSize);
783  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
784  pBuffer->putUint64((amps_uint64_t)0);
785  // Metadata blocks puts client version in CRC position
786  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
787  // No next in chain
788  pBuffer->putUint64((amps_uint64_t)0);
789  return;
790  }
791  size_t numBlocks = size / blockSize;
792  if(size % blockSize > 0)
793  {
794  // We shouldn't ever be in here, since it requires starting with a
795  // file that is not an even multiple of block size and we always
796  // fix the size.
797  numBlocks = size / blockSize;
798  ++numBlocks;
799  amps_uint32_t blockCount = 0;
800  // We allocate all the Blocks at once below so delete allocated Block[]
801  delete[] _blockStore.resizeBuffer(numBlocks*blockSize, &blockCount);
802  // Resize can fail if resizeHandler is set and refuses the request
803  // Since this is recovery, we need to simply fail in that case
804  if (size > pBuffer->getSize() || numBlocks != (size_t)blockCount)
805  {
806  throw StoreException("Publish Store could not resize correctly during recoery, possibly due to resizeHandler refusing the request.");
807  }
808  size = pBuffer->getSize();
809  }
810 
811  amps_uint64_t maxIdx = 0;
812  amps_uint64_t minIdx = 0;
813  size_t location = 0;
814  BlockHeader blockHeader;
815  // The blocks we create here all get their offset set in below loop
816  Block* blocks = new Block[numBlocks];
817  blocks[numBlocks-1]._nextInList = 0;
818  size_t blockNum = 0;
819  _blockStore.addBlocks(blocks);
820  _metadataBlock = blocks; // The first Block is metadata
821  _metadataBlock->_nextInList = 0;
822  pBuffer->setPosition(0);
823  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
824  /* Metadata Block header fields
825  * amps_uint32_t _blocksToWrite = BlockSize
826  * amps_uint32_t _totalRemaining = BlockHeaderSize
827  * amps_uint64_t _seq = last persisted sequence number
828  * amps_uint64_t _crcVal = unused
829  * amps_uint64_t _nextInChain = unused
830  */
831  if (blockHeader._blocksToWrite == 1) // Old store format?
832  {
833  /* Old format metadata block header fields
834  * amps_uint32_t _blocksToWrite = 1
835  * amps_uint32_t _totalRemaining = client version
836  * amps_uint64_t _seq = last persisted sequence number
837  * amps_uint64_t _crcVal = unused
838  * amps_uint64_t _nextInChain = unused
839  */
840  // Readable old format starts with version 5.0.0.0
841  if (blockHeader._totalRemaining >= 5000000)
842  {
843  // All recovery needs to be based on old format
844  // so go do that instead.
845  recoverOldFormat(blocks);
846  return;
847  }
848  // Unreadable format, fail
849  throw StoreException("Unrecognized format for Store. Can't recover.");
850  }
851  if (blockHeader._blocksToWrite == 0)
852  {
853  pBuffer->setPosition(0);
854  pBuffer->putUint32((amps_uint32_t)blockSize);
855  }
856  else
857  {
858  blockSize = blockHeader._blocksToWrite;
859  _blockStore.setBlockSize(blockSize);
860  }
861  if (blockHeader._totalRemaining == 0)
862  {
863  pBuffer->setPosition(sizeof(amps_uint32_t));
864  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
865  }
866  else
867  {
868  _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
869  }
870  _metadataBlock->_sequence = blockHeader._seq;
871  if (_metadataBlock->_sequence
872  && _metadataBlock->_sequence < (amps_uint64_t)1000000)
873  {
874  pBuffer->setPosition(_metadataBlock->_offset
875  + sizeof(amps_uint32_t) // BlockSize
876  + sizeof(amps_uint32_t)); // BlockHeaderSize
877  pBuffer->putUint64((amps_uint64_t)0);
878  _metadataBlock->_sequence = 0;
879  }
880  else
881  {
882  // Set _maxDiscarded and _lastSequence
883  _maxDiscarded = _metadataBlock->_sequence;
884  _lastSequence = _maxDiscarded;
885  }
886  // This would be where to check the client version string
887  // No checks currently
888  location += blockSize;
889  amps_uint32_t freeCount = 0;
890  Block* firstFree = NULL;
891  Block* endOfFreeList = NULL;
892  // Used to create used list in order after recovery
893  typedef std::map<amps_uint64_t, Block*> RecoverMap;
894  RecoverMap recoveredBlocks;
895  while(location < size)
896  {
897  // Get index and check if non-zero
898  pBuffer->setPosition(location);
899  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
900  if((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
901  (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
902  {
903  // Block is part of a chain
904  location += blockSize;
905  continue;
906  }
907  Block* block = blocks[++blockNum].setOffset(location);
908  bool recovered = false;
909  if(blockHeader._seq > 0 && blockHeader._totalRemaining < size)
910  {
911  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
912  block->_sequence = blockHeader._seq;
913  // Track min and max
914  if(maxIdx < blockHeader._seq)
915  {
916  maxIdx = blockHeader._seq;
917  }
918  if(minIdx > blockHeader._seq)
919  {
920  minIdx = blockHeader._seq;
921  }
922  // Save it in recovered blocks
923  recoveredBlocks[blockHeader._seq] = block;
924  // Set up the chain
925  while (blockHeader._nextInChain != (amps_uint64_t)0)
926  {
927  Block* chain = blocks[++blockNum]
928  .setOffset((size_t)blockHeader._nextInChain);
929  chain->_nextInList = 0;
930  pBuffer->setPosition((size_t)blockHeader._nextInChain
931  + sizeof(amps_uint32_t) // blocks used
932  + sizeof(amps_uint32_t) // record length
933  + sizeof(amps_uint64_t) // seq
934  + sizeof(amps_uint64_t)); // crc
935  blockHeader._nextInChain = pBuffer->getUint64();
936  block->_nextInChain = chain;
937  block = chain;
938  block->_sequence = blockHeader._seq;
939  }
940  recovered = true;
941  }
942  if (!recovered)
943  {
944  // Put this Block on the free list
945  if (endOfFreeList)
946  {
947  endOfFreeList->_nextInList = block;
948  }
949  else
950  {
951  firstFree = block;
952  }
953  endOfFreeList = block;
954  ++freeCount;
955  }
956  location += blockSize;
957  }
958  if (endOfFreeList)
959  {
960  endOfFreeList->_nextInList = 0;
961  }
962  _blockStore.setFreeList(firstFree, freeCount);
963  if (maxIdx > _lastSequence)
964  {
965  _lastSequence = maxIdx;
966  }
967  if (minIdx > _maxDiscarded + 1)
968  {
969  _maxDiscarded = minIdx - 1;
970  }
971  if (_maxDiscarded > _metadataBlock->_sequence)
972  {
973  _metadataBlock->_sequence = _maxDiscarded;
974  pBuffer->setPosition(_metadataBlock->_offset+8);
975  pBuffer->putUint64(_maxDiscarded);
976  }
977  Block* end = NULL;
978  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
979  for (RecoverMap::iterator i = recoveredBlocks.begin();
980  i != recoveredBlocks.end(); ++i)
981  {
982  if (end)
983  {
984  end->_nextInList = i->second;
985  }
986  else
987  {
988  _blockStore.setUsedList(i->second);
989  }
990  end = i->second;
991  }
992  if (end)
993  {
994  end->_nextInList = 0;
995  }
996  _blockStore.setEndOfUsedList(end);
997  }
998 
999 private:
1000  // Lock should already be held
1001  void replayOnto(Block* block_, StoreReplayer& replayer_)
1002  {
1003  // Read the header
1004  size_t start = block_->_offset;
1005  size_t position = start;
1006  Buffer* pBuffer = _blockStore.getBuffer();
1007  pBuffer->setPosition(position);
1008  BlockHeader blockHeader;
1009  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1010  if (blockHeader._totalRemaining == 0)
1011  {
1012  // Queue acking sow_delete
1013  return;
1014  }
1015  position += getBlockHeaderSize();
1016  BlockChainHeader blockChainHeader;
1017  pBuffer->copyBytes((char*)&blockChainHeader, sizeof(blockChainHeader));
1018  if (blockChainHeader._operation == Message::Command::Unknown)
1019  {
1020  // Queue acking sow_delete
1021  return;
1022  }
1023  blockChainHeader._ackTypes |= Message::AckType::Persisted;
1024  position += getBlockChainHeaderSize();
1025  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
1026  pBuffer->setPosition(position);
1027 
1028  if (blockHeader._totalRemaining
1029  < blockChainHeader._commandIdLen
1030  + blockChainHeader._correlationIdLen
1031  + blockChainHeader._expirationLen
1032  + blockChainHeader._sowKeyLen
1033  + blockChainHeader._topicLen)
1034  {
1035  std::ostringstream os;
1036  os << "Corrupted message found with invalid lengths. "
1037  << "Attempting to replay " << block_->_sequence
1038  << ". Block sequence " << blockHeader._seq
1039  << ", topic length " << blockChainHeader._topicLen
1040  << ", data length " << blockHeader._totalRemaining
1041  << ", command ID length " << blockChainHeader._commandIdLen
1042  << ", correlation ID length " << blockChainHeader._correlationIdLen
1043  << ", expiration length " << blockChainHeader._expirationLen
1044  << ", sow key length " << blockChainHeader._sowKeyLen
1045  << ", start " << start
1046  << ", position " << position
1047  << ", buffer size " << pBuffer->getSize();
1048  throw StoreException(os.str());
1049  }
1050 
1051  // Start prepping the message
1052  _message.reset();
1053  _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1054  _message.setAckTypeEnum((unsigned)blockChainHeader._ackTypes
1055  | Message::AckType::Persisted);
1056  _message.setSequence(blockHeader._seq);
1057  // Read the data and calculate the CRC
1058  Block* current = block_;
1059  size_t blockBytesRemaining = getBlockDataSize() - getBlockChainHeaderSize();
1060  amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1061  // Use tmpBuffers for any fields split across Block boundaries
1062  char** tmpBuffers = (blockHeader._blocksToWrite>1) ? new char*[blockHeader._blocksToWrite-1] : 0;
1063  size_t blockNum = 0;
1064  if (blockChainHeader._commandIdLen > 0)
1065  {
1066  if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1067  {
1068  _message.assignCommandId(pBuffer->getBytes(blockChainHeader._commandIdLen)._data,
1069  blockChainHeader._commandIdLen);
1070  blockBytesRemaining -= blockChainHeader._commandIdLen;
1071  }
1072  else
1073  {
1074  tmpBuffers[blockNum] = new char[blockChainHeader._commandIdLen]; // -V522
1075  size_t totalLeft = blockChainHeader._commandIdLen;
1076  size_t totalRead = 0;
1077  size_t readLen = 0;
1078  while (totalLeft)
1079  {
1080  readLen = blockBytesRemaining < totalLeft ?
1081  blockBytesRemaining : totalLeft;
1082  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1083  if (!(totalLeft -= readLen)) break;
1084  if (!(current = current->_nextInChain)) break;
1085  totalRead += readLen;
1086  blockBytesRemaining = getBlockDataSize();
1087  position = current->_offset + getBlockHeaderSize();
1088  pBuffer->setPosition(position);
1089  }
1090  blockBytesRemaining -= readLen;
1091  _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1092  }
1093  blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1094  crcCalc = _crc(_message.getCommandId().data(),
1095  blockChainHeader._commandIdLen, crcCalc);
1096  }
1097  if (blockChainHeader._correlationIdLen > 0)
1098  {
1099  if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1100  {
1101  _message.assignCorrelationId(
1102  pBuffer->getBytes(blockChainHeader._correlationIdLen)._data,
1103  blockChainHeader._correlationIdLen);
1104  blockBytesRemaining -= blockChainHeader._correlationIdLen;
1105  }
1106  else
1107  {
1108  tmpBuffers[blockNum] = new char[blockChainHeader._correlationIdLen]; // -V522
1109  size_t totalLeft = blockChainHeader._correlationIdLen;
1110  size_t totalRead = 0;
1111  size_t readLen = 0;
1112  while (totalLeft)
1113  {
1114  readLen = blockBytesRemaining < totalLeft ?
1115  blockBytesRemaining : totalLeft;
1116  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1117  if (!(totalLeft -= readLen)) break;
1118  if (!(current = current->_nextInChain)) break; // -V522
1119  totalRead += readLen;
1120  blockBytesRemaining = getBlockDataSize();
1121  position = current->_offset + getBlockHeaderSize();
1122  pBuffer->setPosition(position);
1123  }
1124  blockBytesRemaining -= readLen;
1125  _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1126  }
1127  blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1128  crcCalc = _crc(_message.getCorrelationId().data(),
1129  blockChainHeader._correlationIdLen, crcCalc);
1130  }
1131  if (blockChainHeader._expirationLen > 0)
1132  {
1133  if (blockChainHeader._expirationLen <= blockBytesRemaining)
1134  {
1135  _message.assignExpiration(
1136  pBuffer->getBytes(blockChainHeader._expirationLen)._data,
1137  blockChainHeader._expirationLen);
1138  blockBytesRemaining -= blockChainHeader._expirationLen;
1139  }
1140  else
1141  {
1142  tmpBuffers[blockNum] = new char[blockChainHeader._expirationLen]; // -V522
1143  size_t totalLeft = blockChainHeader._expirationLen;
1144  size_t totalRead = 0;
1145  size_t readLen = 0;
1146  while (totalLeft)
1147  {
1148  readLen = blockBytesRemaining < totalLeft ?
1149  blockBytesRemaining : totalLeft;
1150  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1151  if (!(totalLeft -= readLen)) break;
1152  if (!(current = current->_nextInChain)) break;
1153  totalRead += readLen;
1154  blockBytesRemaining = getBlockDataSize();
1155  position = current->_offset + getBlockHeaderSize();
1156  pBuffer->setPosition(position);
1157  }
1158  blockBytesRemaining -= readLen;
1159  _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1160  }
1161  blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1162  crcCalc = _crc(_message.getExpiration().data(),
1163  blockChainHeader._expirationLen, crcCalc);
1164  }
1165  if (blockChainHeader._sowKeyLen > 0)
1166  {
1167  if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1168  {
1169  _message.assignSowKey(pBuffer->getBytes(blockChainHeader._sowKeyLen)._data,
1170  blockChainHeader._sowKeyLen);
1171  blockBytesRemaining -= blockChainHeader._sowKeyLen;
1172  }
1173  else
1174  {
1175  tmpBuffers[blockNum] = new char[blockChainHeader._sowKeyLen]; // -V522
1176  size_t totalLeft = blockChainHeader._sowKeyLen;
1177  size_t totalRead = 0;
1178  size_t readLen = 0;
1179  while (totalLeft)
1180  {
1181  readLen = blockBytesRemaining < totalLeft ?
1182  blockBytesRemaining : totalLeft;
1183  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1184  if (!(totalLeft -= readLen)) break;
1185  if (!(current = current->_nextInChain)) break;
1186  totalRead += readLen;
1187  blockBytesRemaining = getBlockDataSize();
1188  position = current->_offset + getBlockHeaderSize();
1189  pBuffer->setPosition(position);
1190  }
1191  blockBytesRemaining -= readLen;
1192  _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1193  }
1194  blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1195  crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1196  }
1197  if (blockChainHeader._topicLen > 0)
1198  {
1199  if (blockChainHeader._topicLen <= blockBytesRemaining)
1200  {
1201  _message.assignTopic(pBuffer->getBytes(blockChainHeader._topicLen)._data,
1202  blockChainHeader._topicLen);
1203  blockBytesRemaining -= blockChainHeader._topicLen;
1204  }
1205  else
1206  {
1207  tmpBuffers[blockNum] = new char[blockChainHeader._topicLen]; // -V522
1208  size_t totalLeft = blockChainHeader._topicLen;
1209  size_t totalRead = 0;
1210  size_t readLen = 0;
1211  while (totalLeft)
1212  {
1213  readLen = blockBytesRemaining < totalLeft ?
1214  blockBytesRemaining : totalLeft;
1215  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1216  if (!(totalLeft -= readLen)) break;
1217  if (!(current = current->_nextInChain)) break;
1218  totalRead += readLen;
1219  blockBytesRemaining = getBlockDataSize();
1220  position = current->_offset + getBlockHeaderSize();
1221  pBuffer->setPosition(position);
1222  }
1223  blockBytesRemaining -= readLen;
1224  _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1225  }
1226  blockHeader._totalRemaining -= blockChainHeader._topicLen;
1227  crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1228  }
1229  if (blockHeader._totalRemaining > 0)
1230  {
1231  if (blockHeader._totalRemaining <= blockBytesRemaining)
1232  {
1233  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1234  {
1235  _message.assignData(
1236  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1237  blockHeader._totalRemaining);
1238  crcCalc = _crc(_message.getData().data(),
1239  blockHeader._totalRemaining, crcCalc);
1240  }
1241  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1242  {
1243  _message.assignFilter(
1244  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1245  blockHeader._totalRemaining);
1246  crcCalc = _crc(_message.getFilter().data(),
1247  blockHeader._totalRemaining, crcCalc);
1248  }
1249  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1250  {
1251  _message.assignSowKeys(
1252  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1253  blockHeader._totalRemaining);
1254  crcCalc = _crc(_message.getSowKeys().data(),
1255  blockHeader._totalRemaining, crcCalc);
1256  }
1257  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1258  {
1259  _message.assignBookmark(
1260  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1261  blockHeader._totalRemaining);
1262  crcCalc = _crc(_message.getBookmark().data(),
1263  blockHeader._totalRemaining, crcCalc);
1264  }
1265  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1266  {
1267  _message.assignBookmark(
1268  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1269  blockHeader._totalRemaining);
1270  crcCalc = _crc(_message.getBookmark().data(),
1271  blockHeader._totalRemaining, crcCalc);
1272  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1273  }
1274  }
1275  else
1276  {
1277  tmpBuffers[blockNum] = new char[blockHeader._totalRemaining]; // -V522
1278  size_t totalLeft = blockHeader._totalRemaining;
1279  size_t totalRead = 0;
1280  size_t readLen = 0;
1281  while (totalLeft)
1282  {
1283  readLen = blockBytesRemaining < totalLeft ?
1284  blockBytesRemaining : totalLeft;
1285  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1286  if (!(totalLeft -= readLen)) break;
1287  if (!(current = current->_nextInChain)) break;
1288  totalRead += readLen;
1289  blockBytesRemaining = getBlockDataSize();
1290  position = current->_offset + getBlockHeaderSize();
1291  pBuffer->setPosition(position);
1292  }
1293  position+=readLen;
1294  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1295  _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1296  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1297  _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1298  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1299  _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1300  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1301  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1302  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1303  {
1304  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1305  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1306  }
1307  crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc); // -V595
1308  }
1309  }
1310 
1311  // Validate the crc and seq
1312  if(crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1313  {
1314  std::ostringstream os;
1315  os << "Corrupted message found by CRC or sequence "
1316  << "Attempting to replay " << block_->_sequence
1317  << ". Block sequence " << blockHeader._seq
1318  << ", expiration length " << blockChainHeader._expirationLen
1319  << ", sowKey length " << blockChainHeader._sowKeyLen
1320  << ", topic length " << blockChainHeader._topicLen
1321  << ", data length " << blockHeader._totalRemaining
1322  << ", command ID length " << blockChainHeader._commandIdLen
1323  << ", correlation ID length " << blockChainHeader._correlationIdLen
1324  << ", flag " << blockChainHeader._flag
1325  << ", expected CRC " << blockHeader._crcVal
1326  << ", actual CRC " << crcCalc
1327  << ", start " << start
1328  << ", position " << position
1329  << ", buffer size " << pBuffer->getSize();
1330  for (Block* block = block_; block; block = block->_nextInChain)
1331  {
1332  os << "\n BLOCK " << block->_offset;
1333  }
1334  if (tmpBuffers)
1335  {
1336  for (amps_uint32_t i=0; i<blockNum; ++i)
1337  delete[] tmpBuffers[i]; // -V522
1338  delete[] tmpBuffers;
1339  }
1340  throw StoreException(os.str());
1341  }
1342  // Replay the message
1343  replayer_.execute(_message);
1344  // Free the buffer if allocated
1345  if (tmpBuffers)
1346  {
1347  for (amps_uint32_t i=0; i<blockNum; ++i)
1348  delete[] tmpBuffers[i]; // -V522
1349  delete[] tmpBuffers;
1350  }
1351  }
1352 
1353  // Lock should already be held
1354  // Read an older format file and update it.
1355  void recoverOldFormat(Block* blocks)
1356  {
1357  Buffer* pBuffer = _blockStore.getBuffer();
1358  amps_uint64_t maxIdx = 0;
1359  amps_uint64_t minIdx = 0;
1360  size_t size = pBuffer->getSize();
1361  size_t location = 0;
1362  pBuffer->setPosition(location);
1363  pBuffer->putUint32((amps_uint32_t)getBlockSize());
1364  pBuffer->putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1365  _metadataBlock->_sequence = pBuffer->getUint64();
1366  if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1367  {
1368  pBuffer->setPosition(_metadataBlock->_offset+8);
1369  pBuffer->putUint64((amps_uint64_t)0);
1370  _metadataBlock->_sequence = 0;
1371  }
1372  else
1373  {
1374  // Set _maxDiscarded and _lastSequence
1375  _maxDiscarded = _metadataBlock->_sequence;
1376  _lastSequence = _maxDiscarded;
1377  }
1378  // Write the current client version
1379  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1380  // No next in chain
1381  pBuffer->putUint64((amps_uint64_t)0);
1382  // No checks currently
1383  location += getBlockSize();
1384  amps_uint32_t freeCount = 0;
1385  Block* firstFree = NULL;
1386  Block* endOfFreeList = NULL;
1387  size_t blockSize = getBlockSize();
1388  size_t numBlocks = size / blockSize;
1389  size_t blockNum = 0;
1390  // Used to create used list in order after recovery
1391  typedef std::map<amps_uint64_t, Block*> RecoverMap;
1392  RecoverMap recoveredBlocks;
1393  RecoverMap growingBlocks;
1394  amps_uint32_t growthBlocksNeeded = 0;
1395  while(location < size)
1396  {
1397  // Get seq and check if non-zero
1398  pBuffer->setPosition(location);
1399  BlockHeader blockHeader;
1400  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1401  size_t blockCount = (size_t)blockHeader._blocksToWrite;
1402  if(blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1403  && blockHeader._totalRemaining < size
1404  && blockHeader._blocksToWrite < numBlocks
1405  && (blockHeader._blocksToWrite*blockSize)
1406  >= blockHeader._totalRemaining)
1407  {
1408  size_t oldFormatSize = blockHeader._totalRemaining;
1409  // Old format total was storage bytes plus 64 bytes for block
1410  // and chain headers.
1411  blockHeader._totalRemaining -= 64;
1412  // New format counts only chain header size
1413  blockHeader._totalRemaining += getBlockChainHeaderSize();
1414  // Get the rest of the header
1415  BlockChainHeader chainHeader;
1416  // Need to reset location to after OLD header:
1417  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1418  // amps_uint64_t seq, amps_uint64_t crc
1419  pBuffer->setPosition(location + (sizeof(amps_uint32_t)*2)
1420  + (sizeof(amps_uint64_t)*2) );
1421  // Read old chain header which uses same order, but not
1422  // as many bytes (everything is 32bit):
1423  // operation, commandIdLen, correlationIdLen,
1424  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1425  pBuffer->copyBytes((char*)&chainHeader,
1426  sizeof(amps_uint32_t) * 8);
1427  // Check for garbage, likely indicating this is part of a chain
1428  if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1429  + chainHeader._expirationLen + chainHeader._sowKeyLen
1430  + chainHeader._topicLen) > blockHeader._totalRemaining)
1431  {
1432  // Skip this block, can't be real data
1433  location += getBlockSize();
1434  continue;
1435  }
1436  // Check if data fits in current number of blocks
1437  amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1438  / getBlockDataSize())
1439  + (blockHeader._totalRemaining
1440  % getBlockDataSize()
1441  ? 1 : 0);
1442  if (blocksNeeded == blockHeader._blocksToWrite)
1443  {
1444  Block* first = blocks[++blockNum].setOffset(location);
1445  first->_nextInList = 0;
1446  first->_sequence = blockHeader._seq;
1447  if (blockHeader._blocksToWrite > 1)
1448  {
1449  // CRC is only set on the first block
1450  amps_uint64_t crcVal = blockHeader._crcVal;
1451  blockHeader._crcVal = 0;
1452  Block* current = 0;
1453  // It fits, just need to adjust the block formats
1454  // and set up the chain. Start with the last block
1455  // and move data as needed starting at the end.
1456  size_t currentBlockNum = blockNum
1457  + blockHeader._blocksToWrite
1458  - 1;
1459  // Last item could wrap to beginning, but beginning is
1460  // block 1, not 0, which is the metadata block.
1461  if (currentBlockNum >= numBlocks)
1462  {
1463  currentBlockNum = currentBlockNum - numBlocks + 1;
1464  }
1465  if (currentBlockNum < blockNum)
1466  {
1467  Block* last = blocks[currentBlockNum]
1468  .init(currentBlockNum, getBlockSize());
1469  if ((current = firstFree) == last)
1470  {
1471  firstFree = firstFree->_nextInList; // -V522
1472  if (!firstFree) endOfFreeList = 0;
1473  --freeCount;
1474  }
1475  else
1476  {
1477  while (current)
1478  {
1479  if (current->_nextInList == last)
1480  {
1481  current->_nextInList = last->_nextInList;
1482  current = last;
1483  --freeCount;
1484  break;
1485  }
1486  current=current->_nextInList;
1487  }
1488  }
1489  }
1490  if (!current)
1491  {
1492  current = blocks[currentBlockNum]
1493  .init(currentBlockNum, getBlockSize());
1494  }
1495  // Initially, the number of bytes in last block
1496  size_t dataBytes = oldFormatSize % getBlockSize();
1497  while (current != first)
1498  {
1499  current->_nextInList = 0;
1500  current->_sequence = blockHeader._seq;
1501  // Set _nextInChain on previous Block, will include first
1502  if (--currentBlockNum < 1
1503  || currentBlockNum > numBlocks)
1504  {
1505  currentBlockNum = numBlocks - 1;
1506  }
1507  Block* previous = blocks[currentBlockNum]
1508  .init(currentBlockNum,
1509  getBlockSize());
1510  previous->_nextInChain = current;
1511  // Shift to make room for a header in every block
1512  // Not growing, so this won't write past the end.
1513  // Shift amount accounts for a header added to each
1514  // block after the first plus any change in the
1515  // chain header size from 32, which is the old size.
1516  size_t bytesToMove = --blockCount
1517  * getBlockHeaderSize()
1519  - 32);
1520  pBuffer->copyBytes(current->_offset + bytesToMove,
1521  current->_offset,
1522  dataBytes);
1523  dataBytes = getBlockSize();
1524  if (bytesToMove > getBlockHeaderSize())
1525  {
1526  bytesToMove -= getBlockHeaderSize();
1527  dataBytes -= bytesToMove;
1528  pBuffer->copyBytes(current->_offset
1529  + getBlockHeaderSize(),
1530  previous->_offset
1531  + dataBytes,
1532  bytesToMove);
1533  }
1534  // Set next in chain for this block's header
1535  blockHeader._nextInChain = (current->_nextInChain
1536  ? current->_nextInChain->_offset
1537  : (amps_uint64_t)0);
1538  // Write the header for this block
1539  pBuffer->setPosition(current->_offset);
1540  pBuffer->putBytes((const char*)&blockHeader,
1541  sizeof(BlockHeader));
1542  if (firstFree == previous)
1543  {
1544  firstFree = firstFree->_nextInList;
1545  if (!firstFree) endOfFreeList = 0;
1546  --freeCount;
1547  }
1548  else
1549  {
1550  current = firstFree;
1551  while (current)
1552  {
1553  if (current->_nextInList == previous)
1554  {
1555  current->_nextInList = previous->_nextInList;
1556  --freeCount;
1557  break;
1558  }
1559  current=current->_nextInList;
1560  }
1561  }
1562  current = previous;
1563  }
1564  blockNum += blockHeader._blocksToWrite - 1;
1565  blockHeader._crcVal = crcVal;
1566  }
1567  // Move bytes for chain header expansion from 32 bytes
1568  size_t bytesToMove = getBlockDataSize() - 32
1569  - (getBlockChainHeaderSize() - 32);
1570  pBuffer->copyBytes(first->_offset + getBlockHeaderSize()
1572  first->_offset+getBlockHeaderSize()+32,
1573  bytesToMove);
1574  // Rewrite the header and chain header for first Block.
1575  pBuffer->setPosition(first->_offset);
1576  blockHeader._nextInChain = (first->_nextInChain
1577  ? first->_nextInChain->_offset
1578  : (amps_uint64_t)0);
1579  pBuffer->putBytes((const char*)&blockHeader,
1580  sizeof(BlockHeader));
1581  pBuffer->putBytes((const char*)&chainHeader,
1582  sizeof(BlockChainHeader));
1583  // Add first Block to recovered for building the used
1584  // list later
1585  recoveredBlocks[blockHeader._seq] = first;
1586  }
1587  else
1588  {
1589  // This will need at least one more Block due to a header in
1590  // every Block. Check how many and save for later.
1591  growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1592  growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1593  blockNum += blockHeader._blocksToWrite - 1;
1594  }
1595  // Track min and max
1596  if (maxIdx < blockHeader._seq)
1597  {
1598  maxIdx = blockHeader._seq;
1599  }
1600  if (minIdx > blockHeader._seq)
1601  {
1602  minIdx = blockHeader._seq;
1603  }
1604  // Advance past read blocks
1605  location += blockHeader._blocksToWrite * getBlockSize();
1606  // Either we're exiting loop, or blockNum is in range
1607  assert(location >= size || blockNum < numBlocks);
1608  }
1609  else
1610  {
1611  // Put this Block on the free list
1612  Block* block = blocks[++blockNum].setOffset(location);
1613  if (endOfFreeList)
1614  {
1615  endOfFreeList->_nextInList = block;
1616  }
1617  else
1618  {
1619  firstFree = block;
1620  }
1621  endOfFreeList = block;
1622  ++freeCount;
1623  location += blockSize;
1624  }
1625  }
1626  for (RecoverMap::iterator i = growingBlocks.begin();
1627  i != growingBlocks.end(); ++i)
1628  {
1629  Block* first = i->second;
1630  pBuffer->setPosition(first->_offset);
1631  BlockHeader blockHeader;
1632  // Read an old BlockHeader, which is only 24 bytes.
1633  // The bytes match current BlockHeader, and _nextInChain is 0.
1634  pBuffer->copyBytes((char*)&blockHeader, 24);
1635  // Old format total was storage bytes plus 64 bytes for block
1636  // and chain headers.
1637  blockHeader._totalRemaining -= 64;
1638  // New format counts only chain header size
1639  blockHeader._totalRemaining += getBlockChainHeaderSize();
1640  if (freeCount < growthBlocksNeeded)
1641  {
1642  // We have to resize, let's try to do it once.
1643  amps_uint32_t minBlocksRequired = growthBlocksNeeded-freeCount;
1644  amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1645  if (growthBlocks < minBlocksRequired)
1646  {
1647  amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1648  if (minBlocksRequired%defaultBlocks)
1649  minBlocksRequired = (minBlocksRequired/defaultBlocks+1)
1650  * defaultBlocks;
1651  growthBlocks = minBlocksRequired;
1652  }
1653  amps_uint32_t newBlocks = 0;
1654  Block* addedBlocks = _blockStore.resizeBuffer(
1655  pBuffer->getSize()
1656  + growthBlocks * blockSize,
1657  &newBlocks);
1658  if (!addedBlocks)
1659  {
1660  throw StoreException("Failed to grow store buffer during recovery");
1661  }
1662  _blockStore.addBlocks(addedBlocks);
1663  freeCount += newBlocks;
1664  growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1665  ? growthBlocksNeeded - freeCount : 0;
1666  if (endOfFreeList)
1667  {
1668  endOfFreeList->_nextInList = addedBlocks;
1669  }
1670  else
1671  {
1672  firstFree = addedBlocks;
1673  }
1674  endOfFreeList = &(addedBlocks[newBlocks-1]);
1675  endOfFreeList->_nextInList = 0;
1676  }
1677  expandBlocks(blocks, first->_offset, first, blockHeader,
1678  &firstFree, &freeCount, pBuffer);
1679  // Add first Block to recovered for building the used list later
1680  recoveredBlocks[blockHeader._seq] = first;
1681  if (!firstFree) endOfFreeList = 0;
1682  }
1683  if (endOfFreeList) endOfFreeList->_nextInList = 0;
1684  _blockStore.setFreeList(firstFree, freeCount);
1685  if (maxIdx > _lastSequence) _lastSequence = maxIdx;
1686  if (minIdx > _maxDiscarded + 1) _maxDiscarded = minIdx - 1;
1687  if (_maxDiscarded > _metadataBlock->_sequence)
1688  {
1689  _metadataBlock->_sequence = _maxDiscarded;
1690  pBuffer->setPosition(_metadataBlock->_offset+8);
1691  pBuffer->putUint64(_maxDiscarded);
1692  }
1693  Block* end = NULL;
1694  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1695  for (RecoverMap::iterator i = recoveredBlocks.begin();
1696  i != recoveredBlocks.end(); ++i)
1697  {
1698  if (_blockStore.front())
1699  {
1700  end->_nextInList = i->second; // -V522
1701  }
1702  else
1703  {
1704  _blockStore.setUsedList(i->second);
1705  }
1706  end = i->second;
1707  }
1708  if (end)
1709  {
1710  end->_nextInList = 0;
1711  }
1712  _blockStore.setEndOfUsedList(end);
1713  }
1714 
1715  // For recovering an old format store to current format when more Blocks
1716  // are needed with the new format.
1717  void expandBlocks(Block* blocks_, size_t location_, Block* first_,
1718  BlockHeader blockHeader_,
1719  Block** pFreeList_, amps_uint32_t* pFreeCount_,
1720  Buffer* pBuffer_)
1721  {
1722  // First create the chain, then we'll fill in reverse
1723  Block* current = first_;
1724  // Old format total was storage bytes plus 64 bytes for block
1725  // and chain headers.
1726  amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1727  blockHeader_._totalRemaining -= 64;
1728  // New format counts only chain header size
1729  blockHeader_._totalRemaining += getBlockChainHeaderSize();
1730  // Check how many Blocks needed and if we have enough free.
1731  amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1732  / getBlockDataSize()
1733  + (blockHeader_._totalRemaining
1734  % getBlockDataSize()
1735  ? 1 : 0);
1736  // Last data block size, remove bytes saved in first block
1737  // then mod by block size.
1738  const amps_uint32_t blockSize = getBlockSize();
1739  // Old total remaining had all header included
1740  size_t endBlockSize = oldTotalRemaining % blockSize;
1741  if (!endBlockSize) endBlockSize = blockSize;
1742  size_t endOfData = 0;
1743  // Hang on to CRC until first block is written
1744  amps_uint64_t crcVal = blockHeader_._crcVal;
1745  blockHeader_._crcVal = 0;
1746 
1747  std::stack<Block*> blocksUsed;
1748  for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1749  {
1750  blocksUsed.push(current);
1751  current->_sequence = blockHeader_._seq;
1752  if (i >= blockHeader_._blocksToWrite)
1753  {
1754  if (i == blockHeader_._blocksToWrite)
1755  endOfData = current->_offset + endBlockSize;
1756  current->_nextInChain = *pFreeList_;
1757  --(*pFreeCount_);
1758  *pFreeList_ = (*pFreeList_)->_nextInList;
1759  }
1760  else
1761  {
1762  current->_nextInChain = current->_nextInList;
1763  if (current->_nextInChain)
1764  {
1765  if (current->_offset + blockSize < pBuffer_->getSize())
1766  {
1767  current->_nextInChain->setOffset(current->_offset
1768  + blockSize);
1769  }
1770  else
1771  {
1772  current->_nextInChain->setOffset(blockSize);
1773  }
1774  }
1775  else
1776  {
1777  current->_nextInChain = blocks_[1].init(1, blockSize);
1778  }
1779  if (current->_nextInChain == *pFreeList_)
1780  {
1781  *pFreeList_ = (*pFreeList_)->_nextInList;
1782  --(*pFreeCount_);
1783  }
1784  else
1785  {
1786  for (Block* free = *pFreeList_; free;
1787  free = free->_nextInList)
1788  {
1789  if (free->_nextInList == current->_nextInChain)
1790  {
1791  free->_nextInList = free->_nextInList->_nextInList;
1792  --(*pFreeCount_);
1793  break;
1794  }
1795  }
1796  }
1797  }
1798  current->_nextInList = 0;
1799  current = current->_nextInChain;
1800  }
1801  // Make sure we write the correct number of blocks to write
1802  blockHeader_._blocksToWrite = blocksNeeded;
1803  // Finish setting up current
1804  current->_nextInList = 0;
1805  current->_sequence = blockHeader_._seq;
1806  // Now shift data, starting at the last Block
1807  // The total shift is for number of Blocks beyond the first
1808  // times Block header size, since previous format only wrote
1809  // the header in the first Block and had contiguous data,
1810  // with only wrap from end to beginning of buffer possible.
1811 
1812  // First time through, this is bytes in last block. After,
1813  // it will be block data size.
1814  size_t dataBytes = blockHeader_._totalRemaining % getBlockDataSize();
1815  while (current != first_)
1816  {
1817  size_t chunkBytesAvail = endOfData > location_
1818  ? endOfData - location_
1819  : endOfData - 2048;
1820  if (chunkBytesAvail < dataBytes)
1821  {
1822  // Original was wrapped from end to start of buffer
1823  // Need to copy what's left at start to end of Block,
1824  // then start working from the end.
1825  // This can ONLY occur during wrap because the first
1826  // Block doesn't get moved in this loop.
1827  pBuffer_->copyBytes(current->_offset
1828  + getBlockSize()
1829  - chunkBytesAvail,
1830  getBlockSize(),
1831  chunkBytesAvail);
1832  chunkBytesAvail = dataBytes - chunkBytesAvail;
1833  endOfData = pBuffer_->getSize() - chunkBytesAvail;
1834  pBuffer_->copyBytes(current->_offset+getBlockHeaderSize(),
1835  endOfData,
1836  chunkBytesAvail);
1837  }
1838  else
1839  {
1840  endOfData -= dataBytes;
1841  pBuffer_->copyBytes(current->_offset+getBlockHeaderSize(),
1842  endOfData,
1843  dataBytes);
1844  }
1845  // Set next in chain in block header
1846  blockHeader_._nextInChain = (current->_nextInChain
1847  ? current->_nextInChain->_offset
1848  : (amps_uint64_t)0);
1849  // Write the header for this block
1850  pBuffer_->setPosition(current->_offset);
1851  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1852  current = blocksUsed.top();
1853  blocksUsed.pop();
1854  dataBytes = getBlockDataSize();
1855  }
1856  // Move bytes for chain header expansion from 32 bytes
1857  pBuffer_->copyBytes(first_->_offset
1858  + getBlockHeaderSize()
1860  first_->_offset + getBlockHeaderSize() + 32,
1862  // Set the CRC to indicate first block and set nextInChain
1863  blockHeader_._crcVal = crcVal;
1864  blockHeader_._nextInChain = first_->_nextInChain->_offset;
1865  // Need to reset location to after OLD header:
1866  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1867  // amps_uint64_t seq, amps_uint64_t crc
1868  pBuffer_->setPosition(location_ + (sizeof(amps_uint32_t)*2)
1869  + (sizeof(amps_uint64_t)*2) );
1870  // Read old chain header which uses same order, but not
1871  // as many bytes (everything is 32bit):
1872  // operation, commandIdLen, correlationIdLen,
1873  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1874  BlockChainHeader chainHeader;
1875  pBuffer_->copyBytes((char*)&chainHeader,
1876  sizeof(amps_uint32_t) * 8);
1877  // Rewrite the header and chain header for first Block.
1878  pBuffer_->setPosition(location_);
1879  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1880  pBuffer_->putBytes((const char*)&chainHeader, sizeof(BlockChainHeader));
1881  }
1882 
1883  void chooseCRC(bool isFile)
1884  {
1885  if(!isFile)
1886  {
1887  _crc = noOpCRC;
1888  return;
1889  }
1890 
1891 #ifndef AMPS_SSE_42
1892  _crc = AMPS::CRC<0>::crcNoSSE;
1893 #else
1894  if(AMPS::CRC<0>::isSSE42Enabled())
1895  {
1896  _crc = AMPS::CRC<0>::crc;
1897  }
1898  else
1899  {
1900  _crc = AMPS::CRC<0>::crcNoSSE;
1901  }
1902 #endif
1903  }
1904 
1905  static amps_uint64_t noOpCRC(const char*, size_t, amps_uint64_t)
1906  {
1907  return 0;
1908  }
1909 
1910 protected:
1911  mutable BlockStore _blockStore;
1912 private:
1913  // Block used to hold metadata, currently:
1914  // the last persisted
1915  Block* _metadataBlock;
1916  // Highest sequence that has been discarded
1917  amps_uint64_t _maxDiscarded;
1918  // Track the assigned sequence numbers
1919  volatile amps_uint64_t _lastSequence;
1920  // Track how many messages are stored
1921  ATOMIC_TYPE _stored;
1922 
1923  // Message used for doing replay
1924  Message _message;
1925 
1926  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
1927 
1928  // Function used to calculate the CRC if one is used
1929  CRCFunction _crc;
1930 
1931 };
1932 
1933 }
1934 
1935 #endif
1936 
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:759
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1140
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1068
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1138
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:731
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:628
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:542
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1142
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:501
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1152
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1034
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:571
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:262
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:680
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:614
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:836
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1233
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:237
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1141
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:196
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1257
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1232
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1003
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:668
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:815
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:487
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.