diff --git a/go.mod b/go.mod index cd35c65..4ffe6af 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,17 @@ module github.com/loopholelabs/silo -go 1.21 +go 1.22 -toolchain go1.22.2 +toolchain go1.22.4 require ( github.com/Merovius/nbd v0.0.0-20231017152624-27b78b60d8da github.com/fatih/color v1.17.0 + github.com/google/uuid v1.6.0 github.com/hashicorp/hcl/v2 v2.21.0 github.com/minio/minio-go/v7 v7.0.77 github.com/ory/dockertest/v3 v3.10.0 + github.com/rs/zerolog v1.33.0 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 github.com/vbauerster/mpb/v8 v8.7.5 @@ -37,13 +39,13 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/native v1.1.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/kr/pretty v0.3.1 // indirect + github.com/loopholelabs/logging v0.3.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect diff --git a/go.sum b/go.sum index a7f8cf7..7d3b446 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,7 @@ github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8 github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7bEZ9Su8= github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= @@ -48,6 +49,7 @@ github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -82,9 +84,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 h1:hRGSmZu7j271trc9sneMrpOW7GN5ngLm8YUZIPzf394= github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/loopholelabs/logging v0.3.1 h1:VA9DF3WrbmvJC1uQJ/XcWgz8KWXydWwe3BdDiMbN2FY= +github.com/loopholelabs/logging v0.3.1/go.mod h1:uRDUydiqPqKbZkb0WoQ3dfyAcJ2iOMhxdEafZssLVv0= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= @@ -125,8 +130,11 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -184,6 +192,7 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/storage/migrator/migrator.go b/pkg/storage/migrator/migrator.go index 3dbb3c2..f19e922 100644 --- a/pkg/storage/migrator/migrator.go +++ b/pkg/storage/migrator/migrator.go @@ -20,6 +20,7 @@ type MigratorConfig struct { Unlocker_handler func() Error_handler func(b *storage.BlockInfo, err error) Progress_handler func(p *MigrationProgress) + Block_handler func(b *storage.BlockInfo, id uint64, block []byte) Concurrency map[int]int Integrity bool Cancel_writes bool @@ -34,6 +35,7 @@ func NewMigratorConfig() *MigratorConfig { Unlocker_handler: func() {}, Error_handler: func(b *storage.BlockInfo, err error) {}, Progress_handler: func(p *MigrationProgress) {}, + Block_handler: func(b *storage.BlockInfo, id uint64, data []byte) {}, Concurrency: map[int]int{ storage.BlockTypeAny: 32, storage.BlockTypeStandard: 32, @@ -73,6 +75,7 @@ type Migrator struct { source_unlock_fn func() error_fn func(block *storage.BlockInfo, err error) progress_fn func(*MigrationProgress) + block_fn func(block *storage.BlockInfo, id uint64, data []byte) progress_lock sync.Mutex progress_last time.Time progress_last_status *MigrationProgress @@ -110,6 +113,7 @@ func NewMigrator(source storage.TrackingStorageProvider, source_unlock_fn: config.Unlocker_handler, error_fn: config.Error_handler, progress_fn: config.Progress_handler, + block_fn: config.Block_handler, block_size: config.Block_size, num_blocks: num_blocks, metric_blocks_migrated: 0, @@ -189,9 +193,11 @@ func (m *Migrator) Migrate(num_blocks int) error { m.wg.Add(1) go func(block_no *storage.BlockInfo) { - err := m.migrateBlock(block_no.Block) + data, err := m.migrateBlock(block_no.Block) if err != nil { m.error_fn(block_no, err) + } else { + m.block_fn(block_no, 0, data) } m.wg.Done() @@ -241,6 +247,14 @@ func (m *Migrator) Unlock() { * An attempt is made to cancel any existing writes for the blocks first. */ func (m *Migrator) MigrateDirty(blocks []uint) error { + return m.MigrateDirtyWithId(blocks, 0) +} + +/** + * + * You can give a tracking ID which will turn up at block_fn on success + */ +func (m *Migrator) MigrateDirtyWithId(blocks []uint, tid uint64) error { for _, pos := range blocks { i := &storage.BlockInfo{Block: int(pos), Type: storage.BlockTypeDirty} @@ -271,10 +285,12 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { m.clean_blocks.ClearBit(int(pos)) - go func(block_no *storage.BlockInfo) { - err := m.migrateBlock(block_no.Block) + go func(block_no *storage.BlockInfo, track_id uint64) { + data, err := m.migrateBlock(block_no.Block) if err != nil { m.error_fn(block_no, err) + } else { + m.block_fn(block_no, track_id, data) } m.wg.Done() @@ -283,7 +299,7 @@ func (m *Migrator) MigrateDirty(blocks []uint) error { cc = m.concurrency[storage.BlockTypeAny] } <-cc - }(i) + }(i, tid) } return nil @@ -337,11 +353,38 @@ func (m *Migrator) reportProgress(forced bool) { m.progress_fn(m.progress_last_status) } +/** + * Get overall status of the migration + * + */ +func (m *Migrator) Status() *MigrationProgress { + m.progress_lock.Lock() + defer m.progress_lock.Unlock() + + migrated := m.migrated_blocks.Count(0, uint(m.num_blocks)) + perc_mig := float64(migrated*100) / float64(m.num_blocks) + + completed := m.clean_blocks.Count(0, uint(m.num_blocks)) + perc_complete := float64(completed*100) / float64(m.num_blocks) + + return &MigrationProgress{ + Total_blocks: m.num_blocks, + Migrated_blocks: migrated, + Migrated_blocks_perc: perc_mig, + Ready_blocks: completed, + Ready_blocks_perc: perc_complete, + Active_blocks: m.moving_blocks.Count(0, uint(m.num_blocks)), + Total_Canceled_blocks: int(atomic.LoadInt64(&m.metric_blocks_canceled)), + Total_Migrated_blocks: int(atomic.LoadInt64(&m.metric_blocks_migrated)), + Total_Duplicated_blocks: int(atomic.LoadInt64(&m.metric_blocks_duplicates)), + } +} + /** * Migrate a single block to dest * */ -func (m *Migrator) migrateBlock(block int) error { +func (m *Migrator) migrateBlock(block int) ([]byte, error) { m.block_locks[block].Lock() defer m.block_locks[block].Unlock() @@ -354,7 +397,7 @@ func (m *Migrator) migrateBlock(block int) error { // Read from source n, err := m.source_tracker.ReadAt(buff, int64(offset)) if err != nil { - return err + return nil, err } var idmap map[uint64]uint64 @@ -362,8 +405,6 @@ func (m *Migrator) migrateBlock(block int) error { idmap = m.source_mapped.GetMapForSourceRange(int64(offset), m.block_size) } - // TODO: Need to figure out how we want to route the WriteAtWithMap here... - // If it was a partial read, truncate buff = buff[:n] @@ -373,11 +414,12 @@ func (m *Migrator) migrateBlock(block int) error { // Now write it to destStorage n, err = m.dest.WriteAt(buff, int64(offset)) } + if n != len(buff) || err != nil { if errors.Is(err, context.Canceled) { atomic.AddInt64(&m.metric_blocks_canceled, 1) } - return err + return nil, err } // Set the last successful write for this block @@ -394,5 +436,5 @@ func (m *Migrator) migrateBlock(block int) error { m.clean_blocks.SetBit(block) m.reportProgress(false) - return nil + return buff, nil } diff --git a/pkg/storage/migrator/sync.go b/pkg/storage/migrator/sync.go new file mode 100644 index 0000000..28937c9 --- /dev/null +++ b/pkg/storage/migrator/sync.go @@ -0,0 +1,234 @@ +package migrator + +import ( + "context" + "crypto/sha256" + "sync" + "sync/atomic" + "time" + + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/dirtytracker" + + "github.com/loopholelabs/logging/types" +) + +type SyncConfig struct { + Logger types.RootLogger + Name string + Tracker *dirtytracker.DirtyTrackerRemote // A dirty block tracker + Lockable storage.LockableStorageProvider // Lockable + LockerHandler func() + UnlockerHandler func() + Destination storage.StorageProvider + Orderer storage.BlockOrder + DirtyCheckPeriod time.Duration + DirtyBlockGetter func() []uint + + BlockSize int + + HashesHandler func(map[uint][32]byte) + ProgressHandler func(p *MigrationProgress) + ErrorHandler func(b *storage.BlockInfo, err error) + + Concurrency map[int]int + Integrity bool + CancelWrites bool + DedupeWrites bool +} + +type Syncer struct { + ctx context.Context + config *SyncConfig + blockStatusLock sync.Mutex + blockStatus []Block_status + currentDirtyID uint64 +} + +type Block_status struct { + UpdatingID uint64 + CurrentID uint64 + CurrentHash [sha256.Size]byte +} + +func NewSyncer(ctx context.Context, sinfo *SyncConfig) *Syncer { + numBlocks := (sinfo.Tracker.Size() + uint64(sinfo.BlockSize) - 1) / uint64(sinfo.BlockSize) + + status := make([]Block_status, numBlocks) + for b := 0; b < int(numBlocks); b++ { + status[b] = Block_status{ + UpdatingID: 0, + CurrentID: 0, + CurrentHash: [32]byte{}, + } + } + + return &Syncer{ + ctx: ctx, + config: sinfo, + blockStatus: status, + currentDirtyID: 0, + } +} + +/** + * Get a list of blocks that are safe (On the destination) + * NB This does not include the very latest dirty list, but it's a good starting point. + */ +func (s *Syncer) GetSafeBlockMap() map[uint][sha256.Size]byte { + blocks := make(map[uint][sha256.Size]byte, 0) + + for b, status := range s.blockStatus { + if status.CurrentID == status.UpdatingID { + blocks[uint(b)] = status.CurrentHash + } + } + return blocks +} + +/** + * + * + */ +func (s *Syncer) Sync(syncAllFirst bool, continuous bool) (*MigrationProgress, error) { + conf := NewMigratorConfig().WithBlockSize(s.config.BlockSize) + conf.Locker_handler = func() { + if s.config.LockerHandler != nil { + s.config.LockerHandler() + } else { + s.config.Lockable.Lock() + } + } + conf.Unlocker_handler = func() { + if s.config.UnlockerHandler != nil { + s.config.UnlockerHandler() + } else { + s.config.Lockable.Unlock() + } + } + conf.Concurrency = map[int]int{ + storage.BlockTypeAny: 16, + } + if s.config.Concurrency != nil { + conf.Concurrency = s.config.Concurrency + } + + conf.Integrity = s.config.Integrity + conf.Cancel_writes = s.config.CancelWrites + conf.Dedupe_writes = s.config.DedupeWrites + + conf.Progress_handler = func(p *MigrationProgress) { + if s.config.Logger != nil { + s.config.Logger.Info(). + Str("name", s.config.Name). + Float64("migrated_blocks_perc", p.Migrated_blocks_perc). + Int("ready_blocks", p.Ready_blocks). + Int("total_blocks", p.Total_blocks). + Float64("ready_blocks_perc", p.Ready_blocks_perc). + Int("active_blocks", p.Active_blocks). + Int("total_migrated_blocks", p.Total_Migrated_blocks). + Int("total_canceled_blocks", p.Total_Canceled_blocks). + Int("total_duplicated_blocks", p.Total_Duplicated_blocks). + Msg("Continuous sync progress") + } + if s.config.ProgressHandler != nil { + s.config.ProgressHandler(p) + } + } + conf.Error_handler = func(b *storage.BlockInfo, err error) { + if s.config.Logger != nil { + s.config.Logger.Error(). + Str("name", s.config.Name). + Err(err). + Int("block", b.Block). + Int("type", b.Type). + Msg("Continuous sync error") + } + if s.config.ErrorHandler != nil { + s.config.ErrorHandler(b, err) + } + } + + // When a block is written, update block_status with the largest ID for that block + conf.Block_handler = func(b *storage.BlockInfo, id uint64, data []byte) { + hash := sha256.Sum256(data) + + s.blockStatusLock.Lock() + if id > s.blockStatus[b.Block].CurrentID { + s.blockStatus[b.Block].CurrentID = id + s.blockStatus[b.Block].CurrentHash = hash + } + s.blockStatusLock.Unlock() + } + + mig, err := NewMigrator(s.config.Tracker, s.config.Destination, s.config.Orderer, conf) + if err != nil { + return nil, err + } + + numBlocks := (s.config.Tracker.Size() + uint64(s.config.BlockSize) - 1) / uint64(s.config.BlockSize) + + if syncAllFirst { + // Now do the initial migration... + err = mig.Migrate(int(numBlocks)) + if err != nil { + return nil, err + } + + // Wait for completion. + err = mig.WaitForCompletion() + if err != nil { + return nil, err + } + + if s.config.HashesHandler != nil { + hashes := mig.GetHashes() + s.config.HashesHandler(hashes) + } + } else { + // We don't need to do an initial migration. + for b := 0; b < int(numBlocks); b++ { + mig.SetMigratedBlock(b) + } + // Track changes for everything. + s.config.Tracker.TrackAt(0, int64(s.config.Tracker.Size())) + } + + // Now enter a loop looking for more dirty blocks to migrate... + + for { + select { + case <-s.ctx.Done(): + // Context has been cancelled. We should wait for any pending migrations to complete + err = mig.WaitForCompletion() + if err != nil { + return mig.Status(), err + } + return mig.Status(), s.ctx.Err() + default: + } + blocks := mig.GetLatestDirtyFunc(s.config.DirtyBlockGetter) + + if blocks != nil { + id := atomic.AddUint64(&s.currentDirtyID, 1) + // Update block_updates with the new ID for these blocks + s.blockStatusLock.Lock() + for _, b := range blocks { + s.blockStatus[b].UpdatingID = id + } + s.blockStatusLock.Unlock() + err = mig.MigrateDirtyWithId(blocks, id) + if err != nil { + return mig.Status(), err + } + } else { + if !continuous { + // We are done! Everything is synced, and the source is locked. + err = mig.WaitForCompletion() + return mig.Status(), err + } + mig.Unlock() + } + time.Sleep(s.config.DirtyCheckPeriod) + } +} diff --git a/pkg/storage/migrator/sync_test.go b/pkg/storage/migrator/sync_test.go new file mode 100644 index 0000000..7122176 --- /dev/null +++ b/pkg/storage/migrator/sync_test.go @@ -0,0 +1,176 @@ +package migrator + +import ( + "context" + crand "crypto/rand" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/loopholelabs/silo/pkg/storage" + "github.com/loopholelabs/silo/pkg/storage/blocks" + "github.com/loopholelabs/silo/pkg/storage/dirtytracker" + "github.com/loopholelabs/silo/pkg/storage/modules" + "github.com/loopholelabs/silo/pkg/storage/sources" + "github.com/loopholelabs/silo/pkg/storage/volatilitymonitor" + "github.com/loopholelabs/silo/pkg/testutils" + "github.com/stretchr/testify/assert" +) + +func TestSyncToS3(t *testing.T) { + PORT_9000 := testutils.SetupMinio(t.Cleanup) + + size := 1024 * 1024 + blockSize := 4096 + numBlocks := (size + blockSize - 1) / blockSize + + // First we setup some local storage + sourceStorageMem := sources.NewMemoryStorage(size) + sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(sourceStorageMem, blockSize) + sourceMonitor := volatilitymonitor.NewVolatilityMonitor(sourceDirtyLocal, blockSize, 2*time.Second) + sourceStorage := modules.NewLockable(sourceMonitor) + + // Set up some data here. + buffer := make([]byte, size) + _, err := crand.Read(buffer) + assert.NoError(t, err) + + n, err := sourceStorage.WriteAt(buffer, 0) + assert.NoError(t, err) + assert.Equal(t, len(buffer), n) + + // Periodically write to sourceStorage so it is dirty + ctx, cancelWrites := context.WithCancel(context.TODO()) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + } + o := rand.Intn(size) + v := rand.Intn(256) + b := make([]byte, 1) + b[0] = byte(v) + n, err := sourceStorage.WriteAt(b, int64(o)) + assert.NoError(t, err) + assert.Equal(t, 1, n) + + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + } + }() + + // Start monitoring blocks, and wait a bit... + orderer := blocks.NewPriorityBlockOrder(numBlocks, sourceMonitor) + orderer.AddAll() + + // START moving data from sourceStorage to destStorage + destStorage, err := sources.NewS3StorageCreate(fmt.Sprintf("localhost:%s", PORT_9000), "silosilo", "silosilo", "silosilo", "file", uint64(size), blockSize) + + assert.NoError(t, err) + + syncConfig := &SyncConfig{ + Name: "sync_s3", + Integrity: false, + CancelWrites: true, + DedupeWrites: true, + Tracker: sourceDirtyRemote, + Lockable: sourceStorage, + Destination: destStorage, + Orderer: orderer, + DirtyCheckPeriod: 1 * time.Second, + DirtyBlockGetter: func() []uint { + b := sourceDirtyRemote.Sync() + return b.Collect(0, b.Length()) + + // Use some basic params here for getting dirty blocks + // return sourceDirtyRemote.GetDirtyBlocks(1*time.Second, 16, 10, 4) + }, + BlockSize: blockSize, + ProgressHandler: func(p *MigrationProgress) { + // Don't need to do anything here... + }, + ErrorHandler: func(b *storage.BlockInfo, err error) { + assert.Fail(t, fmt.Sprintf("Error migrating block %d: %v", b.Block, err)) + }, + } + + // Stop writing in a bit, and we should catch up sync + time.AfterFunc(time.Second, cancelWrites) + + // Sync the data for a bit + syncer := NewSyncer(context.TODO(), syncConfig) + _, err = syncer.Sync(true, false) + assert.NoError(t, err) + + // This will end with migration completed, and consumer Locked. + eq, err := storage.Equals(sourceStorageMem, destStorage, blockSize) + assert.NoError(t, err) + assert.True(t, eq) + + assert.True(t, sourceStorage.IsLocked()) +} + +func TestSyncSimple(t *testing.T) { + size := 1024 * 1024 + blockSize := 4096 + numBlocks := (size + blockSize - 1) / blockSize + + sourceStorageMem := sources.NewMemoryStorage(size) + sourceDirtyLocal, sourceDirtyRemote := dirtytracker.NewDirtyTracker(sourceStorageMem, blockSize) + sourceStorage := modules.NewLockable(sourceDirtyLocal) + + // Set up some data here. + buffer := make([]byte, size) + _, err := crand.Read(buffer) + assert.NoError(t, err) + + n, err := sourceStorage.WriteAt(buffer, 0) + assert.NoError(t, err) + assert.Equal(t, len(buffer), n) + + orderer := blocks.NewAnyBlockOrder(numBlocks, nil) + orderer.AddAll() + + // START moving data from sourceStorage to destStorage + + destStorage := sources.NewMemoryStorage(size) + + // Use sync + syncConfig := &SyncConfig{ + Name: "simple", + Integrity: false, + CancelWrites: true, + DedupeWrites: true, + Tracker: sourceDirtyRemote, + Lockable: sourceStorage, + Destination: destStorage, + Orderer: orderer, + DirtyCheckPeriod: 1 * time.Second, + DirtyBlockGetter: func() []uint { + b := sourceDirtyRemote.Sync() + return b.Collect(0, b.Length()) + + // Use some basic params here for getting dirty blocks + // return sourceDirtyRemote.GetDirtyBlocks(1*time.Second, 16, 10, 4) + }, + BlockSize: blockSize, + ProgressHandler: func(p *MigrationProgress) { + // Don't need to do anything here... + }, + ErrorHandler: func(b *storage.BlockInfo, err error) { + assert.Fail(t, fmt.Sprintf("Error migrating block %d: %v", b.Block, err)) + }, + } + + // Sync the data for a bit + syncer := NewSyncer(context.TODO(), syncConfig) + _, err = syncer.Sync(true, false) + assert.NoError(t, err) + + // This will end with migration completed, and consumer Locked. + eq, err := storage.Equals(sourceStorageMem, destStorage, blockSize) + assert.NoError(t, err) + assert.True(t, eq) +}