diff --git a/pkg/storage/dirtytracker/dirty_tracker.go b/pkg/storage/dirtytracker/dirty_tracker.go index 1220dbd..29e7769 100644 --- a/pkg/storage/dirtytracker/dirty_tracker.go +++ b/pkg/storage/dirtytracker/dirty_tracker.go @@ -29,9 +29,16 @@ type DirtyTracker struct { } type DirtyTrackerLocal struct { + storage.StorageProviderWithEvents dt *DirtyTracker } +// Relay events to embedded StorageProvider +func (i *DirtyTrackerLocal) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.dt.prov, event_type, event_data)...) +} + func (dtl *DirtyTrackerLocal) ReadAt(buffer []byte, offset int64) (int, error) { return dtl.dt.localReadAt(buffer, offset) } @@ -57,9 +64,16 @@ func (dtl *DirtyTrackerLocal) CancelWrites(offset int64, length int64) { } type DirtyTrackerRemote struct { + storage.StorageProviderWithEvents dt *DirtyTracker } +// Relay events to embedded StorageProvider +func (i *DirtyTrackerRemote) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.dt.prov, event_type, event_data)...) +} + func (dtl *DirtyTrackerRemote) ReadAt(buffer []byte, offset int64) (int, error) { return dtl.dt.remoteReadAt(buffer, offset) } diff --git a/pkg/storage/modules/artificial_latency.go b/pkg/storage/modules/artificial_latency.go index cf9ccc8..ea73810 100644 --- a/pkg/storage/modules/artificial_latency.go +++ b/pkg/storage/modules/artificial_latency.go @@ -13,6 +13,7 @@ import ( * */ type ArtificialLatency struct { + storage.StorageProviderWithEvents lock sync.RWMutex prov storage.StorageProvider latency_read time.Duration @@ -21,6 +22,12 @@ type ArtificialLatency struct { latency_write_per_byte time.Duration } +// Relay events to embedded StorageProvider +func (i *ArtificialLatency) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewArtificialLatency(prov storage.StorageProvider, latencyRead time.Duration, latencyReadPerByte time.Duration, latencyWrite time.Duration, latencyWritePerByte time.Duration) *ArtificialLatency { return &ArtificialLatency{ prov: prov, diff --git a/pkg/storage/modules/binlog.go b/pkg/storage/modules/binlog.go index 2558d10..baf308c 100644 --- a/pkg/storage/modules/binlog.go +++ b/pkg/storage/modules/binlog.go @@ -13,6 +13,7 @@ import ( ) type BinLog struct { + storage.StorageProviderWithEvents prov storage.StorageProvider filename string ctime time.Time @@ -23,6 +24,12 @@ type BinLog struct { writesEnabled atomic.Bool } +// Relay events to embedded StorageProvider +func (i *BinLog) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewBinLog(prov storage.StorageProvider, filename string) (*BinLog, error) { fp, err := os.Create(filename) if err != nil { diff --git a/pkg/storage/modules/block_splitter.go b/pkg/storage/modules/block_splitter.go index 9a6a847..e315242 100644 --- a/pkg/storage/modules/block_splitter.go +++ b/pkg/storage/modules/block_splitter.go @@ -9,11 +9,18 @@ import ( */ type BlockSplitter struct { + storage.StorageProviderWithEvents prov storage.StorageProvider block_size int size uint64 } +// Relay events to embedded StorageProvider +func (i *BlockSplitter) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewBlockSplitter(prov storage.StorageProvider, block_size int) *BlockSplitter { return &BlockSplitter{ prov: prov, diff --git a/pkg/storage/modules/copy_on_write.go b/pkg/storage/modules/copy_on_write.go index 1d136dd..dcdf5ac 100644 --- a/pkg/storage/modules/copy_on_write.go +++ b/pkg/storage/modules/copy_on_write.go @@ -8,6 +8,7 @@ import ( ) type CopyOnWrite struct { + storage.StorageProviderWithEvents source storage.StorageProvider cache storage.StorageProvider exists *util.Bitfield @@ -18,6 +19,13 @@ type CopyOnWrite struct { wg sync.WaitGroup } +// Relay events to embedded StorageProvider +func (i *CopyOnWrite) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + data = append(data, storage.SendEvent(i.cache, event_type, event_data)...) + return append(data, storage.SendEvent(i.source, event_type, event_data)...) +} + func NewCopyOnWrite(source storage.StorageProvider, cache storage.StorageProvider, blockSize int) *CopyOnWrite { numBlocks := (source.Size() + uint64(blockSize) - 1) / uint64(blockSize) return &CopyOnWrite{ diff --git a/pkg/storage/modules/dummy_tracker.go b/pkg/storage/modules/dummy_tracker.go index 6d0d544..e57d2c8 100644 --- a/pkg/storage/modules/dummy_tracker.go +++ b/pkg/storage/modules/dummy_tracker.go @@ -6,10 +6,17 @@ import ( ) type DummyTracker struct { + storage.StorageProviderWithEvents prov storage.StorageProvider bf *util.Bitfield } +// Relay events to embedded StorageProvider +func (i *DummyTracker) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewDummyTracker(prov storage.StorageProvider, block_size int) *DummyTracker { num_blocks := (int(prov.Size()) + block_size - 1) / block_size l := &DummyTracker{ diff --git a/pkg/storage/modules/filter_redundant_writes.go b/pkg/storage/modules/filter_redundant_writes.go index 4127413..a519588 100644 --- a/pkg/storage/modules/filter_redundant_writes.go +++ b/pkg/storage/modules/filter_redundant_writes.go @@ -12,11 +12,18 @@ import ( */ type FilterRedundantWrites struct { + storage.StorageProviderWithEvents prov storage.StorageProvider source io.ReaderAt no_change_allowance int } +// Relay events to embedded StorageProvider +func (i *FilterRedundantWrites) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewFilterRedundantWrites(prov storage.StorageProvider, source io.ReaderAt, allowance int) *FilterRedundantWrites { return &FilterRedundantWrites{ prov: prov, diff --git a/pkg/storage/modules/hooks.go b/pkg/storage/modules/hooks.go index 7ebf9f5..3145609 100644 --- a/pkg/storage/modules/hooks.go +++ b/pkg/storage/modules/hooks.go @@ -5,6 +5,7 @@ import ( ) type Hooks struct { + storage.StorageProviderWithEvents prov storage.StorageProvider Pre_read func(buffer []byte, offset int64) (bool, int, error) Post_read func(buffer []byte, offset int64, n int, err error) (int, error) @@ -12,6 +13,12 @@ type Hooks struct { Post_write func(buffer []byte, offset int64, n int, err error) (int, error) } +// Relay events to embedded StorageProvider +func (i *Hooks) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewHooks(prov storage.StorageProvider) *Hooks { return &Hooks{ prov: prov, diff --git a/pkg/storage/modules/lockable.go b/pkg/storage/modules/lockable.go index 0398586..908d554 100644 --- a/pkg/storage/modules/lockable.go +++ b/pkg/storage/modules/lockable.go @@ -12,11 +12,18 @@ import ( */ type Lockable struct { + storage.StorageProviderWithEvents prov storage.StorageProvider lock *sync.Cond locked bool } +// Relay events to embedded StorageProvider +func (i *Lockable) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewLockable(prov storage.StorageProvider) *Lockable { return &Lockable{ prov: prov, diff --git a/pkg/storage/modules/logger.go b/pkg/storage/modules/logger.go index 8079af9..044ba9d 100644 --- a/pkg/storage/modules/logger.go +++ b/pkg/storage/modules/logger.go @@ -9,11 +9,18 @@ import ( ) type Logger struct { + storage.StorageProviderWithEvents prov storage.StorageProvider prefix string enabled atomic.Bool } +// Relay events to embedded StorageProvider +func (i *Logger) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewLogger(prov storage.StorageProvider, prefix string) *Logger { l := &Logger{ prov: prov, diff --git a/pkg/storage/modules/metrics.go b/pkg/storage/modules/metrics.go index 227f220..3817376 100644 --- a/pkg/storage/modules/metrics.go +++ b/pkg/storage/modules/metrics.go @@ -13,6 +13,7 @@ import ( * */ type Metrics struct { + storage.StorageProviderWithEvents prov storage.StorageProvider metric_read_ops uint64 metric_read_bytes uint64 @@ -47,6 +48,12 @@ func NewMetrics(prov storage.StorageProvider) *Metrics { } } +// Relay events to embedded StorageProvider +func (i *Metrics) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func formatDuration(d time.Duration) string { if d < time.Millisecond { return fmt.Sprintf("%dns", d.Nanoseconds()) diff --git a/pkg/storage/modules/raid.go b/pkg/storage/modules/raid.go index 13d68fc..62d4a71 100644 --- a/pkg/storage/modules/raid.go +++ b/pkg/storage/modules/raid.go @@ -8,9 +8,19 @@ import ( ) type Raid struct { + storage.StorageProviderWithEvents prov []storage.StorageProvider } +// Relay events to embedded StorageProvider +func (i *Raid) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + for _, pr := range i.prov { + data = append(data, storage.SendEvent(pr, event_type, event_data)...) + } + return data +} + func NewRaid(prov []storage.StorageProvider) (*Raid, error) { if len(prov) == 0 { return nil, errors.New("Need at least one provider") diff --git a/pkg/storage/modules/read_only_gate.go b/pkg/storage/modules/read_only_gate.go index 5bcf8b8..aa2cef4 100644 --- a/pkg/storage/modules/read_only_gate.go +++ b/pkg/storage/modules/read_only_gate.go @@ -11,11 +11,18 @@ import ( */ type ReadOnlyGate struct { + storage.StorageProviderWithEvents prov storage.StorageProvider lock *sync.Cond locked bool } +// Relay events to embedded StorageProvider +func (i *ReadOnlyGate) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewReadOnlyGate(prov storage.StorageProvider) *ReadOnlyGate { return &ReadOnlyGate{ prov: prov, diff --git a/pkg/storage/modules/sharded_storage.go b/pkg/storage/modules/sharded_storage.go index 9e84e25..7be6121 100644 --- a/pkg/storage/modules/sharded_storage.go +++ b/pkg/storage/modules/sharded_storage.go @@ -13,11 +13,21 @@ import ( ) type ShardedStorage struct { + storage.StorageProviderWithEvents blocks []storage.StorageProvider block_size int size int } +// Relay events to embedded StorageProvider +func (i *ShardedStorage) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + for _, pr := range i.blocks { + data = append(data, storage.SendEvent(pr, event_type, event_data)...) + } + return data +} + func NewShardedStorage(size int, blocksize int, creator func(index int, size int) (storage.StorageProvider, error)) (*ShardedStorage, error) { if blocksize == 0 { return nil, fmt.Errorf("Invalid block size of 0") diff --git a/pkg/storage/sources/file_storage.go b/pkg/storage/sources/file_storage.go index 26b91c4..c314b86 100644 --- a/pkg/storage/sources/file_storage.go +++ b/pkg/storage/sources/file_storage.go @@ -4,6 +4,8 @@ import ( "io" "os" "sync" + + "github.com/loopholelabs/silo/pkg/storage" ) /** @@ -11,6 +13,7 @@ import ( * */ type FileStorage struct { + storage.StorageProviderWithEvents fp *os.File size int64 wg sync.WaitGroup diff --git a/pkg/storage/sources/file_storage_sparse.go b/pkg/storage/sources/file_storage_sparse.go index be2e6c4..42953a9 100644 --- a/pkg/storage/sources/file_storage_sparse.go +++ b/pkg/storage/sources/file_storage_sparse.go @@ -5,6 +5,8 @@ import ( "errors" "os" "sync" + + "github.com/loopholelabs/silo/pkg/storage" ) const BLOCK_HEADER_SIZE = 8 @@ -18,6 +20,7 @@ const BLOCK_HEADER_SIZE = 8 * */ type FileStorageSparse struct { + storage.StorageProviderWithEvents f string fp *os.File size uint64 diff --git a/pkg/storage/sources/memory_storage.go b/pkg/storage/sources/memory_storage.go index 6d44866..c3f0edb 100644 --- a/pkg/storage/sources/memory_storage.go +++ b/pkg/storage/sources/memory_storage.go @@ -2,6 +2,8 @@ package sources import ( "sync" + + "github.com/loopholelabs/silo/pkg/storage" ) /** @@ -10,6 +12,7 @@ import ( * */ type MemoryStorage struct { + storage.StorageProviderWithEvents data []byte lock sync.RWMutex } diff --git a/pkg/storage/sources/s3_storage.go b/pkg/storage/sources/s3_storage.go index 1eb947d..b0355c2 100644 --- a/pkg/storage/sources/s3_storage.go +++ b/pkg/storage/sources/s3_storage.go @@ -8,6 +8,7 @@ import ( "io" "sync" + "github.com/loopholelabs/silo/pkg/storage" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -19,6 +20,7 @@ var ( */ type S3Storage struct { + storage.StorageProviderWithEvents client *minio.Client bucket string prefix string diff --git a/pkg/storage/storage_events.go b/pkg/storage/storage_events.go new file mode 100644 index 0000000..018064d --- /dev/null +++ b/pkg/storage/storage_events.go @@ -0,0 +1,83 @@ +package storage + +import "sync" + +/** + * Events are an optional addition to StorageProvider. + * + * To support events, a StorageProvider can simply embed StorageProviderWithEvents, or do it's own impl of StorageProviderWithEventsIfc + * + * StorageProviders should also relay events to any StorageProviders they wrap. + */ + +type StorageProviderWithEventsIfc interface { + StorageProvider + SendEvent(EventType, EventData) []EventReturnData + AddEventNotification(EventType, EventCallback) +} + +// Try to send an event for a given StorageProvider +func SendEvent(s StorageProvider, event_type EventType, event_data EventData) []EventReturnData { + lcsp, ok := s.(StorageProviderWithEventsIfc) + if ok { + return lcsp.SendEvent(event_type, event_data) + } + return nil +} + +// Try to add an event notification on a StorageProvider +func AddEventNotification(s StorageProvider, state EventType, callback EventCallback) bool { + lcsp, ok := s.(StorageProviderWithEventsIfc) + if ok { + lcsp.AddEventNotification(state, callback) + } + return ok +} + +/** + * A StorageProvider can simply embed StorageProviderWithEvents to support events + * + */ +type EventType string +type EventData interface{} +type EventReturnData interface{} + +type EventCallback func(event EventType, data EventData) EventReturnData + +type StorageProviderWithEvents struct { + lock sync.Mutex + callbacks map[EventType][]EventCallback +} + +// Send an event, and notify any callbacks +func (spl *StorageProviderWithEvents) SendEvent(event_type EventType, event_data EventData) []EventReturnData { + spl.lock.Lock() + defer spl.lock.Unlock() + if spl.callbacks == nil { + return nil + } + cbs, ok := spl.callbacks[event_type] + if ok { + rets := make([]EventReturnData, 0) + for _, cb := range cbs { + rets = append(rets, cb(event_type, event_data)) + } + return rets + } + return nil +} + +// Add a new callback for the given state. +func (spl *StorageProviderWithEvents) AddEventNotification(event_type EventType, callback EventCallback) { + spl.lock.Lock() + defer spl.lock.Unlock() + if spl.callbacks == nil { + spl.callbacks = make(map[EventType][]EventCallback) + } + _, ok := spl.callbacks[event_type] + if ok { + spl.callbacks[event_type] = append(spl.callbacks[event_type], callback) + } else { + spl.callbacks[event_type] = []EventCallback{callback} + } +} diff --git a/pkg/storage/storage_events_test.go b/pkg/storage/storage_events_test.go new file mode 100644 index 0000000..8038d34 --- /dev/null +++ b/pkg/storage/storage_events_test.go @@ -0,0 +1,263 @@ +package storage_test + +import ( + "fmt" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/dirtytracker" + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" + "github.com/loopholelabs/silo/pkg/storage/waitingcache" + "github.com/stretchr/testify/assert" + + "github.com/google/uuid" +) + +type SomeStorage struct { + storage.StorageProviderWithEvents +} + +func NewSomeStorage() *SomeStorage { + return &SomeStorage{} +} + +func (ss *SomeStorage) ReadAt([]byte, int64) (int, error) { + return 0, nil +} + +func (ss *SomeStorage) WriteAt([]byte, int64) (int, error) { + return 0, nil +} + +func (ss *SomeStorage) Size() uint64 { + return 0 +} + +func (ss *SomeStorage) Flush() error { + return nil +} + +func (ss *SomeStorage) Close() error { + return nil +} + +func (ss *SomeStorage) CancelWrites(int64, int64) { +} + +func (ss *SomeStorage) UUID() []uuid.UUID { + return nil +} + +// Only exists in SomeStorage +func (ss *SomeStorage) SomeName() string { + return "SomeStorage" +} + +type SomeStorageNoEvents struct { +} + +func NewSomeStorageNoEvents() *SomeStorageNoEvents { + return &SomeStorageNoEvents{} +} + +func (ss *SomeStorageNoEvents) ReadAt([]byte, int64) (int, error) { + return 0, nil +} + +func (ss *SomeStorageNoEvents) WriteAt([]byte, int64) (int, error) { + return 0, nil +} + +func (ss *SomeStorageNoEvents) Size() uint64 { + return 0 +} + +func (ss *SomeStorageNoEvents) Flush() error { + return nil +} + +func (ss *SomeStorageNoEvents) Close() error { + return nil +} + +func (ss *SomeStorageNoEvents) CancelWrites(int64, int64) { +} + +func (ss *SomeStorageNoEvents) UUID() []uuid.UUID { + return nil +} + +func TestStorageEvents(t *testing.T) { + ss := NewSomeStorage() + + var wg sync.WaitGroup + wg.Add(1) + ok := storage.AddEventNotification(ss, "testing", func(event storage.EventType, data storage.EventData) storage.EventReturnData { + // Do something here + assert.Equal(t, storage.EventType("testing"), event) + assert.Equal(t, "HELLO WORLD", data.(string)) + wg.Done() + + return "SOMETHING" + }) + assert.True(t, ok) + + data := storage.SendEvent(ss, "testing", storage.EventData("HELLO WORLD")) + assert.Equal(t, 1, len(data)) + assert.Equal(t, "SOMETHING", data[0].(string)) + + wg.Wait() + + // Try doing it on something that doesn't support events + + ssnl := NewSomeStorageNoEvents() + + ok = storage.AddEventNotification(ssnl, "testing", func(from storage.EventType, to storage.EventData) storage.EventReturnData { + assert.Fail(t, "shouldn't happen") + return nil + }) + assert.False(t, ok) + data = storage.SendEvent(ssnl, "testing", nil) + assert.Nil(t, data) + +} + +/** + * Make sure events work as we expect when storage providers are chained for migration + * + */ + +type module_data struct { + prov storage.StorageProvider + events_received uint64 +} + +type test_data struct { + name string + insert_handler map[int]bool + expected_returns map[int]int + expected_counts map[int]uint64 +} + +func TestStorageEventsForModules(tt *testing.T) { + test_cases := []test_data{ + { + name: "all handlers", + insert_handler: map[int]bool{0: true, 1: true, 2: true, 3: true, 4: true, 5: true, 6: true, 7: true, 8: true, + 9: true, 10: true, 11: true, 12: true, 13: true, 14: true, 15: true, 16: true, 17: true, 18: true}, + expected_returns: map[int]int{ + 0: 1, 1: 2, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 17}, + expected_counts: map[int]uint64{ + 0: 19, 1: 17, 2: 1, 3: 16, 4: 15, 5: 14, 6: 13, 7: 12, 8: 11, 9: 10, 10: 9, 11: 8, 12: 7, 13: 6, 14: 5, 15: 4, 16: 3, 17: 1, 18: 1}, + }, + { + name: "one handler", + insert_handler: map[int]bool{0: true, 1: false, 2: false, 3: false, 4: false, 5: false, 6: false, 7: false, 8: false, + 9: false, 10: false, 11: false, 12: false, 13: false, 14: false, 15: false, 16: false, 17: false, 18: false}, + expected_returns: map[int]int{ + 0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1, 10: 1, 11: 1, 12: 1, 13: 1, 14: 1, 15: 1, 16: 1, 17: 1, 18: 1}, + expected_counts: map[int]uint64{ + 0: 19, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0, 16: 0, 17: 0, 18: 0}, + }, + } + + for _, td := range test_cases { + tt.Run(td.name, func(t *testing.T) { + + size := 1024 * 1024 + blockSize := 4096 + + all_modules := make([]*module_data, 0) + + // Add a module into our list + add_module := func(s storage.StorageProvider) { + i := len(all_modules) + + mod_data := &module_data{ + prov: s, + events_received: 0, + } + all_modules = append(all_modules, mod_data) + + if td.insert_handler[i] { + // Register an event notification on the module. + ok := storage.AddEventNotification(s, "some_event", func(event storage.EventType, data storage.EventData) storage.EventReturnData { + assert.Equal(t, event, storage.EventType("some_event")) + assert.Equal(t, data, storage.EventData("some_data")) + atomic.AddUint64(&mod_data.events_received, 1) + return fmt.Sprintf("RETURN DATA %d", len(all_modules)) + }) + assert.True(t, ok) + } + } + + // Start with some memory storage, and register a handler on it + sourceStorageMem := sources.NewMemoryStorage(size) + add_module(sourceStorageMem) + + // dirty tracker + sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(sourceStorageMem, blockSize) + add_module(sourceDirtyLocal) + add_module(sourceDirtyRemote) + + mod1 := modules.NewArtificialLatency(sourceDirtyLocal, 0, 0, 0, 0) + add_module(mod1) + mod2, err := modules.NewBinLog(mod1, "binlog_file") + assert.NoError(t, err) + t.Cleanup(func() { + os.Remove("binlog_file") + }) + add_module(mod2) + mod3 := modules.NewBlockSplitter(mod2, blockSize) + add_module(mod3) + mod4 := modules.NewCopyOnWrite(mod3, sources.NewMemoryStorage(size), blockSize) + add_module(mod4) + mod5 := modules.NewDummyTracker(mod4, blockSize) + add_module(mod5) + mod6 := modules.NewFilterRedundantWrites(mod5, nil, 0) + add_module(mod6) + mod7 := modules.NewHooks(mod6) + add_module(mod7) + mod8 := modules.NewLockable(mod7) + add_module(mod8) + mod9 := modules.NewLogger(mod8, "prefix") + add_module(mod9) + mod10 := modules.NewMetrics(mod9) + add_module(mod10) + mod11, err := modules.NewRaid([]storage.StorageProvider{mod10}) + assert.NoError(t, err) + add_module(mod11) + mod12 := modules.NewReadOnlyGate(mod11) + add_module(mod12) + mod13, err := modules.NewShardedStorage(size, size, func(index int, size int) (storage.StorageProvider, error) { + return mod12, nil + }) + assert.NoError(t, err) + add_module(mod13) + + mod14 := volatilitymonitor.NewVolatilityMonitor(mod13, blockSize, time.Second) + add_module(mod14) + mod15, mod16 := waitingcache.NewWaitingCache(mod14, blockSize) + add_module(mod15) + add_module(mod16) + + // Now send events to various parts of the chain, and make sure the handlers receive the events. + for i, mod := range all_modules { + r := storage.SendEvent(mod.prov, storage.EventType("some_event"), storage.EventData("some_data")) + assert.NotNil(t, r) + assert.Equal(t, td.expected_returns[i], len(r)) + } + + // Check the modules got the right number of events on them... + for i, mod := range all_modules { + assert.Equal(t, td.expected_counts[i], mod.events_received) + } + }) + } +} diff --git a/pkg/storage/volatilitymonitor/volatility_monitor.go b/pkg/storage/volatilitymonitor/volatility_monitor.go index 1c7efd2..bff1654 100644 --- a/pkg/storage/volatilitymonitor/volatility_monitor.go +++ b/pkg/storage/volatilitymonitor/volatility_monitor.go @@ -9,6 +9,7 @@ import ( ) type VolatilityMonitor struct { + storage.StorageProviderWithEvents prov storage.StorageProvider expiry time.Duration size uint64 @@ -20,6 +21,12 @@ type VolatilityMonitor struct { totalData *volatilityData } +// Relay events to embedded StorageProvider +func (i *VolatilityMonitor) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := i.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(i.prov, event_type, event_data)...) +} + func NewVolatilityMonitor(prov storage.StorageProvider, blockSize int, expiry time.Duration) *VolatilityMonitor { numBlocks := (int(prov.Size()) + blockSize - 1) / blockSize return &VolatilityMonitor{ diff --git a/pkg/storage/waitingcache/waiting_cache_local.go b/pkg/storage/waitingcache/waiting_cache_local.go index a01a64f..4bbece2 100644 --- a/pkg/storage/waitingcache/waiting_cache_local.go +++ b/pkg/storage/waitingcache/waiting_cache_local.go @@ -1,16 +1,24 @@ package waitingcache import ( + "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/util" ) type WaitingCacheLocal struct { + storage.StorageProviderWithEvents wc *WaitingCache available util.Bitfield NeedAt func(offset int64, length int32) DontNeedAt func(offset int64, length int32) } +// Relay events to embedded StorageProvider +func (wcl *WaitingCacheLocal) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := wcl.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(wcl.wc.prov, event_type, event_data)...) +} + func (wcl *WaitingCacheLocal) ReadAt(buffer []byte, offset int64) (int, error) { end := uint64(offset + int64(len(buffer))) if end > wcl.wc.size { diff --git a/pkg/storage/waitingcache/waiting_cache_remote.go b/pkg/storage/waitingcache/waiting_cache_remote.go index 7048eec..8077134 100644 --- a/pkg/storage/waitingcache/waiting_cache_remote.go +++ b/pkg/storage/waitingcache/waiting_cache_remote.go @@ -3,14 +3,22 @@ package waitingcache import ( "io" + "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/util" ) type WaitingCacheRemote struct { + storage.StorageProviderWithEvents wc *WaitingCache available util.Bitfield } +// Relay events to embedded StorageProvider +func (wcl *WaitingCacheRemote) SendEvent(event_type storage.EventType, event_data storage.EventData) []storage.EventReturnData { + data := wcl.StorageProviderWithEvents.SendEvent(event_type, event_data) + return append(data, storage.SendEvent(wcl.wc.prov, event_type, event_data)...) +} + func (wcl *WaitingCacheRemote) ReadAt(buffer []byte, offset int64) (int, error) { // Remote reads are unsupported at the moment. return 0, io.EOF