Skip to content

Commit

Permalink
Added block interface and memory block implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
vadlakondaswetha committed Oct 16, 2024
1 parent 24a3262 commit aab8ad1
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 0 deletions.
64 changes: 64 additions & 0 deletions internal/block/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package block

import (
"bytes"
"fmt"
"io"
)

// Block represents the buffer which holds the data.
type Block interface {
// Reuse resets the blocks for reuse.
Reuse()

// Size provides the current data size of the block. The capacity of the block
// can be >= data_size.
Size() int64

// Write writes the given data to block.
Write(bytes []byte) error

// Reader interface helps in copying the data directly to storage.writer
// while uploading to GCS.
Reader() io.Reader
}

// TODO: check if we need offset or just storing end is sufficient. We might need
// for handling ordered writes. It will be decided after ordered writes design.
type offset struct {
start, end int64
}

type memoryBlock struct {
Block
buffer []byte
offset offset
}

func (m *memoryBlock) Reuse() {
clear(m.buffer)

m.offset.end = 0
m.offset.start = 0
}

func (m *memoryBlock) Size() int64 {
return m.offset.end - m.offset.start
}
func (m *memoryBlock) Write(bytes []byte) error {
if m.Size()+int64(len(bytes)) > int64(cap(m.buffer)) {
return fmt.Errorf("received data more than capacity of the block")
}

n := copy(m.buffer[m.offset.end:], bytes)
if n != len(bytes) {
return fmt.Errorf("error in copying the data to block. Expected %d, got %d", len(bytes), n)
}

m.offset.end += int64(len(bytes))
return nil
}

func (m *memoryBlock) Reader() io.Reader {
return bytes.NewReader(m.buffer[0:m.offset.end])
}
107 changes: 107 additions & 0 deletions internal/block/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package block

import (
"fmt"
"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

type MemoryBlockTest struct {
suite.Suite
}

func TestMemoryBlockTestSuite(t *testing.T) {
suite.Run(t, new(MemoryBlockTest))
}

func createBlock(size uint32) Block {
mb := memoryBlock{
buffer: make([]byte, size),
offset: offset{0, 0},
}

return &mb
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWrite() {
mb := createBlock(12)
content := []byte("hi")
err := mb.Write(content)

assert.Nil(testSuite.T(), err)
output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), content, output)
assert.Equal(testSuite.T(), int64(2), mb.Size())
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWithDataGreaterThanCapacity() {
mb := createBlock(1)
content := []byte("hi")
err := mb.Write(content)

assert.NotNil(testSuite.T(), err)
assert.EqualError(testSuite.T(), err, fmt.Sprintf("received data more than capacity of the block"))

Check failure on line 47 in internal/block/block_test.go

View workflow job for this annotation

GitHub Actions / Lint

S1039: unnecessary use of fmt.Sprintf (gosimple)
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWithMultipleWrites() {
mb := createBlock(12)
err := mb.Write([]byte("hi"))
assert.Nil(testSuite.T(), err)
err = mb.Write([]byte("hello"))
assert.Nil(testSuite.T(), err)

output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), []byte("hihello"), output)
assert.Equal(testSuite.T(), int64(7), mb.Size())
}

func (testSuite *MemoryBlockTest) TestMemoryBlockWriteWith2ndWriteBeyondCapacity() {
mb := createBlock(2)
content := []byte("hi")
err := mb.Write(content)
assert.Nil(testSuite.T(), err)
err = mb.Write(content)

assert.NotNil(testSuite.T(), err)
assert.EqualError(testSuite.T(), err, fmt.Sprintf("received data more than capacity of the block"))

Check failure on line 71 in internal/block/block_test.go

View workflow job for this annotation

GitHub Actions / Lint

S1039: unnecessary use of fmt.Sprintf (gosimple)
}

func (testSuite *MemoryBlockTest) TestMemoryBlockReuse() {
mb := createBlock(12)
content := []byte("hi")
err := mb.Write(content)
assert.Nil(testSuite.T(), err)
output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), content, output)
assert.Equal(testSuite.T(), int64(2), mb.Size())

mb.Reuse()

output, err = io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Empty(testSuite.T(), output)
assert.Equal(testSuite.T(), int64(0), mb.Size())
}

// Other cases for Size are covered as part of write tests.
func (testSuite *MemoryBlockTest) TestMemoryBlockSizeForEmptyBlock() {
mb := createBlock(12)

assert.Equal(testSuite.T(), int64(0), mb.Size())
}

// Other cases for reader are covered as part of write tests.
func (testSuite *MemoryBlockTest) TestMemoryBlockReaderForEmptyBlock() {
mb := createBlock(12)

output, err := io.ReadAll(mb.Reader())
assert.Nil(testSuite.T(), err)
assert.Empty(testSuite.T(), output)
assert.Equal(testSuite.T(), int64(0), mb.Size())
}

0 comments on commit aab8ad1

Please sign in to comment.