From aab8ad1c5b660c4a280a51a7801bdcd61079f2da Mon Sep 17 00:00:00 2001 From: vadlakondaswetha Date: Wed, 16 Oct 2024 06:27:55 +0000 Subject: [PATCH] Added block interface and memory block implementations. --- internal/block/block.go | 64 +++++++++++++++++++++ internal/block/block_test.go | 107 +++++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+) create mode 100644 internal/block/block.go create mode 100644 internal/block/block_test.go diff --git a/internal/block/block.go b/internal/block/block.go new file mode 100644 index 0000000000..5e4d4100f7 --- /dev/null +++ b/internal/block/block.go @@ -0,0 +1,64 @@ +package block + +import ( + "bytes" + "fmt" + "io" +) + +// Block represents the buffer which holds the data. +type Block interface { + // Reuse resets the blocks for reuse. + Reuse() + + // Size provides the current data size of the block. The capacity of the block + // can be >= data_size. + Size() int64 + + // Write writes the given data to block. + Write(bytes []byte) error + + // Reader interface helps in copying the data directly to storage.writer + // while uploading to GCS. + Reader() io.Reader +} + +// TODO: check if we need offset or just storing end is sufficient. We might need +// for handling ordered writes. It will be decided after ordered writes design. +type offset struct { + start, end int64 +} + +type memoryBlock struct { + Block + buffer []byte + offset offset +} + +func (m *memoryBlock) Reuse() { + clear(m.buffer) + + m.offset.end = 0 + m.offset.start = 0 +} + +func (m *memoryBlock) Size() int64 { + return m.offset.end - m.offset.start +} +func (m *memoryBlock) Write(bytes []byte) error { + if m.Size()+int64(len(bytes)) > int64(cap(m.buffer)) { + return fmt.Errorf("received data more than capacity of the block") + } + + n := copy(m.buffer[m.offset.end:], bytes) + if n != len(bytes) { + return fmt.Errorf("error in copying the data to block. Expected %d, got %d", len(bytes), n) + } + + m.offset.end += int64(len(bytes)) + return nil +} + +func (m *memoryBlock) Reader() io.Reader { + return bytes.NewReader(m.buffer[0:m.offset.end]) +} diff --git a/internal/block/block_test.go b/internal/block/block_test.go new file mode 100644 index 0000000000..9de60adf91 --- /dev/null +++ b/internal/block/block_test.go @@ -0,0 +1,107 @@ +package block + +import ( + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type MemoryBlockTest struct { + suite.Suite +} + +func TestMemoryBlockTestSuite(t *testing.T) { + suite.Run(t, new(MemoryBlockTest)) +} + +func createBlock(size uint32) Block { + mb := memoryBlock{ + buffer: make([]byte, size), + offset: offset{0, 0}, + } + + return &mb +} + +func (testSuite *MemoryBlockTest) TestMemoryBlockWrite() { + mb := createBlock(12) + content := []byte("hi") + err := mb.Write(content) + + assert.Nil(testSuite.T(), err) + output, err := io.ReadAll(mb.Reader()) + assert.Nil(testSuite.T(), err) + assert.Equal(testSuite.T(), content, output) + assert.Equal(testSuite.T(), int64(2), mb.Size()) +} + +func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWithDataGreaterThanCapacity() { + mb := createBlock(1) + content := []byte("hi") + err := mb.Write(content) + + assert.NotNil(testSuite.T(), err) + assert.EqualError(testSuite.T(), err, fmt.Sprintf("received data more than capacity of the block")) +} + +func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWithMultipleWrites() { + mb := createBlock(12) + err := mb.Write([]byte("hi")) + assert.Nil(testSuite.T(), err) + err = mb.Write([]byte("hello")) + assert.Nil(testSuite.T(), err) + + output, err := io.ReadAll(mb.Reader()) + assert.Nil(testSuite.T(), err) + assert.Equal(testSuite.T(), []byte("hihello"), output) + assert.Equal(testSuite.T(), int64(7), mb.Size()) +} + +func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWith2ndWriteBeyondCapacity() { + mb := createBlock(2) + content := []byte("hi") + err := mb.Write(content) + assert.Nil(testSuite.T(), err) + err = mb.Write(content) + + assert.NotNil(testSuite.T(), err) + assert.EqualError(testSuite.T(), err, fmt.Sprintf("received data more than capacity of the block")) +} + +func (testSuite *MemoryBlockTest) TestMemoryBlockReuse() { + mb := createBlock(12) + content := []byte("hi") + err := mb.Write(content) + assert.Nil(testSuite.T(), err) + output, err := io.ReadAll(mb.Reader()) + assert.Nil(testSuite.T(), err) + assert.Equal(testSuite.T(), content, output) + assert.Equal(testSuite.T(), int64(2), mb.Size()) + + mb.Reuse() + + output, err = io.ReadAll(mb.Reader()) + assert.Nil(testSuite.T(), err) + assert.Empty(testSuite.T(), output) + assert.Equal(testSuite.T(), int64(0), mb.Size()) +} + +// Other cases for Size are covered as part of write tests. +func (testSuite *MemoryBlockTest) TestMemoryBlockSizeForEmptyBlock() { + mb := createBlock(12) + + assert.Equal(testSuite.T(), int64(0), mb.Size()) +} + +// Other cases for reader are covered as part of write tests. +func (testSuite *MemoryBlockTest) TestMemoryBlockReaderForEmptyBlock() { + mb := createBlock(12) + + output, err := io.ReadAll(mb.Reader()) + assert.Nil(testSuite.T(), err) + assert.Empty(testSuite.T(), output) + assert.Equal(testSuite.T(), int64(0), mb.Size()) +}