Skip to content

Commit

Permalink
Merge pull request #206 from Shopify/order-by-table-size
Browse files Browse the repository at this point in the history
Run DataIterator over tables in descending order of pagination key
  • Loading branch information
floriecai authored Jan 12, 2021
2 parents 006d45d + 5ff853d commit 837fe03
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 6 deletions.
28 changes: 22 additions & 6 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package ghostferry

import (
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"math"
"sync"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/sirupsen/logrus"
)

Expand All @@ -17,13 +18,19 @@ type DataIterator struct {
ErrorHandler ErrorHandler
CursorConfig *CursorConfig
StateTracker *StateTracker
TableSorter DataIteratorSorter

targetPaginationKeys *sync.Map
batchListeners []func(*RowBatch) error
doneListeners []func() error
logger *logrus.Entry
}

type TableMaxPaginationKey struct {
Table *TableSchema
MaxPaginationKey uint64
}

func (d *DataIterator) Run(tables []*TableSchema) {
d.logger = logrus.WithField("tag", "data_iterator")
d.targetPaginationKeys = &sync.Map{}
Expand Down Expand Up @@ -52,7 +59,7 @@ func (d *DataIterator) Run(tables []*TableSchema) {
// We don't need to reiterate those tables as it has already been done.
delete(tablesWithData, table)
} else {
d.targetPaginationKeys.Store(table.String(), maxPaginationKey)
d.targetPaginationKeys.Store(tableName, maxPaginationKey)
}
}

Expand Down Expand Up @@ -159,17 +166,26 @@ func (d *DataIterator) Run(tables []*TableSchema) {
}()
}

i := 0
sorter := MaxTableSizeSorter{DataIterator: d}
sortedTableData, err := sorter.Sort(tablesWithData)

if err != nil {
d.logger.WithError(err).Error("failed to retrieve sorted tables")
d.ErrorHandler.Fatal("data_iterator", err)
return
}

loggingIncrement := len(tablesWithData) / 50
if loggingIncrement == 0 {
loggingIncrement = 1
}

for table, _ := range tablesWithData {
tablesQueue <- table
i := 0
for _, tableData := range sortedTableData {
tablesQueue <- tableData.Table
i++
if i%loggingIncrement == 0 {
d.logger.WithField("table", table.String()).Infof("queued table for processing (%d/%d)", i, len(tablesWithData))
d.logger.WithField("table", tableData.Table.String()).Infof("queued table for processing (%d/%d)", i, len(sortedTableData))
}
}

Expand Down
91 changes: 91 additions & 0 deletions data_iterator_sorter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package ghostferry

import (
"fmt"
"sort"
"strings"
)

// DataIteratorSorter is an interface for the DataIterator to choose which order it will process table
type DataIteratorSorter interface {
Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error)
}

// MaxPaginationKeySorter arranges table based on the MaxPaginationKey in DESC order
type MaxPaginationKeySorter struct {}

func (s *MaxPaginationKeySorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error) {
orderedTables := make([]TableMaxPaginationKey, len(unorderedTables))
i := 0

for k, v := range unorderedTables {
orderedTables[i] = TableMaxPaginationKey{k, v}
i++
}

sort.Slice(orderedTables, func(i, j int) bool {
return orderedTables[i].MaxPaginationKey > orderedTables[j].MaxPaginationKey
})

return orderedTables, nil
}
// MaxTableSizeSorter uses `information_schema.tables` to estimate the size of the DB and sorts tables in DESC order
type MaxTableSizeSorter struct {
DataIterator *DataIterator
}

func (s *MaxTableSizeSorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error) {
orderedTables := []TableMaxPaginationKey{}
tableNames := []string{}
databaseSchemasSet := map[string]struct{}{}
databaseSchemas := []string{}

for tableSchema, maxPK := range unorderedTables {
orderedTables = append(orderedTables, TableMaxPaginationKey{tableSchema, maxPK})
tableNames = append(tableNames, tableSchema.Name)

if _, exists := databaseSchemasSet[tableSchema.Schema]; !exists {
databaseSchemasSet[tableSchema.Schema] = struct{}{}
databaseSchemas = append(databaseSchemas, tableSchema.Schema)
}
}

query := fmt.Sprintf(`
SELECT TABLE_NAME, TABLE_SCHEMA
FROM information_schema.tables
WHERE TABLE_SCHEMA IN ("%s")
AND TABLE_NAME IN ("%s")
ORDER BY DATA_LENGTH DESC`,
strings.Join(databaseSchemas, `", "`),
strings.Join(tableNames, `", "`),
)
rows, err := s.DataIterator.DB.Query(query)

if err != nil {
return orderedTables, err
}

defer rows.Close()

databaseOrder := make(map[string]int, len(unorderedTables))
i := 0
for rows.Next() {
var tableName, schemaName string
err = rows.Scan(&tableName, &schemaName)

if err != nil {
return orderedTables, err
}

databaseOrder[fmt.Sprintf("%s %s", tableName, schemaName)] = i
i++
}

sort.Slice(orderedTables, func(i, j int) bool {
iName := fmt.Sprintf("%s %s", orderedTables[i].Table.Name, orderedTables[i].Table.Schema)
jName := fmt.Sprintf("%s %s", orderedTables[j].Table.Name, orderedTables[j].Table.Schema)
return databaseOrder[iName] < databaseOrder[jName]
})

return orderedTables, nil
}
129 changes: 129 additions & 0 deletions test/go/data_iterator_sorter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package test

