26 #ifndef _BLOCKPUBLISHSTORE_H_ 27 #define _BLOCKPUBLISHSTORE_H_ 35 #include <ampscrc.hpp> 39 #include <sys/timeb.h> 64 typedef Lock<BlockStore> BufferLock;
65 typedef Unlock<BlockStore> BufferUnlock;
69 SOW_DELETE_DATA = 0x01,
70 SOW_DELETE_FILTER = 0x02,
71 SOW_DELETE_KEYS = 0x04,
72 SOW_DELETE_BOOKMARK = 0x08,
73 SOW_DELETE_BOOKMARK_CANCEL = 0x10,
74 SOW_DELETE_UNKNOWN = 0x80
81 DEFAULT_BLOCK_HEADER_SIZE = 32,
82 DEFAULT_BLOCK_CHAIN_HEADER_SIZE = 64,
83 DEFAULT_BLOCKS_PER_REALLOC = 1000,
84 DEFAULT_BLOCK_SIZE = 2048
124 amps_uint32_t _blocksToWrite;
125 amps_uint32_t _totalRemaining;
127 amps_uint64_t _crcVal;
128 amps_uint64_t _nextInChain;
131 struct BlockChainHeader
133 amps_uint32_t _operation;
134 amps_uint32_t _commandIdLen;
135 amps_uint32_t _correlationIdLen;
136 amps_uint32_t _expirationLen;
137 amps_uint32_t _sowKeyLen;
138 amps_uint32_t _topicLen;
140 amps_uint32_t _ackTypes;
141 amps_uint32_t _unused[8];
143 : _operation(0), _commandIdLen(0), _correlationIdLen(0)
144 , _expirationLen(0), _sowKeyLen(0), _topicLen(0), _flag(-1)
155 return DEFAULT_BLOCK_HEADER_SIZE;
165 return DEFAULT_BLOCK_CHAIN_HEADER_SIZE;
173 return _blockStore.getBlockSize();
197 amps_uint32_t blocksPerRealloc_ = 1000,
198 bool isFile_ =
false,
199 amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
201 , _blockStore(buffer_, blocksPerRealloc_,
202 DEFAULT_BLOCK_HEADER_SIZE,
203 (blockSize_ > DEFAULT_BLOCK_HEADER_SIZE * 2
204 ? blockSize_ : DEFAULT_BLOCK_HEADER_SIZE * 2))
206 , _maxDiscarded((amps_uint64_t)0), _lastSequence((amps_uint64_t)1)
214 BufferLock bufferGuard(_blockStore);
216 _metadataBlock = _blockStore.get(1);
218 _blockStore.setUsedList(0);
219 _blockStore.setEndOfUsedList(0);
222 _metadataBlock->_sequence = (amps_uint64_t)0;
223 Buffer* pBuffer = _blockStore.getBuffer();
229 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
249 return store(message_,
true);
264 const char* commandId, *correlationId, *expiration, *sowKey,
267 BlockHeader blockHeader;
268 BlockChainHeader chainHeader;
270 chainHeader._commandIdLen = (amps_uint32_t)dataLen;
272 chainHeader._correlationIdLen = (amps_uint32_t)dataLen;
274 chainHeader._expirationLen = (amps_uint32_t)dataLen;
276 chainHeader._sowKeyLen = (amps_uint32_t)dataLen;
278 chainHeader._topicLen = (amps_uint32_t)dataLen;
279 message_.getRawData(&data, &dataLen);
280 chainHeader._flag = -1;
282 chainHeader._operation = (amps_uint32_t)operation;
283 if (operation == Message::Command::SOWDelete)
287 chainHeader._flag = SOW_DELETE_DATA;
294 chainHeader._flag = SOW_DELETE_FILTER;
301 chainHeader._flag = SOW_DELETE_KEYS;
306 chainHeader._flag = SOW_DELETE_BOOKMARK;
309 size_t remaining = options.
len();
310 const void* next = NULL;
311 const void* start = (
const void*)(options.
data());
313 while (remaining >= 6 &&
314 (next = memchr(start, (
int)
'c', remaining)) != NULL)
316 remaining = (size_t)next - (
size_t)start;
317 if (remaining >= 6 && strncmp((
const char*)start,
320 chainHeader._flag = SOW_DELETE_BOOKMARK_CANCEL;
328 blockHeader._totalRemaining = (
329 (chainHeader._operation == Message::Command::Unknown)
332 + chainHeader._commandIdLen
333 + chainHeader._correlationIdLen
334 + chainHeader._expirationLen
335 + chainHeader._sowKeyLen
336 + chainHeader._topicLen
337 + (amps_uint32_t)dataLen));
338 size_t lastBlockLength = ((operation == Message::Command::Unknown) ? 0 :
340 blockHeader._blocksToWrite = ((operation == Message::Command::Unknown)
342 : ((amps_uint32_t)(blockHeader._totalRemaining
344 + ((lastBlockLength > 0) ? 1 : 0)));
345 blockHeader._crcVal = (amps_uint64_t)0ULL;
346 blockHeader._crcVal = _crc(commandId,
347 chainHeader._commandIdLen,
348 blockHeader._crcVal);
349 blockHeader._crcVal = _crc(correlationId,
350 chainHeader._correlationIdLen,
351 blockHeader._crcVal);
352 blockHeader._crcVal = _crc(expiration,
353 chainHeader._expirationLen,
354 blockHeader._crcVal);
355 blockHeader._crcVal = _crc(sowKey,
356 chainHeader._sowKeyLen,
357 blockHeader._crcVal);
358 blockHeader._crcVal = _crc(topic,
359 chainHeader._topicLen,
360 blockHeader._crcVal);
361 blockHeader._crcVal = _crc(data, dataLen, blockHeader._crcVal);
364 BufferLock bufferGuard(_blockStore);
365 Block* first = _blockStore.get(blockHeader._blocksToWrite);
368 if (_lastSequence <= 2)
372 blockHeader._seq = ++_lastSequence;
377 message_.getMessage(),
381 _maxDiscarded = blockHeader._seq - 1;
383 if (blockHeader._seq >= _lastSequence)
385 _lastSequence = blockHeader._seq;
391 size_t topicWritten = 0UL;
392 size_t dataWritten = 0UL;
393 size_t commandWritten = 0UL;
394 size_t correlationWritten = 0UL;
395 size_t expirationWritten = 0UL;
396 size_t sowKeyWritten = 0UL;
397 Buffer* pBuffer = _blockStore.getBuffer();
398 for (Block* next = first; next; next = next->_nextInChain)
400 next->_sequence = blockHeader._seq;
401 if (next->_nextInChain)
403 blockHeader._nextInChain = next->_nextInChain->_offset;
407 blockHeader._nextInChain = (amps_uint64_t)0;
411 pBuffer->
putBytes((
const char*)&blockHeader,
sizeof(BlockHeader));
413 blockHeader._crcVal = (amps_uint64_t)0;
419 pBuffer->
putBytes((
const char*)&chainHeader,
420 sizeof(BlockChainHeader));
429 if (commandWritten < chainHeader._commandIdLen)
431 size_t commandWrite = (chainHeader._commandIdLen - commandWritten < bytesRemaining) ? chainHeader._commandIdLen - commandWritten : bytesRemaining;
432 pBuffer->
putBytes(commandId + commandWritten,
434 bytesRemaining -= commandWrite;
435 commandWritten += commandWrite;
437 if (correlationWritten < chainHeader._correlationIdLen)
439 size_t correlationWrite = (chainHeader._correlationIdLen - correlationWritten < bytesRemaining) ? chainHeader._correlationIdLen - correlationWritten : bytesRemaining;
440 pBuffer->
putBytes(correlationId + correlationWritten,
442 bytesRemaining -= correlationWrite;
443 correlationWritten += correlationWrite;
445 if (bytesRemaining > 0 && expirationWritten < chainHeader._expirationLen)
447 size_t expWrite = (chainHeader._expirationLen - expirationWritten < bytesRemaining) ? chainHeader._expirationLen - expirationWritten : bytesRemaining;
448 pBuffer->
putBytes(expiration + expirationWritten, expWrite);
449 bytesRemaining -= expWrite;
450 expirationWritten += expWrite;
452 if (bytesRemaining > 0 && sowKeyWritten < chainHeader._sowKeyLen)
454 size_t sowKeyWrite = (chainHeader._sowKeyLen - sowKeyWritten < bytesRemaining) ? chainHeader._sowKeyLen - sowKeyWritten : bytesRemaining;
455 pBuffer->
putBytes(sowKey + sowKeyWritten, sowKeyWrite);
456 bytesRemaining -= sowKeyWrite;
457 sowKeyWritten += sowKeyWrite;
459 if (bytesRemaining > 0 && topicWritten < chainHeader._topicLen)
461 size_t topicWrite = (chainHeader._topicLen - topicWritten
463 ? chainHeader._topicLen - topicWritten
465 pBuffer->
putBytes(topic + topicWritten, topicWrite);
466 bytesRemaining -= topicWrite;
467 topicWritten += topicWrite;
469 if (bytesRemaining > 0 && dataWritten < dataLen)
471 size_t dataWrite = (dataLen - dataWritten < bytesRemaining) ?
472 dataLen - dataWritten : bytesRemaining;
473 pBuffer->
putBytes(data + dataWritten, dataWrite);
474 bytesRemaining -= dataWrite;
475 dataWritten += dataWrite;
479 catch (
const AMPSException& ex)
481 _blockStore.put(first);
484 AMPS_FETCH_ADD(&_stored, 1);
485 return blockHeader._seq;
495 BufferLock bufferGuard(_blockStore);
496 Buffer* pBuffer = _blockStore.getBuffer();
499 amps_uint64_t lastPersisted = _metadataBlock->_sequence;
501 if (index_ == (amps_uint64_t)0 || !_blockStore.front() || index_ <= _maxDiscarded)
505 if (lastPersisted < index_)
509 _metadataBlock->_sequence = index_;
510 if (_maxDiscarded < index_)
512 _maxDiscarded = index_;
514 if (_lastSequence <= index_)
516 _lastSequence = index_;
523 _blockStore.signalAll();
527 _maxDiscarded = index_;
528 AMPS_FETCH_SUB(&_stored, _blockStore.put(index_));
529 _blockStore.signalAll();
530 if (lastPersisted >= index_)
536 _metadataBlock->_sequence = index_;
537 if (_lastSequence < index_)
539 _lastSequence = index_;
550 BufferLock bufferGuard(_blockStore);
552 if (!_blockStore.front())
556 Block* next = _blockStore.front();
559 for (Block* block = _blockStore.front(); block; block = next)
562 replayOnto(block, replayer_);
563 next = block->_nextInList;
566 catch (
const StoreException& e)
568 _blockStore.putAll(next);
581 BufferLock bufferGuard(_blockStore);
583 if (!_blockStore.front())
588 amps_uint64_t lastIdx = _blockStore.back()->_sequence;
590 amps_uint64_t leastIdx = _blockStore.front()->_sequence;
591 if (index_ >= leastIdx && index_ <= lastIdx)
593 Block* block = _blockStore.front();
594 while (block && block->_sequence != index_)
596 block = block->_nextInList;
603 Buffer* pBuffer = _blockStore.getBuffer();
605 sizeof(amps_uint32_t));
610 replayOnto(block, replayer_);
617 _message.setSequence(leastIdx);
630 size_t count = (size_t)_stored;
644 BufferLock bufferGuard(_blockStore);
645 amps_uint64_t waitFor = _getHighestUnpersisted();
653 bool timedOut =
false;
654 AMPS_START_TIMER(timeout_);
656 while (!timedOut && _stored != 0
657 && waitFor >= _getLowestUnpersisted())
659 if (!_blockStore.wait(timeout_))
662 AMPS_RESET_TIMER(timedOut, timeout_);
666 if (timedOut && _stored != 0
667 && waitFor >= _getLowestUnpersisted())
669 throw TimedOutException(
"Timed out waiting to flush publish store.");
674 while (_stored != 0 && waitFor >= _getLowestUnpersisted())
677 _blockStore.wait(1000);
679 BufferUnlock unlck(_blockStore);
680 amps_invoke_waiting_function();
687 BufferLock bufferGuard(_blockStore);
688 return _getLowestUnpersisted();
691 amps_uint64_t getHighestUnpersisted()
const 693 BufferLock bufferGuard(_blockStore);
694 return _getHighestUnpersisted();
699 BufferLock bufferGuard(_blockStore);
700 return _getLastPersisted();
704 static bool canResize(
size_t requestedSize_,
void* vpThis_)
709 amps_uint64_t _getLowestUnpersisted()
const 713 if (!_blockStore.front())
717 return _blockStore.front()->_sequence;
720 amps_uint64_t _getHighestUnpersisted()
const 724 if (!_blockStore.back())
728 return _blockStore.back()->_sequence;
731 amps_uint64_t _getLastPersisted(
void)
734 amps_uint64_t lastPersisted = (amps_uint64_t)0;
735 Buffer* pBuffer = _blockStore.getBuffer();
740 if (_lastSequence < lastPersisted)
742 _lastSequence = lastPersisted;
744 return lastPersisted;
748 lastPersisted = _maxDiscarded;
755 lastPersisted = (t.time * 1000 + t.millitm) * (amps_uint64_t)1000000;
758 gettimeofday(&tv, NULL);
759 lastPersisted = (amps_uint64_t)((tv.tv_sec * 1000) + (tv.tv_usec / 1000))
760 * (amps_uint64_t)1000000;
763 if (_lastSequence > 2)
765 amps_uint64_t low = _getLowestUnpersisted();
766 amps_uint64_t high = _getHighestUnpersisted();
769 lastPersisted = low - 1;
773 _lastSequence = high;
775 if (_lastSequence < lastPersisted)
777 lastPersisted = _lastSequence - 1;
782 _lastSequence = lastPersisted;
785 +
sizeof(amps_uint32_t)
786 +
sizeof(amps_uint32_t));
788 _metadataBlock->_sequence = lastPersisted;
789 return lastPersisted;
794 BufferLock bufferGuard(_blockStore);
796 Buffer* pBuffer = _blockStore.getBuffer();
797 size_t size = pBuffer->
getSize();
802 _metadataBlock = _blockStore.get(1);
803 _metadataBlock->_sequence = (amps_uint64_t)0;
807 pBuffer->
putUint32((amps_uint32_t)blockSize);
811 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
816 size_t numBlocks = size / blockSize;
817 if (size % blockSize > 0)
822 numBlocks = size / blockSize;
824 amps_uint32_t blockCount = 0;
826 delete[] _blockStore.resizeBuffer(numBlocks * blockSize, &blockCount);
829 if (size > pBuffer->
getSize() || numBlocks != (size_t)blockCount)
831 throw StoreException(
"Publish Store could not resize correctly during recovery, possibly due to resizeHandler refusing the request.");
836 amps_uint64_t maxIdx = 0;
837 amps_uint64_t minIdx = 0;
839 BlockHeader blockHeader;
841 Block* blocks =
new Block[numBlocks];
842 blocks[numBlocks - 1]._nextInList = 0;
844 _blockStore.addBlocks(blocks);
845 _metadataBlock = blocks;
846 _metadataBlock->_nextInList = 0;
848 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
856 if (blockHeader._blocksToWrite == 1)
866 if (blockHeader._totalRemaining >= 5000000)
870 recoverOldFormat(blocks);
874 throw StoreException(
"Unrecognized format for Store. Can't recover.");
876 if (blockHeader._blocksToWrite == 0)
879 pBuffer->
putUint32((amps_uint32_t)blockSize);
883 blockSize = blockHeader._blocksToWrite;
884 _blockStore.setBlockSize(blockSize);
886 if (blockHeader._totalRemaining == 0)
893 _blockStore.setBlockHeaderSize(blockHeader._totalRemaining);
895 _metadataBlock->_sequence = blockHeader._seq;
896 if (_metadataBlock->_sequence
897 && _metadataBlock->_sequence < (amps_uint64_t)1000000)
900 +
sizeof(amps_uint32_t)
901 +
sizeof(amps_uint32_t));
903 _metadataBlock->_sequence = 0;
908 _maxDiscarded = _metadataBlock->_sequence;
909 _lastSequence = _maxDiscarded;
913 location += blockSize;
914 amps_uint32_t freeCount = 0;
915 Block* firstFree = NULL;
916 Block* endOfFreeList = NULL;
918 typedef std::map<amps_uint64_t, Block*> RecoverMap;
919 RecoverMap recoveredBlocks;
920 while (location < size)
924 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
925 if ((blockHeader._seq > 0 && blockHeader._totalRemaining < size) &&
926 (!blockHeader._crcVal || recoveredBlocks.count(blockHeader._seq)))
929 location += blockSize;
932 Block* block = blocks[++blockNum].setOffset(location);
933 bool recovered =
false;
934 if (blockHeader._seq > 0 && blockHeader._totalRemaining < size)
937 block->_sequence = blockHeader._seq;
939 if (maxIdx < blockHeader._seq)
941 maxIdx = blockHeader._seq;
943 if (minIdx > blockHeader._seq)
945 minIdx = blockHeader._seq;
948 recoveredBlocks[blockHeader._seq] = block;
950 while (blockHeader._nextInChain != (amps_uint64_t)0)
952 Block* chain = blocks[++blockNum]
953 .setOffset((
size_t)blockHeader._nextInChain);
954 chain->_nextInList = 0;
955 pBuffer->
setPosition((
size_t)blockHeader._nextInChain
956 +
sizeof(amps_uint32_t)
957 +
sizeof(amps_uint32_t)
958 +
sizeof(amps_uint64_t)
959 +
sizeof(amps_uint64_t));
960 blockHeader._nextInChain = pBuffer->
getUint64();
961 block->_nextInChain = chain;
963 block->_sequence = blockHeader._seq;
972 endOfFreeList->_nextInList = block;
978 endOfFreeList = block;
981 location += blockSize;
985 endOfFreeList->_nextInList = 0;
987 _blockStore.setFreeList(firstFree, freeCount);
988 if (maxIdx > _lastSequence)
990 _lastSequence = maxIdx;
992 if (minIdx > _maxDiscarded + 1)
994 _maxDiscarded = minIdx - 1;
996 if (_maxDiscarded > _metadataBlock->_sequence)
998 _metadataBlock->_sequence = _maxDiscarded;
1003 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1004 for (RecoverMap::iterator i = recoveredBlocks.begin();
1005 i != recoveredBlocks.end(); ++i)
1009 end->_nextInList = i->second;
1013 _blockStore.setUsedList(i->second);
1019 end->_nextInList = 0;
1021 _blockStore.setEndOfUsedList(end);
1029 size_t start = block_->_offset;
1030 size_t position = start;
1031 Buffer* pBuffer = _blockStore.getBuffer();
1033 BlockHeader blockHeader;
1034 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1035 if (blockHeader._totalRemaining == 0)
1041 BlockChainHeader blockChainHeader;
1042 pBuffer->
copyBytes((
char*)&blockChainHeader,
sizeof(blockChainHeader));
1043 if (blockChainHeader._operation == Message::Command::Unknown)
1048 blockChainHeader._ackTypes |= Message::AckType::Persisted;
1053 if (blockHeader._totalRemaining
1054 < blockChainHeader._commandIdLen
1055 + blockChainHeader._correlationIdLen
1056 + blockChainHeader._expirationLen
1057 + blockChainHeader._sowKeyLen
1058 + blockChainHeader._topicLen)
1060 std::ostringstream os;
1061 os <<
"Corrupted message found with invalid lengths. " 1062 <<
"Attempting to replay " << block_->_sequence
1063 <<
". Block sequence " << blockHeader._seq
1064 <<
", topic length " << blockChainHeader._topicLen
1065 <<
", data length " << blockHeader._totalRemaining
1066 <<
", command ID length " << blockChainHeader._commandIdLen
1067 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1068 <<
", expiration length " << blockChainHeader._expirationLen
1069 <<
", sow key length " << blockChainHeader._sowKeyLen
1070 <<
", start " << start
1071 <<
", position " << position
1072 <<
", buffer size " << pBuffer->
getSize();
1073 throw StoreException(os.str());
1078 _message.setCommandEnum((Message::Command::Type)blockChainHeader._operation);
1079 _message.setAckTypeEnum((
unsigned)blockChainHeader._ackTypes
1080 | Message::AckType::Persisted);
1081 _message.setSequence(blockHeader._seq);
1083 Block* current = block_;
1085 amps_uint64_t crcCalc = (amps_uint64_t)0ULL;
1087 char** tmpBuffers = (blockHeader._blocksToWrite > 1) ?
new char* [blockHeader._blocksToWrite - 1] : 0;
1088 size_t blockNum = 0;
1089 if (blockChainHeader._commandIdLen > 0)
1091 if (blockChainHeader._commandIdLen <= blockBytesRemaining)
1093 _message.assignCommandId(pBuffer->
getBytes(blockChainHeader._commandIdLen)._data,
1094 blockChainHeader._commandIdLen);
1095 blockBytesRemaining -= blockChainHeader._commandIdLen;
1099 tmpBuffers[blockNum] =
new char[blockChainHeader._commandIdLen];
1100 size_t totalLeft = blockChainHeader._commandIdLen;
1101 size_t totalRead = 0;
1105 readLen = blockBytesRemaining < totalLeft ?
1106 blockBytesRemaining : totalLeft;
1107 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1108 if (!(totalLeft -= readLen))
1112 if (!(current = current->_nextInChain))
1116 totalRead += readLen;
1121 blockBytesRemaining -= readLen;
1122 _message.assignCommandId(tmpBuffers[blockNum++], blockChainHeader._commandIdLen);
1124 blockHeader._totalRemaining -= blockChainHeader._commandIdLen;
1125 crcCalc = _crc(_message.getCommandId().data(),
1126 blockChainHeader._commandIdLen, crcCalc);
1128 if (blockChainHeader._correlationIdLen > 0)
1130 if (blockChainHeader._correlationIdLen <= blockBytesRemaining)
1132 _message.assignCorrelationId(
1133 pBuffer->
getBytes(blockChainHeader._correlationIdLen)._data,
1134 blockChainHeader._correlationIdLen);
1135 blockBytesRemaining -= blockChainHeader._correlationIdLen;
1139 tmpBuffers[blockNum] =
new char[blockChainHeader._correlationIdLen];
1140 size_t totalLeft = blockChainHeader._correlationIdLen;
1141 size_t totalRead = 0;
1145 readLen = blockBytesRemaining < totalLeft ?
1146 blockBytesRemaining : totalLeft;
1147 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1148 if (!(totalLeft -= readLen))
1152 if (!(current = current->_nextInChain))
1156 totalRead += readLen;
1161 blockBytesRemaining -= readLen;
1162 _message.assignCorrelationId(tmpBuffers[blockNum++], blockChainHeader._correlationIdLen);
1164 blockHeader._totalRemaining -= blockChainHeader._correlationIdLen;
1165 crcCalc = _crc(_message.getCorrelationId().data(),
1166 blockChainHeader._correlationIdLen, crcCalc);
1168 if (blockChainHeader._expirationLen > 0)
1170 if (blockChainHeader._expirationLen <= blockBytesRemaining)
1172 _message.assignExpiration(
1173 pBuffer->
getBytes(blockChainHeader._expirationLen)._data,
1174 blockChainHeader._expirationLen);
1175 blockBytesRemaining -= blockChainHeader._expirationLen;
1179 tmpBuffers[blockNum] =
new char[blockChainHeader._expirationLen];
1180 size_t totalLeft = blockChainHeader._expirationLen;
1181 size_t totalRead = 0;
1185 readLen = blockBytesRemaining < totalLeft ?
1186 blockBytesRemaining : totalLeft;
1187 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1188 if (!(totalLeft -= readLen))
1192 if (!(current = current->_nextInChain))
1196 totalRead += readLen;
1201 blockBytesRemaining -= readLen;
1202 _message.assignExpiration(tmpBuffers[blockNum++], blockChainHeader._expirationLen);
1204 blockHeader._totalRemaining -= blockChainHeader._expirationLen;
1205 crcCalc = _crc(_message.getExpiration().data(),
1206 blockChainHeader._expirationLen, crcCalc);
1208 if (blockChainHeader._sowKeyLen > 0)
1210 if (blockChainHeader._sowKeyLen <= blockBytesRemaining)
1212 _message.assignSowKey(pBuffer->
getBytes(blockChainHeader._sowKeyLen)._data,
1213 blockChainHeader._sowKeyLen);
1214 blockBytesRemaining -= blockChainHeader._sowKeyLen;
1218 tmpBuffers[blockNum] =
new char[blockChainHeader._sowKeyLen];
1219 size_t totalLeft = blockChainHeader._sowKeyLen;
1220 size_t totalRead = 0;
1224 readLen = blockBytesRemaining < totalLeft ?
1225 blockBytesRemaining : totalLeft;
1226 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1227 if (!(totalLeft -= readLen))
1231 if (!(current = current->_nextInChain))
1235 totalRead += readLen;
1240 blockBytesRemaining -= readLen;
1241 _message.assignSowKey(tmpBuffers[blockNum++], blockChainHeader._sowKeyLen);
1243 blockHeader._totalRemaining -= blockChainHeader._sowKeyLen;
1244 crcCalc = _crc(_message.getSowKey().data(), blockChainHeader._sowKeyLen, crcCalc);
1246 if (blockChainHeader._topicLen > 0)
1248 if (blockChainHeader._topicLen <= blockBytesRemaining)
1250 _message.assignTopic(pBuffer->
getBytes(blockChainHeader._topicLen)._data,
1251 blockChainHeader._topicLen);
1252 blockBytesRemaining -= blockChainHeader._topicLen;
1256 tmpBuffers[blockNum] =
new char[blockChainHeader._topicLen];
1257 size_t totalLeft = blockChainHeader._topicLen;
1258 size_t totalRead = 0;
1262 readLen = blockBytesRemaining < totalLeft ?
1263 blockBytesRemaining : totalLeft;
1264 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1265 if (!(totalLeft -= readLen))
1269 if (!(current = current->_nextInChain))
1273 totalRead += readLen;
1278 blockBytesRemaining -= readLen;
1279 _message.assignTopic(tmpBuffers[blockNum++], blockChainHeader._topicLen);
1281 blockHeader._totalRemaining -= blockChainHeader._topicLen;
1282 crcCalc = _crc(_message.getTopic().data(), blockChainHeader._topicLen, crcCalc);
1284 if (blockHeader._totalRemaining > 0)
1286 if (blockHeader._totalRemaining <= blockBytesRemaining)
1288 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1290 _message.assignData(
1291 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1292 blockHeader._totalRemaining);
1293 crcCalc = _crc(_message.getData().data(),
1294 blockHeader._totalRemaining, crcCalc);
1296 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1298 _message.assignFilter(
1299 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1300 blockHeader._totalRemaining);
1301 crcCalc = _crc(_message.getFilter().data(),
1302 blockHeader._totalRemaining, crcCalc);
1304 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1306 _message.assignSowKeys(
1307 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1308 blockHeader._totalRemaining);
1309 crcCalc = _crc(_message.getSowKeys().data(),
1310 blockHeader._totalRemaining, crcCalc);
1312 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1314 _message.assignBookmark(
1315 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1316 blockHeader._totalRemaining);
1317 crcCalc = _crc(_message.getBookmark().data(),
1318 blockHeader._totalRemaining, crcCalc);
1320 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1322 _message.assignBookmark(
1323 pBuffer->
getBytes(blockHeader._totalRemaining)._data,
1324 blockHeader._totalRemaining);
1325 crcCalc = _crc(_message.getBookmark().data(),
1326 blockHeader._totalRemaining, crcCalc);
1327 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1332 tmpBuffers[blockNum] =
new char[blockHeader._totalRemaining];
1333 size_t totalLeft = blockHeader._totalRemaining;
1334 size_t totalRead = 0;
1338 readLen = blockBytesRemaining < totalLeft ?
1339 blockBytesRemaining : totalLeft;
1340 pBuffer->
copyBytes(tmpBuffers[blockNum] + totalRead, readLen);
1341 if (!(totalLeft -= readLen))
1345 if (!(current = current->_nextInChain))
1349 totalRead += readLen;
1354 position += readLen;
1355 if (blockChainHeader._flag == -1 || blockChainHeader._flag == SOW_DELETE_DATA)
1357 _message.assignData(tmpBuffers[blockNum], blockHeader._totalRemaining);
1359 else if (blockChainHeader._flag == SOW_DELETE_FILTER)
1361 _message.assignFilter(tmpBuffers[blockNum], blockHeader._totalRemaining);
1363 else if (blockChainHeader._flag == SOW_DELETE_KEYS)
1365 _message.assignSowKeys(tmpBuffers[blockNum], blockHeader._totalRemaining);
1367 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK)
1369 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1371 else if (blockChainHeader._flag == SOW_DELETE_BOOKMARK_CANCEL)
1373 _message.assignBookmark(tmpBuffers[blockNum], blockHeader._totalRemaining);
1374 _message.assignOptions(AMPS_OPTIONS_CANCEL, 6);
1376 crcCalc = _crc(tmpBuffers[blockNum++], blockHeader._totalRemaining, crcCalc);
1381 if (crcCalc != blockHeader._crcVal || blockHeader._seq != block_->_sequence)
1383 std::ostringstream os;
1384 os <<
"Corrupted message found by CRC or sequence " 1385 <<
"Attempting to replay " << block_->_sequence
1386 <<
". Block sequence " << blockHeader._seq
1387 <<
", expiration length " << blockChainHeader._expirationLen
1388 <<
", sowKey length " << blockChainHeader._sowKeyLen
1389 <<
", topic length " << blockChainHeader._topicLen
1390 <<
", data length " << blockHeader._totalRemaining
1391 <<
", command ID length " << blockChainHeader._commandIdLen
1392 <<
", correlation ID length " << blockChainHeader._correlationIdLen
1393 <<
", flag " << blockChainHeader._flag
1394 <<
", expected CRC " << blockHeader._crcVal
1395 <<
", actual CRC " << crcCalc
1396 <<
", start " << start
1397 <<
", position " << position
1398 <<
", buffer size " << pBuffer->
getSize();
1399 for (Block* block = block_; block; block = block->_nextInChain)
1401 os <<
"\n BLOCK " << block->_offset;
1405 for (amps_uint32_t i = 0; i < blockNum; ++i)
1407 delete[] tmpBuffers[i];
1409 delete[] tmpBuffers;
1411 throw StoreException(os.str());
1418 for (amps_uint32_t i = 0; i < blockNum; ++i)
1420 delete[] tmpBuffers[i];
1422 delete[] tmpBuffers;
1428 void recoverOldFormat(Block* blocks)
1430 Buffer* pBuffer = _blockStore.getBuffer();
1431 amps_uint64_t maxIdx = 0;
1432 amps_uint64_t minIdx = 0;
1433 size_t size = pBuffer->
getSize();
1434 size_t location = 0;
1437 pBuffer->
putUint32((amps_uint32_t)_blockStore.getBlockHeaderSize());
1438 _metadataBlock->_sequence = pBuffer->
getUint64();
1439 if (_metadataBlock->_sequence < (amps_uint64_t)1000000)
1441 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1443 _metadataBlock->_sequence = 0;
1448 _maxDiscarded = _metadataBlock->_sequence;
1449 _lastSequence = _maxDiscarded;
1452 pBuffer->
putUint64((amps_uint64_t)VersionInfo::parseVersion(AMPS_CLIENT_VERSION));
1457 amps_uint32_t freeCount = 0;
1458 Block* firstFree = NULL;
1459 Block* endOfFreeList = NULL;
1461 size_t numBlocks = size / blockSize;
1462 size_t blockNum = 0;
1464 typedef std::map<amps_uint64_t, Block*> RecoverMap;
1465 RecoverMap recoveredBlocks;
1466 RecoverMap growingBlocks;
1467 amps_uint32_t growthBlocksNeeded = 0;
1468 while (location < size)
1472 BlockHeader blockHeader;
1473 pBuffer->
copyBytes((
char*)&blockHeader,
sizeof(BlockHeader));
1474 size_t blockCount = (size_t)blockHeader._blocksToWrite;
1475 if (blockHeader._totalRemaining > 0 && blockHeader._seq > 0
1476 && blockHeader._totalRemaining < size
1477 && blockHeader._blocksToWrite < numBlocks
1478 && (blockHeader._blocksToWrite * blockSize)
1479 >= blockHeader._totalRemaining)
1481 size_t oldFormatSize = blockHeader._totalRemaining;
1484 blockHeader._totalRemaining -= 64;
1488 BlockChainHeader chainHeader;
1492 pBuffer->
setPosition(location + (
sizeof(amps_uint32_t) * 2)
1493 + (
sizeof(amps_uint64_t) * 2) );
1499 sizeof(amps_uint32_t) * 8);
1501 if ((chainHeader._commandIdLen + chainHeader._correlationIdLen
1502 + chainHeader._expirationLen + chainHeader._sowKeyLen
1503 + chainHeader._topicLen) > blockHeader._totalRemaining)
1510 amps_uint32_t blocksNeeded = (blockHeader._totalRemaining
1512 + (blockHeader._totalRemaining
1515 if (blocksNeeded == blockHeader._blocksToWrite)
1517 Block* first = blocks[++blockNum].setOffset(location);
1518 first->_nextInList = 0;
1519 first->_sequence = blockHeader._seq;
1520 if (blockHeader._blocksToWrite > 1)
1523 amps_uint64_t crcVal = blockHeader._crcVal;
1524 blockHeader._crcVal = 0;
1529 size_t currentBlockNum = blockNum
1530 + blockHeader._blocksToWrite
1534 if (currentBlockNum >= numBlocks)
1536 currentBlockNum = currentBlockNum - numBlocks + 1;
1538 if (currentBlockNum < blockNum)
1540 Block* last = blocks[currentBlockNum]
1542 if ((current = firstFree) == last)
1544 firstFree = firstFree->_nextInList;
1555 if (current->_nextInList == last)
1557 current->_nextInList = last->_nextInList;
1562 current = current->_nextInList;
1568 current = blocks[currentBlockNum]
1573 while (current != first)
1575 current->_nextInList = 0;
1576 current->_sequence = blockHeader._seq;
1578 if (--currentBlockNum < 1
1579 || currentBlockNum > numBlocks)
1581 currentBlockNum = numBlocks - 1;
1583 Block* previous = blocks[currentBlockNum]
1584 .init(currentBlockNum,
1586 previous->_nextInChain = current;
1592 size_t bytesToMove = --blockCount
1596 pBuffer->
copyBytes(current->_offset + bytesToMove,
1603 dataBytes -= bytesToMove;
1611 blockHeader._nextInChain = (current->_nextInChain
1612 ? current->_nextInChain->_offset
1613 : (amps_uint64_t)0);
1616 pBuffer->
putBytes((
const char*)&blockHeader,
1617 sizeof(BlockHeader));
1618 if (firstFree == previous)
1620 firstFree = firstFree->_nextInList;
1629 current = firstFree;
1632 if (current->_nextInList == previous)
1634 current->_nextInList = previous->_nextInList;
1638 current = current->_nextInList;
1643 blockNum += blockHeader._blocksToWrite - 1;
1644 blockHeader._crcVal = crcVal;
1655 blockHeader._nextInChain = (first->_nextInChain
1656 ? first->_nextInChain->_offset
1657 : (amps_uint64_t)0);
1658 pBuffer->
putBytes((
const char*)&blockHeader,
1659 sizeof(BlockHeader));
1660 pBuffer->
putBytes((
const char*)&chainHeader,
1661 sizeof(BlockChainHeader));
1664 recoveredBlocks[blockHeader._seq] = first;
1670 growingBlocks[blockHeader._seq] = blocks[++blockNum].setOffset(location);
1671 growthBlocksNeeded += (blocksNeeded - blockHeader._blocksToWrite);
1672 blockNum += blockHeader._blocksToWrite - 1;
1675 if (maxIdx < blockHeader._seq)
1677 maxIdx = blockHeader._seq;
1679 if (minIdx > blockHeader._seq)
1681 minIdx = blockHeader._seq;
1684 location += blockHeader._blocksToWrite *
getBlockSize();
1686 assert(location >= size || blockNum < numBlocks);
1691 Block* block = blocks[++blockNum].setOffset(location);
1694 endOfFreeList->_nextInList = block;
1700 endOfFreeList = block;
1702 location += blockSize;
1705 for (RecoverMap::iterator i = growingBlocks.begin();
1706 i != growingBlocks.end(); ++i)
1708 Block* first = i->second;
1710 BlockHeader blockHeader;
1713 pBuffer->
copyBytes((
char*)&blockHeader, 24);
1716 blockHeader._totalRemaining -= 64;
1719 if (freeCount < growthBlocksNeeded)
1722 amps_uint32_t minBlocksRequired = growthBlocksNeeded - freeCount;
1723 amps_uint32_t growthBlocks = _blockStore.getDefaultResizeBlocks();
1724 if (growthBlocks < minBlocksRequired)
1726 amps_uint32_t defaultBlocks = _blockStore.getDefaultResizeBlocks();
1727 if (minBlocksRequired % defaultBlocks)
1728 minBlocksRequired = (minBlocksRequired / defaultBlocks + 1)
1730 growthBlocks = minBlocksRequired;
1732 amps_uint32_t newBlocks = 0;
1733 Block* addedBlocks = _blockStore.resizeBuffer(
1735 + growthBlocks * blockSize,
1739 throw StoreException(
"Failed to grow store buffer during recovery");
1741 _blockStore.addBlocks(addedBlocks);
1742 freeCount += newBlocks;
1743 growthBlocksNeeded = (growthBlocksNeeded > freeCount)
1744 ? growthBlocksNeeded - freeCount : 0;
1747 endOfFreeList->_nextInList = addedBlocks;
1751 firstFree = addedBlocks;
1753 endOfFreeList = &(addedBlocks[newBlocks - 1]);
1754 endOfFreeList->_nextInList = 0;
1756 expandBlocks(blocks, first->_offset, first, blockHeader,
1757 &firstFree, &freeCount, pBuffer);
1759 recoveredBlocks[blockHeader._seq] = first;
1767 endOfFreeList->_nextInList = 0;
1769 _blockStore.setFreeList(firstFree, freeCount);
1770 if (maxIdx > _lastSequence)
1772 _lastSequence = maxIdx;
1774 if (minIdx > _maxDiscarded + 1)
1776 _maxDiscarded = minIdx - 1;
1778 if (_maxDiscarded > _metadataBlock->_sequence)
1780 _metadataBlock->_sequence = _maxDiscarded;
1781 pBuffer->
setPosition(_metadataBlock->_offset + 8);
1785 AMPS_FETCH_ADD(&_stored, (
long)(recoveredBlocks.size()));
1786 for (RecoverMap::iterator i = recoveredBlocks.begin();
1787 i != recoveredBlocks.end(); ++i)
1789 if (_blockStore.front())
1791 end->_nextInList = i->second;
1795 _blockStore.setUsedList(i->second);
1801 end->_nextInList = 0;
1803 _blockStore.setEndOfUsedList(end);
1808 void expandBlocks(Block* blocks_,
size_t location_, Block* first_,
1809 BlockHeader blockHeader_,
1810 Block** pFreeList_, amps_uint32_t* pFreeCount_,
1814 Block* current = first_;
1817 amps_uint32_t oldTotalRemaining = blockHeader_._totalRemaining;
1818 blockHeader_._totalRemaining -= 64;
1822 amps_uint32_t blocksNeeded = blockHeader_._totalRemaining
1824 + (blockHeader_._totalRemaining
1831 size_t endBlockSize = oldTotalRemaining % blockSize;
1834 endBlockSize = blockSize;
1836 size_t endOfData = 0;
1838 amps_uint64_t crcVal = blockHeader_._crcVal;
1839 blockHeader_._crcVal = 0;
1841 std::stack<Block*> blocksUsed;
1842 for (amps_uint32_t i = 1; i < blocksNeeded; ++i)
1844 blocksUsed.push(current);
1845 current->_sequence = blockHeader_._seq;
1846 if (i >= blockHeader_._blocksToWrite)
1848 if (i == blockHeader_._blocksToWrite)
1850 endOfData = current->_offset + endBlockSize;
1852 current->_nextInChain = *pFreeList_;
1854 *pFreeList_ = (*pFreeList_)->_nextInList;
1858 current->_nextInChain = current->_nextInList;
1859 if (current->_nextInChain)
1861 if (current->_offset + blockSize < pBuffer_->getSize())
1863 current->_nextInChain->setOffset(current->_offset
1868 current->_nextInChain->setOffset(blockSize);
1873 current->_nextInChain = blocks_[1].init(1, blockSize);
1875 if (current->_nextInChain == *pFreeList_)
1877 *pFreeList_ = (*pFreeList_)->_nextInList;
1882 for (Block* free = *pFreeList_; free;
1883 free = free->_nextInList)
1885 if (free->_nextInList == current->_nextInChain)
1887 free->_nextInList = free->_nextInList->_nextInList;
1894 current->_nextInList = 0;
1895 current = current->_nextInChain;
1898 blockHeader_._blocksToWrite = blocksNeeded;
1900 current->_nextInList = 0;
1901 current->_sequence = blockHeader_._seq;
1911 while (current != first_)
1913 size_t chunkBytesAvail = endOfData > location_
1914 ? endOfData - location_
1916 if (chunkBytesAvail < dataBytes)
1928 chunkBytesAvail = dataBytes - chunkBytesAvail;
1929 endOfData = pBuffer_->
getSize() - chunkBytesAvail;
1936 endOfData -= dataBytes;
1942 blockHeader_._nextInChain = (current->_nextInChain
1943 ? current->_nextInChain->_offset
1944 : (amps_uint64_t)0);
1947 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1948 current = blocksUsed.top();
1959 blockHeader_._crcVal = crcVal;
1960 blockHeader_._nextInChain = first_->_nextInChain->_offset;
1964 pBuffer_->
setPosition(location_ + (
sizeof(amps_uint32_t) * 2)
1965 + (
sizeof(amps_uint64_t) * 2) );
1970 BlockChainHeader chainHeader;
1971 pBuffer_->
copyBytes((
char*)&chainHeader,
1972 sizeof(amps_uint32_t) * 8);
1975 pBuffer_->
putBytes((
const char*)&blockHeader_,
sizeof(BlockHeader));
1976 pBuffer_->
putBytes((
const char*)&chainHeader,
sizeof(BlockChainHeader));
1979 void chooseCRC(
bool isFile)
1988 _crc = AMPS::CRC<0>::crcNoSSE;
1990 if (AMPS::CRC<0>::isSSE42Enabled())
1992 _crc = AMPS::CRC<0>::crc;
1996 _crc = AMPS::CRC<0>::crcNoSSE;
2001 static amps_uint64_t noOpCRC(
const char*,
size_t, amps_uint64_t)
2011 Block* _metadataBlock;
2013 amps_uint64_t _maxDiscarded;
2015 volatile amps_uint64_t _lastSequence;
2017 ATOMIC_TYPE _stored;
2022 typedef amps_uint64_t (*CRCFunction)(
const char*, size_t, amps_uint64_t);
virtual void putUint64(amps_uint64_t ui_)=0
Put an amps_uint64_t value into the buffer at the current position and advance past it...
Abstract base class for storing published messages for an HA publisher client.
Definition: ampsplusplus.hpp:927
void getRawCorrelationId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CorrelationId header of self as a Field that references the underlying buf...
Definition: Message.hpp:1250
Command::Type getCommandEnum() const
Decode self's "command" field and return one of the values from Command.
Definition: Message.hpp:1174
Constants
Default constant values for BlockPublishStore.
Definition: BlockPublishStore.hpp:79
void getRawCommandId(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the CommandId header of self as a Field that references the underlying buffer ...
Definition: Message.hpp:1248
Abstract base class for replaying a publish message.
Definition: ampsplusplus.hpp:899
virtual void flush(long timeout_)
Method to wait for the Store to discard everything that has been stored up to the point in time when ...
Definition: BlockPublishStore.hpp:642
void replay(StoreReplayer &replayer_)
Replay all messages in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:547
void getRawFilter(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Filter header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1252
AMPSDLL amps_uint64_t amps_message_get_field_uint64(amps_handle message, FieldId field)
Gets the unsigned 64-bit int value of a header field in an AMPS message.
Message encapsulates a single message sent to or received from an AMPS server, and provides methods f...
Definition: Message.hpp:511
virtual void setPosition(size_t position_)=0
Set the buffer postion to a location.
Field getOptions() const
Retrieves the value of the Options header of the Message as a new Field.
Definition: Message.hpp:1262
static amps_uint32_t getBlockHeaderSize()
Block header is number of blocks, total length, sequence number, crc, next in chain offset...
Definition: BlockPublishStore.hpp:153
const char * data() const
Returns the (non-null-terminated) data underlying this field.
Definition: Field.hpp:259
amps_uint32_t getBlockDataSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:179
virtual size_t getSize() const =0
Get the current size of the Buffer in bytes.
virtual void putBytes(const char *data_, size_t dataLength_)=0
Put the given length of bytes in data into the buffer at the current position and advance past them...
virtual void execute(Message &message_)=0
Called by implementations of Store to replay a message from the store.
void getRawBookmark(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Bookmark header of self as a Field that references the underlying buffer m...
Definition: Message.hpp:1140
bool replaySingle(StoreReplayer &replayer_, amps_uint64_t index_)
Replay one message in the Store onto the given StoreReplayer.
Definition: BlockPublishStore.hpp:579
amps_uint64_t store(const Message &message_, bool assignSequence_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:262
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
amps_uint64_t getLastPersisted(void)
Get the last persisted sequence number.
Definition: BlockPublishStore.hpp:697
Core type, function, and class declarations for the AMPS C++ client.
size_t unpersistedCount() const
Method to return the count of messages that currently in the Store because they have not been discard...
Definition: BlockPublishStore.hpp:628
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
size_t len() const
Returns the length of the data underlying this field.
Definition: Field.hpp:266
virtual void setResizeHandler(PublishStoreResizeHandler handler_, void *userData_)
Set a handler to be called if the Store needs to resize in order to keep storing messages.
Definition: ampsplusplus.hpp:1013
virtual void putUint32(amps_uint32_t i_)=0
Put an unsigned 32-bit int value into the buffer at the current position and advance past the end of ...
void getRawSowKeys(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKeys header of self as a Field that references the underlying buffer ma...
Definition: Message.hpp:1371
Used as a base class for other stores in the AMPS C++ client, this is an implementation of StoreImpl ...
Definition: BlockPublishStore.hpp:60
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
virtual ~BlockPublishStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockPublishStore.hpp:237
void getRawExpiration(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Expiration header of self as a Field that references the underlying buffer...
Definition: Message.hpp:1251
BlockPublishStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=1000, bool isFile_=false, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockPublishStore using buffer_, that grows by blocksPerRealloc_ blocks when it must grow...
Definition: BlockPublishStore.hpp:196
static amps_uint32_t getBlockChainHeaderSize()
Block chain header is operation, command id length, correlation id length, expiration length...
Definition: BlockPublishStore.hpp:163
virtual amps_uint64_t getUint64()=0
Get an unsigned 64-bit int value at the current buffer position and advance past it.
void getRawTopic(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the Topic header of self as a Field that references the underlying buffer mana...
Definition: Message.hpp:1395
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Field represents the value of a single field in a Message.
Definition: Field.hpp:85
virtual amps_uint32_t getUint32()=0
Get the unsigned 32-bit int value at the current buffer position and advance past it...
virtual amps_uint64_t store(const Message &message_)
Store a given message that will be delivered to AMPS.
Definition: BlockPublishStore.hpp:247
amps_uint32_t getBlockSize()
Return the size left in a block for data when it has a header in it.
Definition: BlockPublishStore.hpp:171
void getRawSowKey(const char **dataptr, size_t *sizeptr) const
Retrieves the value of the SowKey header of self as a Field that references the underlying buffer man...
Definition: Message.hpp:1370
unsigned getAckTypeEnum() const
Decode self's "ack type" field and return the corresponding bitmask of values from AckType...
Definition: Message.hpp:1106
amps_uint64_t getLowestUnpersisted() const
Get the oldest unpersisted message sequence in the store.
Definition: BlockPublishStore.hpp:685
Definition: ampsplusplus.hpp:103
static amps_uint64_t getUnsetSequence()
Method to return the value used to represent no such sequence.
Definition: ampsplusplus.hpp:989
virtual void copyBytes(char *buffer_, size_t numBytes_)=0
Copy the given number of bytes from this buffer to the given buffer.
virtual void discardUpTo(amps_uint64_t index_)
Remove all messages with an index up to and including index_.
Definition: BlockPublishStore.hpp:492
Provides AMPS::BlockStore, a class for storing Blocks of a fixed size into a Buffer implementation...
virtual ByteArray getBytes(size_t numBytes_)=0
Get the given number of bytes from the buffer.