Skip to content

Commit

Permalink
Jamesmoore/arch 144 create a sync method in silo to handle some of th…
Browse files Browse the repository at this point in the history
…e heavy (#38)

* Implementation of sync

Signed-off-by: Jimmy Moore <[email protected]>

* Moved from defer to explicit

Signed-off-by: Jimmy Moore <[email protected]>

* block_fn now called for non-dirty Migrate

Signed-off-by: Jimmy Moore <[email protected]>

* Switched sync to loopholelabs logger

Signed-off-by: Jimmy Moore <[email protected]>

* Fixed some lint in sync/sync_test

Signed-off-by: Jimmy Moore <[email protected]>

---------

Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod authored Oct 24, 2024
1 parent 4b1cc14 commit c154618
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 13 deletions.
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
module github.com/loopholelabs/silo

go 1.21
go 1.22

toolchain go1.22.2
toolchain go1.22.4

require (
github.com/Merovius/nbd v0.0.0-20231017152624-27b78b60d8da
github.com/fatih/color v1.17.0
github.com/google/uuid v1.6.0
github.com/hashicorp/hcl/v2 v2.21.0
github.com/minio/minio-go/v7 v7.0.77
github.com/ory/dockertest/v3 v3.10.0
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
github.com/vbauerster/mpb/v8 v8.7.5
Expand Down Expand Up @@ -37,13 +39,13 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/loopholelabs/logging v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7bEZ9Su8=
github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
Expand Down Expand Up @@ -48,6 +49,7 @@ github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68=
github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -82,9 +84,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2 h1:hRGSmZu7j271trc9sneMrpOW7GN5ngLm8YUZIPzf394=
github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/loopholelabs/logging v0.3.1 h1:VA9DF3WrbmvJC1uQJ/XcWgz8KWXydWwe3BdDiMbN2FY=
github.com/loopholelabs/logging v0.3.1/go.mod h1:uRDUydiqPqKbZkb0WoQ3dfyAcJ2iOMhxdEafZssLVv0=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
Expand Down Expand Up @@ -125,8 +130,11 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down Expand Up @@ -184,6 +192,7 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
62 changes: 52 additions & 10 deletions pkg/storage/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type MigratorConfig struct {
Unlocker_handler func()
Error_handler func(b *storage.BlockInfo, err error)
Progress_handler func(p *MigrationProgress)
Block_handler func(b *storage.BlockInfo, id uint64, block []byte)
Concurrency map[int]int
Integrity bool
Cancel_writes bool
Expand All @@ -34,6 +35,7 @@ func NewMigratorConfig() *MigratorConfig {
Unlocker_handler: func() {},
Error_handler: func(b *storage.BlockInfo, err error) {},
Progress_handler: func(p *MigrationProgress) {},
Block_handler: func(b *storage.BlockInfo, id uint64, data []byte) {},
Concurrency: map[int]int{
storage.BlockTypeAny: 32,
storage.BlockTypeStandard: 32,
Expand Down Expand Up @@ -73,6 +75,7 @@ type Migrator struct {
source_unlock_fn func()
error_fn func(block *storage.BlockInfo, err error)
progress_fn func(*MigrationProgress)
block_fn func(block *storage.BlockInfo, id uint64, data []byte)
progress_lock sync.Mutex
progress_last time.Time
progress_last_status *MigrationProgress
Expand Down Expand Up @@ -110,6 +113,7 @@ func NewMigrator(source storage.TrackingStorageProvider,
source_unlock_fn: config.Unlocker_handler,
error_fn: config.Error_handler,
progress_fn: config.Progress_handler,
block_fn: config.Block_handler,
block_size: config.Block_size,
num_blocks: num_blocks,
metric_blocks_migrated: 0,
Expand Down Expand Up @@ -189,9 +193,11 @@ func (m *Migrator) Migrate(num_blocks int) error {
m.wg.Add(1)

go func(block_no *storage.BlockInfo) {
err := m.migrateBlock(block_no.Block)
data, err := m.migrateBlock(block_no.Block)
if err != nil {
m.error_fn(block_no, err)
} else {
m.block_fn(block_no, 0, data)
}

m.wg.Done()
Expand Down Expand Up @@ -241,6 +247,14 @@ func (m *Migrator) Unlock() {
* An attempt is made to cancel any existing writes for the blocks first.
*/
func (m *Migrator) MigrateDirty(blocks []uint) error {
return m.MigrateDirtyWithId(blocks, 0)
}

/**
*
* You can give a tracking ID which will turn up at block_fn on success
*/
func (m *Migrator) MigrateDirtyWithId(blocks []uint, tid uint64) error {
for _, pos := range blocks {
i := &storage.BlockInfo{Block: int(pos), Type: storage.BlockTypeDirty}

Expand Down Expand Up @@ -271,10 +285,12 @@ func (m *Migrator) MigrateDirty(blocks []uint) error {

m.clean_blocks.ClearBit(int(pos))

go func(block_no *storage.BlockInfo) {
err := m.migrateBlock(block_no.Block)
go func(block_no *storage.BlockInfo, track_id uint64) {
data, err := m.migrateBlock(block_no.Block)
if err != nil {
m.error_fn(block_no, err)
} else {
m.block_fn(block_no, track_id, data)
}

m.wg.Done()
Expand All @@ -283,7 +299,7 @@ func (m *Migrator) MigrateDirty(blocks []uint) error {
cc = m.concurrency[storage.BlockTypeAny]
}
<-cc
}(i)
}(i, tid)

}
return nil
Expand Down Expand Up @@ -337,11 +353,38 @@ func (m *Migrator) reportProgress(forced bool) {
m.progress_fn(m.progress_last_status)
}

/**
* Get overall status of the migration
*
*/
func (m *Migrator) Status() *MigrationProgress {
m.progress_lock.Lock()
defer m.progress_lock.Unlock()

migrated := m.migrated_blocks.Count(0, uint(m.num_blocks))
perc_mig := float64(migrated*100) / float64(m.num_blocks)

completed := m.clean_blocks.Count(0, uint(m.num_blocks))
perc_complete := float64(completed*100) / float64(m.num_blocks)

return &MigrationProgress{
Total_blocks: m.num_blocks,
Migrated_blocks: migrated,
Migrated_blocks_perc: perc_mig,
Ready_blocks: completed,
Ready_blocks_perc: perc_complete,
Active_blocks: m.moving_blocks.Count(0, uint(m.num_blocks)),
Total_Canceled_blocks: int(atomic.LoadInt64(&m.metric_blocks_canceled)),
Total_Migrated_blocks: int(atomic.LoadInt64(&m.metric_blocks_migrated)),
Total_Duplicated_blocks: int(atomic.LoadInt64(&m.metric_blocks_duplicates)),
}
}

/**
* Migrate a single block to dest
*
*/
func (m *Migrator) migrateBlock(block int) error {
func (m *Migrator) migrateBlock(block int) ([]byte, error) {
m.block_locks[block].Lock()
defer m.block_locks[block].Unlock()

Expand All @@ -354,16 +397,14 @@ func (m *Migrator) migrateBlock(block int) error {
// Read from source
n, err := m.source_tracker.ReadAt(buff, int64(offset))
if err != nil {
return err
return nil, err
}

var idmap map[uint64]uint64
if m.source_mapped != nil {
idmap = m.source_mapped.GetMapForSourceRange(int64(offset), m.block_size)
}

// TODO: Need to figure out how we want to route the WriteAtWithMap here...

// If it was a partial read, truncate
buff = buff[:n]

Expand All @@ -373,11 +414,12 @@ func (m *Migrator) migrateBlock(block int) error {
// Now write it to destStorage
n, err = m.dest.WriteAt(buff, int64(offset))
}

if n != len(buff) || err != nil {
if errors.Is(err, context.Canceled) {
atomic.AddInt64(&m.metric_blocks_canceled, 1)
}
return err
return nil, err
}

// Set the last successful write for this block
Expand All @@ -394,5 +436,5 @@ func (m *Migrator) migrateBlock(block int) error {
m.clean_blocks.SetBit(block)

m.reportProgress(false)
return nil
return buff, nil
}
Loading

0 comments on commit c154618

Please sign in to comment.