diff --git a/Makefile b/Makefile index 3c88a1ef..e9480578 100644 --- a/Makefile +++ b/Makefile @@ -64,6 +64,7 @@ test-go: fi ulimit -n 1024 && ./bin/gotestsum --format short-verbose ./test/go ./copydb/test ./sharding/test -count 1 -p 1 -failfast + go test -v test-ruby: bundle install diff --git a/inline_verifier.go b/inline_verifier.go index 80d56bab..552c88e6 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -3,8 +3,11 @@ package ghostferry import ( "bytes" "context" + "crypto/md5" + "encoding/hex" "errors" "fmt" + "sort" "strconv" "strings" "sync" @@ -232,10 +235,23 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore { return s.store.Copy() } +type mismatchType string + +const ( + MismatchColumnMissingOnSource mismatchType = "column missing on source" + MismatchColumnMissingOnTarget mismatchType = "column missing on target" + MismatchRowMissingOnSource mismatchType = "row missing on source" + MismatchRowMissingOnTarget mismatchType = "row missing on target" + MismatchColumnValueDifference mismatchType = "column value difference" + MismatchRowChecksumDifference mismatchType = "rows checksum difference" +) + type InlineVerifierMismatches struct { Pk uint64 SourceChecksum string TargetChecksum string + MismatchColumn string + MismatchType mismatchType } type InlineVerifier struct { @@ -432,10 +448,57 @@ func (v *InlineVerifier) VerifyBeforeCutover() error { return nil } +func formatMismatches(mismatches map[string]map[string][]InlineVerifierMismatches) (string, []string) { + // Build error message for display + var messageBuf bytes.Buffer + messageBuf.WriteString("cutover verification failed for: ") + incorrectTables := make([]string, 0) + + for schemaName, _ := range mismatches { + sortedTables := make([]string, 0, len(mismatches[schemaName])) + for tableName, _ := range mismatches[schemaName] { + sortedTables = append(sortedTables, tableName) + } + sort.Strings(sortedTables) + + for _, tableName := range sortedTables { + tableNameWithSchema := fmt.Sprintf("%s.%s", schemaName, tableName) + incorrectTables = append(incorrectTables, tableNameWithSchema) + + messageBuf.WriteString(tableNameWithSchema) + messageBuf.WriteString(" [PKs: ") + for _, mismatch := range mismatches[schemaName][tableName] { + messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10)) + messageBuf.WriteString(" (type: ") + messageBuf.WriteString(string(mismatch.MismatchType)) + if mismatch.SourceChecksum != "" { + messageBuf.WriteString(", source: ") + messageBuf.WriteString(mismatch.SourceChecksum) + } + if mismatch.TargetChecksum != "" { + messageBuf.WriteString(", target: ") + messageBuf.WriteString(mismatch.TargetChecksum) + } + + if mismatch.MismatchColumn != "" { + messageBuf.WriteString(", column: ") + messageBuf.WriteString(mismatch.MismatchColumn) + } + + messageBuf.WriteString(") ") + } + messageBuf.WriteString("] ") + } + } + + return messageBuf.String(), incorrectTables +} + func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) { v.verifyDuringCutoverStarted.Set(true) mismatchFound, mismatches, err := v.verifyAllEventsInStore() + if err != nil { v.logger.WithError(err).Error("failed to VerifyDuringCutover") return VerificationResult{}, err @@ -447,35 +510,13 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) { }, nil } - // Build error message for display - var messageBuf bytes.Buffer - messageBuf.WriteString("cutover verification failed for: ") - incorrectTables := make([]string, 0) - for schemaName, _ := range mismatches { - for tableName, mismatches := range mismatches[schemaName] { - tableName = fmt.Sprintf("%s.%s", schemaName, tableName) - incorrectTables = append(incorrectTables, tableName) - - messageBuf.WriteString(tableName) - messageBuf.WriteString(" [paginationKeys: ") - for _, mismatch := range mismatches { - messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10)) - messageBuf.WriteString(" (source: ") - messageBuf.WriteString(mismatch.SourceChecksum) - messageBuf.WriteString(", target: ") - messageBuf.WriteString(mismatch.TargetChecksum) - messageBuf.WriteString(") ") - } - messageBuf.WriteString("] ") - } - } + message, incorrectTables := formatMismatches(mismatches) - message := messageBuf.String() v.logger.WithField("incorrect_tables", incorrectTables).Error(message) return VerificationResult{ DataCorrect: false, - Message: messageBuf.String(), + Message: message, IncorrectTables: incorrectTables, }, nil } @@ -570,22 +611,27 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin for paginationKey, targetHash := range target { sourceHash, exists := source[paginationKey] - if !bytes.Equal(sourceHash, targetHash) || !exists { + if !exists { + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchRowMissingOnSource, + } + } else if !bytes.Equal(sourceHash, targetHash) { mismatchSet[paginationKey] = InlineVerifierMismatches{ Pk: paginationKey, + MismatchType: MismatchRowChecksumDifference, SourceChecksum: string(sourceHash), TargetChecksum: string(targetHash), } } } - for paginationKey, sourceHash := range source { - targetHash, exists := target[paginationKey] - if !bytes.Equal(sourceHash, targetHash) || !exists { + for paginationKey, _ := range source { + _, exists := target[paginationKey] + if !exists { mismatchSet[paginationKey] = InlineVerifierMismatches{ - Pk: paginationKey, - SourceChecksum: string(sourceHash), - TargetChecksum: string(targetHash), + Pk: paginationKey, + MismatchType: MismatchRowMissingOnTarget, } } } @@ -593,20 +639,40 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin return mismatchSet } -func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]struct{} { - mismatchSet := map[uint64]struct{}{} +func compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]InlineVerifierMismatches { + mismatchSet := map[uint64]InlineVerifierMismatches{} for paginationKey, targetDecompressedColumns := range target { sourceDecompressedColumns, exists := source[paginationKey] if !exists { - mismatchSet[paginationKey] = struct{}{} + // row missing on source + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchRowMissingOnSource, + } continue } for colName, targetData := range targetDecompressedColumns { sourceData, exists := sourceDecompressedColumns[colName] - if !exists || !bytes.Equal(sourceData, targetData) { - mismatchSet[paginationKey] = struct{}{} + if !exists { + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchColumnMissingOnSource, + MismatchColumn: colName, + } + break // no need to compare other columns + } else if !bytes.Equal(sourceData, targetData) { + sourceChecksum := md5.Sum(sourceData) + targetChecksum := md5.Sum(targetData) + + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchColumnValueDifference, + MismatchColumn: colName, + SourceChecksum: hex.EncodeToString(sourceChecksum[:]), + TargetChecksum: hex.EncodeToString(targetChecksum[:]), + } break // no need to compare other columns } } @@ -615,15 +681,22 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s for paginationKey, sourceDecompressedColumns := range source { targetDecompressedColumns, exists := target[paginationKey] if !exists { - mismatchSet[paginationKey] = struct{}{} + // row missing on target + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchRowMissingOnTarget, + } continue } - for colName, sourceData := range sourceDecompressedColumns { - targetData, exists := targetDecompressedColumns[colName] - if !exists || !bytes.Equal(sourceData, targetData) { - mismatchSet[paginationKey] = struct{}{} - break + for colName := range sourceDecompressedColumns { + _, exists := targetDecompressedColumns[colName] + if !exists { + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchColumn: colName, + MismatchType: MismatchColumnMissingOnTarget, + } } } } @@ -633,13 +706,9 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []InlineVerifierMismatches { mismatches := v.compareHashes(sourceHashes, targetHashes) - compressedMismatch := v.compareDecompressedData(sourceData, targetData) - for paginationKey, _ := range compressedMismatch { - mismatches[paginationKey] = InlineVerifierMismatches{ - Pk: paginationKey, - SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here - TargetChecksum: "compressed-data-mismatch", - } + compressedMismatch := compareDecompressedData(sourceData, targetData) + for paginationKey, mismatch := range compressedMismatch { + mismatches[paginationKey] = mismatch } mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches)) diff --git a/inline_verifier_test.go b/inline_verifier_test.go new file mode 100644 index 00000000..1db37362 --- /dev/null +++ b/inline_verifier_test.go @@ -0,0 +1,119 @@ +package ghostferry + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompareDecompressedDataNoDifference(t *testing.T) { + source := map[uint64]map[string][]byte{ + 31: {"name": []byte("Leszek")}, + } + target := map[uint64]map[string][]byte{ + 31: {"name": []byte("Leszek")}, + } + + result := compareDecompressedData(source, target) + + assert.Equal(t, map[uint64]InlineVerifierMismatches{}, result) +} + +func TestCompareDecompressedDataContentDifference(t *testing.T) { + source := map[uint64]map[string][]byte{ + 1: {"name": []byte("Leszek")}, + } + target := map[uint64]map[string][]byte{ + 1: {"name": []byte("Steve")}, + } + + result := compareDecompressedData(source, target) + + assert.Equal(t, map[uint64]InlineVerifierMismatches{ + 1: { + Pk: 1, + MismatchType: MismatchColumnValueDifference, + MismatchColumn: "name", + SourceChecksum: "e356a972989f87a1531252cfa2152797", + TargetChecksum: "81b8a1b77068d06e1c8190825253066f", + }, + }, result) +} + +func TestCompareDecompressedDataMissingTarget(t *testing.T) { + source := map[uint64]map[string][]byte{ + 1: {"name": []byte("Leszek")}, + } + target := map[uint64]map[string][]byte{} + + result := compareDecompressedData(source, target) + + assert.Equal(t, map[uint64]InlineVerifierMismatches{1: {Pk: 1, MismatchType: MismatchRowMissingOnTarget}}, result) +} + +func TestCompareDecompressedDataMissingSource(t *testing.T) { + source := map[uint64]map[string][]byte{} + target := map[uint64]map[string][]byte{ + 3: {"name": []byte("Leszek")}, + } + + result := compareDecompressedData(source, target) + + assert.Equal(t, map[uint64]InlineVerifierMismatches{3: {Pk: 3, MismatchType: MismatchRowMissingOnSource}}, result) +} + +func TestFormatMismatch(t *testing.T) { + mismatches := map[string]map[string][]InlineVerifierMismatches{ + "default": { + "users": { + InlineVerifierMismatches{ + Pk: 1, + MismatchType: MismatchRowMissingOnSource, + }, + }, + }, + } + message, tables := formatMismatches(mismatches) + + assert.Equal(t, string("cutover verification failed for: default.users [PKs: 1 (type: row missing on source) ] "), message) + assert.Equal(t, []string{string("default.users")}, tables) +} + +func TestFormatMismatches(t *testing.T) { + mismatches := map[string]map[string][]InlineVerifierMismatches{ + "default": { + "users": { + InlineVerifierMismatches{ + Pk: 1, + MismatchType: MismatchRowMissingOnSource, + }, + InlineVerifierMismatches{ + Pk: 5, + MismatchType: MismatchRowMissingOnTarget, + }, + }, + "posts": { + InlineVerifierMismatches{ + Pk: 9, + MismatchType: MismatchColumnValueDifference, + MismatchColumn: string("title"), + SourceChecksum: "boo", + TargetChecksum: "aaa", + }, + }, + "attachments": { + InlineVerifierMismatches{ + Pk: 7, + MismatchType: MismatchColumnValueDifference, + MismatchColumn: string("name"), + SourceChecksum: "boo", + TargetChecksum: "aaa", + }, + }, + }, + } + message, tables := formatMismatches(mismatches) + + assert.Equal(t, string("cutover verification failed for: default.attachments [PKs: 7 (type: column value difference, source: boo, target: aaa, column: name) ] default.posts [PKs: 9 (type: column value difference, source: boo, target: aaa, column: title) ] default.users [PKs: 1 (type: row missing on source) 5 (type: row missing on target) ] "), message) + assert.Equal(t, []string{string("default.attachments"), string("default.posts"), string("default.users")}, tables) +} diff --git a/iterative_verifier.go b/iterative_verifier.go index 94aafd58..114bf35e 100644 --- a/iterative_verifier.go +++ b/iterative_verifier.go @@ -337,7 +337,7 @@ func (v *IterativeVerifier) reverifyUntilStoreIsSmallEnough(maxIterations int) e before := v.reverifyStore.RowCount start := time.Now() - _, err := v.verifyStore("reverification_before_cutover", []MetricTag{{"iteration", string(iteration)}}) + _, err := v.verifyStore("reverification_before_cutover", []MetricTag{{"iteration", strconv.FormatInt(int64(iteration), 10)}}) if err != nil { return err } diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index 26c3a34d..82ce7aef 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -40,7 +40,10 @@ def test_corrupted_insert_is_detected_inline_with_batch_writer assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: #{corrupting_id} (type: rows checksum difference, source: " + assert ghostferry.error_lines.last["msg"].start_with?(expected_message) end def test_different_compressed_data_is_detected_inline_with_batch_writer @@ -68,7 +71,11 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: 1 (type: column value difference, source: 389101948d1694a3bbfb904f57ae845c, target: 4594bb26f2f93c5c60328df6c86a0846, column: data) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_same_decompressed_data_different_compressed_test_passes_inline_verification @@ -163,7 +170,11 @@ def test_catches_binlog_streamer_corruption ghostferry.run assert verification_ran - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: #{corrupting_id} (type: rows checksum difference, source: ced197ee28c2e73cc737242eb0e8c49c, target: ff030f09c559a197ed440b0eee7950a0) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_target_corruption_is_ignored_if_skip_target_verification @@ -399,7 +410,11 @@ def test_catches_binlog_streamer_corruption_with_composite_pk ghostferry.run assert verification_ran assert incorrect_tables_found, "verification did not catch corrupted table" - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: #{corrupting_id}") + + expected_message = "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} "\ + "[PKs: #{corrupting_id} (type: rows checksum difference, source: 0cc788986133d5289aba8cd87705d106, target: f4c00525c4daf1388254f1b1024ed35d) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_positive_negative_zero @@ -430,7 +445,10 @@ def test_positive_negative_zero assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1") + + expected_message = "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} "\ + "[PKs: 1 (type: rows checksum difference, source: 2888f4944da0fba0d5a5c7a7de2346f3, target: 2fa7e7e5e76005ffd8bfa5082da9f2f9) ] " + assert_equal expected_message, ghostferry.error_lines.last["msg"] # Now we run the real test case. target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1") @@ -484,7 +502,10 @@ def test_null_vs_empty_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 [PKs: 1 (type: " + + assert ghostferry.error_lines.last["msg"].start_with?(expected_message) end def test_null_vs_null_string @@ -507,7 +528,11 @@ def test_null_vs_null_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 " \ + "[PKs: 1 (type: rows checksum difference, source: 7dfce9db8fc0f2475d2ff8ac3a5382e9, target: dc4cca2441c365c72466c75076782022) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_null_in_different_order @@ -533,7 +558,11 @@ def test_null_in_different_order assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: 1 (type: rows checksum difference, source: 8e8e0931b9b2e5cb422a76d63160bbf3, target: 503b2de936a8da9e8d67b0d4594117d9) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end ################### @@ -605,7 +634,9 @@ def run_collation_test(data, source_charset, target_charset, identical:) assert verify_during_cutover_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 [PKs: 1 (" + assert ghostferry.error_lines.last["msg"].start_with?(expected_message) end end diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 0da17264..72d078f6 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -136,10 +136,11 @@ def test_interrupt_resume_inline_verifier_with_datawriter end dumped_state = ghostferry.run_expecting_interrupt + extra_message = "dumped state was: #{dumped_state.inspect}, expecting something under 'BinlogVerifyStore" assert_basic_fields_exist_in_dumped_state(dumped_state) - refute_nil dumped_state["BinlogVerifyStore"] - refute_nil dumped_state["BinlogVerifyStore"]["gftest"] - refute_nil dumped_state["BinlogVerifyStore"]["gftest"]["test_table_1"] + refute_nil dumped_state["BinlogVerifyStore"], "#{extra_message}'" + refute_nil dumped_state["BinlogVerifyStore"]["gftest"], "#{extra_message}.gftest'" + refute_nil dumped_state["BinlogVerifyStore"]["gftest"]["test_table_1"], "#{extra_message}.gftest.test_table_1'" # Resume Ghostferry with dumped state ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -253,8 +254,11 @@ def test_interrupt_resume_inline_verifier_will_verify_entries_in_reverify_store assert_equal 1, incorrect_tables.length assert_equal "gftest.test_table_1", incorrect_tables.first - error_line = ghostferry.error_lines.last - assert error_line["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id}") + error_message = ghostferry.error_lines.last["msg"] + predicate = "cutover verification failed for: gftest.test_table_1 [PKs: #{chosen_id}" + expectation = error_message.start_with?(predicate) + + assert expectation, "error message: #{error_message.inspect}, didn't start with #{predicate.inspect}" end def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on_source_during_interrupt @@ -297,8 +301,11 @@ def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on assert_equal 1, incorrect_tables.length assert_equal "gftest.test_table_1", incorrect_tables.first - error_line = ghostferry.error_lines.last - assert error_line["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id}") + error_message = ghostferry.error_lines.last["msg"] + predicate = "cutover verification failed for: gftest.test_table_1 [PKs: #{chosen_id}" + expectation = error_message.start_with?(predicate) + + assert expectation, "error message: #{error_message.inspect}, didn't start with #{predicate.inspect}" end # originally taken from @kolbitsch-lastline in https://github.com/Shopify/ghostferry/pull/160 @@ -665,6 +672,11 @@ def test_issue_149_corrupted assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{id_to_change}") + + error_message = ghostferry.error_lines.last["msg"] + predicate = "cutover verification failed for: gftest.test_table_1 [PKs: #{id_to_change}" + expectation = error_message.start_with?(predicate) + + assert expectation, "error message: #{error_message.inspect}, didn't start with #{predicate.inspect}" end end