AMPS C/C++ Client Class Reference
AMPS C/C++ Client Version 5.3.0.5
BlockStore.hpp
Go to the documentation of this file.
1 //
3 // Copyright (c) 2010-2020 60East Technologies Inc., All Rights Reserved.
4 //
5 // This computer software is owned by 60East Technologies Inc. and is
6 // protected by U.S. copyright laws and other laws and by international
7 // treaties. This computer software is furnished by 60East Technologies
8 // Inc. pursuant to a written license agreement and may be used, copied,
9 // transmitted, and stored only in accordance with the terms of such
10 // license agreement and with the inclusion of the above copyright notice.
11 // This computer software or any other copies thereof may not be provided
12 // or otherwise made available to any other person.
13 //
14 // U.S. Government Restricted Rights. This computer software: (a) was
15 // developed at private expense and is in all respects the proprietary
16 // information of 60East Technologies Inc.; (b) was not developed with
17 // government funds; (c) is a trade secret of 60East Technologies Inc.
18 // for all purposes of the Freedom of Information Act; and (d) is a
19 // commercial item and thus, pursuant to Section 12.212 of the Federal
20 // Acquisition Regulations (FAR) and DFAR Supplement Section 227.7202,
21 // Government's use, duplication or disclosure of the computer software
22 // is subject to the restrictions set forth by 60East Technologies Inc..
23 //
25 
26 #ifndef _BLOCKSTORE_H_
27 #define _BLOCKSTORE_H_
28 #include <ampsplusplus.hpp>
29 #include <Buffer.hpp>
30 #include <sstream>
31 #include <string>
32 #include <map>
33 #include <ampscrc.hpp>
34 
35 #ifdef _WIN32
36 #include <intrin.h>
37 #include <sys/timeb.h>
38 #else
39 #include <sys/time.h>
40 #if AMPS_SSE_42
41 #include <cpuid.h>
42 #endif
43 #endif
44 #include <iostream>
45 
50 
51 namespace AMPS
52 {
59 {
60 public:
63  enum Constants : amps_uint32_t
64  {
65  DEFAULT_BLOCK_HEADER_SIZE = 32,
66  DEFAULT_BLOCKS_PER_REALLOC = 1000,
67  DEFAULT_BLOCK_SIZE = 2048
68  };
69 
72  class Block
73  {
74  public:
75  // The offset of the Block's data in the buffer.
76  size_t _offset;
77  // The sequence number assoicated with the Block.
78  amps_uint64_t _sequence;
79  // The next Block in the chain when data is in multiple Blocks.
80  Block* _nextInChain;
81  // The next Block in list of available or free Blocks.
82  Block* _nextInList;
83 
84  // Create Block with given offset
85  Block(size_t offset_) : _offset(offset_), _sequence(0)
86  , _nextInChain(0), _nextInList(0)
87  { ; }
88 
89  // Create Block with _nextInList at an address one Block farther
90  // than self. Convenient for creating arrays of Blocks.
91  Block() : _offset(0), _sequence(0)
92  , _nextInChain(0), _nextInList((Block*)(this+1))
93  { ; }
94 
95  // Init Block to an offset at index_ * blockSize_
96  Block* init(size_t index_, amps_uint32_t blockSize_)
97  {
98  _offset = index_*blockSize_;
99  return this;
100  }
101 
102  // Set Block to given offset and return pointer to self
103  Block* setOffset(size_t offset_)
104  {
105  _offset = offset_;
106  return this;
107  }
108 
109  };
110 
111 private:
112  // Typedefs
113  typedef Lock<Mutex> BufferLock;
114  typedef Unlock<Mutex> BufferUnlock;
115  typedef bool (*ResizeHandler)(size_t,void*);
116  typedef std::vector<Block*> BlockList;
117 
118 public:
134  BlockStore(Buffer* buffer_,
135  amps_uint32_t blocksPerRealloc_ = DEFAULT_BLOCKS_PER_REALLOC,
136  amps_uint32_t blockHeaderSize_ = DEFAULT_BLOCK_HEADER_SIZE,
137  amps_uint32_t blockSize_ = DEFAULT_BLOCK_SIZE)
138  : _buffer(buffer_), _freeList(0), _usedList(0)
139  , _endOfUsedList(0), _blocksPerRealloc(blocksPerRealloc_)
140  , _blockSize(blockSize_), _blockHeaderSize(blockHeaderSize_)
141  , _blocksAvailable(0), _resizing(false)
142  {
143  }
144 
148  {
149  for (BlockList::iterator i = _blockList.begin();
150  i != _blockList.end(); ++i)
151  {
152  delete[] *i;
153  }
154  delete _buffer;
155  }
156 
159  amps_uint32_t getBlockSize() const
160  {
161  return _blockSize;
162  }
163 
166  amps_uint32_t getBlockHeaderSize() const
167  {
168  return _blockHeaderSize;
169  }
170 
175  void acquireRead() const
176  {
177  _lock.acquireRead();
178  }
179 
182  void releaseRead() const
183  {
184  _lock.releaseRead();
185  }
186 
189  void signalAll()
190  {
191  _lock.signalAll();
192  }
193 
196  void wait()
197  {
198  _lock.wait();
199  }
200 
205  bool wait(long timeout_)
206  {
207  return _lock.wait(timeout_);
208  }
209 
219  void setResizeHandler(ResizeHandler resizeHandler_, void* userData_)
220  {
221  _resizeHandler = resizeHandler_;
222  _resizeUserData = userData_;
223  }
224 
228  // Lock should already be acquired
229  Block* front() const
230  {
231  return _usedList;
232  }
233 
238  // Lock should already be acquired
239  Block* back() const
240  {
241  return _endOfUsedList;
242  }
243 
247  // Lock should already be acquired
248  void setFreeList(Block* block_, amps_uint32_t freeCount_)
249  {
250  _freeList = block_;
251  _blocksAvailable = freeCount_;
252  }
253 
256  // Lock should already be acquired
257  void setUsedList(Block* block_)
258  {
259  _usedList = block_;
260  }
261 
264  // Lock should already be acquired
265  void setEndOfUsedList(Block* block_)
266  {
267  _endOfUsedList = block_;
268  }
269 
273  // Lock should already be acquired
274  void addBlocks(Block* blockArray_)
275  {
276  _blockList.push_back(blockArray_);
277  }
278 
283  // Lock should already be acquired
284  Block* get(amps_uint32_t numBlocksInChain_)
285  {
286  // Check that we have enough blocks
287  // Do this in a loop since resize can possibly return without resizing
288  // and may still leave us needing more space.
289  while (_blocksAvailable < numBlocksInChain_)
290  {
291  // Resize by required multiple of blockPerRealloc
292  unsigned int blocksNeeded = numBlocksInChain_ - _blocksAvailable;
293  amps_uint32_t addedBlocks = (blocksNeeded/_blocksPerRealloc + 1)
294  * _blocksPerRealloc;
295  size_t size = _buffer->getSize() + (addedBlocks * _blockSize);
296  resize(size);
297  }
298  // Return first free block with others as _nextInChain
299  Block* first = 0;
300  Block* last = 0;
301  Block* next = 0;
302  for (unsigned int i=0; i<numBlocksInChain_; ++i)
303  {
304  // Take from free list and advance
305  next = _freeList;
306  _freeList = _freeList->_nextInList;
307  next->_nextInList = 0;
308  if (!first)
309  {
310  // First, set it up
311  first = next;
312  last = next;
313  }
314  else
315  {
316  // Not first, add it to chain
317  last->_nextInChain = next;
318  last = next;
319  }
320  }
321  assert(first);
322  // Set _usedList or add it to the end of the used list
323  if (!_usedList)
324  {
325  _usedList = first;
326  }
327  else
328  {
329  _endOfUsedList->_nextInList = first;
330  }
331  _endOfUsedList = first;
332  _blocksAvailable -= numBlocksInChain_;
333  return first;
334  }
335 
339  // Lock should already be acquired
340  void put(Block* block_)
341  {
342  assert(_usedList);
343  assert(_endOfUsedList);
344  // Remove from used list
345  if (_usedList == block_)
346  {
347  // Easy
348  _usedList = _usedList->_nextInList;
349  if (!_usedList)
350  {
351  _endOfUsedList = 0;
352  }
353  }
354  else
355  {
356  // Search and remove the block
357  Block* used=_usedList;
358  while (used)
359  {
360  if (used->_nextInList == block_)
361  {
362  used->_nextInList = block_->_nextInList;
363  break;
364  }
365  used = used->_nextInList;
366  if (!_usedList)
367  {
368  _endOfUsedList = 0;
369  }
370  }
371  }
372  // Add to free list
373  _flattenToFreeList(block_);
374  }
375 
379  // Lock should already be acquired
380  ATOMIC_TYPE put(amps_uint64_t sequence_)
381  {
382  assert(_usedList);
383  assert(_endOfUsedList);
384  Block* used=_usedList;
385  ATOMIC_TYPE removalCount = 0;
386  while (used && used->_sequence <= sequence_)
387  {
388  Block* next = used->_nextInList;
389  // Add to free list
390  _flattenToFreeList(used);
391  used = next;
392  ++removalCount;
393  }
394  _usedList = used;
395  if (!used)
396  {
397  _endOfUsedList = 0;
398  }
399  return removalCount;
400  }
401 
405  // Lock should already be acquired
406  void putAll(Block* block_)
407  {
408  // Remove from used list
409  Block* newEndOfUsedList = 0;
410  for (Block* used = _usedList; used; used = used->_nextInList)
411  {
412  if (used == block_)
413  {
414  if (newEndOfUsedList)
415  {
416  newEndOfUsedList->_nextInList = 0;
417  }
418  else
419  {
420  _usedList = 0;
421  }
422  _endOfUsedList = newEndOfUsedList;
423  }
424  newEndOfUsedList = used;
425  }
426  // Add all remaining to free list
427  Block* next = 0;
428  for (Block* block = block_; block; block = next)
429  {
430  next = block->_nextInList;
431  _flattenToFreeList(block);
432  }
433  }
434 
437  // Lock should already be held
438  void init()
439  {
440  size_t startSize = _buffer->getSize();
441  if (!startSize)
442  {
444  startSize = _buffer->getSize();
445  }
446  // How many blocks are we resizing
447  amps_uint32_t numBlocks = (amps_uint32_t)(startSize) / getBlockSize();
448  _freeList = new Block[numBlocks];
449  _blockList.push_back(_freeList);
450  for (size_t i = 0; i < numBlocks; ++i)
451  {
452  _freeList[i].init(i, getBlockSize());
453  }
454  _freeList[numBlocks-1]._nextInList = 0;
455  _blocksAvailable += numBlocks;
456  assert(_freeList);
457  assert(_blocksAvailable);
458  }
459 
462  size_t getDefaultResizeSize() const
463  {
464  return _blocksPerRealloc * _blockSize;
465  }
466 
469  amps_uint32_t getDefaultResizeBlocks() const
470  {
471  return _blocksPerRealloc;
472  }
473 
481  // Lock should already be held
482  Block* resizeBuffer(size_t size_, amps_uint32_t* pNewBlocks_)
483  {
484  Block* freeList = 0;
485  while (_resizing)
486  {
487  if (_buffer->getSize() >= size_) return freeList;
488  BufferUnlock guard(_lock);
489  AMPS_YIELD();
490  }
491  FlagFlip flip(&_resizing);
492  bool okToResize = false;
493  if (true)
494  {
495  BufferUnlock u(_lock);
496  // Don't do anything if resizeHandler says no
497  okToResize = _canResize(size_);
498  }
499  if (!okToResize)
500  {
501  return freeList;
502  }
503  try
504  {
505  size_t oldSize = _buffer->getSize();
506  amps_uint32_t oldBlocks = (amps_uint32_t)(oldSize / getBlockSize());
507  if (oldSize >= size_)
508  {
509  *pNewBlocks_ = 0;
510  return freeList;
511  }
512  _buffer->setSize(size_);
513  _buffer->zero(oldSize, size_-oldSize);
514  // How many blocks are we resizing
515  *pNewBlocks_ = (amps_uint32_t)((size_-oldSize) / getBlockSize());
516  freeList = new Block[*pNewBlocks_];
517  for (size_t i = 0; i < *pNewBlocks_; ++i)
518  {
519  freeList[i].init(oldBlocks+i, getBlockSize());
520  }
521  freeList[*pNewBlocks_-1]._nextInList = 0;
522  }
523 #ifdef _WIN32
524  catch(const std::bad_alloc&)
525 #else
526  catch(const std::bad_alloc& e)
527 #endif
528  {
529  std::ostringstream os;
530  os << "BlockStore failed to allocate " << size_
531  << " bytes for resize of store from " << _buffer->getSize()
532  << " bytes.";
533  throw StoreException(os.str());
534  }
535  return freeList;
536  }
537 
541  // Lock should already be held
542  void resize(size_t size_)
543  {
544  amps_uint32_t newBlocks = 0;
545  Block* addedBlockList = resizeBuffer(size_, &newBlocks);
546  if (!addedBlockList || !newBlocks)
547  {
548  // Maybe we didn't have to allocate in this thread
549  return;
550  if (_buffer->getSize() >= size_) return;
551  // This is bad.
552  std::ostringstream os;
553  os << "BlockStore failed to allocate " << size_
554  << " bytes for resize of buffer from " << _buffer->getSize()
555  << " bytes.";
556  throw StoreException(os.str());
557  }
558  _blockList.push_back(addedBlockList);
559  addedBlockList[newBlocks-1]._nextInList = _freeList;
560  _freeList = addedBlockList;
561  _blocksAvailable += newBlocks;
562  assert(_freeList);
563  assert(_blocksAvailable);
564  }
565 
568  // Lock should be held, no blocks should be used or allocated
569  amps_uint32_t setBlockSize(amps_uint32_t blockSize_)
570  {
571  if (_usedList || _freeList)
572  {
573  return 0;
574  }
575  amps_uint32_t oldSize = _blockSize;
576  _blockSize = blockSize_;
577  return oldSize;
578  }
579 
583  // Lock should be held, no blocks should be used or allocated
584  amps_uint32_t setBlockHeaderSize(amps_uint32_t blockHeaderSize_)
585  {
586  if (_usedList || _freeList)
587  {
588  return 0;
589  }
590  amps_uint32_t oldSize = _blockHeaderSize;
591  _blockHeaderSize = blockHeaderSize_;
592  return oldSize;
593  }
594 
597  // Lock should already be held
599  {
600  return _buffer;
601  }
602 
603 private:
605  bool _canResize(size_t requestedSize_)
606  {
607  if (_resizeHandler)
608  {
609  return _resizeHandler(requestedSize_, _resizeUserData);
610  }
611  else
612  {
613  return true;
614  }
615  }
616 
617  // Lock should already be acquired
618  void _flattenToFreeList(Block* block_)
619  {
620  // Flatten chain to front of free list
621  Block* current = block_;
622  while (current)
623  {
624  Block* chain = current->_nextInChain;
625  // Clear the header
626  _buffer->zero(current->_offset, _blockHeaderSize);
627  // Prepend to the free list and clear other values
628  current->_nextInList = _freeList;
629  _freeList = current;
630  ++_blocksAvailable;
631  current->_sequence = (amps_uint64_t)0;
632  current->_nextInChain = 0;
633  current = chain;
634  }
635  assert(_freeList);
636  assert(_blocksAvailable);
637  }
638 
639  // Member variables
640  // Buffer to use for storage
641  Buffer* _buffer;
642 
643  // The Block accounting
644  Block* _freeList;
645  Block* _usedList;
646  Block* _endOfUsedList;
647  // How much to resize buffer when needed
648  amps_uint32_t _blocksPerRealloc;
649  // How big is each Block, and what part is header
650  amps_uint32_t _blockSize;
651  amps_uint32_t _blockHeaderSize;
652  // How many blocks are free
653  amps_uint32_t _blocksAvailable;
654  // ResizeHandler to call before resizing
655  ResizeHandler _resizeHandler;
656  // ResizeHandler data
657  void* _resizeUserData;
658  // List of every allocated slab of Blocks
659  BlockList _blockList;
660  // Flag to control resizing
661  volatile bool _resizing;
662 
663  // Lock for _buffer
664  mutable Mutex _lock;
665 
666 };
667 
668 }
669 
670 #endif
671 
void wait()
Wait for a signal.
Definition: BlockStore.hpp:196
void setResizeHandler(ResizeHandler resizeHandler_, void *userData_)
Set a resize hanlder that is called with the new total size of the Buffer.
Definition: BlockStore.hpp:219
amps_uint32_t setBlockSize(amps_uint32_t blockSize_)
Set the size to use for all Blocks.
Definition: BlockStore.hpp:569
void acquireRead() const
Acquire the lock for this object.
Definition: BlockStore.hpp:175
Constants
Default constant values for BlockStore.
Definition: BlockStore.hpp:63
Buffer * getBuffer()
Return the buffer underlying the store for direct write/read.
Definition: BlockStore.hpp:598
void put(Block *block_)
Return the given chain of Blocks to the free list for reuse.
Definition: BlockStore.hpp:340
~BlockStore()
Destructor that cleans up the buffer and other associated memory.
Definition: BlockStore.hpp:147
void releaseRead() const
Release the lock for this object. Used by RAII templates.
Definition: BlockStore.hpp:182
Block * front() const
Get the first used block in the store.
Definition: BlockStore.hpp:229
amps_uint32_t getDefaultResizeBlocks() const
Return the default number of blocks for each resize.
Definition: BlockStore.hpp:469
void addBlocks(Block *blockArray_)
Allow users to create Block arrays during recovery that are tracked for cleanup here with all other B...
Definition: BlockStore.hpp:274
bool wait(long timeout_)
Wait timeout_ ms for a signal.
Definition: BlockStore.hpp:205
Block * back() const
Get the last used block in the store.
Definition: BlockStore.hpp:239
void resize(size_t size_)
Resize the buffer to the requested size, adding all new space as unused Blocks for the free list...
Definition: BlockStore.hpp:542
ATOMIC_TYPE put(amps_uint64_t sequence_)
Return all Blocks with sequence <= sequence_ for reuse.
Definition: BlockStore.hpp:380
Used as a base class for other stores in the AMPS C++ client, this is an implementation that breaks a...
Definition: BlockStore.hpp:58
void init()
Initialize, assuming that _buffer has no existing information.
Definition: BlockStore.hpp:438
void setEndOfUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:265
size_t getDefaultResizeSize() const
Return the default number of bytes for each resize.
Definition: BlockStore.hpp:462
void setUsedList(Block *block_)
Allow containing classes to initialize the used list in recovery.
Definition: BlockStore.hpp:257
Core type, function, and class declarations for the AMPS C++ client.
Provides AMPS::Buffer, an abstract base class used by the store implementations in the AMPS client...
amps_uint32_t setBlockHeaderSize(amps_uint32_t blockHeaderSize_)
Set the size to use for the header for all Blocks.
Definition: BlockStore.hpp:584
void setFreeList(Block *block_, amps_uint32_t freeCount_)
Allow containing classes to initialize the free list in recovery.
Definition: BlockStore.hpp:248
amps_uint32_t getBlockHeaderSize() const
Get the size of a header within each Block, as set in the constructor.
Definition: BlockStore.hpp:166
Abstract base class for implementing a buffer to be used by a StoreImpl for storage of publish messag...
Definition: Buffer.hpp:40
BlockStore(Buffer *buffer_, amps_uint32_t blocksPerRealloc_=DEFAULT_BLOCKS_PER_REALLOC, amps_uint32_t blockHeaderSize_=DEFAULT_BLOCK_HEADER_SIZE, amps_uint32_t blockSize_=DEFAULT_BLOCK_SIZE)
Create a BlockStore using buffer_ and default block size, that grows by blocksPerRealloc_ blocks when...
Definition: BlockStore.hpp:134
void putAll(Block *block_)
Return all Blocks starting with the given Block to the free list.
Definition: BlockStore.hpp:406
Used as metadata for each block in a Buffer.
Definition: BlockStore.hpp:72
Block * resizeBuffer(size_t size_, amps_uint32_t *pNewBlocks_)
Resize the buffer to the requested size, returning all new space.
Definition: BlockStore.hpp:482
void signalAll()
Signal lock waiters.
Definition: BlockStore.hpp:189
amps_uint32_t getBlockSize() const
Get the size of each Block, as set in the constructor.
Definition: BlockStore.hpp:159
Definition: ampsplusplus.hpp:136