Skip to content

Commit

Permalink
Implementation of StorageProvider events (#37)
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod authored Oct 24, 2024
1 parent 3265747 commit 4b1cc14
Show file tree
Hide file tree
Showing 23 changed files with 492 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/storage/dirtytracker/dirty_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,16 @@ type DirtyTracker struct {
}

type DirtyTrackerLocal struct {
storage.StorageProviderWithEvents
dt *DirtyTracker
}

// Relay events to embedded StorageProvider
func (i *DirtyTrackerLocal) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.dt.prov, event_type, event_data)...)
}

func (dtl *DirtyTrackerLocal) ReadAt(buffer []byte, offset int64) (int, error) {
return dtl.dt.localReadAt(buffer, offset)
}
Expand All @@ -57,9 +64,16 @@ func (dtl *DirtyTrackerLocal) CancelWrites(offset int64, length int64) {
}

type DirtyTrackerRemote struct {
storage.StorageProviderWithEvents
dt *DirtyTracker
}

// Relay events to embedded StorageProvider
func (i *DirtyTrackerRemote) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.dt.prov, event_type, event_data)...)
}

func (dtl *DirtyTrackerRemote) ReadAt(buffer []byte, offset int64) (int, error) {
return dtl.dt.remoteReadAt(buffer, offset)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/artificial_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
*
*/
type ArtificialLatency struct {
storage.StorageProviderWithEvents
lock sync.RWMutex
prov storage.StorageProvider
latency_read time.Duration
Expand All @@ -21,6 +22,12 @@ type ArtificialLatency struct {
latency_write_per_byte time.Duration
}

// Relay events to embedded StorageProvider
func (i *ArtificialLatency) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewArtificialLatency(prov storage.StorageProvider, latencyRead time.Duration, latencyReadPerByte time.Duration, latencyWrite time.Duration, latencyWritePerByte time.Duration) *ArtificialLatency {
return &ArtificialLatency{
prov: prov,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

type BinLog struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
filename string
ctime time.Time
Expand All @@ -23,6 +24,12 @@ type BinLog struct {
writesEnabled atomic.Bool
}

// Relay events to embedded StorageProvider
func (i *BinLog) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewBinLog(prov storage.StorageProvider, filename string) (*BinLog, error) {
fp, err := os.Create(filename)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/block_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ import (
*/

type BlockSplitter struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
block_size int
size uint64
}

// Relay events to embedded StorageProvider
func (i *BlockSplitter) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewBlockSplitter(prov storage.StorageProvider, block_size int) *BlockSplitter {
return &BlockSplitter{
prov: prov,
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/modules/copy_on_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

type CopyOnWrite struct {
storage.StorageProviderWithEvents
source storage.StorageProvider
cache storage.StorageProvider
exists *util.Bitfield
Expand All @@ -18,6 +19,13 @@ type CopyOnWrite struct {
wg sync.WaitGroup
}

// Relay events to embedded StorageProvider
func (i *CopyOnWrite) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
data = append(data, storage.SendEvent(i.cache, event_type, event_data)...)
return append(data, storage.SendEvent(i.source, event_type, event_data)...)
}

func NewCopyOnWrite(source storage.StorageProvider, cache storage.StorageProvider, blockSize int) *CopyOnWrite {
numBlocks := (source.Size() + uint64(blockSize) - 1) / uint64(blockSize)
return &CopyOnWrite{
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/dummy_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ import (
)

type DummyTracker struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
bf *util.Bitfield
}

// Relay events to embedded StorageProvider
func (i *DummyTracker) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewDummyTracker(prov storage.StorageProvider, block_size int) *DummyTracker {
num_blocks := (int(prov.Size()) + block_size - 1) / block_size
l := &DummyTracker{
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/filter_redundant_writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ import (
*/

type FilterRedundantWrites struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
source io.ReaderAt
no_change_allowance int
}

// Relay events to embedded StorageProvider
func (i *FilterRedundantWrites) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewFilterRedundantWrites(prov storage.StorageProvider, source io.ReaderAt, allowance int) *FilterRedundantWrites {
return &FilterRedundantWrites{
prov: prov,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ import (
)

type Hooks struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
Pre_read func(buffer []byte, offset int64) (bool, int, error)
Post_read func(buffer []byte, offset int64, n int, err error) (int, error)
Pre_write func(buffer []byte, offset int64) (bool, int, error)
Post_write func(buffer []byte, offset int64, n int, err error) (int, error)
}

// Relay events to embedded StorageProvider
func (i *Hooks) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewHooks(prov storage.StorageProvider) *Hooks {
return &Hooks{
prov: prov,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/lockable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ import (
*/

type Lockable struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
lock *sync.Cond
locked bool
}

// Relay events to embedded StorageProvider
func (i *Lockable) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewLockable(prov storage.StorageProvider) *Lockable {
return &Lockable{
prov: prov,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ import (
)

type Logger struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
prefix string
enabled atomic.Bool
}

// Relay events to embedded StorageProvider
func (i *Logger) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewLogger(prov storage.StorageProvider, prefix string) *Logger {
l := &Logger{
prov: prov,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
*
*/
type Metrics struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
metric_read_ops uint64
metric_read_bytes uint64
Expand Down Expand Up @@ -47,6 +48,12 @@ func NewMetrics(prov storage.StorageProvider) *Metrics {
}
}

// Relay events to embedded StorageProvider
func (i *Metrics) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func formatDuration(d time.Duration) string {
if d < time.Millisecond {
return fmt.Sprintf("%dns", d.Nanoseconds())
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/modules/raid.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,19 @@ import (
)

type Raid struct {
storage.StorageProviderWithEvents
prov []storage.StorageProvider
}

// Relay events to embedded StorageProvider
func (i *Raid) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
for _, pr := range i.prov {
data = append(data, storage.SendEvent(pr, event_type, event_data)...)
}
return data
}

func NewRaid(prov []storage.StorageProvider) (*Raid, error) {
if len(prov) == 0 {
return nil, errors.New("Need at least one provider")
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/modules/read_only_gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,18 @@ import (
*/

type ReadOnlyGate struct {
storage.StorageProviderWithEvents
prov storage.StorageProvider
lock *sync.Cond
locked bool
}

// Relay events to embedded StorageProvider
func (i *ReadOnlyGate) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
return append(data, storage.SendEvent(i.prov, event_type, event_data)...)
}

func NewReadOnlyGate(prov storage.StorageProvider) *ReadOnlyGate {
return &ReadOnlyGate{
prov: prov,
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/modules/sharded_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@ import (
)

type ShardedStorage struct {
storage.StorageProviderWithEvents
blocks []storage.StorageProvider
block_size int
size int
}

// Relay events to embedded StorageProvider
func (i *ShardedStorage) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData {
data := i.StorageProviderWithEvents.SendEvent(event_type, event_data)
for _, pr := range i.blocks {
data = append(data, storage.SendEvent(pr, event_type, event_data)...)
}
return data
}

func NewShardedStorage(size int, blocksize int, creator func(index int, size int) (storage.StorageProvider, error)) (*ShardedStorage, error) {
if blocksize == 0 {
return nil, fmt.Errorf("Invalid block size of 0")
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/sources/file_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"io"
"os"
"sync"

"github.com/loopholelabs/silo/pkg/storage"
)

/**
* Simple fixed size file storage provider
*
*/
type FileStorage struct {
storage.StorageProviderWithEvents
fp *os.File
size int64
wg sync.WaitGroup
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/sources/file_storage_sparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"os"
"sync"

"github.com/loopholelabs/silo/pkg/storage"
)

const BLOCK_HEADER_SIZE = 8
Expand All @@ -18,6 +20,7 @@ const BLOCK_HEADER_SIZE = 8
*
*/
type FileStorageSparse struct {
storage.StorageProviderWithEvents
f string
fp *os.File
size uint64
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/sources/memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sources

import (
"sync"

"github.com/loopholelabs/silo/pkg/storage"
)

/**
Expand All @@ -10,6 +12,7 @@ import (
*
*/
type MemoryStorage struct {
storage.StorageProviderWithEvents
data []byte
lock sync.RWMutex
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/sources/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"sync"

"github.com/loopholelabs/silo/pkg/storage"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
Expand All @@ -19,6 +20,7 @@ var (
*/

type S3Storage struct {
storage.StorageProviderWithEvents
client *minio.Client
bucket string
prefix string
Expand Down
Loading

0 comments on commit 4b1cc14

Please sign in to comment.