Skip to content

Commit

Permalink
Add estimated row batch size in bytes to state
Browse files Browse the repository at this point in the history
  • Loading branch information
floriecai committed Jan 13, 2021
1 parent 837fe03 commit 3fdea62
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 14 deletions.
2 changes: 1 addition & 1 deletion batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
// Note that the state tracker expects us the track based on the original
// database and table names as opposed to the target ones.
if w.StateTracker != nil {
w.StateTracker.UpdateLastSuccessfulPaginationKey(batch.TableSchema().String(), endPaginationKeypos, uint64(batch.Size()))
w.StateTracker.UpdateLastSuccessfulPaginationKey(batch.TableSchema().String(), endPaginationKeypos, uint64(batch.Size()), batch.EstimateByteSize())
}

return nil
Expand Down
2 changes: 0 additions & 2 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ type Cursor struct {

paginationKeyColumn *schema.TableColumn
lastSuccessfulPaginationKey uint64
rowsExamined uint64
logger *logrus.Entry
}

Expand Down Expand Up @@ -142,7 +141,6 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
tx.Rollback()

c.lastSuccessfulPaginationKey = paginationKeypos
c.rowsExamined += uint64(batch.Size())
}

return nil
Expand Down
7 changes: 4 additions & 3 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ func (f *Ferry) Progress() *Progress {
serializedState := f.StateTracker.Serialize(nil, nil)
// Note the below will not necessarily be synchronized with serializedState.
// This is fine as we don't need to be super precise with performance data.
rowsWrittenPerTable := f.StateTracker.RowsWrittenPerTable()
rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable()

s.Tables = make(map[string]TableProgress)
targetPaginationKeys := make(map[string]uint64)
Expand Down Expand Up @@ -876,13 +876,14 @@ func (f *Ferry) Progress() *Progress {
currentAction = TableActionWaiting
}

rowsWritten, _ := rowsWrittenPerTable[tableName]
rowWrittenStats, _ := rowStatsWrittenPerTable[tableName]

s.Tables[tableName] = TableProgress{
LastSuccessfulPaginationKey: lastSuccessfulPaginationKey,
TargetPaginationKey: targetPaginationKeys[tableName],
CurrentAction: currentAction,
RowsWritten: rowsWritten,
RowsWritten: rowWrittenStats.NumRows,
BytesWritten: rowWrittenStats.NumBytes,
}
}

Expand Down
1 change: 1 addition & 0 deletions progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type TableProgress struct {
TargetPaginationKey uint64
CurrentAction string // Possible values are defined via the constants TableAction*
RowsWritten uint64
BytesWritten uint64
}

type Progress struct {
Expand Down
14 changes: 14 additions & 0 deletions row_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ghostferry

import (
"strings"
"encoding/json"
)

type RowBatch struct {
Expand All @@ -23,6 +24,19 @@ func (e *RowBatch) Values() []RowData {
return e.values
}

func (e *RowBatch) EstimateByteSize() uint64 {
var total int
for _, v := range e.values {
size, err := json.Marshal(v)
if err != nil {
continue
}
total += len(size)
}

return uint64(total)
}

func (e *RowBatch) PaginationKeyIndex() int {
return e.paginationKeyIndex
}
Expand Down
24 changes: 16 additions & 8 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func newSpeedLogRing(speedLogCount int) *ring.Ring {
return speedLog
}

type RowStats struct {
NumRows uint64
NumBytes uint64
}
type StateTracker struct {
BinlogRWMutex *sync.RWMutex
CopyRWMutex *sync.RWMutex
Expand All @@ -92,8 +96,8 @@ type StateTracker struct {

// TODO: Performance tracking should be refactored out of the state tracker,
// as it confuses the focus of this struct.
iterationSpeedLog *ring.Ring
rowsWrittenPerTable map[string]uint64
iterationSpeedLog *ring.Ring
rowStatsWrittenPerTable map[string]RowStats
}

func NewStateTracker(speedLogCount int) *StateTracker {
Expand All @@ -104,7 +108,7 @@ func NewStateTracker(speedLogCount int) *StateTracker {
lastSuccessfulPaginationKeys: make(map[string]uint64),
completedTables: make(map[string]bool),
iterationSpeedLog: newSpeedLogRing(speedLogCount),
rowsWrittenPerTable: make(map[string]uint64),
rowStatsWrittenPerTable: make(map[string]RowStats),
}
}

Expand Down Expand Up @@ -141,7 +145,7 @@ func (s *StateTracker) UpdateLastResumableBinlogPositionForTargetVerifier(pos my
s.lastStoredBinlogPositionForTargetVerifier = pos
}

func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginationKey uint64, rowsWrittenForThisBatch uint64) {
func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginationKey uint64, rowsWrittenForThisBatch uint64, bytesWrittenForThisBatch uint64) {
s.CopyRWMutex.Lock()
defer s.CopyRWMutex.Unlock()

Expand All @@ -152,17 +156,21 @@ func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginatio
// hopefully will motivate us to fix it by refactoring the state tracker a bit
// in the future. Namely, the tracking of performance metrics and the tracking
// of pagination key locations should be done more separately than it is now.
s.rowsWrittenPerTable[table] += rowsWrittenForThisBatch
prev := s.rowStatsWrittenPerTable[table]
rowsTotal := prev.NumRows + rowsWrittenForThisBatch
bytesTotal := prev.NumBytes + bytesWrittenForThisBatch

s.rowStatsWrittenPerTable[table] = RowStats{NumRows: rowsTotal, NumBytes: bytesTotal}

s.updateSpeedLog(deltaPaginationKey)
}

func (s *StateTracker) RowsWrittenPerTable() map[string]uint64 {
func (s *StateTracker) RowStatsWrittenPerTable() map[string]RowStats {
s.CopyRWMutex.RLock()
defer s.CopyRWMutex.RUnlock()

d := make(map[string]uint64)
for k, v := range s.rowsWrittenPerTable {
d := make(map[string]RowStats)
for k, v := range s.rowStatsWrittenPerTable {
d[k] = v
}

Expand Down
3 changes: 3 additions & 0 deletions test/integration/callbacks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def test_progress_callback
assert count > 0, "There should be some rows on the target, not 0."
assert_equal count, progress.last["Tables"]["gftest.test_table_1"]["RowsWritten"]

# data column is 32 characters so each row should be at least 32 bytes
assert count * 32 < progress.last["Tables"]["gftest.test_table_1"]["BytesWritten"], "Each row should have more than 32 bytes"

assert_equal 0, progress.last["ActiveDataIterators"]

refute progress.last["LastSuccessfulBinlogPos"]["Name"].nil?
Expand Down

0 comments on commit 3fdea62

Please sign in to comment.