diff --git a/data_iterator.go b/data_iterator.go index ffead901..362d99c4 100644 --- a/data_iterator.go +++ b/data_iterator.go @@ -2,10 +2,11 @@ package ghostferry import ( "fmt" - sql "github.com/Shopify/ghostferry/sqlwrapper" "math" "sync" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/sirupsen/logrus" ) @@ -17,6 +18,7 @@ type DataIterator struct { ErrorHandler ErrorHandler CursorConfig *CursorConfig StateTracker *StateTracker + TableSorter DataIteratorSorter targetPaginationKeys *sync.Map batchListeners []func(*RowBatch) error @@ -24,6 +26,11 @@ type DataIterator struct { logger *logrus.Entry } +type TableMaxPaginationKey struct { + Table *TableSchema + MaxPaginationKey uint64 +} + func (d *DataIterator) Run(tables []*TableSchema) { d.logger = logrus.WithField("tag", "data_iterator") d.targetPaginationKeys = &sync.Map{} @@ -52,7 +59,7 @@ func (d *DataIterator) Run(tables []*TableSchema) { // We don't need to reiterate those tables as it has already been done. delete(tablesWithData, table) } else { - d.targetPaginationKeys.Store(table.String(), maxPaginationKey) + d.targetPaginationKeys.Store(tableName, maxPaginationKey) } } @@ -159,17 +166,26 @@ func (d *DataIterator) Run(tables []*TableSchema) { }() } - i := 0 + sorter := MaxTableSizeSorter{DataIterator: d} + sortedTableData, err := sorter.Sort(tablesWithData) + + if err != nil { + d.logger.WithError(err).Error("failed to retrieve sorted tables") + d.ErrorHandler.Fatal("data_iterator", err) + return + } + loggingIncrement := len(tablesWithData) / 50 if loggingIncrement == 0 { loggingIncrement = 1 } - for table, _ := range tablesWithData { - tablesQueue <- table + i := 0 + for _, tableData := range sortedTableData { + tablesQueue <- tableData.Table i++ if i%loggingIncrement == 0 { - d.logger.WithField("table", table.String()).Infof("queued table for processing (%d/%d)", i, len(tablesWithData)) + d.logger.WithField("table", tableData.Table.String()).Infof("queued table for processing (%d/%d)", i, len(sortedTableData)) } } diff --git a/data_iterator_sorter.go b/data_iterator_sorter.go new file mode 100644 index 00000000..16f1d365 --- /dev/null +++ b/data_iterator_sorter.go @@ -0,0 +1,91 @@ +package ghostferry + +import ( + "fmt" + "sort" + "strings" +) + +// DataIteratorSorter is an interface for the DataIterator to choose which order it will process table +type DataIteratorSorter interface { + Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error) +} + +// MaxPaginationKeySorter arranges table based on the MaxPaginationKey in DESC order +type MaxPaginationKeySorter struct {} + +func (s *MaxPaginationKeySorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error) { + orderedTables := make([]TableMaxPaginationKey, len(unorderedTables)) + i := 0 + + for k, v := range unorderedTables { + orderedTables[i] = TableMaxPaginationKey{k, v} + i++ + } + + sort.Slice(orderedTables, func(i, j int) bool { + return orderedTables[i].MaxPaginationKey > orderedTables[j].MaxPaginationKey + }) + + return orderedTables, nil +} +// MaxTableSizeSorter uses `information_schema.tables` to estimate the size of the DB and sorts tables in DESC order +type MaxTableSizeSorter struct { + DataIterator *DataIterator +} + +func (s *MaxTableSizeSorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error) { + orderedTables := []TableMaxPaginationKey{} + tableNames := []string{} + databaseSchemasSet := map[string]struct{}{} + databaseSchemas := []string{} + + for tableSchema, maxPK := range unorderedTables { + orderedTables = append(orderedTables, TableMaxPaginationKey{tableSchema, maxPK}) + tableNames = append(tableNames, tableSchema.Name) + + if _, exists := databaseSchemasSet[tableSchema.Schema]; !exists { + databaseSchemasSet[tableSchema.Schema] = struct{}{} + databaseSchemas = append(databaseSchemas, tableSchema.Schema) + } + } + + query := fmt.Sprintf(` + SELECT TABLE_NAME, TABLE_SCHEMA + FROM information_schema.tables + WHERE TABLE_SCHEMA IN ("%s") + AND TABLE_NAME IN ("%s") + ORDER BY DATA_LENGTH DESC`, + strings.Join(databaseSchemas, `", "`), + strings.Join(tableNames, `", "`), + ) + rows, err := s.DataIterator.DB.Query(query) + + if err != nil { + return orderedTables, err + } + + defer rows.Close() + + databaseOrder := make(map[string]int, len(unorderedTables)) + i := 0 + for rows.Next() { + var tableName, schemaName string + err = rows.Scan(&tableName, &schemaName) + + if err != nil { + return orderedTables, err + } + + databaseOrder[fmt.Sprintf("%s %s", tableName, schemaName)] = i + i++ + } + + sort.Slice(orderedTables, func(i, j int) bool { + iName := fmt.Sprintf("%s %s", orderedTables[i].Table.Name, orderedTables[i].Table.Schema) + jName := fmt.Sprintf("%s %s", orderedTables[j].Table.Name, orderedTables[j].Table.Schema) + return databaseOrder[iName] < databaseOrder[jName] + }) + + return orderedTables, nil +} diff --git a/test/go/data_iterator_sorter_test.go b/test/go/data_iterator_sorter_test.go new file mode 100644 index 00000000..78199285 --- /dev/null +++ b/test/go/data_iterator_sorter_test.go @@ -0,0 +1,129 @@ +package test + +import ( + "fmt" + "sort" + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/testhelpers" +) + +const ( + TestDB1 = "gftest2" + TestDB2 = "gftest3" + TestDB3 = "gftest4" + TestTableDB1 = "test_db_2" + TestTableDB2 = "test_db_3" + TestTableDB3 = "test_db_4" +) + +var TestDBs = []string{TestDB1, TestDB2, TestDB3} + +var DBTableMap = map[string]string{ + TestDB1: TestTableDB1, + TestDB2: TestTableDB2, + TestDB3: TestTableDB3, +} + +type DataIteratorSorterTestSuite struct { + *testhelpers.GhostferryUnitTestSuite + + unsortedTables map[*ghostferry.TableSchema]uint64 + dataIterator *ghostferry.DataIterator +} + +func (t *DataIteratorSorterTestSuite) SetupTest() { + t.GhostferryUnitTestSuite.SetupTest() + testhelpers.SeedInitialData(t.Ferry.SourceDB, TestDB1, TestTableDB1, 300) + testhelpers.SeedInitialData(t.Ferry.SourceDB, TestDB2, TestTableDB2, 1) + testhelpers.SeedInitialData(t.Ferry.SourceDB, TestDB3, TestTableDB3, 500) + + tableFilter := &testhelpers.TestTableFilter{ + DbsFunc: testhelpers.DbApplicabilityFilter(TestDBs), + TablesFunc: nil, + } + tables, _ := ghostferry.LoadTables(t.Ferry.SourceDB, tableFilter, nil, nil, nil, nil) + + t.unsortedTables = make(map[*ghostferry.TableSchema]uint64, len(tables)) + i := 0 + for _,f := range tables.AsSlice() { + maxPaginationKey := uint64(100_000 - i) + t.unsortedTables[f] = maxPaginationKey + i++ + } + + t.dataIterator = &ghostferry.DataIterator{ + DB: t.Ferry.SourceDB, + ErrorHandler: t.Ferry.ErrorHandler, + } +} + +func (t *DataIteratorSorterTestSuite) TearDownTest() { + for _, db := range TestDBs { + _, err := t.Ferry.SourceDB.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", db)) + t.Require().Nil(err) + } +} + +func (t *DataIteratorSorterTestSuite) TestOrderMaxPaginationKeys() { + sorter := ghostferry.MaxPaginationKeySorter{} + + sortedTables, err := sorter.Sort(t.unsortedTables) + + if err != nil { + t.Fail("Could not sort tables for data iterator using MaxPaginationKeySorter") + } + + expectedTables := make([]ghostferry.TableMaxPaginationKey, len(sortedTables)) + copy(expectedTables, sortedTables) + + sort.Slice(expectedTables, func(i, j int) bool { + return sortedTables[i].MaxPaginationKey > sortedTables[j].MaxPaginationKey + }) + + t.Require().Equal(len(t.unsortedTables), len(sortedTables)) + t.Require().EqualValues(expectedTables, sortedTables) + +} + +func (t *DataIteratorSorterTestSuite) TestOrderByInformationSchemaTableSize() { + + // information_schemas.table does not update automatically on every write + // ANALYZE TABLE will trigger an update so we can get the latest db sizes + for schema, table := range DBTableMap { + _, err := t.Ferry.SourceDB.Exec(fmt.Sprintf("USE %s", schema)) + + if err != nil { + t.Fail("Could not Update information_schemas.tables to get latest table sizes") + } + _, err = t.Ferry.SourceDB.Exec(fmt.Sprintf("OPTIMIZE TABLE `%s`", table)) + if err != nil { + t.Fail("Could not Update information_schemas.tables to get latest table sizes") + } + } + + sorter := ghostferry.MaxTableSizeSorter{DataIterator: t.Ferry.DataIterator} + + sortedTables, err := sorter.Sort(t.unsortedTables) + + if err != nil { + t.Fail("Could not sort tables for data iterator using MaxTableSizeSorter") + } + + t.Require().Equal(len(t.unsortedTables), 3) + + table1 := sortedTables[0].Table + table2 := sortedTables[1].Table + table3 := sortedTables[2].Table + t.Require().Equal("gftest4 test_db_4", fmt.Sprintf("%s %s", table1.Schema, table1.Name)) + t.Require().Equal("gftest2 test_db_2", fmt.Sprintf("%s %s", table2.Schema, table2.Name)) + t.Require().Equal("gftest3 test_db_3", fmt.Sprintf("%s %s", table3.Schema, table3.Name)) +} + +func TestDataIteratorSorterTestSuite(t *testing.T) { + testhelpers.SetupTest() + suite.Run(t, &DataIteratorSorterTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}}) +} diff --git a/vendor/github.com/sirupsen/logrus/go.mod b/vendor/github.com/sirupsen/logrus/go.mod index 8261a2b3..b71dd42e 100644 --- a/vendor/github.com/sirupsen/logrus/go.mod +++ b/vendor/github.com/sirupsen/logrus/go.mod @@ -1,5 +1,7 @@ module github.com/sirupsen/logrus +go 1.14 + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.1