26 #ifndef _BLOCKPUBLISHSTORE_H_ 27 #define _BLOCKPUBLISHSTORE_H_ 35 #include <ampscrc.hpp> 39 #include <sys/timeb.h> 64 typedef Lock<BlockStore> BufferLock;
65 typedef Unlock<BlockStore> BufferUnlock;
70 SOW_DELETE_FILTER=0x02,
72 SOW_DELETE_BOOKMARK=0x08,
73 SOW_DELETE_BOOKMARK_CANCEL=0x10,
74 SOW_DELETE_UNKNOWN=0x80
81 DEFAULT_BLOCK_HEADER_SIZE = 32,
82 DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83 DEFAULT_BLOCKS_PER_REALLOC = 1000,
84 DEFAULT_BLOCK_SIZE = 2048
124 amps_uint32_t _blocksToWrite;
125 amps_uint32_t _totalRemaining;
127 amps_uint64_t _crcVal;
128 amps_uint64_t _nextInChain;
131 struct BlockChainHeader
133 amps_uint32_t _operation;
134 amps_uint32_t _commandIdLen;
135 amps_uint32_t _correlationIdLen;
136 amps_uint32_t _expirationLen;
137 amps_uint32_t _sowKeyLen;
138 amps_uint32_t _topicLen;
140 amps_uint32_t _ackTypes;
141 amps_uint32_t _unused[8];
143 : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144 , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
155 return DEFAULT_BLOCK_HEADER_SIZE;
165 return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
173 return _blockStore.getBlockSize();
197 amps_uint32_t blocksPerRealloc_ = 1000,
198 bool isFile_ =
false,
199 amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
201 , _blockStore(buffer_, blocksPerRealloc_,
202 DEFAULT_BLOCK_HEADER_SIZE,
203 (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE*2
204 ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE*2))
206 , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
214 BufferLock bufferGuard(_blockStore);
216 _metadataBlock = _blockStore.get(1);
218 _blockStore.setUsedList(0);
219 _blockStore.setEndOfUsedList(0);
222 _metadataBlock->_sequence = (amps_uint64_t)0;
223 Buffer* pBuffer = _blockStore.getBuffer();
229 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
249 return store(message_,
true);
264 const char *commandId, *correlationId, *expiration, *sowKey,
267 BlockHeader blockHeader;
268 BlockChainHeader chainHeader;
270 chainHeader._commandIdLen = (amps_uint32_t)dataLen;
272 chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
274 chainHeader._expirationLen = (amps_uint32_t)dataLen;
276 chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
278 chainHeader._topicLen = (amps_uint32_t)dataLen;
279 message_.getRawData(&data, &dataLen);
280 chainHeader._flag = -1;
282 chainHeader._operation = (amps_uint32_t)operation;
283 if (operation == Message::Command::SOWDelete)
287 chainHeader._flag = SOW_DELETE_DATA;
294 chainHeader._flag = SOW_DELETE_FILTER;
301 chainHeader._flag = SOW_DELETE_KEYS;
306 chainHeader._flag = SOW_DELETE_BOOKMARK;
309 size_t remaining = options.
len();
310 const void* next = NULL;
311 const void* start = (
const void*)(options.
data());
313 while (remaining >= 6 &&
314 (next = memchr(start,(
int)
'c',remaining)) != NULL)
316 remaining = (size_t)next-(
size_t)start;
317 if (remaining >= 6 && strncmp((
const char*)start,
320 chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
328 blockHeader._totalRemaining = (
329 (chainHeader._operation == Message::Command::Unknown)
332 + chainHeader._commandIdLen
333 + chainHeader._correlationIdLen
334 + chainHeader._expirationLen
335 + chainHeader._sowKeyLen
336 + chainHeader._topicLen
337 + (amps_uint32_t)dataLen));
338 size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
340 blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
342 : ((amps_uint32_t)(blockHeader._totalRemaining
344 + ((lastBlockLength > 0) ? 1 : 0)));
345 blockHeader._crcVal = (amps_uint64_t)0ULL;
346 blockHeader._crcVal = _crc(commandId,
347 chainHeader._commandIdLen,
348 blockHeader._crcVal);
349 blockHeader._crcVal = _crc(correlationId,
350 chainHeader._correlationIdLen,
351 blockHeader._crcVal);
352 blockHeader._crcVal = _crc(expiration,
353 chainHeader._expirationLen,
354 blockHeader._crcVal);
355 blockHeader._crcVal = _crc(sowKey,
356 chainHeader._sowKeyLen,
357 blockHeader._crcVal);
358 blockHeader._crcVal = _crc(topic,
359 chainHeader._topicLen,
360 blockHeader._crcVal);
361 blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
364 BufferLock bufferGuard(_blockStore);
365 Block* first = _blockStore.get(blockHeader._blocksToWrite);
368 if (_lastSequence <= 2)
372 blockHeader._seq = ++_lastSequence;
380 _maxDiscarded = blockHeader._seq - 1;
382 if (blockHeader._seq >= _lastSequence)
384 _lastSequence = blockHeader._seq;
390 size_t topicWritten = 0UL;
391 size_t dataWritten = 0UL;
392 size_t commandWritten = 0UL;
393 size_t correlationWritten = 0UL;
394 size_t expirationWritten = 0UL;
395 size_t sowKeyWritten = 0UL;
396 Buffer* pBuffer = _blockStore.getBuffer();
397 for(Block* next = first; next; next = next->_nextInChain)
399 next->_sequence = blockHeader._seq;
400 if (next->_nextInChain)
401 blockHeader._nextInChain = next->_nextInChain->_offset;
403 blockHeader._nextInChain = (amps_uint64_t)0;
406 pBuffer->
putBytes((
const char*)&blockHeader,
sizeof(BlockHeader));
408 blockHeader._crcVal = (amps_uint64_t)0;
414 pBuffer->
putBytes((
const char*)&chainHeader,
415 sizeof(BlockChainHeader));
424 if(commandWritten < chainHeader._commandIdLen)
426 size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
427 pBuffer->
putBytes(commandId + commandWritten,
429 bytesRemaining -= commandWrite;
430 commandWritten += commandWrite;
432 if(correlationWritten < chainHeader._correlationIdLen)
434 size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
435 pBuffer->
putBytes(correlationId + correlationWritten,
437 bytesRemaining -= correlationWrite;
438 correlationWritten += correlationWrite;
440 if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
442 size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
443 pBuffer->
putBytes(expiration + expirationWritten, expWrite);
444 bytesRemaining -= expWrite;
445 expirationWritten += expWrite;
447 if(bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
449 size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
450 pBuffer->
putBytes(sowKey + sowKeyWritten, sowKeyWrite);
451 bytesRemaining -= sowKeyWrite;
452 sowKeyWritten += sowKeyWrite;
454 if(bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
456 size_t topicWrite = (chainHeader._topicLen - topicWritten
458 ? chainHeader._topicLen - topicWritten
460 pBuffer->
putBytes(topic + topicWritten, topicWrite);
461 bytesRemaining -= topicWrite;
462 topicWritten += topicWrite;
464 if(bytesRemaining > 0 && dataWritten < dataLen)
466 size_t dataWrite = (dataLen-dataWritten < bytesRemaining) ?
467 dataLen-dataWritten : bytesRemaining;
468 pBuffer->
putBytes(data + dataWritten, dataWrite);
469 bytesRemaining -= dataWrite;
470 dataWritten += dataWrite;
474 catch (
const AMPSException& ex)
476 _blockStore.put(first);
479 AMPS_FETCH_ADD(&_stored, 1);
480 return blockHeader._seq;
490 BufferLock bufferGuard(_blockStore);
491 Buffer* pBuffer = _blockStore.getBuffer();
494 amps_uint64_t lastPersisted = _metadataBlock->_sequence;
496 if(index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
500 if (lastPersisted < index_)
504 _metadataBlock->_sequence = index_;
505 if (_maxDiscarded < index_)
507 _maxDiscarded = index_;
509 if (_lastSequence <= index_)
511 _lastSequence = index_;
518 _blockStore.signalAll();
522 _maxDiscarded = index_;
523 AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
524 _blockStore.signalAll();
525 if (lastPersisted >= index_)
531 _metadataBlock->_sequence = index_;
532 if (_lastSequence < index_)
534 _lastSequence = index_;
545 BufferLock bufferGuard(_blockStore);
547 if(!_blockStore.front())
return;
548 Block* next = _blockStore.front();
551 for (Block* block = _blockStore.front(); block; block = next)
554 replayOnto(block, replayer_);
555 next = block->_nextInList;
558 catch (
const StoreException& e)
560 _blockStore.putAll(next);
573 BufferLock bufferGuard(_blockStore);
575 if(!_blockStore.front())
return false;
577 amps_uint64_t lastIdx = _blockStore.back()->_sequence;
579 amps_uint64_t leastIdx = _blockStore.front()->_sequence;
580 if(index_>=leastIdx && index_ <=lastIdx)
582 Block* block = _blockStore.front();
583 while(block && block->_sequence != index_)
585 block = block->_nextInList;
592 Buffer* pBuffer = _blockStore.getBuffer();
594 sizeof(amps_uint32_t));
595 if (pBuffer->
getUint32() == 0)
return false;
596 replayOnto(block, replayer_);
603 _message.setSequence(leastIdx);
616 size_t count = (size_t)_stored;
630 BufferLock bufferGuard(_blockStore);
631 amps_uint64_t waitFor = _getHighestUnpersisted();
636 bool timedOut =
false;
637 AMPS_START_TIMER(timeout_);
639 while (!timedOut && _stored != 0
640 && waitFor >= _getLowestUnpersisted())
642 if (!_blockStore.wait(timeout_))
645 AMPS_RESET_TIMER(timedOut, timeout_);
649 if (timedOut && _stored != 0
650 && waitFor >= _getLowestUnpersisted())
652 throw TimedOutException(
"Timed out waiting to flush publish store.");
657 while (_stored != 0 && waitFor >= _getLowestUnpersisted())
660 _blockStore.wait(1000);
662 BufferUnlock unlck(_blockStore);
663 amps_invoke_waiting_function();
670 BufferLock bufferGuard(_blockStore);
671 return _getLowestUnpersisted();
674 amps_uint64_t getHighestUnpersisted()
const 676 BufferLock bufferGuard(_blockStore);
677 return _getHighestUnpersisted();
682 BufferLock bufferGuard(_blockStore);
683 return _getLastPersisted();
687 static bool canResize(
size_t requestedSize_,
void* vpThis_)
692 amps_uint64_t _getLowestUnpersisted()
const 697 return _blockStore.front()->_sequence;
700 amps_uint64_t _getHighestUnpersisted()
const 705 return _blockStore.back()->_sequence;
708 amps_uint64_t _getLastPersisted(
void)
711 amps_uint64_t lastPersisted = (amps_uint64_t)0;
712 Buffer* pBuffer = _blockStore.getBuffer();
717 if (_lastSequence < lastPersisted)
718 _lastSequence = lastPersisted;
719 return lastPersisted;
723 lastPersisted = _maxDiscarded;
730 lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
733 gettimeofday(&tv, NULL);
734 lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
735 * (amps_uint64_t)1000000;
738 if (_lastSequence > 2)
740 amps_uint64_t low = _getLowestUnpersisted();
741 amps_uint64_t high = _getHighestUnpersisted();
744 lastPersisted = low - 1;
748 _lastSequence = high;
750 if (_lastSequence < lastPersisted)
752 lastPersisted = _lastSequence - 1;
757 _lastSequence = lastPersisted;
760 +
sizeof(amps_uint32_t)
761 +
sizeof(amps_uint32_t));
763 _metadataBlock->_sequence = lastPersisted;
764 return lastPersisted;
769 BufferLock bufferGuard(_blockStore);
771 Buffer* pBuffer = _blockStore.getBuffer();
772 size_t size = pBuffer->
getSize();
777 _metadataBlock = _blockStore.get(1);
778 _metadataBlock->_sequence = (amps_uint64_t)0;
782 pBuffer->
putUint32((amps_uint32_t)blockSize);
786 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
791 size_t numBlocks = size / blockSize;
792 if(size % blockSize > 0)
797 numBlocks = size / blockSize;
799 amps_uint32_t blockCount = 0;
801 delete[] _blockStore.resizeBuffer(numBlocks*blockSize, &blockCount);
804 if (size > pBuffer->
getSize() || numBlocks != (size_t)blockCount)
806 throw StoreException(
"Publish Store could not resize correctly during recoery, possibly due to resizeHandler refusing the request.");
811 amps_uint64_t maxIdx = 0;
812 amps_uint64_t minIdx = 0;
814 BlockHeader blockHeader;
816 Block* blocks =
new Block[numBlocks];
817 blocks[numBlocks-1]._nextInList = 0;
819 _blockStore.addBlocks(blocks);
820 _metadataBlock = blocks;
821 _metadataBlock->_nextInList = 0;
823 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
831 if (blockHeader._blocksToWrite == 1)
841 if (blockHeader._totalRemaining >= 5000000)
845 recoverOldFormat(blocks);
849 throw StoreException(
"Unrecognized format for Store. Can't recover.");
851 if (blockHeader._blocksToWrite == 0)
854 pBuffer->
putUint32((amps_uint32_t)blockSize);
858 blockSize = blockHeader._blocksToWrite;
859 _blockStore.setBlockSize(blockSize);
861 if (blockHeader._totalRemaining == 0)
868 _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
870 _metadataBlock->_sequence = blockHeader._seq;
871 if (_metadataBlock->_sequence
872 && _metadataBlock->_sequence < (amps_uint64_t)1000000)
875 +
sizeof(amps_uint32_t)
876 +
sizeof(amps_uint32_t));
878 _metadataBlock->_sequence = 0;
883 _maxDiscarded = _metadataBlock->_sequence;
884 _lastSequence = _maxDiscarded;
888 location += blockSize;
889 amps_uint32_t freeCount = 0;
890 Block* firstFree = NULL;
891 Block* endOfFreeList = NULL;
893 typedef std::map<amps_uint64_t, Block*> RecoverMap;
894 RecoverMap recoveredBlocks;
895 while(location < size)
899 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
900 if((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
901 (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
904 location += blockSize;
907 Block* block = blocks[++blockNum].setOffset(location);
908 bool recovered =
false;
909 if(blockHeader._seq > 0 && blockHeader._totalRemaining < size)
912 block->_sequence = blockHeader._seq;
914 if(maxIdx < blockHeader._seq)
916 maxIdx = blockHeader._seq;
918 if(minIdx > blockHeader._seq)
920 minIdx = blockHeader._seq;
923 recoveredBlocks[blockHeader._seq] = block;
925 while (blockHeader._nextInChain != (amps_uint64_t)0)
927 Block* chain = blocks[++blockNum]
928 .setOffset((
size_t)blockHeader._nextInChain);
929 chain->_nextInList = 0;
930 pBuffer->
setPosition((
size_t)blockHeader._nextInChain
931 +
sizeof(amps_uint32_t)
932 +
sizeof(amps_uint32_t)
933 +
sizeof(amps_uint64_t)
934 +
sizeof(amps_uint64_t));
935 blockHeader._nextInChain = pBuffer->
getUint64();
936 block->_nextInChain = chain;
938 block->_sequence = blockHeader._seq;
947 endOfFreeList->_nextInList = block;
953 endOfFreeList = block;
956 location += blockSize;
960 endOfFreeList->_nextInList = 0;
962 _blockStore.setFreeList(firstFree, freeCount);
963 if (maxIdx > _lastSequence)
965 _lastSequence = maxIdx;
967 if (minIdx > _maxDiscarded + 1)
969 _maxDiscarded = minIdx - 1;
971 if (_maxDiscarded > _metadataBlock->_sequence)
973 _metadataBlock->_sequence = _maxDiscarded;
978 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
979 for (RecoverMap::iterator i = recoveredBlocks.begin();
980 i != recoveredBlocks.end(); ++i)
984 end->_nextInList = i->second;
988 _blockStore.setUsedList(i->second);
994 end->_nextInList = 0;
996 _blockStore.setEndOfUsedList(end);
1004 size_t start = block_->_offset;
1005 size_t position = start;
1006 Buffer* pBuffer = _blockStore.getBuffer();
1008 BlockHeader blockHeader;
1009 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1010 if (blockHeader._totalRemaining == 0)
1016 BlockChainHeader blockChainHeader;
1017 pBuffer->
copyBytes((
char*)&blockChainHeader,
sizeof(blockChainHeader));
1018 if (blockChainHeader._operation == Message::Command::Unknown)
1023 blockChainHeader._ackTypes |= Message::AckType::Persisted;
1028 if (blockHeader._totalRemaining
1029 < blockChainHeader._commandIdLen
1030 + blockChainHeader._correlationIdLen
1031 + blockChainHeader._expirationLen
1032 + blockChainHeader._sowKeyLen
1033 + blockChainHeader._topicLen)
1035 std::ostringstream os;
1036 os <<
"Corrupted message found with invalid lengths. " 1037 <<
"Attempting to replay " << block_->_sequence
1038 <<
". Block sequence " << blockHeader._seq
1039 <<
", topic length " << blockChainHeader._topicLen
1040 <<
", data length " << blockHeader._totalRemaining
1041 <<
", command ID length " << blockChainHeader._commandIdLen
1042 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1043 <<
", expiration length " << blockChainHeader._expirationLen
1044 <<
", sow key length " << blockChainHeader._sowKeyLen
1045 <<
", start " << start
1046 <<
", position " << position
1047 <<
", buffer size " << pBuffer->
getSize();
1048 throw StoreException(os.str());
1053 _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1054 _message.setAckTypeEnum((
unsigned)blockChainHeader._ackTypes
1055 | Message::AckType::Persisted);
1056 _message.setSequence(blockHeader._seq);
1058 Block* current = block_;
1060 amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1062 char** tmpBuffers = (blockHeader._blocksToWrite>1) ?
new char*[blockHeader._blocksToWrite-1] : 0;
1063 size_t blockNum = 0;
1064 if (blockChainHeader._commandIdLen > 0)
1066 if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1068 _message.assignCommandId(pBuffer->
getBytes(blockChainHeader._commandIdLen)._data,
1069 blockChainHeader._commandIdLen);
1070 blockBytesRemaining -= blockChainHeader._commandIdLen;
1074 tmpBuffers[blockNum] =
new char[blockChainHeader._commandIdLen];
1075 size_t totalLeft = blockChainHeader._commandIdLen;
1076 size_t totalRead = 0;
1080 readLen = blockBytesRemaining < totalLeft ?
1081 blockBytesRemaining : totalLeft;
1082 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1083 if (!(totalLeft -= readLen))
break;
1084 if (!(current = current->_nextInChain))
break;
1085 totalRead += readLen;
1090 blockBytesRemaining -= readLen;
1091 _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1093 blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1094 crcCalc = _crc(_message.getCommandId().data(),
1095 blockChainHeader._commandIdLen, crcCalc);
1097 if (blockChainHeader._correlationIdLen > 0)
1099 if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1101 _message.assignCorrelationId(
1102 pBuffer->
getBytes(blockChainHeader._correlationIdLen)._data,
1103 blockChainHeader._correlationIdLen);
1104 blockBytesRemaining -= blockChainHeader._correlationIdLen;
1108 tmpBuffers[blockNum] =
new char[blockChainHeader._correlationIdLen];
1109 size_t totalLeft = blockChainHeader._correlationIdLen;
1110 size_t totalRead = 0;
1114 readLen = blockBytesRemaining < totalLeft ?
1115 blockBytesRemaining : totalLeft;
1116 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1117 if (!(totalLeft -= readLen))
break;
1118 if (!(current = current->_nextInChain))
break;
1119 totalRead += readLen;
1124 blockBytesRemaining -= readLen;
1125 _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1127 blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1128 crcCalc = _crc(_message.getCorrelationId().data(),
1129 blockChainHeader._correlationIdLen, crcCalc);
1131 if (blockChainHeader._expirationLen > 0)
1133 if (blockChainHeader._expirationLen <= blockBytesRemaining)
1135 _message.assignExpiration(
1136 pBuffer->
getBytes(blockChainHeader._expirationLen)._data,
1137 blockChainHeader._expirationLen);
1138 blockBytesRemaining -= blockChainHeader._expirationLen;
1142 tmpBuffers[blockNum] =
new char[blockChainHeader._expirationLen];
1143 size_t totalLeft = blockChainHeader._expirationLen;
1144 size_t totalRead = 0;
1148 readLen = blockBytesRemaining < totalLeft ?
1149 blockBytesRemaining : totalLeft;
1150 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1151 if (!(totalLeft -= readLen))
break;
1152 if (!(current = current->_nextInChain))
break;
1153 totalRead += readLen;
1158 blockBytesRemaining -= readLen;
1159 _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1161 blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1162 crcCalc = _crc(_message.getExpiration().data(),
1163 blockChainHeader._expirationLen, crcCalc);
1165 if (blockChainHeader._sowKeyLen > 0)
1167 if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1169 _message.assignSowKey(pBuffer->
getBytes(blockChainHeader._sowKeyLen)._data,
1170 blockChainHeader._sowKeyLen);
1171 blockBytesRemaining -= blockChainHeader._sowKeyLen;
1175 tmpBuffers[blockNum] =
new char[blockChainHeader._sowKeyLen];
1176 size_t totalLeft = blockChainHeader._sowKeyLen;
1177 size_t totalRead = 0;
1181 readLen = blockBytesRemaining < totalLeft ?
1182 blockBytesRemaining : totalLeft;
1183 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1184 if (!(totalLeft -= readLen))
break;
1185 if (!(current = current->_nextInChain))
break;
1186 totalRead += readLen;
1191 blockBytesRemaining -= readLen;
1192 _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1194 blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1195 crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1197 if (blockChainHeader._topicLen > 0)
1199 if (blockChainHeader._topicLen <= blockBytesRemaining)
1201 _message.assignTopic(pBuffer->
getBytes(blockChainHeader._topicLen)._data,
1202 blockChainHeader._topicLen);
1203 blockBytesRemaining -= blockChainHeader._topicLen;
1207 tmpBuffers[blockNum] =
new char[blockChainHeader._topicLen];
1208 size_t totalLeft = blockChainHeader._topicLen;
1209 size_t totalRead = 0;
1213 readLen = blockBytesRemaining < totalLeft ?
1214 blockBytesRemaining : totalLeft;
1215 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1216 if (!(totalLeft -= readLen))
break;
1217 if (!(current = current->_nextInChain))
break;
1218 totalRead += readLen;
1223 blockBytesRemaining -= readLen;
1224 _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1226 blockHeader._totalRemaining -= blockChainHeader._topicLen;
1227 crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1229 if (blockHeader._totalRemaining > 0)
1231 if (blockHeader._totalRemaining <= blockBytesRemaining)
1233 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1235 _message.assignData(
1236 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1237 blockHeader._totalRemaining);
1238 crcCalc = _crc(_message.getData().data(),
1239 blockHeader._totalRemaining, crcCalc);
1241 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1243 _message.assignFilter(
1244 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1245 blockHeader._totalRemaining);
1246 crcCalc = _crc(_message.getFilter().data(),
1247 blockHeader._totalRemaining, crcCalc);
1249 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1251 _message.assignSowKeys(
1252 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1253 blockHeader._totalRemaining);
1254 crcCalc = _crc(_message.getSowKeys().data(),
1255 blockHeader._totalRemaining, crcCalc);
1257 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1259 _message.assignBookmark(
1260 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1261 blockHeader._totalRemaining);
1262 crcCalc = _crc(_message.getBookmark().data(),
1263 blockHeader._totalRemaining, crcCalc);
1265 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1267 _message.assignBookmark(
1268 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1269 blockHeader._totalRemaining);
1270 crcCalc = _crc(_message.getBookmark().data(),
1271 blockHeader._totalRemaining, crcCalc);
1272 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1277 tmpBuffers[blockNum] =
new char[blockHeader._totalRemaining];
1278 size_t totalLeft = blockHeader._totalRemaining;
1279 size_t totalRead = 0;
1283 readLen = blockBytesRemaining < totalLeft ?
1284 blockBytesRemaining : totalLeft;
1285 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1286 if (!(totalLeft -= readLen))
break;
1287 if (!(current = current->_nextInChain))
break;
1288 totalRead += readLen;
1294 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1295 _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1296 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1297 _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1298 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1299 _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1300 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1301 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1302 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1304 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1305 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1307 crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc);
1312 if(crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1314 std::ostringstream os;
1315 os <<
"Corrupted message found by CRC or sequence " 1316 <<
"Attempting to replay " << block_->_sequence
1317 <<
". Block sequence " << blockHeader._seq
1318 <<
", expiration length " << blockChainHeader._expirationLen
1319 <<
", sowKey length " << blockChainHeader._sowKeyLen
1320 <<
", topic length " << blockChainHeader._topicLen
1321 <<
", data length " << blockHeader._totalRemaining
1322 <<
", command ID length " << blockChainHeader._commandIdLen
1323 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1324 <<
", flag " << blockChainHeader._flag
1325 <<
", expected CRC " << blockHeader._crcVal
1326 <<
", actual CRC " << crcCalc
1327 <<
", start " << start
1328 <<
", position " << position
1329 <<
", buffer size " << pBuffer->
getSize();
1330 for (Block* block = block_; block; block = block->_nextInChain)
1332 os <<
"\n BLOCK " << block->_offset;
1336 for (amps_uint32_t i=0; i<blockNum; ++i)
1337 delete[] tmpBuffers[i];
1338 delete[] tmpBuffers;
1340 throw StoreException(os.str());
1347 for (amps_uint32_t i=0; i<blockNum; ++i)
1348 delete[] tmpBuffers[i];
1349 delete[] tmpBuffers;
1355 void recoverOldFormat(Block* blocks)
1357 Buffer* pBuffer = _blockStore.getBuffer();
1358 amps_uint64_t maxIdx = 0;
1359 amps_uint64_t minIdx = 0;
1360 size_t size = pBuffer->
getSize();
1361 size_t location = 0;
1364 pBuffer->
putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1365 _metadataBlock->_sequence = pBuffer->
getUint64();
1366 if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1370 _metadataBlock->_sequence = 0;
1375 _maxDiscarded = _metadataBlock->_sequence;
1376 _lastSequence = _maxDiscarded;
1379 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1384 amps_uint32_t freeCount = 0;
1385 Block* firstFree = NULL;
1386 Block* endOfFreeList = NULL;
1388 size_t numBlocks = size / blockSize;
1389 size_t blockNum = 0;
1391 typedef std::map<amps_uint64_t, Block*> RecoverMap;
1392 RecoverMap recoveredBlocks;
1393 RecoverMap growingBlocks;
1394 amps_uint32_t growthBlocksNeeded = 0;
1395 while(location < size)
1399 BlockHeader blockHeader;
1400 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1401 size_t blockCount = (size_t)blockHeader._blocksToWrite;
1402 if(blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1403 && blockHeader._totalRemaining < size
1404 && blockHeader._blocksToWrite < numBlocks
1405 && (blockHeader._blocksToWrite*blockSize)
1406 >= blockHeader._totalRemaining)
1408 size_t oldFormatSize = blockHeader._totalRemaining;
1411 blockHeader._totalRemaining -= 64;
1415 BlockChainHeader chainHeader;
1419 pBuffer->
setPosition(location + (
sizeof(amps_uint32_t)*2)
1420 + (
sizeof(amps_uint64_t)*2) );
1426 sizeof(amps_uint32_t) * 8);
1428 if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1429 + chainHeader._expirationLen + chainHeader._sowKeyLen
1430 + chainHeader._topicLen) > blockHeader._totalRemaining)
1437 amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1439 + (blockHeader._totalRemaining
1442 if (blocksNeeded == blockHeader._blocksToWrite)
1444 Block* first = blocks[++blockNum].setOffset(location);
1445 first->_nextInList = 0;
1446 first->_sequence = blockHeader._seq;
1447 if (blockHeader._blocksToWrite > 1)
1450 amps_uint64_t crcVal = blockHeader._crcVal;
1451 blockHeader._crcVal = 0;
1456 size_t currentBlockNum = blockNum
1457 + blockHeader._blocksToWrite
1461 if (currentBlockNum >= numBlocks)
1463 currentBlockNum = currentBlockNum - numBlocks + 1;
1465 if (currentBlockNum < blockNum)
1467 Block* last = blocks[currentBlockNum]
1469 if ((current = firstFree) == last)
1471 firstFree = firstFree->_nextInList;
1472 if (!firstFree) endOfFreeList = 0;
1479 if (current->_nextInList == last)
1481 current->_nextInList = last->_nextInList;
1486 current=current->_nextInList;
1492 current = blocks[currentBlockNum]
1497 while (current != first)
1499 current->_nextInList = 0;
1500 current->_sequence = blockHeader._seq;
1502 if (--currentBlockNum < 1
1503 || currentBlockNum > numBlocks)
1505 currentBlockNum = numBlocks - 1;
1507 Block* previous = blocks[currentBlockNum]
1508 .init(currentBlockNum,
1510 previous->_nextInChain = current;
1516 size_t bytesToMove = --blockCount
1520 pBuffer->
copyBytes(current->_offset + bytesToMove,
1527 dataBytes -= bytesToMove;
1535 blockHeader._nextInChain = (current->_nextInChain
1536 ? current->_nextInChain->_offset
1537 : (amps_uint64_t)0);
1540 pBuffer->
putBytes((
const char*)&blockHeader,
1541 sizeof(BlockHeader));
1542 if (firstFree == previous)
1544 firstFree = firstFree->_nextInList;
1545 if (!firstFree) endOfFreeList = 0;
1550 current = firstFree;
1553 if (current->_nextInList == previous)
1555 current->_nextInList = previous->_nextInList;
1559 current=current->_nextInList;
1564 blockNum += blockHeader._blocksToWrite - 1;
1565 blockHeader._crcVal = crcVal;
1576 blockHeader._nextInChain = (first->_nextInChain
1577 ? first->_nextInChain->_offset
1578 : (amps_uint64_t)0);
1579 pBuffer->
putBytes((
const char*)&blockHeader,
1580 sizeof(BlockHeader));
1581 pBuffer->
putBytes((
const char*)&chainHeader,
1582 sizeof(BlockChainHeader));
1585 recoveredBlocks[blockHeader._seq] = first;
1591 growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1592 growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1593 blockNum += blockHeader._blocksToWrite - 1;
1596 if (maxIdx < blockHeader._seq)
1598 maxIdx = blockHeader._seq;
1600 if (minIdx > blockHeader._seq)
1602 minIdx = blockHeader._seq;
1605 location += blockHeader._blocksToWrite *
getBlockSize();
1607 assert(location >= size || blockNum < numBlocks);
1612 Block* block = blocks[++blockNum].setOffset(location);
1615 endOfFreeList->_nextInList = block;
1621 endOfFreeList = block;
1623 location += blockSize;
1626 for (RecoverMap::iterator i = growingBlocks.begin();
1627 i != growingBlocks.end(); ++i)
1629 Block* first = i->second;
1631 BlockHeader blockHeader;
1634 pBuffer->
copyBytes((
char*)&blockHeader, 24);
1637 blockHeader._totalRemaining -= 64;
1640 if (freeCount < growthBlocksNeeded)
1643 amps_uint32_t minBlocksRequired = growthBlocksNeeded-freeCount;
1644 amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1645 if (growthBlocks < minBlocksRequired)
1647 amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1648 if (minBlocksRequired%defaultBlocks)
1649 minBlocksRequired = (minBlocksRequired/defaultBlocks+1)
1651 growthBlocks = minBlocksRequired;
1653 amps_uint32_t newBlocks = 0;
1654 Block* addedBlocks = _blockStore.resizeBuffer(
1656 + growthBlocks * blockSize,
1660 throw StoreException(
"Failed to grow store buffer during recovery");
1662 _blockStore.addBlocks(addedBlocks);
1663 freeCount += newBlocks;
1664 growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1665 ? growthBlocksNeeded - freeCount : 0;
1668 endOfFreeList->_nextInList = addedBlocks;
1672 firstFree = addedBlocks;
1674 endOfFreeList = &(addedBlocks[newBlocks-1]);
1675 endOfFreeList->_nextInList = 0;
1677 expandBlocks(blocks, first->_offset, first, blockHeader,
1678 &firstFree, &freeCount, pBuffer);
1680 recoveredBlocks[blockHeader._seq] = first;
1681 if (!firstFree) endOfFreeList = 0;
1683 if (endOfFreeList) endOfFreeList->_nextInList = 0;
1684 _blockStore.setFreeList(firstFree, freeCount);
1685 if (maxIdx > _lastSequence) _lastSequence = maxIdx;
1686 if (minIdx > _maxDiscarded + 1) _maxDiscarded = minIdx - 1;
1687 if (_maxDiscarded > _metadataBlock->_sequence)
1689 _metadataBlock->_sequence = _maxDiscarded;
1694 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1695 for (RecoverMap::iterator i = recoveredBlocks.begin();
1696 i != recoveredBlocks.end(); ++i)
1698 if (_blockStore.front())
1700 end->_nextInList = i->second;
1704 _blockStore.setUsedList(i->second);
1710 end->_nextInList = 0;
1712 _blockStore.setEndOfUsedList(end);
1717 void expandBlocks(Block* blocks_,
size_t location_, Block* first_,
1718 BlockHeader blockHeader_,
1719 Block** pFreeList_, amps_uint32_t* pFreeCount_,
1723 Block* current = first_;
1726 amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1727 blockHeader_._totalRemaining -= 64;
1731 amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1733 + (blockHeader_._totalRemaining
1740 size_t endBlockSize = oldTotalRemaining % blockSize;
1741 if (!endBlockSize) endBlockSize = blockSize;
1742 size_t endOfData = 0;
1744 amps_uint64_t crcVal = blockHeader_._crcVal;
1745 blockHeader_._crcVal = 0;
1747 std::stack<Block*> blocksUsed;
1748 for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1750 blocksUsed.push(current);
1751 current->_sequence = blockHeader_._seq;
1752 if (i >= blockHeader_._blocksToWrite)
1754 if (i == blockHeader_._blocksToWrite)
1755 endOfData = current->_offset + endBlockSize;
1756 current->_nextInChain = *pFreeList_;
1758 *pFreeList_ = (*pFreeList_)->_nextInList;
1762 current->_nextInChain = current->_nextInList;
1763 if (current->_nextInChain)
1765 if (current->_offset + blockSize < pBuffer_->getSize())
1767 current->_nextInChain->setOffset(current->_offset
1772 current->_nextInChain->setOffset(blockSize);
1777 current->_nextInChain = blocks_[1].init(1, blockSize);
1779 if (current->_nextInChain == *pFreeList_)
1781 *pFreeList_ = (*pFreeList_)->_nextInList;
1786 for (Block* free = *pFreeList_; free;
1787 free = free->_nextInList)
1789 if (free->_nextInList == current->_nextInChain)
1791 free->_nextInList = free->_nextInList->_nextInList;
1798 current->_nextInList = 0;
1799 current = current->_nextInChain;
1802 blockHeader_._blocksToWrite = blocksNeeded;
1804 current->_nextInList = 0;
1805 current->_sequence = blockHeader_._seq;
1815 while (current != first_)
1817 size_t chunkBytesAvail = endOfData > location_
1818 ? endOfData - location_
1820 if (chunkBytesAvail < dataBytes)
1832 chunkBytesAvail = dataBytes - chunkBytesAvail;
1833 endOfData = pBuffer_->
getSize() - chunkBytesAvail;
1840 endOfData -= dataBytes;
1846 blockHeader_._nextInChain = (current->_nextInChain
1847 ? current->_nextInChain->_offset
1848 : (amps_uint64_t)0);
1851 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1852 current = blocksUsed.top();
1863 blockHeader_._crcVal = crcVal;
1864 blockHeader_._nextInChain = first_->_nextInChain->_offset;
1868 pBuffer_->
setPosition(location_ + (
sizeof(amps_uint32_t)*2)
1869 + (
sizeof(amps_uint64_t)*2) );
1874 BlockChainHeader chainHeader;
1875 pBuffer_->
copyBytes((
char*)&chainHeader,
1876 sizeof(amps_uint32_t) * 8);
1879 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1880 pBuffer_->
putBytes((
const char*)&chainHeader,
sizeof(BlockChainHeader));
1883 void chooseCRC(
bool isFile)
1892 _crc = AMPS::CRC<0>::crcNoSSE;
1894 if(AMPS::CRC<0>::isSSE42Enabled())
1896 _crc = AMPS::CRC<0>::crc;
1900 _crc = AMPS::CRC<0>::crcNoSSE;
1905 static amps_uint64_t noOpCRC(
const char*,
size_t, amps_uint64_t)
1915 Block* _metadataBlock;
1917 amps_uint64_t _maxDiscarded;
1919 volatile amps_uint64_t _lastSequence;
1921 ATOMIC_TYPE _stored;
1926 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:759
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1120
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:941
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1012
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:731
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:628
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:542
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1035
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:447
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:251
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1122
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:571
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:262
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:680
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:614
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:258
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:836
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1119
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:237
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1109
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:196
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1034
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:84
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1130
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1077
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:668
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:815
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:487
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.