import (
"fmt"
"sort"
"testing"

"github.com/stretchr/testify/suite"

"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/testhelpers"
)

const (
TestDB1 = "gftest2"
TestDB2 = "gftest3"
TestDB3 = "gftest4"
TestTableDB1 = "test_db_2"
TestTableDB2 = "test_db_3"
TestTableDB3 = "test_db_4"
)

var TestDBs = []string{TestDB1, TestDB2, TestDB3}

var DBTableMap = map[string]string{
TestDB1: TestTableDB1,
TestDB2: TestTableDB2,
TestDB3: TestTableDB3,
}

type DataIteratorSorterTestSuite struct {
*testhelpers.GhostferryUnitTestSuite

unsortedTables map[*ghostferry.TableSchema]uint64
dataIterator *ghostferry.DataIterator
}

func (t *DataIteratorSorterTestSuite) SetupTest() {
t.GhostferryUnitTestSuite.SetupTest()
testhelpers.SeedInitialData(t.Ferry.SourceDB, TestDB1, TestTableDB1, 300)
testhelpers.SeedInitialData(t.Ferry.SourceDB, TestDB2, TestTableDB2, 1)
testhelpers.SeedInitialData(t.Ferry.SourceDB, TestDB3, TestTableDB3, 500)

tableFilter := &testhelpers.TestTableFilter{
DbsFunc: testhelpers.DbApplicabilityFilter(TestDBs),
TablesFunc: nil,
}
tables, _ := ghostferry.LoadTables(t.Ferry.SourceDB, tableFilter, nil, nil, nil, nil)

t.unsortedTables = make(map[*ghostferry.TableSchema]uint64, len(tables))
i := 0
for _,f := range tables.AsSlice() {
maxPaginationKey := uint64(100_000 - i)
t.unsortedTables[f] = maxPaginationKey
i++
}

t.dataIterator = &ghostferry.DataIterator{
DB: t.Ferry.SourceDB,
ErrorHandler: t.Ferry.ErrorHandler,
}
}

func (t *DataIteratorSorterTestSuite) TearDownTest() {
for _, db := range TestDBs {
_, err := t.Ferry.SourceDB.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", db))
t.Require().Nil(err)
}
}

func (t *DataIteratorSorterTestSuite) TestOrderMaxPaginationKeys() {
sorter := ghostferry.MaxPaginationKeySorter{}

sortedTables, err := sorter.Sort(t.unsortedTables)

if err != nil {
t.Fail("Could not sort tables for data iterator using MaxPaginationKeySorter")
}

expectedTables := make([]ghostferry.TableMaxPaginationKey, len(sortedTables))
copy(expectedTables, sortedTables)

sort.Slice(expectedTables, func(i, j int) bool {
return sortedTables[i].MaxPaginationKey > sortedTables[j].MaxPaginationKey
})

t.Require().Equal(len(t.unsortedTables), len(sortedTables))
t.Require().EqualValues(expectedTables, sortedTables)

}

func (t *DataIteratorSorterTestSuite) TestOrderByInformationSchemaTableSize() {

// information_schemas.table does not update automatically on every write
// ANALYZE TABLE will trigger an update so we can get the latest db sizes
for schema, table := range DBTableMap {
_, err := t.Ferry.SourceDB.Exec(fmt.Sprintf("USE %s", schema))

if err != nil {
t.Fail("Could not Update information_schemas.tables to get latest table sizes")
}
_, err = t.Ferry.SourceDB.Exec(fmt.Sprintf("OPTIMIZE TABLE `%s`", table))
if err != nil {
t.Fail("Could not Update information_schemas.tables to get latest table sizes")
}
}

sorter := ghostferry.MaxTableSizeSorter{DataIterator: t.Ferry.DataIterator}

sortedTables, err := sorter.Sort(t.unsortedTables)

if err != nil {
t.Fail("Could not sort tables for data iterator using MaxTableSizeSorter")
}

t.Require().Equal(len(t.unsortedTables), 3)

table1 := sortedTables[0].Table
table2 := sortedTables[1].Table
table3 := sortedTables[2].Table
t.Require().Equal("gftest4 test_db_4", fmt.Sprintf("%s %s", table1.Schema, table1.Name))
t.Require().Equal("gftest2 test_db_2", fmt.Sprintf("%s %s", table2.Schema, table2.Name))
t.Require().Equal("gftest3 test_db_3", fmt.Sprintf("%s %s", table3.Schema, table3.Name))
}

func TestDataIteratorSorterTestSuite(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &DataIteratorSorterTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
}
2 changes: 2 additions & 0 deletions vendor/github.com/sirupsen/logrus/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 837fe03

Please sign in to comment.