AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.3.4
BlockPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2023 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKPUBLISHSTORE_H_
27 #define _BLOCKPUBLISHSTORE_H_
28 #include <ampsplusplus.hpp>
29 #include <BlockStore.hpp>
30 #include <Buffer.hpp>
31 #include <sstream>
32 #include <stack>
33 #include <string>
34 #include <map>
35 #include <ampscrc.hpp>
36 
37 #ifdef _WIN32
38  #include <intrin.h>
39  #include <sys/timeb.h>
40 #else
41  #include <sys/time.h>
42  #if AMPS_SSE_42
43  #include <cpuid.h>
44  #endif
45 #endif
46 #include <iostream>
47 
52 
53 namespace AMPS
54 {
61  {
62  public:
63  typedef BlockStore::Block Block;
64  typedef Lock<BlockStore> BufferLock;
65  typedef Unlock<BlockStore> BufferUnlock;
66 
67  typedef enum
68  {
69  SOW_DELETE_DATA = 0x01,
70  SOW_DELETE_FILTER = 0x02,
71  SOW_DELETE_KEYS = 0x04,
72  SOW_DELETE_BOOKMARK = 0x08,
73  SOW_DELETE_BOOKMARK_CANCEL = 0x10,
74  SOW_DELETE_UNKNOWN = 0x80
75  } SowDeleteType;
76 
79  enum Constants : amps_uint32_t
80  {
81  DEFAULT_BLOCK_HEADER_SIZE = 32,
82  DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83  DEFAULT_BLOCKS_PER_REALLOC = 1000,
84  DEFAULT_BLOCK_SIZE = 2048
85  };
86 
87  /**********************************************************************
88  * Storage format
89  *************************************************************************
90  * Field Description | Type | # Bytes
91  *************************************************************************
92  * HEADER as detailed below | | 32 TOTAL
93  * | |
94  * Total number of blocks used by the record | uint32_t | 4
95  * Total length of the saved record | uint32_t | 4
96  * HA Message sequence | uint64_t | 8
97  * CRC Value - only set in first Block | uint64_t | 8
98  * next in chain offset | uint64_t | 8
99  *************************************************************************
100  * CHAIN HEADER as detailed below | | 64 TOTAL
101  * | |
102  * operation | uint32_t | 4
103  * command id length | uint32_t | 4
104  * correltation id length | uint32_t | 4
105  * expiration length | uint32_t | 4
106  * sow key length | uint32_t | 4
107  * topic length | uint32_t | 4
108  * sow delete flag | int32_t | 4
109  * ack types | uint32_t | 4
110  * unused [8] | uint32_t | 4*8 = 32
111  *************************************************************************
112  * DATA SECTION - can be spread across multiple blocks
113  *
114  * command id | char[]
115  * correlation id | char[]
116  * expiration | char[]
117  * sow key | char[]
118  * topic | char[]
119  * data | char[]
120  *************************************************************************/
121 
122  struct BlockHeader
123  {
124  amps_uint32_t _blocksToWrite;
125  amps_uint32_t _totalRemaining;
126  amps_uint64_t _seq;
127  amps_uint64_t _crcVal;
128  amps_uint64_t _nextInChain;
129  };
130 
131  struct BlockChainHeader
132  {
133  amps_uint32_t _operation;
134  amps_uint32_t _commandIdLen;
135  amps_uint32_t _correlationIdLen;
136  amps_uint32_t _expirationLen;
137  amps_uint32_t _sowKeyLen;
138  amps_uint32_t _topicLen;
139  amps_int32_t _flag;
140  amps_uint32_t _ackTypes;
141  amps_uint32_t _unused[8];
142  BlockChainHeader() // -V730
143  : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144  , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
145  , _ackTypes(0)
146  { ; }
147  };
148 
153  static inline amps_uint32_t getBlockHeaderSize()
154  {
155  return DEFAULT_BLOCK_HEADER_SIZE;
156  }
157 
163  static inline amps_uint32_t getBlockChainHeaderSize()
164  {
165  return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
166  }
167 
171  inline amps_uint32_t getBlockSize()
172  {
173  return _blockStore.getBlockSize();
174  }
175 
179  inline amps_uint32_t getBlockDataSize()
180  {
181  return _blockStore.getBlockSize() - getBlockHeaderSize();
182  }
183 
197  amps_uint32_t blocksPerRealloc_ = 1000,
198  bool isFile_ = false,
199  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
200  : StoreImpl()
201  , _blockStore(buffer_, blocksPerRealloc_,
202  DEFAULT_BLOCK_HEADER_SIZE,
203  (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
204  ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
205  , _metadataBlock(0)
206  , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
207  , _stored(0)
208  {
209  _blockStore.setResizeHandler(&BlockPublishStore::canResize, (void*)this);
210  chooseCRC(isFile_);
211  if (!isFile_)
212  {
213  // This gets set in recover in file-based stores
214  BufferLock bufferGuard(_blockStore);
215  _blockStore.init();
216  _metadataBlock = _blockStore.get(1);
217  // Remove metadata block from used list
218  _blockStore.setUsedList(0);
219  _blockStore.setEndOfUsedList(0);
220  // Metadata block holds block size, block header size,
221  // last discarded sequence, client version
222  _metadataBlock->_sequence = (amps_uint64_t)0;
223  Buffer* pBuffer = _blockStore.getBuffer();
224  pBuffer->setPosition(_metadataBlock->_offset);
225  pBuffer->putUint32((amps_uint32_t)getBlockSize());
226  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
227  pBuffer->putUint64((amps_uint64_t)0);
228  // Metadata blocks puts client version in CRC position
229  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
230  // No next in chain
231  pBuffer->putUint64((amps_uint64_t)0);
232  }
233  }
234 
238  {
239  }
240 
247  virtual amps_uint64_t store(const Message& message_)
248  {
249  return store(message_, true);
250  }
251 
262  amps_uint64_t store(const Message& message_, bool assignSequence_)
263  {
264  const char* commandId, *correlationId, *expiration, *sowKey,
265  *topic, *data;
266  size_t dataLen = 0;
267  BlockHeader blockHeader;
268  BlockChainHeader chainHeader;
269  message_.getRawCommandId(&commandId, &dataLen);
270  chainHeader._commandIdLen = (amps_uint32_t)dataLen;
271  message_.getRawCorrelationId(&correlationId, &dataLen);
272  chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
273  message_.getRawExpiration(&expiration, &dataLen);
274  chainHeader._expirationLen = (amps_uint32_t)dataLen;
275  message_.getRawSowKey(&sowKey, &dataLen);
276  chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
277  message_.getRawTopic(&topic, &dataLen);
278  chainHeader._topicLen = (amps_uint32_t)dataLen;
279  message_.getRawData(&data, &dataLen);
280  chainHeader._flag = -1;
281  Message::Command::Type operation = message_.getCommandEnum();
282  chainHeader._operation = (amps_uint32_t)operation;
283  if (operation == Message::Command::SOWDelete)
284  {
285  if (dataLen > 0)
286  {
287  chainHeader._flag = SOW_DELETE_DATA;
288  }
289  else
290  {
291  message_.getRawFilter(&data, &dataLen);
292  if (dataLen > 0)
293  {
294  chainHeader._flag = SOW_DELETE_FILTER;
295  }
296  else
297  {
298  message_.getRawSowKeys(&data, &dataLen);
299  if (dataLen > 0)
300  {
301  chainHeader._flag = SOW_DELETE_KEYS;
302  }
303  else
304  {
305  message_.getRawBookmark(&data, &dataLen);
306  chainHeader._flag = SOW_DELETE_BOOKMARK;
307  // Check options for cancel
308  Message::Field options = message_.getOptions();
309  size_t remaining = options.len();
310  const void* next = NULL;
311  const void* start = (const void*)(options.data());
312  // Not necessarily null-terminated so no strstr
313  while (remaining >= 6 &&
314  (next = memchr(start, (int)'c', remaining)) != NULL)
315  {
316  remaining = (size_t)next - (size_t)start;
317  if (remaining >= 6 && strncmp((const char*)start,
318  "cancel", 6) == 0)
319  {
320  chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
321  break;
322  }
323  }
324  }
325  }
326  }
327  }
328  blockHeader._totalRemaining = (
329  (chainHeader._operation == Message::Command::Unknown)
330  ? 0
332  + chainHeader._commandIdLen
333  + chainHeader._correlationIdLen
334  + chainHeader._expirationLen
335  + chainHeader._sowKeyLen
336  + chainHeader._topicLen
337  + (amps_uint32_t)dataLen));
338  size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
339  (blockHeader._totalRemaining % getBlockDataSize()));
340  blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
341  ? 1
342  : ((amps_uint32_t)(blockHeader._totalRemaining
343  / getBlockDataSize())
344  + ((lastBlockLength > 0) ? 1 : 0)));
345  blockHeader._crcVal = (amps_uint64_t)0ULL;
346  blockHeader._crcVal = _crc(commandId,
347  chainHeader._commandIdLen,
348  blockHeader._crcVal);
349  blockHeader._crcVal = _crc(correlationId,
350  chainHeader._correlationIdLen,
351  blockHeader._crcVal);
352  blockHeader._crcVal = _crc(expiration,
353  chainHeader._expirationLen,
354  blockHeader._crcVal);
355  blockHeader._crcVal = _crc(sowKey,
356  chainHeader._sowKeyLen,
357  blockHeader._crcVal);
358  blockHeader._crcVal = _crc(topic,
359  chainHeader._topicLen,
360  blockHeader._crcVal);
361  blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
362 
363  // Reserve slots for storage, growing if necessary
364  BufferLock bufferGuard(_blockStore);
365  Block* first = _blockStore.get(blockHeader._blocksToWrite);
366  if (assignSequence_)
367  {
368  if (_lastSequence <= 2)
369  {
370  _getLastPersisted();
371  }
372  blockHeader._seq = ++_lastSequence;
373  }
374  else
375  {
376  blockHeader._seq = amps_message_get_field_uint64(
377  message_.getMessage(),
378  AMPS_Sequence);
379  if (!_maxDiscarded)
380  {
381  _maxDiscarded = blockHeader._seq - 1;
382  }
383  if (blockHeader._seq >= _lastSequence)
384  {
385  _lastSequence = blockHeader._seq;
386  }
387  }
388 
389  try
390  {
391  size_t topicWritten = 0UL;
392  size_t dataWritten = 0UL;
393  size_t commandWritten = 0UL;
394  size_t correlationWritten = 0UL;
395  size_t expirationWritten = 0UL;
396  size_t sowKeyWritten = 0UL;
397  Buffer* pBuffer = _blockStore.getBuffer();
398  for (Block* next = first; next; next = next->_nextInChain)
399  {
400  next->_sequence = blockHeader._seq;
401  if (next->_nextInChain)
402  {
403  blockHeader._nextInChain = next->_nextInChain->_offset;
404  }
405  else
406  {
407  blockHeader._nextInChain = (amps_uint64_t)0;
408  }
409  // Set buffer to start of Block and write the header
410  pBuffer->setPosition(next->_offset);
411  pBuffer->putBytes((const char*)&blockHeader, sizeof(BlockHeader));
412  // Clear crcVal, as it's only written in the first Block
413  blockHeader._crcVal = (amps_uint64_t)0;
414  size_t bytesRemaining = getBlockDataSize();
415  if (next == first)
416  {
417  // Write Block chain header
418  chainHeader._ackTypes = (amps_uint32_t)message_.getAckTypeEnum();
419  pBuffer->putBytes((const char*)&chainHeader,
420  sizeof(BlockChainHeader));
421  pBuffer->setPosition(next->_offset + getBlockHeaderSize() + getBlockChainHeaderSize());
422  bytesRemaining -= getBlockChainHeaderSize();
423  }
424  else
425  {
426  pBuffer->setPosition(next->_offset + getBlockHeaderSize());
427  }
428 
429  if (commandWritten < chainHeader._commandIdLen)
430  {
431  size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
432  pBuffer->putBytes(commandId + commandWritten,
433  commandWrite);
434  bytesRemaining -= commandWrite;
435  commandWritten += commandWrite;
436  }
437  if (correlationWritten < chainHeader._correlationIdLen)
438  {
439  size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
440  pBuffer->putBytes(correlationId + correlationWritten,
441  correlationWrite);
442  bytesRemaining -= correlationWrite;
443  correlationWritten += correlationWrite;
444  }
445  if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
446  {
447  size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
448  pBuffer->putBytes(expiration + expirationWritten, expWrite);
449  bytesRemaining -= expWrite;
450  expirationWritten += expWrite;
451  }
452  if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
453  {
454  size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
455  pBuffer->putBytes(sowKey + sowKeyWritten, sowKeyWrite);
456  bytesRemaining -= sowKeyWrite;
457  sowKeyWritten += sowKeyWrite;
458  }
459  if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
460  {
461  size_t topicWrite = (chainHeader._topicLen - topicWritten
462  < bytesRemaining)
463  ? chainHeader._topicLen - topicWritten
464  : bytesRemaining;
465  pBuffer->putBytes(topic + topicWritten, topicWrite);
466  bytesRemaining -= topicWrite;
467  topicWritten += topicWrite;
468  }
469  if (bytesRemaining > 0 && dataWritten < dataLen)
470  {
471  size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
472  dataLen - dataWritten : bytesRemaining;
473  pBuffer->putBytes(data + dataWritten, dataWrite);
474  bytesRemaining -= dataWrite;
475  dataWritten += dataWrite;
476  }
477  }
478  }
479  catch (const AMPSException& ex)
480  {
481  _blockStore.put(first);
482  throw ex;
483  }
484  AMPS_FETCH_ADD(&_stored, 1);
485  return blockHeader._seq;
486  }
487 
492  virtual void discardUpTo(amps_uint64_t index_)
493  {
494  // Get the lock
495  BufferLock bufferGuard(_blockStore);
496  Buffer* pBuffer = _blockStore.getBuffer();
497  // Don't use _getLastPersisted() here, don't want to set it
498  // to something other than index_ if it's not already set
499  amps_uint64_t lastPersisted = _metadataBlock->_sequence;
500  // Make sure it's a real index and we have messages to discard
501  if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
502  {
503  // During logon it's very possible we don't have a last persisted
504  // but that the Client is calling discardUpTo with the ack value.
505  if (lastPersisted < index_)
506  {
507  pBuffer->setPosition(_metadataBlock->_offset + 8);
508  pBuffer->putUint64(index_);
509  _metadataBlock->_sequence = index_;
510  if (_maxDiscarded < index_)
511  {
512  _maxDiscarded = index_;
513  }
514  if (_lastSequence <= index_)
515  {
516  _lastSequence = index_;
517  }
518  }
519  else if (!index_) // Fresh logon, no sequence history
520  {
521  _getLastPersisted();
522  }
523  _blockStore.signalAll();
524  return;
525  }
526 
527  _maxDiscarded = index_;
528  AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
529  _blockStore.signalAll();
530  if (lastPersisted >= index_)
531  {
532  return;
533  }
534  pBuffer->setPosition(_metadataBlock->_offset + 8);
535  pBuffer->putUint64(index_);
536  _metadataBlock->_sequence = index_;
537  if (_lastSequence < index_)
538  {
539  _lastSequence = index_;
540  }
541  }
542 
547  void replay(StoreReplayer& replayer_)
548  {
549  // Get the lock
550  BufferLock bufferGuard(_blockStore);
551  // If we don't have anything yet, return
552  if (!_blockStore.front())
553  {
554  return;
555  }
556  Block* next = _blockStore.front();
557  try
558  {
559  for (Block* block = _blockStore.front(); block; block = next)
560  {
561  // Replay the message
562  replayOnto(block, replayer_);
563  next = block->_nextInList;
564  }
565  }
566  catch (const StoreException& e)
567  {
568  _blockStore.putAll(next);
569  throw e;
570  }
571  }
572 
579  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
580  {
581  BufferLock bufferGuard(_blockStore);
582  // If we don't have anything yet, return
583  if (!_blockStore.front())
584  {
585  return false;
586  }
587  // Get the end point
588  amps_uint64_t lastIdx = _blockStore.back()->_sequence;
589  // Get the start point
590  amps_uint64_t leastIdx = _blockStore.front()->_sequence;
591  if (index_ >= leastIdx && index_ <= lastIdx)
592  {
593  Block* block = _blockStore.front();
594  while (block && block->_sequence != index_)
595  {
596  block = block->_nextInList;
597  }
598  if (!block)
599  {
600  return false;
601  }
602  // If total bytes is 0, it's a queue ack and gets skipped.
603  Buffer* pBuffer = _blockStore.getBuffer();
604  pBuffer->setPosition(block->_offset +
605  sizeof(amps_uint32_t));
606  if (pBuffer->getUint32() == 0)
607  {
608  return false;
609  }
610  replayOnto(block, replayer_);
611  return true;
612  }
613  else // Get Store and Client back in sync
614  {
615  _message.reset();
616  leastIdx -= 1;
617  _message.setSequence(leastIdx);
618  replayer_.execute(_message);
619  return false;
620  }
621  }
622 
628  size_t unpersistedCount() const
629  {
630  size_t count = (size_t)_stored;
631  return count;
632  }
633 
642  virtual void flush(long timeout_)
643  {
644  BufferLock bufferGuard(_blockStore);
645  amps_uint64_t waitFor = _getHighestUnpersisted();
646  // Check that we aren't already empty
647  if (waitFor == getUnsetSequence())
648  {
649  return;
650  }
651  if (timeout_ > 0)
652  {
653  bool timedOut = false;
654  AMPS_START_TIMER(timeout_);
655  // While timeout hasn't expired and we haven't had everything acked
656  while (!timedOut && _stored != 0
657  && waitFor >= _getLowestUnpersisted())
658  {
659  if (!_blockStore.wait(timeout_))
660  {
661  // May have woken up early, check real time
662  AMPS_RESET_TIMER(timedOut, timeout_);
663  }
664  }
665  // If we timed out and still haven't caught up with the acks
666  if (timedOut && _stored != 0
667  && waitFor >= _getLowestUnpersisted())
668  {
669  throw TimedOutException("Timed out waiting to flush publish store.");
670  }
671  }
672  else
673  {
674  while (_stored != 0 && waitFor >= _getLowestUnpersisted())
675  {
676  // Still wake up every 1s so python can interrupt
677  _blockStore.wait(1000);
678  // Don't hold lock if possibly grabbing GIL
679  BufferUnlock unlck(_blockStore);
680  amps_invoke_waiting_function();
681  }
682  }
683  }
684 
685  amps_uint64_t getLowestUnpersisted() const
686  {
687  BufferLock bufferGuard(_blockStore);
688  return _getLowestUnpersisted();
689  }
690 
691  amps_uint64_t getHighestUnpersisted() const
692  {
693  BufferLock bufferGuard(_blockStore);
694  return _getHighestUnpersisted();
695  }
696 
697  amps_uint64_t getLastPersisted(void)
698  {
699  BufferLock bufferGuard(_blockStore);
700  return _getLastPersisted();
701  }
702 
703  protected:
704  static bool canResize(size_t requestedSize_, void* vpThis_)
705  {
706  return ((BlockPublishStore*)vpThis_)->callResizeHandler(requestedSize_);
707  }
708 
709  amps_uint64_t _getLowestUnpersisted() const
710  {
711  // Assume the lock is held
712  // If we don't have anything, return MAX
713  if (!_blockStore.front())
714  {
715  return getUnsetSequence();
716  }
717  return _blockStore.front()->_sequence;
718  }
719 
720  amps_uint64_t _getHighestUnpersisted() const
721  {
722  // Assume the lock is held
723  // If we don't have anything, return MAX
724  if (!_blockStore.back())
725  {
726  return getUnsetSequence();
727  }
728  return _blockStore.back()->_sequence;
729  }
730 
731  amps_uint64_t _getLastPersisted(void)
732  {
733  // Assume the lock is held
734  amps_uint64_t lastPersisted = (amps_uint64_t)0;
735  Buffer* pBuffer = _blockStore.getBuffer();
736  pBuffer->setPosition(_metadataBlock->_offset + 8);
737  lastPersisted = pBuffer->getUint64();
738  if (lastPersisted)
739  {
740  if (_lastSequence < lastPersisted)
741  {
742  _lastSequence = lastPersisted;
743  }
744  return lastPersisted;
745  }
746  if (_maxDiscarded)
747  {
748  lastPersisted = _maxDiscarded;
749  }
750  else
751  {
752 #ifdef _WIN32
753  struct _timeb t;
754  _ftime_s(&t);
755  lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
756 #else // not _WIN32
757  struct timeval tv;
758  gettimeofday(&tv, NULL);
759  lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
760  * (amps_uint64_t)1000000;
761 #endif
762  }
763  if (_lastSequence > 2)
764  {
765  amps_uint64_t low = _getLowestUnpersisted();
766  amps_uint64_t high = _getHighestUnpersisted();
767  if (low != getUnsetSequence())
768  {
769  lastPersisted = low - 1;
770  }
771  if (high != getUnsetSequence() && _lastSequence <= high)
772  {
773  _lastSequence = high;
774  }
775  if (_lastSequence < lastPersisted)
776  {
777  lastPersisted = _lastSequence - 1;
778  }
779  }
780  else
781  {
782  _lastSequence = lastPersisted;
783  }
784  pBuffer->setPosition(_metadataBlock->_offset
785  + sizeof(amps_uint32_t) // blocks used
786  + sizeof(amps_uint32_t)); // record length
787  pBuffer->putUint64(lastPersisted);
788  _metadataBlock->_sequence = lastPersisted;
789  return lastPersisted;
790  }
791 
792  void recover(void)
793  {
794  BufferLock bufferGuard(_blockStore);
795  // Make sure the size isn't 0 and is a multiple of block size
796  Buffer* pBuffer = _blockStore.getBuffer();
797  size_t size = pBuffer->getSize();
798  amps_uint32_t blockSize = getBlockSize();
799  if (size == 0)
800  {
801  _blockStore.init();
802  _metadataBlock = _blockStore.get(1);
803  _metadataBlock->_sequence = (amps_uint64_t)0;
804  pBuffer->setPosition(_metadataBlock->_offset);
805  // Metadata block holds block size, block header size,
806  // last discarded sequence, client version
807  pBuffer->putUint32((amps_uint32_t)blockSize);
808  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
809  pBuffer->putUint64((amps_uint64_t)0);
810  // Metadata blocks puts client version in CRC position
811  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
812  // No next in chain
813  pBuffer->putUint64((amps_uint64_t)0);
814  return;
815  }
816  size_t numBlocks = size / blockSize;
817  if (size % blockSize > 0)
818  {
819  // We shouldn't ever be in here, since it requires starting with a
820  // file that is not an even multiple of block size and we always
821  // fix the size.
822  numBlocks = size / blockSize;
823  ++numBlocks;
824  amps_uint32_t blockCount = 0;
825  // We allocate all the Blocks at once below so delete allocated Block[]
826  delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
827  // Resize can fail if resizeHandler is set and refuses the request
828  // Since this is recovery, we need to simply fail in that case
829  if (size > pBuffer->getSize() || numBlocks != (size_t)blockCount)
830  {
831  throw StoreException("Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
832  }
833  size = pBuffer->getSize();
834  }
835 
836  amps_uint64_t maxIdx = 0;
837  amps_uint64_t minIdx = 0;
838  size_t location = 0;
839  BlockHeader blockHeader;
840  // The blocks we create here all get their offset set in below loop
841  Block* blocks = new Block[numBlocks];
842  blocks[numBlocks - 1]._nextInList = 0;
843  size_t blockNum = 0;
844  _blockStore.addBlocks(blocks);
845  _metadataBlock = blocks; // The first Block is metadata
846  _metadataBlock->_nextInList = 0;
847  pBuffer->setPosition(0);
848  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
849  /* Metadata Block header fields
850  * amps_uint32_t _blocksToWrite = BlockSize
851  * amps_uint32_t _totalRemaining = BlockHeaderSize
852  * amps_uint64_t _seq = last persisted sequence number
853  * amps_uint64_t _crcVal = unused
854  * amps_uint64_t _nextInChain = unused
855  */
856  if (blockHeader._blocksToWrite == 1) // Old store format?
857  {
858  /* Old format metadata block header fields
859  * amps_uint32_t _blocksToWrite = 1
860  * amps_uint32_t _totalRemaining = client version
861  * amps_uint64_t _seq = last persisted sequence number
862  * amps_uint64_t _crcVal = unused
863  * amps_uint64_t _nextInChain = unused
864  */
865  // Readable old format starts with version 5.0.0.0
866  if (blockHeader._totalRemaining >= 5000000)
867  {
868  // All recovery needs to be based on old format
869  // so go do that instead.
870  recoverOldFormat(blocks);
871  return;
872  }
873  // Unreadable format, fail
874  throw StoreException("Unrecognized format for Store. Can't recover.");
875  }
876  if (blockHeader._blocksToWrite == 0)
877  {
878  pBuffer->setPosition(0);
879  pBuffer->putUint32((amps_uint32_t)blockSize);
880  }
881  else
882  {
883  blockSize = blockHeader._blocksToWrite;
884  _blockStore.setBlockSize(blockSize);
885  }
886  if (blockHeader._totalRemaining == 0)
887  {
888  pBuffer->setPosition(sizeof(amps_uint32_t));
889  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
890  }
891  else
892  {
893  _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
894  }
895  _metadataBlock->_sequence = blockHeader._seq;
896  if (_metadataBlock->_sequence
897  && _metadataBlock->_sequence < (amps_uint64_t)1000000)
898  {
899  pBuffer->setPosition(_metadataBlock->_offset
900  + sizeof(amps_uint32_t) // BlockSize
901  + sizeof(amps_uint32_t)); // BlockHeaderSize
902  pBuffer->putUint64((amps_uint64_t)0);
903  _metadataBlock->_sequence = 0;
904  }
905  else
906  {
907  // Set _maxDiscarded and _lastSequence
908  _maxDiscarded = _metadataBlock->_sequence;
909  _lastSequence = _maxDiscarded;
910  }
911  // This would be where to check the client version string
912  // No checks currently
913  location += blockSize;
914  amps_uint32_t freeCount = 0;
915  Block* firstFree = NULL;
916  Block* endOfFreeList = NULL;
917  // Used to create used list in order after recovery
918  typedef std::map<amps_uint64_t, Block*> RecoverMap;
919  RecoverMap recoveredBlocks;
920  while (location < size)
921  {
922  // Get index and check if non-zero
923  pBuffer->setPosition(location);
924  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
925  if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
926  (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
927  {
928  // Block is part of a chain
929  location += blockSize;
930  continue;
931  }
932  Block* block = blocks[++blockNum].setOffset(location);
933  bool recovered = false;
934  if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
935  {
936  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
937  block->_sequence = blockHeader._seq;
938  // Track min and max
939  if (maxIdx < blockHeader._seq)
940  {
941  maxIdx = blockHeader._seq;
942  }
943  if (minIdx > blockHeader._seq)
944  {
945  minIdx = blockHeader._seq;
946  }
947  // Save it in recovered blocks
948  recoveredBlocks[blockHeader._seq] = block;
949  // Set up the chain
950  while (blockHeader._nextInChain != (amps_uint64_t)0)
951  {
952  Block* chain = blocks[++blockNum]
953  .setOffset((size_t)blockHeader._nextInChain);
954  chain->_nextInList = 0;
955  pBuffer->setPosition((size_t)blockHeader._nextInChain
956  + sizeof(amps_uint32_t) // blocks used
957  + sizeof(amps_uint32_t) // record length
958  + sizeof(amps_uint64_t) // seq
959  + sizeof(amps_uint64_t)); // crc
960  blockHeader._nextInChain = pBuffer->getUint64();
961  block->_nextInChain = chain;
962  block = chain;
963  block->_sequence = blockHeader._seq;
964  }
965  recovered = true;
966  }
967  if (!recovered)
968  {
969  // Put this Block on the free list
970  if (endOfFreeList)
971  {
972  endOfFreeList->_nextInList = block;
973  }
974  else
975  {
976  firstFree = block;
977  }
978  endOfFreeList = block;
979  ++freeCount;
980  }
981  location += blockSize;
982  }
983  if (endOfFreeList)
984  {
985  endOfFreeList->_nextInList = 0;
986  }
987  _blockStore.setFreeList(firstFree, freeCount);
988  if (maxIdx > _lastSequence)
989  {
990  _lastSequence = maxIdx;
991  }
992  if (minIdx > _maxDiscarded + 1)
993  {
994  _maxDiscarded = minIdx - 1;
995  }
996  if (_maxDiscarded > _metadataBlock->_sequence)
997  {
998  _metadataBlock->_sequence = _maxDiscarded;
999  pBuffer->setPosition(_metadataBlock->_offset + 8);
1000  pBuffer->putUint64(_maxDiscarded);
1001  }
1002  Block* end = NULL;
1003  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1004  for (RecoverMap::iterator i = recoveredBlocks.begin();
1005  i != recoveredBlocks.end(); ++i)
1006  {
1007  if (end)
1008  {
1009  end->_nextInList = i->second;
1010  }
1011  else
1012  {
1013  _blockStore.setUsedList(i->second);
1014  }
1015  end = i->second;
1016  }
1017  if (end)
1018  {
1019  end->_nextInList = 0;
1020  }
1021  _blockStore.setEndOfUsedList(end);
1022  }
1023 
1024  private:
1025  // Lock should already be held
1026  void replayOnto(Block* block_, StoreReplayer& replayer_)
1027  {
1028  // Read the header
1029  size_t start = block_->_offset;
1030  size_t position = start;
1031  Buffer* pBuffer = _blockStore.getBuffer();
1032  pBuffer->setPosition(position);
1033  BlockHeader blockHeader;
1034  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1035  if (blockHeader._totalRemaining == 0)
1036  {
1037  // Queue acking sow_delete
1038  return;
1039  }
1040  position += getBlockHeaderSize();
1041  BlockChainHeader blockChainHeader;
1042  pBuffer->copyBytes((char*)&blockChainHeader, sizeof(blockChainHeader));
1043  if (blockChainHeader._operation == Message::Command::Unknown)
1044  {
1045  // Queue acking sow_delete
1046  return;
1047  }
1048  blockChainHeader._ackTypes |= Message::AckType::Persisted;
1049  position += getBlockChainHeaderSize();
1050  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
1051  pBuffer->setPosition(position);
1052 
1053  if (blockHeader._totalRemaining
1054  < blockChainHeader._commandIdLen
1055  + blockChainHeader._correlationIdLen
1056  + blockChainHeader._expirationLen
1057  + blockChainHeader._sowKeyLen
1058  + blockChainHeader._topicLen)
1059  {
1060  std::ostringstream os;
1061  os << "Corrupted message found with invalid lengths. "
1062  << "Attempting to replay " << block_->_sequence
1063  << ". Block sequence " << blockHeader._seq
1064  << ", topic length " << blockChainHeader._topicLen
1065  << ", data length " << blockHeader._totalRemaining
1066  << ", command ID length " << blockChainHeader._commandIdLen
1067  << ", correlation ID length " << blockChainHeader._correlationIdLen
1068  << ", expiration length " << blockChainHeader._expirationLen
1069  << ", sow key length " << blockChainHeader._sowKeyLen
1070  << ", start " << start
1071  << ", position " << position
1072  << ", buffer size " << pBuffer->getSize();
1073  throw StoreException(os.str());
1074  }
1075 
1076  // Start prepping the message
1077  _message.reset();
1078  _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1079  _message.setAckTypeEnum((unsigned)blockChainHeader._ackTypes
1080  | Message::AckType::Persisted);
1081  _message.setSequence(blockHeader._seq);
1082  // Read the data and calculate the CRC
1083  Block* current = block_;
1084  size_t blockBytesRemaining = getBlockDataSize() - getBlockChainHeaderSize();
1085  amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1086  // Use tmpBuffers for any fields split across Block boundaries
1087  char** tmpBuffers = (blockHeader._blocksToWrite > 1) ? new char* [blockHeader._blocksToWrite - 1] : 0;
1088  size_t blockNum = 0;
1089  if (blockChainHeader._commandIdLen > 0)
1090  {
1091  if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1092  {
1093  _message.assignCommandId(pBuffer->getBytes(blockChainHeader._commandIdLen)._data,
1094  blockChainHeader._commandIdLen);
1095  blockBytesRemaining -= blockChainHeader._commandIdLen;
1096  }
1097  else
1098  {
1099  tmpBuffers[blockNum] = new char[blockChainHeader._commandIdLen]; // -V522
1100  size_t totalLeft = blockChainHeader._commandIdLen;
1101  size_t totalRead = 0;
1102  size_t readLen = 0;
1103  while (totalLeft)
1104  {
1105  readLen = blockBytesRemaining < totalLeft ?
1106  blockBytesRemaining : totalLeft;
1107  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1108  if (!(totalLeft -= readLen))
1109  {
1110  break;
1111  }
1112  if (!(current = current->_nextInChain))
1113  {
1114  break;
1115  }
1116  totalRead += readLen;
1117  blockBytesRemaining = getBlockDataSize();
1118  position = current->_offset + getBlockHeaderSize();
1119  pBuffer->setPosition(position);
1120  }
1121  blockBytesRemaining -= readLen;
1122  _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1123  }
1124  blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1125  crcCalc = _crc(_message.getCommandId().data(),
1126  blockChainHeader._commandIdLen, crcCalc);
1127  }
1128  if (blockChainHeader._correlationIdLen > 0)
1129  {
1130  if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1131  {
1132  _message.assignCorrelationId(
1133  pBuffer->getBytes(blockChainHeader._correlationIdLen)._data,
1134  blockChainHeader._correlationIdLen);
1135  blockBytesRemaining -= blockChainHeader._correlationIdLen;
1136  }
1137  else
1138  {
1139  tmpBuffers[blockNum] = new char[blockChainHeader._correlationIdLen]; // -V522
1140  size_t totalLeft = blockChainHeader._correlationIdLen;
1141  size_t totalRead = 0;
1142  size_t readLen = 0;
1143  while (totalLeft)
1144  {
1145  readLen = blockBytesRemaining < totalLeft ?
1146  blockBytesRemaining : totalLeft;
1147  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1148  if (!(totalLeft -= readLen))
1149  {
1150  break;
1151  }
1152  if (!(current = current->_nextInChain))
1153  {
1154  break; // -V522
1155  }
1156  totalRead += readLen;
1157  blockBytesRemaining = getBlockDataSize();
1158  position = current->_offset + getBlockHeaderSize();
1159  pBuffer->setPosition(position);
1160  }
1161  blockBytesRemaining -= readLen;
1162  _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1163  }
1164  blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1165  crcCalc = _crc(_message.getCorrelationId().data(),
1166  blockChainHeader._correlationIdLen, crcCalc);
1167  }
1168  if (blockChainHeader._expirationLen > 0)
1169  {
1170  if (blockChainHeader._expirationLen <= blockBytesRemaining)
1171  {
1172  _message.assignExpiration(
1173  pBuffer->getBytes(blockChainHeader._expirationLen)._data,
1174  blockChainHeader._expirationLen);
1175  blockBytesRemaining -= blockChainHeader._expirationLen;
1176  }
1177  else
1178  {
1179  tmpBuffers[blockNum] = new char[blockChainHeader._expirationLen]; // -V522
1180  size_t totalLeft = blockChainHeader._expirationLen;
1181  size_t totalRead = 0;
1182  size_t readLen = 0;
1183  while (totalLeft)
1184  {
1185  readLen = blockBytesRemaining < totalLeft ?
1186  blockBytesRemaining : totalLeft;
1187  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1188  if (!(totalLeft -= readLen))
1189  {
1190  break;
1191  }
1192  if (!(current = current->_nextInChain))
1193  {
1194  break;
1195  }
1196  totalRead += readLen;
1197  blockBytesRemaining = getBlockDataSize();
1198  position = current->_offset + getBlockHeaderSize();
1199  pBuffer->setPosition(position);
1200  }
1201  blockBytesRemaining -= readLen;
1202  _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1203  }
1204  blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1205  crcCalc = _crc(_message.getExpiration().data(),
1206  blockChainHeader._expirationLen, crcCalc);
1207  }
1208  if (blockChainHeader._sowKeyLen > 0)
1209  {
1210  if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1211  {
1212  _message.assignSowKey(pBuffer->getBytes(blockChainHeader._sowKeyLen)._data,
1213  blockChainHeader._sowKeyLen);
1214  blockBytesRemaining -= blockChainHeader._sowKeyLen;
1215  }
1216  else
1217  {
1218  tmpBuffers[blockNum] = new char[blockChainHeader._sowKeyLen]; // -V522
1219  size_t totalLeft = blockChainHeader._sowKeyLen;
1220  size_t totalRead = 0;
1221  size_t readLen = 0;
1222  while (totalLeft)
1223  {
1224  readLen = blockBytesRemaining < totalLeft ?
1225  blockBytesRemaining : totalLeft;
1226  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1227  if (!(totalLeft -= readLen))
1228  {
1229  break;
1230  }
1231  if (!(current = current->_nextInChain))
1232  {
1233  break;
1234  }
1235  totalRead += readLen;
1236  blockBytesRemaining = getBlockDataSize();
1237  position = current->_offset + getBlockHeaderSize();
1238  pBuffer->setPosition(position);
1239  }
1240  blockBytesRemaining -= readLen;
1241  _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1242  }
1243  blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1244  crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1245  }
1246  if (blockChainHeader._topicLen > 0)
1247  {
1248  if (blockChainHeader._topicLen <= blockBytesRemaining)
1249  {
1250  _message.assignTopic(pBuffer->getBytes(blockChainHeader._topicLen)._data,
1251  blockChainHeader._topicLen);
1252  blockBytesRemaining -= blockChainHeader._topicLen;
1253  }
1254  else
1255  {
1256  tmpBuffers[blockNum] = new char[blockChainHeader._topicLen]; // -V522
1257  size_t totalLeft = blockChainHeader._topicLen;
1258  size_t totalRead = 0;
1259  size_t readLen = 0;
1260  while (totalLeft)
1261  {
1262  readLen = blockBytesRemaining < totalLeft ?
1263  blockBytesRemaining : totalLeft;
1264  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1265  if (!(totalLeft -= readLen))
1266  {
1267  break;
1268  }
1269  if (!(current = current->_nextInChain))
1270  {
1271  break;
1272  }
1273  totalRead += readLen;
1274  blockBytesRemaining = getBlockDataSize();
1275  position = current->_offset + getBlockHeaderSize();
1276  pBuffer->setPosition(position);
1277  }
1278  blockBytesRemaining -= readLen;
1279  _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1280  }
1281  blockHeader._totalRemaining -= blockChainHeader._topicLen;
1282  crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1283  }
1284  if (blockHeader._totalRemaining > 0)
1285  {
1286  if (blockHeader._totalRemaining <= blockBytesRemaining)
1287  {
1288  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1289  {
1290  _message.assignData(
1291  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1292  blockHeader._totalRemaining);
1293  crcCalc = _crc(_message.getData().data(),
1294  blockHeader._totalRemaining, crcCalc);
1295  }
1296  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1297  {
1298  _message.assignFilter(
1299  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1300  blockHeader._totalRemaining);
1301  crcCalc = _crc(_message.getFilter().data(),
1302  blockHeader._totalRemaining, crcCalc);
1303  }
1304  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1305  {
1306  _message.assignSowKeys(
1307  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1308  blockHeader._totalRemaining);
1309  crcCalc = _crc(_message.getSowKeys().data(),
1310  blockHeader._totalRemaining, crcCalc);
1311  }
1312  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1313  {
1314  _message.assignBookmark(
1315  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1316  blockHeader._totalRemaining);
1317  crcCalc = _crc(_message.getBookmark().data(),
1318  blockHeader._totalRemaining, crcCalc);
1319  }
1320  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1321  {
1322  _message.assignBookmark(
1323  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1324  blockHeader._totalRemaining);
1325  crcCalc = _crc(_message.getBookmark().data(),
1326  blockHeader._totalRemaining, crcCalc);
1327  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1328  }
1329  }
1330  else
1331  {
1332  tmpBuffers[blockNum] = new char[blockHeader._totalRemaining]; // -V522
1333  size_t totalLeft = blockHeader._totalRemaining;
1334  size_t totalRead = 0;
1335  size_t readLen = 0;
1336  while (totalLeft)
1337  {
1338  readLen = blockBytesRemaining < totalLeft ?
1339  blockBytesRemaining : totalLeft;
1340  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1341  if (!(totalLeft -= readLen))
1342  {
1343  break;
1344  }
1345  if (!(current = current->_nextInChain))
1346  {
1347  break;
1348  }
1349  totalRead += readLen;
1350  blockBytesRemaining = getBlockDataSize();
1351  position = current->_offset + getBlockHeaderSize();
1352  pBuffer->setPosition(position);
1353  }
1354  position += readLen;
1355  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1356  {
1357  _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1358  }
1359  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1360  {
1361  _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1362  }
1363  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1364  {
1365  _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1366  }
1367  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1368  {
1369  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1370  }
1371  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1372  {
1373  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1374  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1375  }
1376  crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc); // -V595
1377  }
1378  }
1379 
1380  // Validate the crc and seq
1381  if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1382  {
1383  std::ostringstream os;
1384  os << "Corrupted message found by CRC or sequence "
1385  << "Attempting to replay " << block_->_sequence
1386  << ". Block sequence " << blockHeader._seq
1387  << ", expiration length " << blockChainHeader._expirationLen
1388  << ", sowKey length " << blockChainHeader._sowKeyLen
1389  << ", topic length " << blockChainHeader._topicLen
1390  << ", data length " << blockHeader._totalRemaining
1391  << ", command ID length " << blockChainHeader._commandIdLen
1392  << ", correlation ID length " << blockChainHeader._correlationIdLen
1393  << ", flag " << blockChainHeader._flag
1394  << ", expected CRC " << blockHeader._crcVal
1395  << ", actual CRC " << crcCalc
1396  << ", start " << start
1397  << ", position " << position
1398  << ", buffer size " << pBuffer->getSize();
1399  for (Block* block = block_; block; block = block->_nextInChain)
1400  {
1401  os << "\n BLOCK " << block->_offset;
1402  }
1403  if (tmpBuffers)
1404  {
1405  for (amps_uint32_t i = 0; i < blockNum; ++i)
1406  {
1407  delete[] tmpBuffers[i]; // -V522
1408  }
1409  delete[] tmpBuffers;
1410  }
1411  throw StoreException(os.str());
1412  }
1413  // Replay the message
1414  replayer_.execute(_message);
1415  // Free the buffer if allocated
1416  if (tmpBuffers)
1417  {
1418  for (amps_uint32_t i = 0; i < blockNum; ++i)
1419  {
1420  delete[] tmpBuffers[i]; // -V522
1421  }
1422  delete[] tmpBuffers;
1423  }
1424  }
1425 
1426  // Lock should already be held
1427  // Read an older format file and update it.
1428  void recoverOldFormat(Block* blocks)
1429  {
1430  Buffer* pBuffer = _blockStore.getBuffer();
1431  amps_uint64_t maxIdx = 0;
1432  amps_uint64_t minIdx = 0;
1433  size_t size = pBuffer->getSize();
1434  size_t location = 0;
1435  pBuffer->setPosition(location);
1436  pBuffer->putUint32((amps_uint32_t)getBlockSize());
1437  pBuffer->putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1438  _metadataBlock->_sequence = pBuffer->getUint64();
1439  if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1440  {
1441  pBuffer->setPosition(_metadataBlock->_offset + 8);
1442  pBuffer->putUint64((amps_uint64_t)0);
1443  _metadataBlock->_sequence = 0;
1444  }
1445  else
1446  {
1447  // Set _maxDiscarded and _lastSequence
1448  _maxDiscarded = _metadataBlock->_sequence;
1449  _lastSequence = _maxDiscarded;
1450  }
1451  // Write the current client version
1452  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1453  // No next in chain
1454  pBuffer->putUint64((amps_uint64_t)0);
1455  // No checks currently
1456  location += getBlockSize();
1457  amps_uint32_t freeCount = 0;
1458  Block* firstFree = NULL;
1459  Block* endOfFreeList = NULL;
1460  size_t blockSize = getBlockSize();
1461  size_t numBlocks = size / blockSize;
1462  size_t blockNum = 0;
1463  // Used to create used list in order after recovery
1464  typedef std::map<amps_uint64_t, Block*> RecoverMap;
1465  RecoverMap recoveredBlocks;
1466  RecoverMap growingBlocks;
1467  amps_uint32_t growthBlocksNeeded = 0;
1468  while (location < size)
1469  {
1470  // Get seq and check if non-zero
1471  pBuffer->setPosition(location);
1472  BlockHeader blockHeader;
1473  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1474  size_t blockCount = (size_t)blockHeader._blocksToWrite;
1475  if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1476  && blockHeader._totalRemaining < size
1477  && blockHeader._blocksToWrite < numBlocks
1478  && (blockHeader._blocksToWrite * blockSize)
1479  >= blockHeader._totalRemaining)
1480  {
1481  size_t oldFormatSize = blockHeader._totalRemaining;
1482  // Old format total was storage bytes plus 64 bytes for block
1483  // and chain headers.
1484  blockHeader._totalRemaining -= 64;
1485  // New format counts only chain header size
1486  blockHeader._totalRemaining += getBlockChainHeaderSize();
1487  // Get the rest of the header
1488  BlockChainHeader chainHeader;
1489  // Need to reset location to after OLD header:
1490  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1491  // amps_uint64_t seq, amps_uint64_t crc
1492  pBuffer->setPosition(location + (sizeof(amps_uint32_t) * 2)
1493  + (sizeof(amps_uint64_t) * 2) );
1494  // Read old chain header which uses same order, but not
1495  // as many bytes (everything is 32bit):
1496  // operation, commandIdLen, correlationIdLen,
1497  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1498  pBuffer->copyBytes((char*)&chainHeader,
1499  sizeof(amps_uint32_t) * 8);
1500  // Check for garbage, likely indicating this is part of a chain
1501  if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1502  + chainHeader._expirationLen + chainHeader._sowKeyLen
1503  + chainHeader._topicLen) > blockHeader._totalRemaining)
1504  {
1505  // Skip this block, can't be real data
1506  location += getBlockSize();
1507  continue;
1508  }
1509  // Check if data fits in current number of blocks
1510  amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1511  / getBlockDataSize())
1512  + (blockHeader._totalRemaining
1513  % getBlockDataSize()
1514  ? 1 : 0);
1515  if (blocksNeeded == blockHeader._blocksToWrite)
1516  {
1517  Block* first = blocks[++blockNum].setOffset(location);
1518  first->_nextInList = 0;
1519  first->_sequence = blockHeader._seq;
1520  if (blockHeader._blocksToWrite > 1)
1521  {
1522  // CRC is only set on the first block
1523  amps_uint64_t crcVal = blockHeader._crcVal;
1524  blockHeader._crcVal = 0;
1525  Block* current = 0;
1526  // It fits, just need to adjust the block formats
1527  // and set up the chain. Start with the last block
1528  // and move data as needed starting at the end.
1529  size_t currentBlockNum = blockNum
1530  + blockHeader._blocksToWrite
1531  - 1;
1532  // Last item could wrap to beginning, but beginning is
1533  // block 1, not 0, which is the metadata block.
1534  if (currentBlockNum >= numBlocks)
1535  {
1536  currentBlockNum = currentBlockNum - numBlocks + 1;
1537  }
1538  if (currentBlockNum < blockNum)
1539  {
1540  Block* last = blocks[currentBlockNum]
1541  .init(currentBlockNum, getBlockSize());
1542  if ((current = firstFree) == last)
1543  {
1544  firstFree = firstFree->_nextInList; // -V522
1545  if (!firstFree)
1546  {
1547  endOfFreeList = 0;
1548  }
1549  --freeCount;
1550  }
1551  else
1552  {
1553  while (current)
1554  {
1555  if (current->_nextInList == last)
1556  {
1557  current->_nextInList = last->_nextInList;
1558  current = last;
1559  --freeCount;
1560  break;
1561  }
1562  current = current->_nextInList;
1563  }
1564  }
1565  }
1566  if (!current)
1567  {
1568  current = blocks[currentBlockNum]
1569  .init(currentBlockNum, getBlockSize());
1570  }
1571  // Initially, the number of bytes in last block
1572  size_t dataBytes = oldFormatSize % getBlockSize();
1573  while (current != first)
1574  {
1575  current->_nextInList = 0;
1576  current->_sequence = blockHeader._seq;
1577  // Set _nextInChain on previous Block, will include first
1578  if (--currentBlockNum < 1
1579  || currentBlockNum > numBlocks)
1580  {
1581  currentBlockNum = numBlocks - 1;
1582  }
1583  Block* previous = blocks[currentBlockNum]
1584  .init(currentBlockNum,
1585  getBlockSize());
1586  previous->_nextInChain = current;
1587  // Shift to make room for a header in every block
1588  // Not growing, so this won't write past the end.
1589  // Shift amount accounts for a header added to each
1590  // block after the first plus any change in the
1591  // chain header size from 32, which is the old size.
1592  size_t bytesToMove = --blockCount
1593  * getBlockHeaderSize()
1595  - 32);
1596  pBuffer->copyBytes(current->_offset + bytesToMove,
1597  current->_offset,
1598  dataBytes);
1599  dataBytes = getBlockSize();
1600  if (bytesToMove > getBlockHeaderSize())
1601  {
1602  bytesToMove -= getBlockHeaderSize();
1603  dataBytes -= bytesToMove;
1604  pBuffer->copyBytes(current->_offset
1605  + getBlockHeaderSize(),
1606  previous->_offset
1607  + dataBytes,
1608  bytesToMove);
1609  }
1610  // Set next in chain for this block's header
1611  blockHeader._nextInChain = (current->_nextInChain
1612  ? current->_nextInChain->_offset
1613  : (amps_uint64_t)0);
1614  // Write the header for this block
1615  pBuffer->setPosition(current->_offset);
1616  pBuffer->putBytes((const char*)&blockHeader,
1617  sizeof(BlockHeader));
1618  if (firstFree == previous)
1619  {
1620  firstFree = firstFree->_nextInList;
1621  if (!firstFree)
1622  {
1623  endOfFreeList = 0;
1624  }
1625  --freeCount;
1626  }
1627  else
1628  {
1629  current = firstFree;
1630  while (current)
1631  {
1632  if (current->_nextInList == previous)
1633  {
1634  current->_nextInList = previous->_nextInList;
1635  --freeCount;
1636  break;
1637  }
1638  current = current->_nextInList;
1639  }
1640  }
1641  current = previous;
1642  }
1643  blockNum += blockHeader._blocksToWrite - 1;
1644  blockHeader._crcVal = crcVal;
1645  }
1646  // Move bytes for chain header expansion from 32 bytes
1647  size_t bytesToMove = getBlockDataSize() - 32
1648  - (getBlockChainHeaderSize() - 32);
1649  pBuffer->copyBytes(first->_offset + getBlockHeaderSize()
1651  first->_offset + getBlockHeaderSize() + 32,
1652  bytesToMove);
1653  // Rewrite the header and chain header for first Block.
1654  pBuffer->setPosition(first->_offset);
1655  blockHeader._nextInChain = (first->_nextInChain
1656  ? first->_nextInChain->_offset
1657  : (amps_uint64_t)0);
1658  pBuffer->putBytes((const char*)&blockHeader,
1659  sizeof(BlockHeader));
1660  pBuffer->putBytes((const char*)&chainHeader,
1661  sizeof(BlockChainHeader));
1662  // Add first Block to recovered for building the used
1663  // list later
1664  recoveredBlocks[blockHeader._seq] = first;
1665  }
1666  else
1667  {
1668  // This will need at least one more Block due to a header in
1669  // every Block. Check how many and save for later.
1670  growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1671  growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1672  blockNum += blockHeader._blocksToWrite - 1;
1673  }
1674  // Track min and max
1675  if (maxIdx < blockHeader._seq)
1676  {
1677  maxIdx = blockHeader._seq;
1678  }
1679  if (minIdx > blockHeader._seq)
1680  {
1681  minIdx = blockHeader._seq;
1682  }
1683  // Advance past read blocks
1684  location += blockHeader._blocksToWrite * getBlockSize();
1685  // Either we're exiting loop, or blockNum is in range
1686  assert(location >= size || blockNum < numBlocks);
1687  }
1688  else
1689  {
1690  // Put this Block on the free list
1691  Block* block = blocks[++blockNum].setOffset(location);
1692  if (endOfFreeList)
1693  {
1694  endOfFreeList->_nextInList = block;
1695  }
1696  else
1697  {
1698  firstFree = block;
1699  }
1700  endOfFreeList = block;
1701  ++freeCount;
1702  location += blockSize;
1703  }
1704  }
1705  for (RecoverMap::iterator i = growingBlocks.begin();
1706  i != growingBlocks.end(); ++i)
1707  {
1708  Block* first = i->second;
1709  pBuffer->setPosition(first->_offset);
1710  BlockHeader blockHeader;
1711  // Read an old BlockHeader, which is only 24 bytes.
1712  // The bytes match current BlockHeader, and _nextInChain is 0.
1713  pBuffer->copyBytes((char*)&blockHeader, 24);
1714  // Old format total was storage bytes plus 64 bytes for block
1715  // and chain headers.
1716  blockHeader._totalRemaining -= 64;
1717  // New format counts only chain header size
1718  blockHeader._totalRemaining += getBlockChainHeaderSize();
1719  if (freeCount < growthBlocksNeeded)
1720  {
1721  // We have to resize, let's try to do it once.
1722  amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1723  amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1724  if (growthBlocks < minBlocksRequired)
1725  {
1726  amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1727  if (minBlocksRequired % defaultBlocks)
1728  minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1729  * defaultBlocks;
1730  growthBlocks = minBlocksRequired;
1731  }
1732  amps_uint32_t newBlocks = 0;
1733  Block* addedBlocks = _blockStore.resizeBuffer(
1734  pBuffer->getSize()
1735  + growthBlocks * blockSize,
1736  &newBlocks);
1737  if (!addedBlocks)
1738  {
1739  throw StoreException("Failed to grow store buffer during recovery");
1740  }
1741  _blockStore.addBlocks(addedBlocks);
1742  freeCount += newBlocks;
1743  growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1744  ? growthBlocksNeeded - freeCount : 0;
1745  if (endOfFreeList)
1746  {
1747  endOfFreeList->_nextInList = addedBlocks;
1748  }
1749  else
1750  {
1751  firstFree = addedBlocks;
1752  }
1753  endOfFreeList = &(addedBlocks[newBlocks - 1]);
1754  endOfFreeList->_nextInList = 0;
1755  }
1756  expandBlocks(blocks, first->_offset, first, blockHeader,
1757  &firstFree, &freeCount, pBuffer);
1758  // Add first Block to recovered for building the used list later
1759  recoveredBlocks[blockHeader._seq] = first;
1760  if (!firstFree)
1761  {
1762  endOfFreeList = 0;
1763  }
1764  }
1765  if (endOfFreeList)
1766  {
1767  endOfFreeList->_nextInList = 0;
1768  }
1769  _blockStore.setFreeList(firstFree, freeCount);
1770  if (maxIdx > _lastSequence)
1771  {
1772  _lastSequence = maxIdx;
1773  }
1774  if (minIdx > _maxDiscarded + 1)
1775  {
1776  _maxDiscarded = minIdx - 1;
1777  }
1778  if (_maxDiscarded > _metadataBlock->_sequence)
1779  {
1780  _metadataBlock->_sequence = _maxDiscarded;
1781  pBuffer->setPosition(_metadataBlock->_offset + 8);
1782  pBuffer->putUint64(_maxDiscarded);
1783  }
1784  Block* end = NULL;
1785  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1786  for (RecoverMap::iterator i = recoveredBlocks.begin();
1787  i != recoveredBlocks.end(); ++i)
1788  {
1789  if (_blockStore.front())
1790  {
1791  end->_nextInList = i->second; // -V522
1792  }
1793  else
1794  {
1795  _blockStore.setUsedList(i->second);
1796  }
1797  end = i->second;
1798  }
1799  if (end)
1800  {
1801  end->_nextInList = 0;
1802  }
1803  _blockStore.setEndOfUsedList(end);
1804  }
1805 
1806  // For recovering an old format store to current format when more Blocks
1807  // are needed with the new format.
1808  void expandBlocks(Block* blocks_, size_t location_, Block* first_,
1809  BlockHeader blockHeader_,
1810  Block** pFreeList_, amps_uint32_t* pFreeCount_,
1811  Buffer* pBuffer_)
1812  {
1813  // First create the chain, then we'll fill in reverse
1814  Block* current = first_;
1815  // Old format total was storage bytes plus 64 bytes for block
1816  // and chain headers.
1817  amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1818  blockHeader_._totalRemaining -= 64;
1819  // New format counts only chain header size
1820  blockHeader_._totalRemaining += getBlockChainHeaderSize();
1821  // Check how many Blocks needed and if we have enough free.
1822  amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1823  / getBlockDataSize()
1824  + (blockHeader_._totalRemaining
1825  % getBlockDataSize()
1826  ? 1 : 0);
1827  // Last data block size, remove bytes saved in first block
1828  // then mod by block size.
1829  const amps_uint32_t blockSize = getBlockSize();
1830  // Old total remaining had all header included
1831  size_t endBlockSize = oldTotalRemaining % blockSize;
1832  if (!endBlockSize)
1833  {
1834  endBlockSize = blockSize;
1835  }
1836  size_t endOfData = 0;
1837  // Hang on to CRC until first block is written
1838  amps_uint64_t crcVal = blockHeader_._crcVal;
1839  blockHeader_._crcVal = 0;
1840 
1841  std::stack<Block*> blocksUsed;
1842  for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1843  {
1844  blocksUsed.push(current);
1845  current->_sequence = blockHeader_._seq;
1846  if (i >= blockHeader_._blocksToWrite)
1847  {
1848  if (i == blockHeader_._blocksToWrite)
1849  {
1850  endOfData = current->_offset + endBlockSize;
1851  }
1852  current->_nextInChain = *pFreeList_;
1853  --(*pFreeCount_);
1854  *pFreeList_ = (*pFreeList_)->_nextInList;
1855  }
1856  else
1857  {
1858  current->_nextInChain = current->_nextInList;
1859  if (current->_nextInChain)
1860  {
1861  if (current->_offset + blockSize < pBuffer_->getSize())
1862  {
1863  current->_nextInChain->setOffset(current->_offset
1864  + blockSize);
1865  }
1866  else
1867  {
1868  current->_nextInChain->setOffset(blockSize);
1869  }
1870  }
1871  else
1872  {
1873  current->_nextInChain = blocks_[1].init(1, blockSize);
1874  }
1875  if (current->_nextInChain == *pFreeList_)
1876  {
1877  *pFreeList_ = (*pFreeList_)->_nextInList;
1878  --(*pFreeCount_);
1879  }
1880  else
1881  {
1882  for (Block* free = *pFreeList_; free;
1883  free = free->_nextInList)
1884  {
1885  if (free->_nextInList == current->_nextInChain)
1886  {
1887  free->_nextInList = free->_nextInList->_nextInList;
1888  --(*pFreeCount_);
1889  break;
1890  }
1891  }
1892  }
1893  }
1894  current->_nextInList = 0;
1895  current = current->_nextInChain;
1896  }
1897  // Make sure we write the correct number of blocks to write
1898  blockHeader_._blocksToWrite = blocksNeeded;
1899  // Finish setting up current
1900  current->_nextInList = 0;
1901  current->_sequence = blockHeader_._seq;
1902  // Now shift data, starting at the last Block
1903  // The total shift is for number of Blocks beyond the first
1904  // times Block header size, since previous format only wrote
1905  // the header in the first Block and had contiguous data,
1906  // with only wrap from end to beginning of buffer possible.
1907 
1908  // First time through, this is bytes in last block. After,
1909  // it will be block data size.
1910  size_t dataBytes = blockHeader_._totalRemaining % getBlockDataSize();
1911  while (current != first_)
1912  {
1913  size_t chunkBytesAvail = endOfData > location_
1914  ? endOfData - location_
1915  : endOfData - 2048;
1916  if (chunkBytesAvail < dataBytes)
1917  {
1918  // Original was wrapped from end to start of buffer
1919  // Need to copy what's left at start to end of Block,
1920  // then start working from the end.
1921  // This can ONLY occur during wrap because the first
1922  // Block doesn't get moved in this loop.
1923  pBuffer_->copyBytes(current->_offset
1924  + getBlockSize()
1925  - chunkBytesAvail,
1926  getBlockSize(),
1927  chunkBytesAvail);
1928  chunkBytesAvail = dataBytes - chunkBytesAvail;
1929  endOfData = pBuffer_->getSize() - chunkBytesAvail;
1930  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1931  endOfData,
1932  chunkBytesAvail);
1933  }
1934  else
1935  {
1936  endOfData -= dataBytes;
1937  pBuffer_->copyBytes(current->_offset + getBlockHeaderSize(),
1938  endOfData,
1939  dataBytes);
1940  }
1941  // Set next in chain in block header
1942  blockHeader_._nextInChain = (current->_nextInChain
1943  ? current->_nextInChain->_offset
1944  : (amps_uint64_t)0);
1945  // Write the header for this block
1946  pBuffer_->setPosition(current->_offset);
1947  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1948  current = blocksUsed.top();
1949  blocksUsed.pop();
1950  dataBytes = getBlockDataSize();
1951  }
1952  // Move bytes for chain header expansion from 32 bytes
1953  pBuffer_->copyBytes(first_->_offset
1954  + getBlockHeaderSize()
1956  first_->_offset + getBlockHeaderSize() + 32,
1958  // Set the CRC to indicate first block and set nextInChain
1959  blockHeader_._crcVal = crcVal;
1960  blockHeader_._nextInChain = first_->_nextInChain->_offset;
1961  // Need to reset location to after OLD header:
1962  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1963  // amps_uint64_t seq, amps_uint64_t crc
1964  pBuffer_->setPosition(location_ + (sizeof(amps_uint32_t) * 2)
1965  + (sizeof(amps_uint64_t) * 2) );
1966  // Read old chain header which uses same order, but not
1967  // as many bytes (everything is 32bit):
1968  // operation, commandIdLen, correlationIdLen,
1969  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1970  BlockChainHeader chainHeader;
1971  pBuffer_->copyBytes((char*)&chainHeader,
1972  sizeof(amps_uint32_t) * 8);
1973  // Rewrite the header and chain header for first Block.
1974  pBuffer_->setPosition(location_);
1975  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1976  pBuffer_->putBytes((const char*)&chainHeader, sizeof(BlockChainHeader));
1977  }
1978 
1979  void chooseCRC(bool isFile)
1980  {
1981  if (!isFile)
1982  {
1983  _crc = noOpCRC;
1984  return;
1985  }
1986 
1987 #ifndef AMPS_SSE_42
1988  _crc = AMPS::CRC<0>::crcNoSSE;
1989 #else
1990  if (AMPS::CRC<0>::isSSE42Enabled())
1991  {
1992  _crc = AMPS::CRC<0>::crc;
1993  }
1994  else
1995  {
1996  _crc = AMPS::CRC<0>::crcNoSSE;
1997  }
1998 #endif
1999  }
2000 
2001  static amps_uint64_t noOpCRC(const char*, size_t, amps_uint64_t)
2002  {
2003  return 0;
2004  }
2005 
2006  protected:
2007  mutable BlockStore _blockStore;
2008  private:
2009  // Block used to hold metadata, currently:
2010  // the last persisted
2011  Block* _metadataBlock;
2012  // Highest sequence that has been discarded
2013  amps_uint64_t _maxDiscarded;
2014  // Track the assigned sequence numbers
2015  volatile amps_uint64_t _lastSequence;
2016  // Track how many messages are stored
2017  ATOMIC_TYPE _stored;
2018 
2019  // Message used for doing replay
2020  Message _message;
2021 
2022  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
2023 
2024  // Function used to calculate the CRC if one is used
2025  CRCFunction _crc;
2026 
2027  };
2028 
2029 }
2030 
2031 #endif
2032 
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:927
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1250
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:1174
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1248
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:899
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:642
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:547
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1252
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1262
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1140
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:579
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:262
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:697
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:628
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1013
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1371
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:237
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1251
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:196
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1395
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1370
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1106
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:685
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:989
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:492
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.