AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
BlockPublishStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKPUBLISHSTORE_H_
27 #define _BLOCKPUBLISHSTORE_H_
28 #include <ampsplusplus.hpp>
29 #include <BlockStore.hpp>
30 #include <Buffer.hpp>
31 #include <sstream>
32 #include <string>
33 #include <map>
34 #include <ampscrc.hpp>
35 
36 #ifdef _WIN32
37 #include <intrin.h>
38 #include <sys/timeb.h>
39 #else
40 #include <sys/time.h>
41 #if AMPS_SSE_42
42 #include <cpuid.h>
43 #endif
44 #endif
45 #include <iostream>
46 
51 
52 namespace AMPS
53 {
60 {
61 public:
62  typedef BlockStore::Block Block;
63  typedef Lock<BlockStore> BufferLock;
64 
65  typedef enum
66  {
67  SOW_DELETE_DATA=0x01,
68  SOW_DELETE_FILTER=0x02,
69  SOW_DELETE_KEYS=0x04,
70  SOW_DELETE_BOOKMARK=0x08,
71  SOW_DELETE_BOOKMARK_CANCEL=0x10,
72  SOW_DELETE_UNKNOWN=0x80
73  } SowDeleteType;
74 
77  enum Constants : amps_uint32_t
78  {
79  DEFAULT_BLOCK_HEADER_SIZE = 32,
80  DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
81  DEFAULT_BLOCKS_PER_REALLOC = 1000,
82  DEFAULT_BLOCK_SIZE = 2048
83  };
84 
85  /**********************************************************************
86  * Storage format
87  *************************************************************************
88  * Field Description | Type | # Bytes
89  *************************************************************************
90  * HEADER as detailed below | | 32 TOTAL
91  * | |
92  * Total number of blocks used by the record | uint32_t | 4
93  * Total length of the saved record | uint32_t | 4
94  * HA Message sequence | uint64_t | 8
95  * CRC Value - only set in first Block | uint64_t | 8
96  * next in chain offset | uint64_t | 8
97  *************************************************************************
98  * CHAIN HEADER as detailed below | | 64 TOTAL
99  * | |
100  * operation | uint32_t | 4
101  * command id length | uint32_t | 4
102  * correltation id length | uint32_t | 4
103  * expiration length | uint32_t | 4
104  * sow key length | uint32_t | 4
105  * topic length | uint32_t | 4
106  * sow delete flag | int32_t | 4
107  * ack types | uint32_t | 4
108  * unused [8] | uint32_t | 4*8 = 32
109  *************************************************************************
110  * DATA SECTION - can be spread across multiple blocks
111  *
112  * command id | char[]
113  * correlation id | char[]
114  * expiration | char[]
115  * sow key | char[]
116  * topic | char[]
117  * data | char[]
118  *************************************************************************/
119 
120  struct BlockHeader
121  {
122  amps_uint32_t _blocksToWrite;
123  amps_uint32_t _totalRemaining;
124  amps_uint64_t _seq;
125  amps_uint64_t _crcVal;
126  amps_uint64_t _nextInChain;
127  };
128 
129  struct BlockChainHeader
130  {
131  amps_uint32_t _operation;
132  amps_uint32_t _commandIdLen;
133  amps_uint32_t _correlationIdLen;
134  amps_uint32_t _expirationLen;
135  amps_uint32_t _sowKeyLen;
136  amps_uint32_t _topicLen;
137  amps_int32_t _flag;
138  amps_uint32_t _ackTypes;
139  amps_uint32_t _unused[8];
140  BlockChainHeader()
141  : _operation(0), _commandIdLen(0), _correlationIdLen(0)
142  , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
143  , _ackTypes(0)
144  { ; }
145  };
146 
151  static inline amps_uint32_t getBlockHeaderSize()
152  {
153  return DEFAULT_BLOCK_HEADER_SIZE;
154  }
155 
161  static inline amps_uint32_t getBlockChainHeaderSize()
162  {
163  return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
164  }
165 
169  inline amps_uint32_t getBlockSize()
170  {
171  return _blockStore.getBlockSize();
172  }
173 
177  inline amps_uint32_t getBlockDataSize()
178  {
179  return _blockStore.getBlockSize() - getBlockHeaderSize();
180  }
181 
195  amps_uint32_t blocksPerRealloc_ = 1000,
196  bool isFile_ = false,
197  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
198  : StoreImpl()
199  , _blockStore(buffer_, blocksPerRealloc_,
200  DEFAULT_BLOCK_HEADER_SIZE,
201  (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE*2
202  ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE*2))
203  , _metadataBlock(0)
204  , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
205  , _stored(0)
206  {
207  _blockStore.setResizeHandler(&BlockPublishStore::canResize, (void*)this);
208  chooseCRC(isFile_);
209  if (!isFile_)
210  {
211  // This gets set in recover in file-based stores
212  BufferLock bufferGuard(_blockStore);
213  _blockStore.init();
214  _metadataBlock = _blockStore.get(1);
215  // Remove metadata block from used list
216  _blockStore.setUsedList(0);
217  _blockStore.setEndOfUsedList(0);
218  // Metadata block holds block size, block header size,
219  // last discarded sequence, client version
220  _metadataBlock->_sequence = (amps_uint64_t)0;
221  Buffer* pBuffer = _blockStore.getBuffer();
222  pBuffer->setPosition(_metadataBlock->_offset);
223  pBuffer->putUint32((amps_uint32_t)getBlockSize());
224  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
225  pBuffer->putUint64((amps_uint64_t)0);
226  // Metadata blocks puts client version in CRC position
227  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
228  // No next in chain
229  pBuffer->putUint64((amps_uint64_t)0);
230  }
231  }
232 
236  {
237  }
238 
245  virtual amps_uint64_t store(const Message& message_)
246  {
247  return store(message_, true);
248  }
249 
260  amps_uint64_t store(const Message& message_, bool assignSequence_)
261  {
262  const char *commandId, *correlationId, *expiration, *sowKey,
263  *topic, *data;
264  size_t dataLen = 0;
265  BlockHeader blockHeader;
266  BlockChainHeader chainHeader;
267  message_.getRawCommandId(&commandId, &dataLen);
268  chainHeader._commandIdLen = (amps_uint32_t)dataLen;
269  message_.getRawCommandId(&commandId, &dataLen);
270  chainHeader._commandIdLen = (amps_uint32_t)dataLen;
271  message_.getRawCorrelationId(&correlationId, &dataLen);
272  chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
273  message_.getRawExpiration(&expiration, &dataLen);
274  chainHeader._expirationLen = (amps_uint32_t)dataLen;
275  message_.getRawSowKey(&sowKey, &dataLen);
276  chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
277  message_.getRawTopic(&topic, &dataLen);
278  chainHeader._topicLen = (amps_uint32_t)dataLen;
279  message_.getRawData(&data, &dataLen);
280  chainHeader._flag = -1;
281  Message::Command::Type operation = message_.getCommandEnum();
282  chainHeader._operation = (amps_uint32_t)operation;
283  if (operation == Message::Command::SOWDelete)
284  {
285  if (dataLen > 0)
286  {
287  chainHeader._flag = SOW_DELETE_DATA;
288  }
289  else
290  {
291  message_.getRawFilter(&data, &dataLen);
292  if (dataLen > 0)
293  {
294  chainHeader._flag = SOW_DELETE_FILTER;
295  }
296  else
297  {
298  message_.getRawSowKeys(&data, &dataLen);
299  if (dataLen > 0)
300  {
301  chainHeader._flag = SOW_DELETE_KEYS;
302  }
303  else
304  {
305  message_.getRawBookmark(&data, &dataLen);
306  chainHeader._flag = SOW_DELETE_BOOKMARK;
307  // Check options for cancel
308  Message::Field options = message_.getOptions();
309  size_t remaining = options.len();
310  const void* next = NULL;
311  const void* start = (const void*)(options.data());
312  // Not necessarily null-terminated so no strstr
313  while (remaining >= 6 &&
314  (next = memchr(start,(int)'c',remaining)) != NULL)
315  {
316  remaining = (size_t)next-(size_t)start;
317  if (remaining >= 6 && strncmp((const char*)start,
318  "cancel", 6) == 0)
319  {
320  chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
321  break;
322  }
323  }
324  }
325  }
326  }
327  }
328  blockHeader._totalRemaining = (
329  (chainHeader._operation == Message::Command::Unknown)
330  ? 0
332  + chainHeader._commandIdLen
333  + chainHeader._correlationIdLen
334  + chainHeader._expirationLen
335  + chainHeader._sowKeyLen
336  + chainHeader._topicLen
337  + (amps_uint32_t)dataLen));
338  size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
339  (blockHeader._totalRemaining % getBlockDataSize()));
340  blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
341  ? 1
342  : ((amps_uint32_t)(blockHeader._totalRemaining
343  / getBlockDataSize())
344  + ((lastBlockLength > 0) ? 1 : 0)));
345  blockHeader._crcVal = (amps_uint64_t)0ULL;
346  blockHeader._crcVal = _crc(commandId,
347  chainHeader._commandIdLen,
348  blockHeader._crcVal);
349  blockHeader._crcVal = _crc(correlationId,
350  chainHeader._correlationIdLen,
351  blockHeader._crcVal);
352  blockHeader._crcVal = _crc(expiration,
353  chainHeader._expirationLen,
354  blockHeader._crcVal);
355  blockHeader._crcVal = _crc(sowKey,
356  chainHeader._sowKeyLen,
357  blockHeader._crcVal);
358  blockHeader._crcVal = _crc(topic,
359  chainHeader._topicLen,
360  blockHeader._crcVal);
361  blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
362 
363  // Reserve slots for storage, growing if necessary
364  BufferLock bufferGuard(_blockStore);
365  Block* first = _blockStore.get(blockHeader._blocksToWrite);
366  if (assignSequence_)
367  {
368  if (_lastSequence <= 2)
369  {
370  _getLastPersisted();
371  }
372  blockHeader._seq = ++_lastSequence;
373  }
374  else
375  {
376  blockHeader._seq = amps_message_get_field_uint64(message_.getMessage(),
377  AMPS_Sequence);
378  if (!_maxDiscarded)
379  {
380  _maxDiscarded = blockHeader._seq - 1;
381  }
382  if (blockHeader._seq >= _lastSequence)
383  {
384  _lastSequence = blockHeader._seq;
385  }
386  }
387 
388  try
389  {
390  size_t topicWritten = 0UL;
391  size_t dataWritten = 0UL;
392  size_t commandWritten = 0UL;
393  size_t correlationWritten = 0UL;
394  size_t expirationWritten = 0UL;
395  size_t sowKeyWritten = 0UL;
396  Buffer* pBuffer = _blockStore.getBuffer();
397  for(Block* next = first; next; next = next->_nextInChain)
398  {
399  next->_sequence = blockHeader._seq;
400  if (next->_nextInChain)
401  blockHeader._nextInChain = next->_nextInChain->_offset;
402  else
403  blockHeader._nextInChain = (amps_uint64_t)0;
404  // Set buffer to start of Block and write the header
405  pBuffer->setPosition(next->_offset);
406  pBuffer->putBytes((const char*)&blockHeader, sizeof(BlockHeader));
407  // Clear crcVal, as it's only written in the first Block
408  blockHeader._crcVal = (amps_uint64_t)0;
409  size_t bytesRemaining = getBlockDataSize();
410  if (next == first)
411  {
412  // Write Block chain header
413  chainHeader._ackTypes = (amps_uint32_t)message_.getAckTypeEnum();
414  pBuffer->putBytes((const char*)&chainHeader,
415  sizeof(BlockChainHeader));
416  pBuffer->setPosition(next->_offset+getBlockHeaderSize()+getBlockChainHeaderSize());
417  bytesRemaining -= getBlockChainHeaderSize();
418  }
419  else
420  {
421  pBuffer->setPosition(next->_offset+getBlockHeaderSize());
422  }
423 
424  if(commandWritten < chainHeader._commandIdLen)
425  {
426  size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
427  pBuffer->putBytes(commandId + commandWritten,
428  commandWrite);
429  bytesRemaining -= commandWrite;
430  commandWritten += commandWrite;
431  }
432  if(correlationWritten < chainHeader._correlationIdLen)
433  {
434  size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
435  pBuffer->putBytes(correlationId + correlationWritten,
436  correlationWrite);
437  bytesRemaining -= correlationWrite;
438  correlationWritten += correlationWrite;
439  }
440  if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
441  {
442  size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
443  pBuffer->putBytes(expiration + expirationWritten, expWrite);
444  bytesRemaining -= expWrite;
445  expirationWritten += expWrite;
446  }
447  if(bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
448  {
449  size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
450  pBuffer->putBytes(sowKey + sowKeyWritten, sowKeyWrite);
451  bytesRemaining -= sowKeyWrite;
452  sowKeyWritten += sowKeyWrite;
453  }
454  if(bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
455  {
456  size_t topicWrite = (chainHeader._topicLen - topicWritten
457  < bytesRemaining)
458  ? chainHeader._topicLen - topicWritten
459  : bytesRemaining;
460  pBuffer->putBytes(topic + topicWritten, topicWrite);
461  bytesRemaining -= topicWrite;
462  topicWritten += topicWrite;
463  }
464  if(bytesRemaining > 0 && dataWritten < dataLen)
465  {
466  size_t dataWrite = (dataLen-dataWritten < bytesRemaining) ?
467  dataLen-dataWritten : bytesRemaining;
468  pBuffer->putBytes(data + dataWritten, dataWrite);
469  bytesRemaining -= dataWrite;
470  dataWritten += dataWrite;
471  }
472  }
473  }
474  catch (const AMPSException& ex)
475  {
476  _blockStore.put(first);
477  throw ex;
478  }
479  AMPS_FETCH_ADD(&_stored, 1);
480  return blockHeader._seq;
481  }
482 
487  virtual void discardUpTo(amps_uint64_t index_)
488  {
489  // Get the lock
490  BufferLock bufferGuard(_blockStore);
491  Buffer* pBuffer = _blockStore.getBuffer();
492  // Don't use _getLastPersisted() here, don't want to set it
493  // to something other than index_ if it's not already set
494  amps_uint64_t lastPersisted = _metadataBlock->_sequence;
495  // Make sure it's a real index and we have messages to discard
496  if(index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
497  {
498  // During logon it's very possible we don't have a last persisted
499  // but that the Client is calling discardUpTo with the ack value.
500  if (lastPersisted < index_)
501  {
502  pBuffer->setPosition(_metadataBlock->_offset+8);
503  pBuffer->putUint64(index_);
504  _metadataBlock->_sequence = index_;
505  if (_maxDiscarded < index_)
506  {
507  _maxDiscarded = index_;
508  }
509  if (_lastSequence <= index_)
510  {
511  _lastSequence = index_;
512  }
513  }
514  else if (!index_) // Fresh logon, no sequence history
515  {
516  _getLastPersisted();
517  }
518  _blockStore.signalAll();
519  return;
520  }
521  if (_maxDiscarded < index_)
522  {
523  _maxDiscarded = index_;
524  }
525  AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
526  _blockStore.signalAll();
527  if (lastPersisted >= index_)
528  {
529  return;
530  }
531  pBuffer->setPosition(_metadataBlock->_offset+8);
532  pBuffer->putUint64(index_);
533  _metadataBlock->_sequence = index_;
534  if (_lastSequence < index_)
535  {
536  _lastSequence = index_;
537  }
538  }
539 
544  void replay(StoreReplayer& replayer_)
545  {
546  // Get the lock
547  BufferLock bufferGuard(_blockStore);
548  // If we don't have anything yet, return
549  if(!_blockStore.front()) return;
550  Block* next = _blockStore.front();
551  try
552  {
553  for (Block* block = _blockStore.front(); block; block = next)
554  {
555  // Replay the message
556  replayOnto(block, replayer_);
557  next = block->_nextInList;
558  }
559  }
560  catch (const StoreException& e)
561  {
562  _blockStore.putAll(next);
563  throw e;
564  }
565  }
566 
573  bool replaySingle(StoreReplayer& replayer_, amps_uint64_t index_)
574  {
575  BufferLock bufferGuard(_blockStore);
576  // If we don't have anything yet, return
577  if(!_blockStore.front()) return false;
578  // Get the end point
579  amps_uint64_t lastIdx = _blockStore.back()->_sequence;
580  // Get the start point
581  amps_uint64_t leastIdx = _blockStore.front()->_sequence;
582  if(index_>=leastIdx && index_ <=lastIdx)
583  {
584  Block* block = _blockStore.front();
585  while(block && block->_sequence != index_)
586  {
587  block = block->_nextInList;
588  }
589  if (!block)
590  {
591  return false;
592  }
593  // If total bytes is 0, it's a queue ack and gets skipped.
594  Buffer* pBuffer = _blockStore.getBuffer();
595  pBuffer->setPosition(block->_offset +
596  sizeof(amps_uint32_t));
597  if (pBuffer->getUint32() == 0) return false;
598  replayOnto(block, replayer_);
599  return true;
600  }
601  else // Get Store and Client back in sync
602  {
603  _message.reset();
604  if (_blockStore.front()) leastIdx -= 1;
605  else leastIdx = getLastPersisted();
606  _message.setSequence(leastIdx);
607  replayer_.execute(_message);
608  return false;
609  }
610  }
611 
617  size_t unpersistedCount() const
618  {
619  size_t count = (size_t)_stored;
620  return count;
621  }
622 
631  virtual void flush(long timeout_)
632  {
633  BufferLock bufferGuard(_blockStore);
634  amps_uint64_t waitFor = _getHighestUnpersisted();
635  // Check that we aren't already empty
636  if (waitFor == getUnsetSequence()) return;
637  if (timeout_ > 0)
638  {
639  bool timedOut = false;
640  AMPS_START_TIMER(timeout_);
641  // While timeout hasn't expired and we haven't had everything acked
642  while (!timedOut && _stored != 0
643  && waitFor >= _getLowestUnpersisted())
644  {
645  if (!_blockStore.wait(timeout_))
646  {
647  // May have woken up early, check real time
648  AMPS_RESET_TIMER(timedOut, timeout_);
649  }
650  }
651  // If we timed out and still haven't caught up with the acks
652  if (timedOut && _stored != 0
653  && waitFor >= _getLowestUnpersisted())
654  {
655  throw TimedOutException("Timed out waiting to flush publish store.");
656  }
657  }
658  else
659  {
660  while (_stored != 0 && waitFor >= _getLowestUnpersisted())
661  {
662  // Still wake up every 1s so python can interrupt
663  _blockStore.wait(1000);
664  amps_invoke_waiting_function();
665  }
666  }
667  }
668 
669  amps_uint64_t getLowestUnpersisted() const
670  {
671  BufferLock bufferGuard(_blockStore);
672  return _getLowestUnpersisted();
673  }
674 
675  amps_uint64_t getHighestUnpersisted() const
676  {
677  BufferLock bufferGuard(_blockStore);
678  return _getHighestUnpersisted();
679  }
680 
681  amps_uint64_t getLastPersisted(void)
682  {
683  BufferLock bufferGuard(_blockStore);
684  return _getLastPersisted();
685  }
686 
687 protected:
688  static bool canResize(size_t requestedSize_, void* vpThis_)
689  {
690  return ((BlockPublishStore*)vpThis_)->callResizeHandler(requestedSize_);
691  }
692 
693  amps_uint64_t _getLowestUnpersisted() const
694  {
695  // Assume the lock is held
696  // If we don't have anything, return MAX
697  if(!_blockStore.front()) return getUnsetSequence();
698  return _blockStore.front()->_sequence;
699  }
700 
701  amps_uint64_t _getHighestUnpersisted() const
702  {
703  // Assume the lock is held
704  // If we don't have anything, return MAX
705  if(!_blockStore.back()) return getUnsetSequence();
706  return _blockStore.back()->_sequence;
707  }
708 
709  amps_uint64_t _getLastPersisted(void)
710  {
711  // Assume the lock is held
712  amps_uint64_t lastPersisted = (amps_uint64_t)0;
713  Buffer* pBuffer = _blockStore.getBuffer();
714  pBuffer->setPosition(_metadataBlock->_offset+8);
715  lastPersisted = pBuffer->getUint64();
716  if (lastPersisted)
717  {
718  if (_lastSequence < lastPersisted)
719  _lastSequence = lastPersisted;
720  return lastPersisted;
721  }
722  if (_maxDiscarded)
723  {
724  lastPersisted = _maxDiscarded;
725  }
726  else
727  {
728 #ifdef _WIN32
729  struct _timeb t;
730  _ftime_s(&t);
731  lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
732 #else // not _WIN32
733  struct timeval tv;
734  gettimeofday(&tv, NULL);
735  lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
736  * (amps_uint64_t)1000000;
737 #endif
738  }
739  if (_lastSequence > 2)
740  {
741  amps_uint64_t low = _getLowestUnpersisted();
742  amps_uint64_t high = _getHighestUnpersisted();
743  if (low != getUnsetSequence())
744  {
745  lastPersisted = low - 1;
746  }
747  if (high != getUnsetSequence() && _lastSequence <= high)
748  {
749  _lastSequence = high;
750  }
751  if (_lastSequence < lastPersisted)
752  {
753  lastPersisted = _lastSequence - 1;
754  }
755  }
756  else
757  {
758  _lastSequence = lastPersisted;
759  }
760  pBuffer->setPosition(_metadataBlock->_offset
761  + sizeof(amps_uint32_t) // blocks used
762  + sizeof(amps_uint32_t)); // record length
763  pBuffer->putUint64(lastPersisted);
764  _metadataBlock->_sequence = lastPersisted;
765  return lastPersisted;
766  }
767 
768  void recover(void)
769  {
770  BufferLock bufferGuard(_blockStore);
771  // Make sure the size isn't 0 and is a multiple of block size
772  Buffer* pBuffer = _blockStore.getBuffer();
773  size_t size = pBuffer->getSize();
774  amps_uint32_t blockSize = getBlockSize();
775  if(size == 0)
776  {
777  _blockStore.init();
778  _metadataBlock = _blockStore.get(1);
779  _metadataBlock->_sequence = (amps_uint64_t)0;
780  pBuffer->setPosition(_metadataBlock->_offset);
781  // Metadata block holds block size, block header size,
782  // last discarded sequence, client version
783  pBuffer->putUint32((amps_uint32_t)blockSize);
784  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
785  pBuffer->putUint64((amps_uint64_t)0);
786  // Metadata blocks puts client version in CRC position
787  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
788  // No next in chain
789  pBuffer->putUint64((amps_uint64_t)0);
790  return;
791  }
792  size_t numBlocks = size / blockSize;
793  if(size % blockSize > 0)
794  {
795  // We shouldn't ever be in here, since it requires starting with a
796  // file that is not an even multiple of block size and we always
797  // fix the size.
798  numBlocks = size / blockSize;
799  ++numBlocks;
800  amps_uint32_t blockCount = 0;
801  // We allocate all the Blocks at once below so delete allocated Block[]
802  delete[] _blockStore.resizeBuffer(numBlocks*blockSize, &blockCount);
803  // Resize can fail if resizeHandler is set and refuses the request
804  // Since this is recovery, we need to simply fail in that case
805  if (size > pBuffer->getSize() || numBlocks != (size_t)blockCount)
806  {
807  throw StoreException("Publish Store could not resize correctly during recoery, possibly due to resizeHandler refusing the request.");
808  }
809  size = pBuffer->getSize();
810  }
811 
812  amps_uint64_t maxIdx = 0;
813  amps_uint64_t minIdx = 0;
814  size_t location = 0;
815  BlockHeader blockHeader;
816  // The blocks we create here all get their offset set in below loop
817  Block* blocks = new Block[numBlocks];
818  blocks[numBlocks-1]._nextInList = 0;
819  size_t blockNum = 0;
820  _blockStore.addBlocks(blocks);
821  _metadataBlock = blocks; // The first Block is metadata
822  _metadataBlock->_nextInList = 0;
823  pBuffer->setPosition(0);
824  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
825  /* Metadata Block header fields
826  * amps_uint32_t _blocksToWrite = BlockSize
827  * amps_uint32_t _totalRemaining = BlockHeaderSize
828  * amps_uint64_t _seq = last persisted sequence number
829  * amps_uint64_t _crcVal = unused
830  * amps_uint64_t _nextInChain = unused
831  */
832  if (blockHeader._blocksToWrite == 1) // Old store format?
833  {
834  /* Old format metadata block header fields
835  * amps_uint32_t _blocksToWrite = 1
836  * amps_uint32_t _totalRemaining = client version
837  * amps_uint64_t _seq = last persisted sequence number
838  * amps_uint64_t _crcVal = unused
839  * amps_uint64_t _nextInChain = unused
840  */
841  // Readable old format starts with version 5.0.0.0
842  if (blockHeader._totalRemaining >= 5000000)
843  {
844  // All recovery needs to be based on old format
845  // so go do that instead.
846  recoverOldFormat(blocks);
847  return;
848  }
849  // Unreadable format, fail
850  throw StoreException("Unrecognized format for Store. Can't recover.");
851  }
852  if (blockHeader._blocksToWrite == 0)
853  {
854  pBuffer->setPosition(0);
855  pBuffer->putUint32((amps_uint32_t)blockSize);
856  }
857  else
858  {
859  blockSize = blockHeader._blocksToWrite;
860  _blockStore.setBlockSize(blockSize);
861  }
862  if (blockHeader._totalRemaining == 0)
863  {
864  pBuffer->setPosition(sizeof(amps_uint32_t));
865  pBuffer->putUint32((amps_uint32_t)getBlockHeaderSize());
866  }
867  else
868  {
869  _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
870  }
871  _metadataBlock->_sequence = blockHeader._seq;
872  if (_metadataBlock->_sequence
873  && _metadataBlock->_sequence < (amps_uint64_t)1000000)
874  {
875  pBuffer->setPosition(_metadataBlock->_offset
876  + sizeof(amps_uint32_t) // BlockSize
877  + sizeof(amps_uint32_t)); // BlockHeaderSize
878  pBuffer->putUint64((amps_uint64_t)0);
879  _metadataBlock->_sequence = 0;
880  }
881  else
882  {
883  // Set _maxDiscarded and _lastSequence
884  _maxDiscarded = _metadataBlock->_sequence;
885  _lastSequence = _maxDiscarded;
886  }
887  // This would be where to check the client version string
888  // No checks currently
889  location += blockSize;
890  amps_uint32_t freeCount = 0;
891  Block* firstFree = NULL;
892  Block* endOfFreeList = NULL;
893  // Used to create used list in order after recovery
894  typedef std::map<amps_uint64_t, Block*> RecoverMap;
895  RecoverMap recoveredBlocks;
896  while(location < size)
897  {
898  // Get index and check if non-zero
899  pBuffer->setPosition(location);
900  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
901  if((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
902  (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
903  {
904  // Block is part of a chain
905  location += blockSize;
906  continue;
907  }
908  Block* block = blocks[++blockNum].setOffset(location);
909  bool recovered = false;
910  if(blockHeader._seq > 0 && blockHeader._totalRemaining < size)
911  {
912  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
913  block->_sequence = blockHeader._seq;
914  // Track min and max
915  if(maxIdx < blockHeader._seq)
916  {
917  maxIdx = blockHeader._seq;
918  }
919  if(minIdx > blockHeader._seq)
920  {
921  minIdx = blockHeader._seq;
922  }
923  // Save it in recovered blocks
924  recoveredBlocks[blockHeader._seq] = block;
925  // Set up the chain
926  while (blockHeader._nextInChain != (amps_uint64_t)0)
927  {
928  Block* chain = blocks[++blockNum]
929  .setOffset((size_t)blockHeader._nextInChain);
930  chain->_nextInList = 0;
931  pBuffer->setPosition((size_t)blockHeader._nextInChain
932  + sizeof(amps_uint32_t) // blocks used
933  + sizeof(amps_uint32_t) // record length
934  + sizeof(amps_uint64_t) // seq
935  + sizeof(amps_uint64_t)); // crc
936  blockHeader._nextInChain = pBuffer->getUint64();
937  block->_nextInChain = chain;
938  block = chain;
939  block->_sequence = blockHeader._seq;
940  }
941  recovered = true;
942  }
943  if (!recovered)
944  {
945  // Put this Block on the free list
946  if (endOfFreeList)
947  {
948  endOfFreeList->_nextInList = block;
949  }
950  else
951  {
952  firstFree = block;
953  }
954  endOfFreeList = block;
955  ++freeCount;
956  }
957  location += blockSize;
958  }
959  if (endOfFreeList)
960  {
961  endOfFreeList->_nextInList = 0;
962  }
963  _blockStore.setFreeList(firstFree, freeCount);
964  if (maxIdx > _lastSequence)
965  {
966  _lastSequence = maxIdx;
967  }
968  if (minIdx > _maxDiscarded + 1)
969  {
970  _maxDiscarded = minIdx - 1;
971  }
972  if (_maxDiscarded > _metadataBlock->_sequence)
973  {
974  _metadataBlock->_sequence = _maxDiscarded;
975  pBuffer->setPosition(_metadataBlock->_offset+8);
976  pBuffer->putUint64(_maxDiscarded);
977  }
978  Block* end = NULL;
979  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
980  for (RecoverMap::iterator i = recoveredBlocks.begin();
981  i != recoveredBlocks.end(); ++i)
982  {
983  if (end)
984  {
985  end->_nextInList = i->second;
986  }
987  else
988  {
989  _blockStore.setUsedList(i->second);
990  }
991  end = i->second;
992  }
993  if (end)
994  {
995  end->_nextInList = 0;
996  }
997  _blockStore.setEndOfUsedList(end);
998  }
999 
1000 private:
1001  // Lock should already be held
1002  void replayOnto(Block* block_, StoreReplayer& replayer_)
1003  {
1004  // Read the header
1005  size_t start = block_->_offset;
1006  size_t position = start;
1007  Buffer* pBuffer = _blockStore.getBuffer();
1008  pBuffer->setPosition(position);
1009  BlockHeader blockHeader;
1010  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1011  if (blockHeader._totalRemaining == 0)
1012  {
1013  // Queue acking sow_delete
1014  return;
1015  }
1016  position += getBlockHeaderSize();
1017  BlockChainHeader blockChainHeader;
1018  pBuffer->copyBytes((char*)&blockChainHeader, sizeof(blockChainHeader));
1019  if (blockChainHeader._operation == Message::Command::Unknown)
1020  {
1021  // Queue acking sow_delete
1022  return;
1023  }
1024  blockChainHeader._ackTypes |= Message::AckType::Persisted;
1025  position += getBlockChainHeaderSize();
1026  blockHeader._totalRemaining -= (amps_uint32_t)getBlockChainHeaderSize();
1027  pBuffer->setPosition(position);
1028 
1029  if (blockHeader._totalRemaining
1030  < blockChainHeader._commandIdLen
1031  + blockChainHeader._correlationIdLen
1032  + blockChainHeader._expirationLen
1033  + blockChainHeader._sowKeyLen
1034  + blockChainHeader._topicLen)
1035  {
1036  std::ostringstream os;
1037  os << "Corrupted message found with invalid lengths. "
1038  << "Attempting to replay " << block_->_sequence
1039  << ". Block sequence " << blockHeader._seq
1040  << ", topic length " << blockChainHeader._topicLen
1041  << ", data length " << blockHeader._totalRemaining
1042  << ", command ID length " << blockChainHeader._commandIdLen
1043  << ", correlation ID length " << blockChainHeader._correlationIdLen
1044  << ", expiration length " << blockChainHeader._expirationLen
1045  << ", sow key length " << blockChainHeader._sowKeyLen
1046  << ", start " << start
1047  << ", position " << position
1048  << ", buffer size " << pBuffer->getSize();
1049  throw StoreException(os.str());
1050  }
1051 
1052  // Start prepping the message
1053  _message.reset();
1054  _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1055  _message.setAckTypeEnum((unsigned)blockChainHeader._ackTypes
1056  | Message::AckType::Persisted);
1057  _message.setSequence(blockHeader._seq);
1058  // Read the data and calculate the CRC
1059  Block* current = block_;
1060  size_t blockBytesRemaining = getBlockDataSize() - getBlockChainHeaderSize();
1061  amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1062  // Use tmpBuffers for any fields split across Block boundaries
1063  char** tmpBuffers = (blockHeader._blocksToWrite>1) ? new char*[blockHeader._blocksToWrite-1] : 0;
1064  size_t blockNum = 0;
1065  if (blockChainHeader._commandIdLen > 0)
1066  {
1067  if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1068  {
1069  _message.assignCommandId(pBuffer->getBytes(blockChainHeader._commandIdLen)._data,
1070  blockChainHeader._commandIdLen);
1071  blockBytesRemaining -= blockChainHeader._commandIdLen;
1072  }
1073  else
1074  {
1075  tmpBuffers[blockNum] = new char[blockChainHeader._commandIdLen];
1076  size_t totalLeft = blockChainHeader._commandIdLen;
1077  size_t totalRead = 0;
1078  size_t readLen = 0;
1079  while (totalLeft)
1080  {
1081  readLen = blockBytesRemaining < totalLeft ?
1082  blockBytesRemaining : totalLeft;
1083  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1084  if (!(totalLeft -= readLen)) break;
1085  if (!(current = current->_nextInChain)) break;
1086  totalRead += readLen;
1087  blockBytesRemaining = getBlockDataSize();
1088  position = current->_offset + getBlockHeaderSize();
1089  pBuffer->setPosition(position);
1090  }
1091  blockBytesRemaining -= readLen;
1092  _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1093  }
1094  blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1095  crcCalc = _crc(_message.getCommandId().data(),
1096  blockChainHeader._commandIdLen, crcCalc);
1097  }
1098  if (blockChainHeader._correlationIdLen > 0)
1099  {
1100  if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1101  {
1102  _message.assignCorrelationId(
1103  pBuffer->getBytes(blockChainHeader._correlationIdLen)._data,
1104  blockChainHeader._correlationIdLen);
1105  blockBytesRemaining -= blockChainHeader._correlationIdLen;
1106  }
1107  else
1108  {
1109  tmpBuffers[blockNum] = new char[blockChainHeader._correlationIdLen];
1110  size_t totalLeft = blockChainHeader._correlationIdLen;
1111  size_t totalRead = 0;
1112  size_t readLen = 0;
1113  while (totalLeft)
1114  {
1115  readLen = blockBytesRemaining < totalLeft ?
1116  blockBytesRemaining : totalLeft;
1117  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1118  if (!(totalLeft -= readLen)) break;
1119  if (!(current = current->_nextInChain)) break;
1120  totalRead += readLen;
1121  blockBytesRemaining = getBlockDataSize();
1122  position = current->_offset + getBlockHeaderSize();
1123  pBuffer->setPosition(position);
1124  }
1125  blockBytesRemaining -= readLen;
1126  _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1127  }
1128  blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1129  crcCalc = _crc(_message.getCorrelationId().data(),
1130  blockChainHeader._correlationIdLen, crcCalc);
1131  }
1132  if (blockChainHeader._expirationLen > 0)
1133  {
1134  if (blockChainHeader._expirationLen <= blockBytesRemaining)
1135  {
1136  _message.assignExpiration(
1137  pBuffer->getBytes(blockChainHeader._expirationLen)._data,
1138  blockChainHeader._expirationLen);
1139  blockBytesRemaining -= blockChainHeader._expirationLen;
1140  }
1141  else
1142  {
1143  tmpBuffers[blockNum] = new char[blockChainHeader._expirationLen];
1144  size_t totalLeft = blockChainHeader._expirationLen;
1145  size_t totalRead = 0;
1146  size_t readLen = 0;
1147  while (totalLeft)
1148  {
1149  readLen = blockBytesRemaining < totalLeft ?
1150  blockBytesRemaining : totalLeft;
1151  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1152  if (!(totalLeft -= readLen)) break;
1153  if (!(current = current->_nextInChain)) break;
1154  totalRead += readLen;
1155  blockBytesRemaining = getBlockDataSize();
1156  position = current->_offset + getBlockHeaderSize();
1157  pBuffer->setPosition(position);
1158  }
1159  blockBytesRemaining -= readLen;
1160  _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1161  }
1162  blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1163  crcCalc = _crc(_message.getExpiration().data(),
1164  blockChainHeader._expirationLen, crcCalc);
1165  }
1166  if (blockChainHeader._sowKeyLen > 0)
1167  {
1168  if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1169  {
1170  _message.assignSowKey(pBuffer->getBytes(blockChainHeader._sowKeyLen)._data,
1171  blockChainHeader._sowKeyLen);
1172  blockBytesRemaining -= blockChainHeader._sowKeyLen;
1173  }
1174  else
1175  {
1176  tmpBuffers[blockNum] = new char[blockChainHeader._sowKeyLen];
1177  size_t totalLeft = blockChainHeader._sowKeyLen;
1178  size_t totalRead = 0;
1179  size_t readLen = 0;
1180  while (totalLeft)
1181  {
1182  readLen = blockBytesRemaining < totalLeft ?
1183  blockBytesRemaining : totalLeft;
1184  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1185  if (!(totalLeft -= readLen)) break;
1186  if (!(current = current->_nextInChain)) break;
1187  totalRead += readLen;
1188  blockBytesRemaining = getBlockDataSize();
1189  position = current->_offset + getBlockHeaderSize();
1190  pBuffer->setPosition(position);
1191  }
1192  blockBytesRemaining -= readLen;
1193  _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1194  }
1195  blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1196  crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1197  }
1198  if (blockChainHeader._topicLen > 0)
1199  {
1200  if (blockChainHeader._topicLen <= blockBytesRemaining)
1201  {
1202  _message.assignTopic(pBuffer->getBytes(blockChainHeader._topicLen)._data,
1203  blockChainHeader._topicLen);
1204  blockBytesRemaining -= blockChainHeader._topicLen;
1205  }
1206  else
1207  {
1208  tmpBuffers[blockNum] = new char[blockChainHeader._topicLen];
1209  size_t totalLeft = blockChainHeader._topicLen;
1210  size_t totalRead = 0;
1211  size_t readLen = 0;
1212  while (totalLeft)
1213  {
1214  readLen = blockBytesRemaining < totalLeft ?
1215  blockBytesRemaining : totalLeft;
1216  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1217  if (!(totalLeft -= readLen)) break;
1218  if (!(current = current->_nextInChain)) break;
1219  totalRead += readLen;
1220  blockBytesRemaining = getBlockDataSize();
1221  position = current->_offset + getBlockHeaderSize();
1222  pBuffer->setPosition(position);
1223  }
1224  blockBytesRemaining -= readLen;
1225  _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1226  }
1227  blockHeader._totalRemaining -= blockChainHeader._topicLen;
1228  crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1229  }
1230  if (blockHeader._totalRemaining > 0)
1231  {
1232  if (blockHeader._totalRemaining <= blockBytesRemaining)
1233  {
1234  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1235  {
1236  _message.assignData(
1237  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1238  blockHeader._totalRemaining);
1239  crcCalc = _crc(_message.getData().data(),
1240  blockHeader._totalRemaining, crcCalc);
1241  }
1242  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1243  {
1244  _message.assignFilter(
1245  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1246  blockHeader._totalRemaining);
1247  crcCalc = _crc(_message.getFilter().data(),
1248  blockHeader._totalRemaining, crcCalc);
1249  }
1250  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1251  {
1252  _message.assignSowKeys(
1253  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1254  blockHeader._totalRemaining);
1255  crcCalc = _crc(_message.getSowKeys().data(),
1256  blockHeader._totalRemaining, crcCalc);
1257  }
1258  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1259  {
1260  _message.assignBookmark(
1261  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1262  blockHeader._totalRemaining);
1263  crcCalc = _crc(_message.getBookmark().data(),
1264  blockHeader._totalRemaining, crcCalc);
1265  }
1266  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1267  {
1268  _message.assignBookmark(
1269  pBuffer->getBytes(blockHeader._totalRemaining)._data,
1270  blockHeader._totalRemaining);
1271  crcCalc = _crc(_message.getBookmark().data(),
1272  blockHeader._totalRemaining, crcCalc);
1273  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1274  }
1275  }
1276  else
1277  {
1278  tmpBuffers[blockNum] = new char[blockHeader._totalRemaining];
1279  size_t totalLeft = blockHeader._totalRemaining;
1280  size_t totalRead = 0;
1281  size_t readLen = 0;
1282  while (totalLeft)
1283  {
1284  readLen = blockBytesRemaining < totalLeft ?
1285  blockBytesRemaining : totalLeft;
1286  pBuffer->copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1287  if (!(totalLeft -= readLen)) break;
1288  if (!(current = current->_nextInChain)) break;
1289  totalRead += readLen;
1290  blockBytesRemaining = getBlockDataSize();
1291  position = current->_offset + getBlockHeaderSize();
1292  pBuffer->setPosition(position);
1293  }
1294  position+=readLen;
1295  if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1296  _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1297  else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1298  _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1299  else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1300  _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1301  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1302  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1303  else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1304  {
1305  _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1306  _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1307  }
1308  crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc);
1309  }
1310  }
1311 
1312  // Validate the crc and seq
1313  if(crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1314  {
1315  std::ostringstream os;
1316  os << "Corrupted message found by CRC or sequence "
1317  << "Attempting to replay " << block_->_sequence
1318  << ". Block sequence " << blockHeader._seq
1319  << ", expiration length " << blockChainHeader._expirationLen
1320  << ", sowKey length " << blockChainHeader._sowKeyLen
1321  << ", topic length " << blockChainHeader._topicLen
1322  << ", data length " << blockHeader._totalRemaining
1323  << ", command ID length " << blockChainHeader._commandIdLen
1324  << ", correlation ID length " << blockChainHeader._correlationIdLen
1325  << ", flag " << blockChainHeader._flag
1326  << ", expected CRC " << blockHeader._crcVal
1327  << ", actual CRC " << crcCalc
1328  << ", start " << start
1329  << ", position " << position
1330  << ", buffer size " << pBuffer->getSize();
1331  for (Block* block = block_; block; block = block->_nextInChain)
1332  {
1333  os << "\n BLOCK " << block->_offset;
1334  }
1335  if (tmpBuffers)
1336  {
1337  for (amps_uint32_t i=0; i<blockNum; ++i)
1338  delete[] tmpBuffers[i];
1339  delete[] tmpBuffers;
1340  }
1341  throw StoreException(os.str());
1342  }
1343  // Replay the message
1344  replayer_.execute(_message);
1345  // Free the buffer if allocated
1346  if (tmpBuffers)
1347  {
1348  for (amps_uint32_t i=0; i<blockNum; ++i)
1349  delete[] tmpBuffers[i];
1350  delete[] tmpBuffers;
1351  }
1352  }
1353 
1354  // Lock should already be held
1355  // Read an older format file and update it.
1356  void recoverOldFormat(Block* blocks)
1357  {
1358  Buffer* pBuffer = _blockStore.getBuffer();
1359  amps_uint64_t maxIdx = 0;
1360  amps_uint64_t minIdx = 0;
1361  size_t size = pBuffer->getSize();
1362  size_t location = 0;
1363  pBuffer->setPosition(location);
1364  pBuffer->putUint32((amps_uint32_t)getBlockSize());
1365  pBuffer->putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1366  _metadataBlock->_sequence = pBuffer->getUint64();
1367  if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1368  {
1369  pBuffer->setPosition(_metadataBlock->_offset+8);
1370  pBuffer->putUint64((amps_uint64_t)0);
1371  _metadataBlock->_sequence = 0;
1372  }
1373  else
1374  {
1375  // Set _maxDiscarded and _lastSequence
1376  _maxDiscarded = _metadataBlock->_sequence;
1377  _lastSequence = _maxDiscarded;
1378  }
1379  // Write the current client version
1380  pBuffer->putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1381  // No next in chain
1382  pBuffer->putUint64((amps_uint64_t)0);
1383  // No checks currently
1384  location += getBlockSize();
1385  amps_uint32_t freeCount = 0;
1386  Block* firstFree = NULL;
1387  Block* endOfFreeList = NULL;
1388  size_t blockSize = getBlockSize();
1389  size_t numBlocks = size / blockSize;
1390  size_t blockNum = 0;
1391  // Used to create used list in order after recovery
1392  typedef std::map<amps_uint64_t, Block*> RecoverMap;
1393  RecoverMap recoveredBlocks;
1394  RecoverMap growingBlocks;
1395  amps_uint32_t growthBlocksNeeded = 0;
1396  while(location < size)
1397  {
1398  // Get seq and check if non-zero
1399  pBuffer->setPosition(location);
1400  BlockHeader blockHeader;
1401  pBuffer->copyBytes((char*)&blockHeader, sizeof(BlockHeader));
1402  size_t blockCount = (size_t)blockHeader._blocksToWrite;
1403  if(blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1404  && blockHeader._totalRemaining < size
1405  && blockHeader._blocksToWrite < numBlocks
1406  && (blockHeader._blocksToWrite*blockSize)
1407  >= blockHeader._totalRemaining)
1408  {
1409  size_t oldFormatSize = blockHeader._totalRemaining;
1410  // Old format total was storage bytes plus 64 bytes for block
1411  // and chain headers.
1412  blockHeader._totalRemaining -= 64;
1413  // New format counts only chain header size
1414  blockHeader._totalRemaining += getBlockChainHeaderSize();
1415  // Get the rest of the header
1416  BlockChainHeader chainHeader;
1417  // Need to reset location to after OLD header:
1418  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1419  // amps_uint64_t seq, amps_uint64_t crc
1420  pBuffer->setPosition(location + (sizeof(amps_uint32_t)*2)
1421  + (sizeof(amps_uint64_t)*2) );
1422  // Read old chain header which uses same order, but not
1423  // as many bytes (everything is 32bit):
1424  // operation, commandIdLen, correlationIdLen,
1425  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1426  pBuffer->copyBytes((char*)&chainHeader,
1427  sizeof(amps_uint32_t) * 8);
1428  // Check for garbage, likely indicating this is part of a chain
1429  if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1430  + chainHeader._expirationLen + chainHeader._sowKeyLen
1431  + chainHeader._topicLen) > blockHeader._totalRemaining)
1432  {
1433  // Skip this block, can't be real data
1434  location += getBlockSize();
1435  continue;
1436  }
1437  // Check if data fits in current number of blocks
1438  amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1439  / getBlockDataSize())
1440  + (blockHeader._totalRemaining
1441  % getBlockDataSize()
1442  ? 1 : 0);
1443  if (blocksNeeded == blockHeader._blocksToWrite)
1444  {
1445  Block* first = blocks[++blockNum].setOffset(location);
1446  first->_nextInList = 0;
1447  first->_sequence = blockHeader._seq;
1448  if (blockHeader._blocksToWrite > 1)
1449  {
1450  // CRC is only set on the first block
1451  amps_uint64_t crcVal = blockHeader._crcVal;
1452  blockHeader._crcVal = 0;
1453  Block* current = 0;
1454  // It fits, just need to adjust the block formats
1455  // and set up the chain. Start with the last block
1456  // and move data as needed starting at the end.
1457  size_t currentBlockNum = blockNum
1458  + blockHeader._blocksToWrite
1459  - 1;
1460  // Last item could wrap to beginning, but beginning is
1461  // block 1, not 0, which is the metadata block.
1462  if (currentBlockNum >= numBlocks)
1463  {
1464  currentBlockNum = currentBlockNum - numBlocks + 1;
1465  }
1466  if (currentBlockNum < blockNum)
1467  {
1468  Block* last = blocks[currentBlockNum]
1469  .init(currentBlockNum, getBlockSize());
1470  if ((current = firstFree) == last)
1471  {
1472  firstFree = firstFree->_nextInList;
1473  if (!firstFree) endOfFreeList = 0;
1474  --freeCount;
1475  }
1476  else
1477  {
1478  while (current)
1479  {
1480  if (current->_nextInList == last)
1481  {
1482  current->_nextInList = last->_nextInList;
1483  current = last;
1484  --freeCount;
1485  break;
1486  }
1487  current=current->_nextInList;
1488  }
1489  }
1490  }
1491  if (!current)
1492  {
1493  current = blocks[currentBlockNum]
1494  .init(currentBlockNum, getBlockSize());
1495  }
1496  // Initially, the number of bytes in last block
1497  size_t dataBytes = oldFormatSize % getBlockSize();
1498  while (current != first)
1499  {
1500  current->_nextInList = 0;
1501  current->_sequence = blockHeader._seq;
1502  // Set _nextInChain on previous Block, will include first
1503  if (--currentBlockNum < 1
1504  || currentBlockNum > numBlocks)
1505  {
1506  currentBlockNum = numBlocks - 1;
1507  }
1508  Block* previous = blocks[currentBlockNum]
1509  .init(currentBlockNum,
1510  getBlockSize());
1511  previous->_nextInChain = current;
1512  // Shift to make room for a header in every block
1513  // Not growing, so this won't write past the end.
1514  // Shift amount accounts for a header added to each
1515  // block after the first plus any change in the
1516  // chain header size from 32, which is the old size.
1517  size_t bytesToMove = --blockCount
1518  * getBlockHeaderSize()
1520  - 32);
1521  pBuffer->copyBytes(current->_offset + bytesToMove,
1522  current->_offset,
1523  dataBytes);
1524  dataBytes = getBlockSize();
1525  if (bytesToMove > getBlockHeaderSize())
1526  {
1527  bytesToMove -= getBlockHeaderSize();
1528  dataBytes -= bytesToMove;
1529  pBuffer->copyBytes(current->_offset
1530  + getBlockHeaderSize(),
1531  previous->_offset
1532  + dataBytes,
1533  bytesToMove);
1534  }
1535  // Set next in chain for this block's header
1536  blockHeader._nextInChain = (current->_nextInChain
1537  ? current->_nextInChain->_offset
1538  : (amps_uint64_t)0);
1539  // Write the header for this block
1540  pBuffer->setPosition(current->_offset);
1541  pBuffer->putBytes((const char*)&blockHeader,
1542  sizeof(BlockHeader));
1543  if (firstFree == previous)
1544  {
1545  firstFree = firstFree->_nextInList;
1546  if (!firstFree) endOfFreeList = 0;
1547  --freeCount;
1548  }
1549  else
1550  {
1551  current = firstFree;
1552  while (current)
1553  {
1554  if (current->_nextInList == previous)
1555  {
1556  current->_nextInList = previous->_nextInList;
1557  --freeCount;
1558  break;
1559  }
1560  current=current->_nextInList;
1561  }
1562  }
1563  current = previous;
1564  }
1565  blockNum += blockHeader._blocksToWrite - 1;
1566  blockHeader._crcVal = crcVal;
1567  }
1568  // Move bytes for chain header expansion from 32 bytes
1569  size_t bytesToMove = getBlockDataSize() - 32
1570  - (getBlockChainHeaderSize() - 32);
1571  pBuffer->copyBytes(first->_offset + getBlockHeaderSize()
1573  first->_offset+getBlockHeaderSize()+32,
1574  bytesToMove);
1575  // Rewrite the header and chain header for first Block.
1576  pBuffer->setPosition(first->_offset);
1577  blockHeader._nextInChain = (first->_nextInChain
1578  ? first->_nextInChain->_offset
1579  : (amps_uint64_t)0);
1580  pBuffer->putBytes((const char*)&blockHeader,
1581  sizeof(BlockHeader));
1582  pBuffer->putBytes((const char*)&chainHeader,
1583  sizeof(BlockChainHeader));
1584  // Add first Block to recovered for building the used
1585  // list later
1586  recoveredBlocks[blockHeader._seq] = first;
1587  }
1588  else
1589  {
1590  // This will need at least one more Block due to a header in
1591  // every Block. Check how many and save for later.
1592  growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1593  growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1594  blockNum += blockHeader._blocksToWrite - 1;
1595  }
1596  // Track min and max
1597  if (maxIdx < blockHeader._seq)
1598  {
1599  maxIdx = blockHeader._seq;
1600  }
1601  if (minIdx > blockHeader._seq)
1602  {
1603  minIdx = blockHeader._seq;
1604  }
1605  // Advance past read blocks
1606  location += blockHeader._blocksToWrite * getBlockSize();
1607  // Either we're exiting loop, or blockNum is in range
1608  assert(location >= size || blockNum < numBlocks);
1609  }
1610  else
1611  {
1612  // Put this Block on the free list
1613  Block* block = blocks[++blockNum].setOffset(location);
1614  if (endOfFreeList)
1615  {
1616  endOfFreeList->_nextInList = block;
1617  }
1618  else
1619  {
1620  firstFree = block;
1621  }
1622  endOfFreeList = block;
1623  ++freeCount;
1624  location += blockSize;
1625  }
1626  }
1627  for (RecoverMap::iterator i = growingBlocks.begin();
1628  i != growingBlocks.end(); ++i)
1629  {
1630  Block* first = i->second;
1631  pBuffer->setPosition(first->_offset);
1632  BlockHeader blockHeader;
1633  // Read an old BlockHeader, which is only 24 bytes.
1634  // The bytes match current BlockHeader, and _nextInChain is 0.
1635  pBuffer->copyBytes((char*)&blockHeader, 24);
1636  // Old format total was storage bytes plus 64 bytes for block
1637  // and chain headers.
1638  blockHeader._totalRemaining -= 64;
1639  // New format counts only chain header size
1640  blockHeader._totalRemaining += getBlockChainHeaderSize();
1641  if (freeCount < growthBlocksNeeded)
1642  {
1643  // We have to resize, let's try to do it once.
1644  amps_uint32_t minBlocksRequired = growthBlocksNeeded-freeCount;
1645  amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1646  if (growthBlocks < minBlocksRequired)
1647  {
1648  amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1649  if (minBlocksRequired%defaultBlocks)
1650  minBlocksRequired = (minBlocksRequired/defaultBlocks+1)
1651  * defaultBlocks;
1652  growthBlocks = minBlocksRequired;
1653  }
1654  amps_uint32_t newBlocks = 0;
1655  Block* addedBlocks = _blockStore.resizeBuffer(
1656  pBuffer->getSize()
1657  + growthBlocks * blockSize,
1658  &newBlocks);
1659  _blockStore.addBlocks(addedBlocks);
1660  freeCount += newBlocks;
1661  growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1662  ? growthBlocksNeeded - freeCount : 0;
1663  if (endOfFreeList)
1664  {
1665  endOfFreeList->_nextInList = addedBlocks;
1666  }
1667  else
1668  {
1669  firstFree = addedBlocks;
1670  }
1671  endOfFreeList = &(addedBlocks[newBlocks-1]);
1672  endOfFreeList->_nextInList = 0;
1673  }
1674  expandBlocks(blocks, first->_offset, first, blockHeader,
1675  &firstFree, &freeCount, pBuffer);
1676  // Add first Block to recovered for building the used list later
1677  recoveredBlocks[blockHeader._seq] = first;
1678  if (!firstFree) endOfFreeList = 0;
1679  }
1680  if (endOfFreeList) endOfFreeList->_nextInList = 0;
1681  _blockStore.setFreeList(firstFree, freeCount);
1682  if (maxIdx > _lastSequence) _lastSequence = maxIdx;
1683  if (minIdx > _maxDiscarded + 1) _maxDiscarded = minIdx - 1;
1684  if (_maxDiscarded > _metadataBlock->_sequence)
1685  {
1686  _metadataBlock->_sequence = _maxDiscarded;
1687  pBuffer->setPosition(_metadataBlock->_offset+8);
1688  pBuffer->putUint64(_maxDiscarded);
1689  }
1690  Block* end = NULL;
1691  AMPS_FETCH_ADD(&_stored, (long)(recoveredBlocks.size()));
1692  for (RecoverMap::iterator i = recoveredBlocks.begin();
1693  i != recoveredBlocks.end(); ++i)
1694  {
1695  if (_blockStore.front())
1696  {
1697  end->_nextInList = i->second;
1698  }
1699  else
1700  {
1701  _blockStore.setUsedList(i->second);
1702  }
1703  end = i->second;
1704  }
1705  if (end)
1706  {
1707  end->_nextInList = 0;
1708  }
1709  _blockStore.setEndOfUsedList(end);
1710  }
1711 
1712  // For recovering an old format store to current format when more Blocks
1713  // are needed with the new format.
1714  void expandBlocks(Block* blocks_, size_t location_, Block* first_,
1715  BlockHeader blockHeader_,
1716  Block** pFreeList_, amps_uint32_t* pFreeCount_,
1717  Buffer* pBuffer_)
1718  {
1719  // First create the chain, then we'll fill in reverse
1720  Block* current = first_;
1721  // Old format total was storage bytes plus 64 bytes for block
1722  // and chain headers.
1723  amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1724  blockHeader_._totalRemaining -= 64;
1725  // New format counts only chain header size
1726  blockHeader_._totalRemaining += getBlockChainHeaderSize();
1727  // Check how many Blocks needed and if we have enough free.
1728  amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1729  / getBlockDataSize()
1730  + (blockHeader_._totalRemaining
1731  % getBlockDataSize()
1732  ? 1 : 0);
1733  // Last data block size, remove bytes saved in first block
1734  // then mod by block size.
1735  const amps_uint32_t blockSize = getBlockSize();
1736  // Old total remaining had all header included
1737  size_t endBlockSize = oldTotalRemaining % blockSize;
1738  if (!endBlockSize) endBlockSize = blockSize;
1739  size_t endOfData = 0;
1740  // Hang on to CRC until first block is written
1741  amps_uint64_t crcVal = blockHeader_._crcVal;
1742  blockHeader_._crcVal = 0;
1743 
1744  std::list<Block*> blocksUsed;
1745  for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1746  {
1747  blocksUsed.push_back(current);
1748  current->_sequence = blockHeader_._seq;
1749  if (i >= blockHeader_._blocksToWrite)
1750  {
1751  if (i == blockHeader_._blocksToWrite)
1752  endOfData = current->_offset + endBlockSize;
1753  current->_nextInChain = *pFreeList_;
1754  --(*pFreeCount_);
1755  *pFreeList_ = (*pFreeList_)->_nextInList;
1756  }
1757  else
1758  {
1759  current->_nextInChain = current->_nextInList;
1760  if (current->_nextInChain)
1761  {
1762  if (current->_offset + blockSize < pBuffer_->getSize())
1763  {
1764  current->_nextInChain->setOffset(current->_offset
1765  + blockSize);
1766  }
1767  else
1768  {
1769  current->_nextInChain->setOffset(blockSize);
1770  }
1771  }
1772  else
1773  {
1774  current->_nextInChain = blocks_[1].init(1, blockSize);
1775  }
1776  if (current->_nextInChain == *pFreeList_)
1777  {
1778  *pFreeList_ = (*pFreeList_)->_nextInList;
1779  --(*pFreeCount_);
1780  }
1781  else
1782  {
1783  for (Block* free = *pFreeList_; free;
1784  free = free->_nextInList)
1785  {
1786  if (free->_nextInList == current->_nextInChain)
1787  {
1788  free->_nextInList = free->_nextInList->_nextInList;
1789  --(*pFreeCount_);
1790  break;
1791  }
1792  }
1793  }
1794  }
1795  current->_nextInList = 0;
1796  current = current->_nextInChain;
1797  }
1798  // Make sure we write the correct number of blocks to write
1799  blockHeader_._blocksToWrite = blocksNeeded;
1800  // Finish setting up current
1801  current->_nextInList = 0;
1802  current->_sequence = blockHeader_._seq;
1803  // Now shift data, starting at the last Block
1804  // The total shift is for number of Blocks beyond the first
1805  // times Block header size, since previous format only wrote
1806  // the header in the first Block and had contiguous data,
1807  // with only wrap from end to beginning of buffer possible.
1808 
1809  // First time through, this is bytes in last block. After,
1810  // it will be block data size.
1811  size_t dataBytes = blockHeader_._totalRemaining % getBlockDataSize();
1812  while (current != first_)
1813  {
1814  size_t chunkBytesAvail = endOfData > location_
1815  ? endOfData - location_
1816  : endOfData - 2048;
1817  if (chunkBytesAvail < dataBytes)
1818  {
1819  // Original was wrapped from end to start of buffer
1820  // Need to copy what's left at start to end of Block,
1821  // then start working from the end.
1822  // This can ONLY occur during wrap because the first
1823  // Block doesn't get moved in this loop.
1824  pBuffer_->copyBytes(current->_offset
1825  + getBlockSize()
1826  - chunkBytesAvail,
1827  getBlockSize(),
1828  chunkBytesAvail);
1829  chunkBytesAvail = dataBytes - chunkBytesAvail;
1830  endOfData = pBuffer_->getSize() - chunkBytesAvail;
1831  pBuffer_->copyBytes(current->_offset+getBlockHeaderSize(),
1832  endOfData,
1833  chunkBytesAvail);
1834  }
1835  else
1836  {
1837  endOfData -= dataBytes;
1838  pBuffer_->copyBytes(current->_offset+getBlockHeaderSize(),
1839  endOfData,
1840  dataBytes);
1841  }
1842  // Set next in chain in block header
1843  blockHeader_._nextInChain = (current->_nextInChain
1844  ? current->_nextInChain->_offset
1845  : (amps_uint64_t)0);
1846  // Write the header for this block
1847  pBuffer_->setPosition(current->_offset);
1848  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1849  current = blocksUsed.back();
1850  blocksUsed.pop_back();
1851  dataBytes = getBlockDataSize();
1852  }
1853  // Move bytes for chain header expansion from 32 bytes
1854  pBuffer_->copyBytes(first_->_offset
1855  + getBlockHeaderSize()
1857  first_->_offset + getBlockHeaderSize() + 32,
1859  // Set the CRC to indicate first block and set nextInChain
1860  blockHeader_._crcVal = crcVal;
1861  blockHeader_._nextInChain = first_->_nextInChain->_offset;
1862  // Need to reset location to after OLD header:
1863  // amps_uint32_t blocks, amps_uint32_t totalRemaining,
1864  // amps_uint64_t seq, amps_uint64_t crc
1865  pBuffer_->setPosition(location_ + (sizeof(amps_uint32_t)*2)
1866  + (sizeof(amps_uint64_t)*2) );
1867  // Read old chain header which uses same order, but not
1868  // as many bytes (everything is 32bit):
1869  // operation, commandIdLen, correlationIdLen,
1870  // expirationLen, sowKeyLen, topicLen, flag, ackTypes
1871  BlockChainHeader chainHeader;
1872  pBuffer_->copyBytes((char*)&chainHeader,
1873  sizeof(amps_uint32_t) * 8);
1874  // Rewrite the header and chain header for first Block.
1875  pBuffer_->setPosition(location_);
1876  pBuffer_->putBytes((const char*)&blockHeader_, sizeof(BlockHeader));
1877  pBuffer_->putBytes((const char*)&chainHeader, sizeof(BlockChainHeader));
1878  }
1879 
1880  static bool isSSE42Enabled()
1881  {
1882 #ifdef _WIN32
1883  int cpuinfo[4];
1884  __cpuid(cpuinfo, 1);
1885  return (cpuinfo[2] & (1<<20)) != 0;
1886 #elif AMPS_SSE_42
1887  unsigned int eax, ebx, ecx=0, edx;
1888  __get_cpuid(1, &eax, &ebx, &ecx, &edx);
1889  return ecx & (1<<20);
1890 #else
1891  return false;
1892 #endif
1893  }
1894 
1895  void chooseCRC(bool isFile)
1896  {
1897  if(!isFile)
1898  {
1899  _crc = noOpCRC;
1900  return;
1901  }
1902 
1903 #ifndef AMPS_SSE_42
1904  _crc = AMPS::CRC<0>::crcNoSSE;
1905 #else
1906  if(isSSE42Enabled())
1907  {
1908  _crc = AMPS::CRC<0>::crc;
1909  }
1910  else
1911  {
1912  _crc = AMPS::CRC<0>::crcNoSSE;
1913  }
1914 #endif
1915  }
1916 
1917  static amps_uint64_t noOpCRC(const char*, size_t, amps_uint64_t)
1918  {
1919  return 0;
1920  }
1921 
1922 protected:
1923  mutable BlockStore _blockStore;
1924 private:
1925  // Block used to hold metadata, currently:
1926  // the last persisted
1927  Block* _metadataBlock;
1928  // Highest sequence that has been discarded
1929  amps_uint64_t _maxDiscarded;
1930  // Track the assigned sequence numbers
1931  volatile amps_uint64_t _lastSequence;
1932  // Track how many messages are stored
1933  ATOMIC_TYPE _stored;
1934 
1935  // Message used for doing replay
1936  Message _message;
1937 
1938  typedef amps_uint64_t (*CRCFunction)(const char*, size_t, amps_uint64_t);
1939 
1940  // Function used to calculate the CRC if one is used
1941  CRCFunction _crc;
1942 
1943 };
1944 
1945 }
1946 
1947 #endif
1948 
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1143
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1062
Command::Type getCommandEnum() const
Decode self&#39;s "command" field and return one of the values from Command.
Definition: Message.hpp:885
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:77
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:956
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1115
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:631
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:544
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:979
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:151
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:177
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1064
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:573
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:260
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:681
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:617
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1220
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1061
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:59
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:235
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1052
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:194
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:161
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:978
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:245
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:169
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1072
unsigned getAckTypeEnum() const
Decode self&#39;s "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1021
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:669
Definition: ampsplusplus.hpp:136
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1199
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:487
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.