26 #ifndef _BLOCKPUBLISHSTORE_H_ 27 #define _BLOCKPUBLISHSTORE_H_ 34 #include <ampscrc.hpp> 38 #include <sys/timeb.h> 63 typedef Lock<BlockStore> BufferLock;
68 SOW_DELETE_FILTER=0x02,
70 SOW_DELETE_BOOKMARK=0x08,
71 SOW_DELETE_BOOKMARK_CANCEL=0x10,
72 SOW_DELETE_UNKNOWN=0x80
79 DEFAULT_BLOCK_HEADER_SIZE = 32,
80 DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
81 DEFAULT_BLOCKS_PER_REALLOC = 1000,
82 DEFAULT_BLOCK_SIZE = 2048
122 amps_uint32_t _blocksToWrite;
123 amps_uint32_t _totalRemaining;
125 amps_uint64_t _crcVal;
126 amps_uint64_t _nextInChain;
129 struct BlockChainHeader
131 amps_uint32_t _operation;
132 amps_uint32_t _commandIdLen;
133 amps_uint32_t _correlationIdLen;
134 amps_uint32_t _expirationLen;
135 amps_uint32_t _sowKeyLen;
136 amps_uint32_t _topicLen;
138 amps_uint32_t _ackTypes;
139 amps_uint32_t _unused[8];
141 : _operation(0), _commandIdLen(0), _correlationIdLen(0)
142 , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
153 return DEFAULT_BLOCK_HEADER_SIZE;
163 return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
171 return _blockStore.getBlockSize();
195 amps_uint32_t blocksPerRealloc_ = 1000,
196 bool isFile_ =
false,
197 amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
199 , _blockStore(buffer_, blocksPerRealloc_,
200 DEFAULT_BLOCK_HEADER_SIZE,
201 (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE*2
202 ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE*2))
204 , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
212 BufferLock bufferGuard(_blockStore);
214 _metadataBlock = _blockStore.get(1);
216 _blockStore.setUsedList(0);
217 _blockStore.setEndOfUsedList(0);
220 _metadataBlock->_sequence = (amps_uint64_t)0;
221 Buffer* pBuffer = _blockStore.getBuffer();
227 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
247 return store(message_,
true);
262 const char *commandId, *correlationId, *expiration, *sowKey,
265 BlockHeader blockHeader;
266 BlockChainHeader chainHeader;
268 chainHeader._commandIdLen = (amps_uint32_t)dataLen;
270 chainHeader._commandIdLen = (amps_uint32_t)dataLen;
272 chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
274 chainHeader._expirationLen = (amps_uint32_t)dataLen;
276 chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
278 chainHeader._topicLen = (amps_uint32_t)dataLen;
279 message_.getRawData(&data, &dataLen);
280 chainHeader._flag = -1;
282 chainHeader._operation = (amps_uint32_t)operation;
283 if (operation == Message::Command::SOWDelete)
287 chainHeader._flag = SOW_DELETE_DATA;
294 chainHeader._flag = SOW_DELETE_FILTER;
301 chainHeader._flag = SOW_DELETE_KEYS;
306 chainHeader._flag = SOW_DELETE_BOOKMARK;
309 size_t remaining = options.
len();
310 const void* next = NULL;
311 const void* start = (
const void*)(options.
data());
313 while (remaining >= 6 &&
314 (next = memchr(start,(
int)
'c',remaining)) != NULL)
316 remaining = (size_t)next-(
size_t)start;
317 if (remaining >= 6 && strncmp((
const char*)start,
320 chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
328 blockHeader._totalRemaining = (
329 (chainHeader._operation == Message::Command::Unknown)
332 + chainHeader._commandIdLen
333 + chainHeader._correlationIdLen
334 + chainHeader._expirationLen
335 + chainHeader._sowKeyLen
336 + chainHeader._topicLen
337 + (amps_uint32_t)dataLen));
338 size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
340 blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
342 : ((amps_uint32_t)(blockHeader._totalRemaining
344 + ((lastBlockLength > 0) ? 1 : 0)));
345 blockHeader._crcVal = (amps_uint64_t)0ULL;
346 blockHeader._crcVal = _crc(commandId,
347 chainHeader._commandIdLen,
348 blockHeader._crcVal);
349 blockHeader._crcVal = _crc(correlationId,
350 chainHeader._correlationIdLen,
351 blockHeader._crcVal);
352 blockHeader._crcVal = _crc(expiration,
353 chainHeader._expirationLen,
354 blockHeader._crcVal);
355 blockHeader._crcVal = _crc(sowKey,
356 chainHeader._sowKeyLen,
357 blockHeader._crcVal);
358 blockHeader._crcVal = _crc(topic,
359 chainHeader._topicLen,
360 blockHeader._crcVal);
361 blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
364 BufferLock bufferGuard(_blockStore);
365 Block* first = _blockStore.get(blockHeader._blocksToWrite);
368 if (_lastSequence <= 2)
372 blockHeader._seq = ++_lastSequence;
380 _maxDiscarded = blockHeader._seq - 1;
382 if (blockHeader._seq >= _lastSequence)
384 _lastSequence = blockHeader._seq;
390 size_t topicWritten = 0UL;
391 size_t dataWritten = 0UL;
392 size_t commandWritten = 0UL;
393 size_t correlationWritten = 0UL;
394 size_t expirationWritten = 0UL;
395 size_t sowKeyWritten = 0UL;
396 Buffer* pBuffer = _blockStore.getBuffer();
397 for(Block* next = first; next; next = next->_nextInChain)
399 next->_sequence = blockHeader._seq;
400 if (next->_nextInChain)
401 blockHeader._nextInChain = next->_nextInChain->_offset;
403 blockHeader._nextInChain = (amps_uint64_t)0;
406 pBuffer->
putBytes((
const char*)&blockHeader,
sizeof(BlockHeader));
408 blockHeader._crcVal = (amps_uint64_t)0;
414 pBuffer->
putBytes((
const char*)&chainHeader,
415 sizeof(BlockChainHeader));
424 if(commandWritten < chainHeader._commandIdLen)
426 size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
427 pBuffer->
putBytes(commandId + commandWritten,
429 bytesRemaining -= commandWrite;
430 commandWritten += commandWrite;
432 if(correlationWritten < chainHeader._correlationIdLen)
434 size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
435 pBuffer->
putBytes(correlationId + correlationWritten,
437 bytesRemaining -= correlationWrite;
438 correlationWritten += correlationWrite;
440 if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
442 size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
443 pBuffer->
putBytes(expiration + expirationWritten, expWrite);
444 bytesRemaining -= expWrite;
445 expirationWritten += expWrite;
447 if(bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
449 size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
450 pBuffer->
putBytes(sowKey + sowKeyWritten, sowKeyWrite);
451 bytesRemaining -= sowKeyWrite;
452 sowKeyWritten += sowKeyWrite;
454 if(bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
456 size_t topicWrite = (chainHeader._topicLen - topicWritten
458 ? chainHeader._topicLen - topicWritten
460 pBuffer->
putBytes(topic + topicWritten, topicWrite);
461 bytesRemaining -= topicWrite;
462 topicWritten += topicWrite;
464 if(bytesRemaining > 0 && dataWritten < dataLen)
466 size_t dataWrite = (dataLen-dataWritten < bytesRemaining) ?
467 dataLen-dataWritten : bytesRemaining;
468 pBuffer->
putBytes(data + dataWritten, dataWrite);
469 bytesRemaining -= dataWrite;
470 dataWritten += dataWrite;
474 catch (
const AMPSException& ex)
476 _blockStore.put(first);
479 AMPS_FETCH_ADD(&_stored, 1);
480 return blockHeader._seq;
490 BufferLock bufferGuard(_blockStore);
491 Buffer* pBuffer = _blockStore.getBuffer();
494 amps_uint64_t lastPersisted = _metadataBlock->_sequence;
496 if(index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
500 if (lastPersisted < index_)
504 _metadataBlock->_sequence = index_;
505 if (_maxDiscarded < index_)
507 _maxDiscarded = index_;
509 if (_lastSequence <= index_)
511 _lastSequence = index_;
518 _blockStore.signalAll();
521 if (_maxDiscarded < index_)
523 _maxDiscarded = index_;
525 AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
526 _blockStore.signalAll();
527 if (lastPersisted >= index_)
533 _metadataBlock->_sequence = index_;
534 if (_lastSequence < index_)
536 _lastSequence = index_;
547 BufferLock bufferGuard(_blockStore);
549 if(!_blockStore.front())
return;
550 Block* next = _blockStore.front();
553 for (Block* block = _blockStore.front(); block; block = next)
556 replayOnto(block, replayer_);
557 next = block->_nextInList;
560 catch (
const StoreException& e)
562 _blockStore.putAll(next);
575 BufferLock bufferGuard(_blockStore);
577 if(!_blockStore.front())
return false;
579 amps_uint64_t lastIdx = _blockStore.back()->_sequence;
581 amps_uint64_t leastIdx = _blockStore.front()->_sequence;
582 if(index_>=leastIdx && index_ <=lastIdx)
584 Block* block = _blockStore.front();
585 while(block && block->_sequence != index_)
587 block = block->_nextInList;
594 Buffer* pBuffer = _blockStore.getBuffer();
596 sizeof(amps_uint32_t));
597 if (pBuffer->
getUint32() == 0)
return false;
598 replayOnto(block, replayer_);
604 if (_blockStore.front()) leastIdx -= 1;
606 _message.setSequence(leastIdx);
619 size_t count = (size_t)_stored;
633 BufferLock bufferGuard(_blockStore);
634 amps_uint64_t waitFor = _getHighestUnpersisted();
639 bool timedOut =
false;
640 AMPS_START_TIMER(timeout_);
642 while (!timedOut && _stored != 0
643 && waitFor >= _getLowestUnpersisted())
645 if (!_blockStore.wait(timeout_))
648 AMPS_RESET_TIMER(timedOut, timeout_);
652 if (timedOut && _stored != 0
653 && waitFor >= _getLowestUnpersisted())
655 throw TimedOutException(
"Timed out waiting to flush publish store.");
660 while (_stored != 0 && waitFor >= _getLowestUnpersisted())
663 _blockStore.wait(1000);
664 amps_invoke_waiting_function();
671 BufferLock bufferGuard(_blockStore);
672 return _getLowestUnpersisted();
675 amps_uint64_t getHighestUnpersisted()
const 677 BufferLock bufferGuard(_blockStore);
678 return _getHighestUnpersisted();
683 BufferLock bufferGuard(_blockStore);
684 return _getLastPersisted();
688 static bool canResize(
size_t requestedSize_,
void* vpThis_)
693 amps_uint64_t _getLowestUnpersisted()
const 698 return _blockStore.front()->_sequence;
701 amps_uint64_t _getHighestUnpersisted()
const 706 return _blockStore.back()->_sequence;
709 amps_uint64_t _getLastPersisted(
void)
712 amps_uint64_t lastPersisted = (amps_uint64_t)0;
713 Buffer* pBuffer = _blockStore.getBuffer();
718 if (_lastSequence < lastPersisted)
719 _lastSequence = lastPersisted;
720 return lastPersisted;
724 lastPersisted = _maxDiscarded;
731 lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
734 gettimeofday(&tv, NULL);
735 lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
736 * (amps_uint64_t)1000000;
739 if (_lastSequence > 2)
741 amps_uint64_t low = _getLowestUnpersisted();
742 amps_uint64_t high = _getHighestUnpersisted();
745 lastPersisted = low - 1;
749 _lastSequence = high;
751 if (_lastSequence < lastPersisted)
753 lastPersisted = _lastSequence - 1;
758 _lastSequence = lastPersisted;
761 +
sizeof(amps_uint32_t)
762 +
sizeof(amps_uint32_t));
764 _metadataBlock->_sequence = lastPersisted;
765 return lastPersisted;
770 BufferLock bufferGuard(_blockStore);
772 Buffer* pBuffer = _blockStore.getBuffer();
773 size_t size = pBuffer->
getSize();
778 _metadataBlock = _blockStore.get(1);
779 _metadataBlock->_sequence = (amps_uint64_t)0;
783 pBuffer->
putUint32((amps_uint32_t)blockSize);
787 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
792 size_t numBlocks = size / blockSize;
793 if(size % blockSize > 0)
798 numBlocks = size / blockSize;
800 amps_uint32_t blockCount = 0;
802 delete[] _blockStore.resizeBuffer(numBlocks*blockSize, &blockCount);
805 if (size > pBuffer->
getSize() || numBlocks != (size_t)blockCount)
807 throw StoreException(
"Publish Store could not resize correctly during recoery, possibly due to resizeHandler refusing the request.");
812 amps_uint64_t maxIdx = 0;
813 amps_uint64_t minIdx = 0;
815 BlockHeader blockHeader;
817 Block* blocks =
new Block[numBlocks];
818 blocks[numBlocks-1]._nextInList = 0;
820 _blockStore.addBlocks(blocks);
821 _metadataBlock = blocks;
822 _metadataBlock->_nextInList = 0;
824 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
832 if (blockHeader._blocksToWrite == 1)
842 if (blockHeader._totalRemaining >= 5000000)
846 recoverOldFormat(blocks);
850 throw StoreException(
"Unrecognized format for Store. Can't recover.");
852 if (blockHeader._blocksToWrite == 0)
855 pBuffer->
putUint32((amps_uint32_t)blockSize);
859 blockSize = blockHeader._blocksToWrite;
860 _blockStore.setBlockSize(blockSize);
862 if (blockHeader._totalRemaining == 0)
869 _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
871 _metadataBlock->_sequence = blockHeader._seq;
872 if (_metadataBlock->_sequence
873 && _metadataBlock->_sequence < (amps_uint64_t)1000000)
876 +
sizeof(amps_uint32_t)
877 +
sizeof(amps_uint32_t));
879 _metadataBlock->_sequence = 0;
884 _maxDiscarded = _metadataBlock->_sequence;
885 _lastSequence = _maxDiscarded;
889 location += blockSize;
890 amps_uint32_t freeCount = 0;
891 Block* firstFree = NULL;
892 Block* endOfFreeList = NULL;
894 typedef std::map<amps_uint64_t, Block*> RecoverMap;
895 RecoverMap recoveredBlocks;
896 while(location < size)
900 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
901 if((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
902 (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
905 location += blockSize;
908 Block* block = blocks[++blockNum].setOffset(location);
909 bool recovered =
false;
910 if(blockHeader._seq > 0 && blockHeader._totalRemaining < size)
913 block->_sequence = blockHeader._seq;
915 if(maxIdx < blockHeader._seq)
917 maxIdx = blockHeader._seq;
919 if(minIdx > blockHeader._seq)
921 minIdx = blockHeader._seq;
924 recoveredBlocks[blockHeader._seq] = block;
926 while (blockHeader._nextInChain != (amps_uint64_t)0)
928 Block* chain = blocks[++blockNum]
929 .setOffset((
size_t)blockHeader._nextInChain);
930 chain->_nextInList = 0;
931 pBuffer->
setPosition((
size_t)blockHeader._nextInChain
932 +
sizeof(amps_uint32_t)
933 +
sizeof(amps_uint32_t)
934 +
sizeof(amps_uint64_t)
935 +
sizeof(amps_uint64_t));
936 blockHeader._nextInChain = pBuffer->
getUint64();
937 block->_nextInChain = chain;
939 block->_sequence = blockHeader._seq;
948 endOfFreeList->_nextInList = block;
954 endOfFreeList = block;
957 location += blockSize;
961 endOfFreeList->_nextInList = 0;
963 _blockStore.setFreeList(firstFree, freeCount);
964 if (maxIdx > _lastSequence)
966 _lastSequence = maxIdx;
968 if (minIdx > _maxDiscarded + 1)
970 _maxDiscarded = minIdx - 1;
972 if (_maxDiscarded > _metadataBlock->_sequence)
974 _metadataBlock->_sequence = _maxDiscarded;
979 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
980 for (RecoverMap::iterator i = recoveredBlocks.begin();
981 i != recoveredBlocks.end(); ++i)
985 end->_nextInList = i->second;
989 _blockStore.setUsedList(i->second);
995 end->_nextInList = 0;
997 _blockStore.setEndOfUsedList(end);
1005 size_t start = block_->_offset;
1006 size_t position = start;
1007 Buffer* pBuffer = _blockStore.getBuffer();
1009 BlockHeader blockHeader;
1010 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1011 if (blockHeader._totalRemaining == 0)
1017 BlockChainHeader blockChainHeader;
1018 pBuffer->
copyBytes((
char*)&blockChainHeader,
sizeof(blockChainHeader));
1019 if (blockChainHeader._operation == Message::Command::Unknown)
1024 blockChainHeader._ackTypes |= Message::AckType::Persisted;
1029 if (blockHeader._totalRemaining
1030 < blockChainHeader._commandIdLen
1031 + blockChainHeader._correlationIdLen
1032 + blockChainHeader._expirationLen
1033 + blockChainHeader._sowKeyLen
1034 + blockChainHeader._topicLen)
1036 std::ostringstream os;
1037 os <<
"Corrupted message found with invalid lengths. " 1038 <<
"Attempting to replay " << block_->_sequence
1039 <<
". Block sequence " << blockHeader._seq
1040 <<
", topic length " << blockChainHeader._topicLen
1041 <<
", data length " << blockHeader._totalRemaining
1042 <<
", command ID length " << blockChainHeader._commandIdLen
1043 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1044 <<
", expiration length " << blockChainHeader._expirationLen
1045 <<
", sow key length " << blockChainHeader._sowKeyLen
1046 <<
", start " << start
1047 <<
", position " << position
1048 <<
", buffer size " << pBuffer->
getSize();
1049 throw StoreException(os.str());
1054 _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1055 _message.setAckTypeEnum((
unsigned)blockChainHeader._ackTypes
1056 | Message::AckType::Persisted);
1057 _message.setSequence(blockHeader._seq);
1059 Block* current = block_;
1061 amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1063 char** tmpBuffers = (blockHeader._blocksToWrite>1) ?
new char*[blockHeader._blocksToWrite-1] : 0;
1064 size_t blockNum = 0;
1065 if (blockChainHeader._commandIdLen > 0)
1067 if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1069 _message.assignCommandId(pBuffer->
getBytes(blockChainHeader._commandIdLen)._data,
1070 blockChainHeader._commandIdLen);
1071 blockBytesRemaining -= blockChainHeader._commandIdLen;
1075 tmpBuffers[blockNum] =
new char[blockChainHeader._commandIdLen];
1076 size_t totalLeft = blockChainHeader._commandIdLen;
1077 size_t totalRead = 0;
1081 readLen = blockBytesRemaining < totalLeft ?
1082 blockBytesRemaining : totalLeft;
1083 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1084 if (!(totalLeft -= readLen))
break;
1085 if (!(current = current->_nextInChain))
break;
1086 totalRead += readLen;
1091 blockBytesRemaining -= readLen;
1092 _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1094 blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1095 crcCalc = _crc(_message.getCommandId().data(),
1096 blockChainHeader._commandIdLen, crcCalc);
1098 if (blockChainHeader._correlationIdLen > 0)
1100 if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1102 _message.assignCorrelationId(
1103 pBuffer->
getBytes(blockChainHeader._correlationIdLen)._data,
1104 blockChainHeader._correlationIdLen);
1105 blockBytesRemaining -= blockChainHeader._correlationIdLen;
1109 tmpBuffers[blockNum] =
new char[blockChainHeader._correlationIdLen];
1110 size_t totalLeft = blockChainHeader._correlationIdLen;
1111 size_t totalRead = 0;
1115 readLen = blockBytesRemaining < totalLeft ?
1116 blockBytesRemaining : totalLeft;
1117 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1118 if (!(totalLeft -= readLen))
break;
1119 if (!(current = current->_nextInChain))
break;
1120 totalRead += readLen;
1125 blockBytesRemaining -= readLen;
1126 _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1128 blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1129 crcCalc = _crc(_message.getCorrelationId().data(),
1130 blockChainHeader._correlationIdLen, crcCalc);
1132 if (blockChainHeader._expirationLen > 0)
1134 if (blockChainHeader._expirationLen <= blockBytesRemaining)
1136 _message.assignExpiration(
1137 pBuffer->
getBytes(blockChainHeader._expirationLen)._data,
1138 blockChainHeader._expirationLen);
1139 blockBytesRemaining -= blockChainHeader._expirationLen;
1143 tmpBuffers[blockNum] =
new char[blockChainHeader._expirationLen];
1144 size_t totalLeft = blockChainHeader._expirationLen;
1145 size_t totalRead = 0;
1149 readLen = blockBytesRemaining < totalLeft ?
1150 blockBytesRemaining : totalLeft;
1151 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1152 if (!(totalLeft -= readLen))
break;
1153 if (!(current = current->_nextInChain))
break;
1154 totalRead += readLen;
1159 blockBytesRemaining -= readLen;
1160 _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1162 blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1163 crcCalc = _crc(_message.getExpiration().data(),
1164 blockChainHeader._expirationLen, crcCalc);
1166 if (blockChainHeader._sowKeyLen > 0)
1168 if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1170 _message.assignSowKey(pBuffer->
getBytes(blockChainHeader._sowKeyLen)._data,
1171 blockChainHeader._sowKeyLen);
1172 blockBytesRemaining -= blockChainHeader._sowKeyLen;
1176 tmpBuffers[blockNum] =
new char[blockChainHeader._sowKeyLen];
1177 size_t totalLeft = blockChainHeader._sowKeyLen;
1178 size_t totalRead = 0;
1182 readLen = blockBytesRemaining < totalLeft ?
1183 blockBytesRemaining : totalLeft;
1184 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1185 if (!(totalLeft -= readLen))
break;
1186 if (!(current = current->_nextInChain))
break;
1187 totalRead += readLen;
1192 blockBytesRemaining -= readLen;
1193 _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1195 blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1196 crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1198 if (blockChainHeader._topicLen > 0)
1200 if (blockChainHeader._topicLen <= blockBytesRemaining)
1202 _message.assignTopic(pBuffer->
getBytes(blockChainHeader._topicLen)._data,
1203 blockChainHeader._topicLen);
1204 blockBytesRemaining -= blockChainHeader._topicLen;
1208 tmpBuffers[blockNum] =
new char[blockChainHeader._topicLen];
1209 size_t totalLeft = blockChainHeader._topicLen;
1210 size_t totalRead = 0;
1214 readLen = blockBytesRemaining < totalLeft ?
1215 blockBytesRemaining : totalLeft;
1216 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1217 if (!(totalLeft -= readLen))
break;
1218 if (!(current = current->_nextInChain))
break;
1219 totalRead += readLen;
1224 blockBytesRemaining -= readLen;
1225 _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1227 blockHeader._totalRemaining -= blockChainHeader._topicLen;
1228 crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1230 if (blockHeader._totalRemaining > 0)
1232 if (blockHeader._totalRemaining <= blockBytesRemaining)
1234 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1236 _message.assignData(
1237 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1238 blockHeader._totalRemaining);
1239 crcCalc = _crc(_message.getData().data(),
1240 blockHeader._totalRemaining, crcCalc);
1242 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1244 _message.assignFilter(
1245 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1246 blockHeader._totalRemaining);
1247 crcCalc = _crc(_message.getFilter().data(),
1248 blockHeader._totalRemaining, crcCalc);
1250 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1252 _message.assignSowKeys(
1253 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1254 blockHeader._totalRemaining);
1255 crcCalc = _crc(_message.getSowKeys().data(),
1256 blockHeader._totalRemaining, crcCalc);
1258 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1260 _message.assignBookmark(
1261 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1262 blockHeader._totalRemaining);
1263 crcCalc = _crc(_message.getBookmark().data(),
1264 blockHeader._totalRemaining, crcCalc);
1266 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1268 _message.assignBookmark(
1269 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1270 blockHeader._totalRemaining);
1271 crcCalc = _crc(_message.getBookmark().data(),
1272 blockHeader._totalRemaining, crcCalc);
1273 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1278 tmpBuffers[blockNum] =
new char[blockHeader._totalRemaining];
1279 size_t totalLeft = blockHeader._totalRemaining;
1280 size_t totalRead = 0;
1284 readLen = blockBytesRemaining < totalLeft ?
1285 blockBytesRemaining : totalLeft;
1286 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1287 if (!(totalLeft -= readLen))
break;
1288 if (!(current = current->_nextInChain))
break;
1289 totalRead += readLen;
1295 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1296 _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1297 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1298 _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1299 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1300 _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1301 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1302 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1303 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1305 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1306 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1308 crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc);
1313 if(crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1315 std::ostringstream os;
1316 os <<
"Corrupted message found by CRC or sequence " 1317 <<
"Attempting to replay " << block_->_sequence
1318 <<
". Block sequence " << blockHeader._seq
1319 <<
", expiration length " << blockChainHeader._expirationLen
1320 <<
", sowKey length " << blockChainHeader._sowKeyLen
1321 <<
", topic length " << blockChainHeader._topicLen
1322 <<
", data length " << blockHeader._totalRemaining
1323 <<
", command ID length " << blockChainHeader._commandIdLen
1324 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1325 <<
", flag " << blockChainHeader._flag
1326 <<
", expected CRC " << blockHeader._crcVal
1327 <<
", actual CRC " << crcCalc
1328 <<
", start " << start
1329 <<
", position " << position
1330 <<
", buffer size " << pBuffer->
getSize();
1331 for (Block* block = block_; block; block = block->_nextInChain)
1333 os <<
"\n BLOCK " << block->_offset;
1337 for (amps_uint32_t i=0; i<blockNum; ++i)
1338 delete[] tmpBuffers[i];
1339 delete[] tmpBuffers;
1341 throw StoreException(os.str());
1348 for (amps_uint32_t i=0; i<blockNum; ++i)
1349 delete[] tmpBuffers[i];
1350 delete[] tmpBuffers;
1356 void recoverOldFormat(Block* blocks)
1358 Buffer* pBuffer = _blockStore.getBuffer();
1359 amps_uint64_t maxIdx = 0;
1360 amps_uint64_t minIdx = 0;
1361 size_t size = pBuffer->
getSize();
1362 size_t location = 0;
1365 pBuffer->
putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1366 _metadataBlock->_sequence = pBuffer->
getUint64();
1367 if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1371 _metadataBlock->_sequence = 0;
1376 _maxDiscarded = _metadataBlock->_sequence;
1377 _lastSequence = _maxDiscarded;
1380 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1385 amps_uint32_t freeCount = 0;
1386 Block* firstFree = NULL;
1387 Block* endOfFreeList = NULL;
1389 size_t numBlocks = size / blockSize;
1390 size_t blockNum = 0;
1392 typedef std::map<amps_uint64_t, Block*> RecoverMap;
1393 RecoverMap recoveredBlocks;
1394 RecoverMap growingBlocks;
1395 amps_uint32_t growthBlocksNeeded = 0;
1396 while(location < size)
1400 BlockHeader blockHeader;
1401 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1402 size_t blockCount = (size_t)blockHeader._blocksToWrite;
1403 if(blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1404 && blockHeader._totalRemaining < size
1405 && blockHeader._blocksToWrite < numBlocks
1406 && (blockHeader._blocksToWrite*blockSize)
1407 >= blockHeader._totalRemaining)
1409 size_t oldFormatSize = blockHeader._totalRemaining;
1412 blockHeader._totalRemaining -= 64;
1416 BlockChainHeader chainHeader;
1420 pBuffer->
setPosition(location + (
sizeof(amps_uint32_t)*2)
1421 + (
sizeof(amps_uint64_t)*2) );
1427 sizeof(amps_uint32_t) * 8);
1429 if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1430 + chainHeader._expirationLen + chainHeader._sowKeyLen
1431 + chainHeader._topicLen) > blockHeader._totalRemaining)
1438 amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1440 + (blockHeader._totalRemaining
1443 if (blocksNeeded == blockHeader._blocksToWrite)
1445 Block* first = blocks[++blockNum].setOffset(location);
1446 first->_nextInList = 0;
1447 first->_sequence = blockHeader._seq;
1448 if (blockHeader._blocksToWrite > 1)
1451 amps_uint64_t crcVal = blockHeader._crcVal;
1452 blockHeader._crcVal = 0;
1457 size_t currentBlockNum = blockNum
1458 + blockHeader._blocksToWrite
1462 if (currentBlockNum >= numBlocks)
1464 currentBlockNum = currentBlockNum - numBlocks + 1;
1466 if (currentBlockNum < blockNum)
1468 Block* last = blocks[currentBlockNum]
1470 if ((current = firstFree) == last)
1472 firstFree = firstFree->_nextInList;
1473 if (!firstFree) endOfFreeList = 0;
1480 if (current->_nextInList == last)
1482 current->_nextInList = last->_nextInList;
1487 current=current->_nextInList;
1493 current = blocks[currentBlockNum]
1498 while (current != first)
1500 current->_nextInList = 0;
1501 current->_sequence = blockHeader._seq;
1503 if (--currentBlockNum < 1
1504 || currentBlockNum > numBlocks)
1506 currentBlockNum = numBlocks - 1;
1508 Block* previous = blocks[currentBlockNum]
1509 .init(currentBlockNum,
1511 previous->_nextInChain = current;
1517 size_t bytesToMove = --blockCount
1521 pBuffer->
copyBytes(current->_offset + bytesToMove,
1528 dataBytes -= bytesToMove;
1536 blockHeader._nextInChain = (current->_nextInChain
1537 ? current->_nextInChain->_offset
1538 : (amps_uint64_t)0);
1541 pBuffer->
putBytes((
const char*)&blockHeader,
1542 sizeof(BlockHeader));
1543 if (firstFree == previous)
1545 firstFree = firstFree->_nextInList;
1546 if (!firstFree) endOfFreeList = 0;
1551 current = firstFree;
1554 if (current->_nextInList == previous)
1556 current->_nextInList = previous->_nextInList;
1560 current=current->_nextInList;
1565 blockNum += blockHeader._blocksToWrite - 1;
1566 blockHeader._crcVal = crcVal;
1577 blockHeader._nextInChain = (first->_nextInChain
1578 ? first->_nextInChain->_offset
1579 : (amps_uint64_t)0);
1580 pBuffer->
putBytes((
const char*)&blockHeader,
1581 sizeof(BlockHeader));
1582 pBuffer->
putBytes((
const char*)&chainHeader,
1583 sizeof(BlockChainHeader));
1586 recoveredBlocks[blockHeader._seq] = first;
1592 growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1593 growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1594 blockNum += blockHeader._blocksToWrite - 1;
1597 if (maxIdx < blockHeader._seq)
1599 maxIdx = blockHeader._seq;
1601 if (minIdx > blockHeader._seq)
1603 minIdx = blockHeader._seq;
1606 location += blockHeader._blocksToWrite *
getBlockSize();
1608 assert(location >= size || blockNum < numBlocks);
1613 Block* block = blocks[++blockNum].setOffset(location);
1616 endOfFreeList->_nextInList = block;
1622 endOfFreeList = block;
1624 location += blockSize;
1627 for (RecoverMap::iterator i = growingBlocks.begin();
1628 i != growingBlocks.end(); ++i)
1630 Block* first = i->second;
1632 BlockHeader blockHeader;
1635 pBuffer->
copyBytes((
char*)&blockHeader, 24);
1638 blockHeader._totalRemaining -= 64;
1641 if (freeCount < growthBlocksNeeded)
1644 amps_uint32_t minBlocksRequired = growthBlocksNeeded-freeCount;
1645 amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1646 if (growthBlocks < minBlocksRequired)
1648 amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1649 if (minBlocksRequired%defaultBlocks)
1650 minBlocksRequired = (minBlocksRequired/defaultBlocks+1)
1652 growthBlocks = minBlocksRequired;
1654 amps_uint32_t newBlocks = 0;
1655 Block* addedBlocks = _blockStore.resizeBuffer(
1657 + growthBlocks * blockSize,
1659 _blockStore.addBlocks(addedBlocks);
1660 freeCount += newBlocks;
1661 growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1662 ? growthBlocksNeeded - freeCount : 0;
1665 endOfFreeList->_nextInList = addedBlocks;
1669 firstFree = addedBlocks;
1671 endOfFreeList = &(addedBlocks[newBlocks-1]);
1672 endOfFreeList->_nextInList = 0;
1674 expandBlocks(blocks, first->_offset, first, blockHeader,
1675 &firstFree, &freeCount, pBuffer);
1677 recoveredBlocks[blockHeader._seq] = first;
1678 if (!firstFree) endOfFreeList = 0;
1680 if (endOfFreeList) endOfFreeList->_nextInList = 0;
1681 _blockStore.setFreeList(firstFree, freeCount);
1682 if (maxIdx > _lastSequence) _lastSequence = maxIdx;
1683 if (minIdx > _maxDiscarded + 1) _maxDiscarded = minIdx - 1;
1684 if (_maxDiscarded > _metadataBlock->_sequence)
1686 _metadataBlock->_sequence = _maxDiscarded;
1691 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1692 for (RecoverMap::iterator i = recoveredBlocks.begin();
1693 i != recoveredBlocks.end(); ++i)
1695 if (_blockStore.front())
1697 end->_nextInList = i->second;
1701 _blockStore.setUsedList(i->second);
1707 end->_nextInList = 0;
1709 _blockStore.setEndOfUsedList(end);
1714 void expandBlocks(Block* blocks_,
size_t location_, Block* first_,
1715 BlockHeader blockHeader_,
1716 Block** pFreeList_, amps_uint32_t* pFreeCount_,
1720 Block* current = first_;
1723 amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1724 blockHeader_._totalRemaining -= 64;
1728 amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1730 + (blockHeader_._totalRemaining
1737 size_t endBlockSize = oldTotalRemaining % blockSize;
1738 if (!endBlockSize) endBlockSize = blockSize;
1739 size_t endOfData = 0;
1741 amps_uint64_t crcVal = blockHeader_._crcVal;
1742 blockHeader_._crcVal = 0;
1744 std::list<Block*> blocksUsed;
1745 for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1747 blocksUsed.push_back(current);
1748 current->_sequence = blockHeader_._seq;
1749 if (i >= blockHeader_._blocksToWrite)
1751 if (i == blockHeader_._blocksToWrite)
1752 endOfData = current->_offset + endBlockSize;
1753 current->_nextInChain = *pFreeList_;
1755 *pFreeList_ = (*pFreeList_)->_nextInList;
1759 current->_nextInChain = current->_nextInList;
1760 if (current->_nextInChain)
1762 if (current->_offset + blockSize < pBuffer_->getSize())
1764 current->_nextInChain->setOffset(current->_offset
1769 current->_nextInChain->setOffset(blockSize);
1774 current->_nextInChain = blocks_[1].init(1, blockSize);
1776 if (current->_nextInChain == *pFreeList_)
1778 *pFreeList_ = (*pFreeList_)->_nextInList;
1783 for (Block* free = *pFreeList_; free;
1784 free = free->_nextInList)
1786 if (free->_nextInList == current->_nextInChain)
1788 free->_nextInList = free->_nextInList->_nextInList;
1795 current->_nextInList = 0;
1796 current = current->_nextInChain;
1799 blockHeader_._blocksToWrite = blocksNeeded;
1801 current->_nextInList = 0;
1802 current->_sequence = blockHeader_._seq;
1812 while (current != first_)
1814 size_t chunkBytesAvail = endOfData > location_
1815 ? endOfData - location_
1817 if (chunkBytesAvail < dataBytes)
1829 chunkBytesAvail = dataBytes - chunkBytesAvail;
1830 endOfData = pBuffer_->
getSize() - chunkBytesAvail;
1837 endOfData -= dataBytes;
1843 blockHeader_._nextInChain = (current->_nextInChain
1844 ? current->_nextInChain->_offset
1845 : (amps_uint64_t)0);
1848 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1849 current = blocksUsed.back();
1850 blocksUsed.pop_back();
1860 blockHeader_._crcVal = crcVal;
1861 blockHeader_._nextInChain = first_->_nextInChain->_offset;
1865 pBuffer_->
setPosition(location_ + (
sizeof(amps_uint32_t)*2)
1866 + (
sizeof(amps_uint64_t)*2) );
1871 BlockChainHeader chainHeader;
1872 pBuffer_->
copyBytes((
char*)&chainHeader,
1873 sizeof(amps_uint32_t) * 8);
1876 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1877 pBuffer_->
putBytes((
const char*)&chainHeader,
sizeof(BlockChainHeader));
1880 static bool isSSE42Enabled()
1884 __cpuid(cpuinfo, 1);
1885 return (cpuinfo[2] & (1<<20)) != 0;
1887 unsigned int eax, ebx, ecx=0, edx;
1888 __get_cpuid(1, &eax, &ebx, &ecx, &edx);
1889 return ecx & (1<<20);
1895 void chooseCRC(
bool isFile)
1904 _crc = AMPS::CRC<0>::crcNoSSE;
1906 if(isSSE42Enabled())
1908 _crc = AMPS::CRC<0>::crc;
1912 _crc = AMPS::CRC<0>::crcNoSSE;
1917 static amps_uint64_t noOpCRC(
const char*,
size_t, amps_uint64_t)
1927 Block* _metadataBlock;
1929 amps_uint64_t _maxDiscarded;
1931 volatile amps_uint64_t _lastSequence;
1933 ATOMIC_TYPE _stored;
1938 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:1143
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1062
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:885
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:77
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:956
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:1115
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:631
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:544
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:979
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:393
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:151
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:206
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:177
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1064
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:573
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:260
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:681
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:617
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:213
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1220
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1061
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:59
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:235
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1052
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:194
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:161
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:978
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:52
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:245
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:169
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1072
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1021
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:669
Definition: ampsplusplus.hpp:136
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:1199
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:487
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.