Skip to content

Commit

Permalink
Merge pull request #336 from Shopify/more-logs-inline-verifier
Browse files Browse the repository at this point in the history
Log checksum in case of inline verification failure
  • Loading branch information
shuhaowu authored Nov 4, 2022
2 parents 05fb06b + 0db9b4f commit b3aaacf
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 42 deletions.
2 changes: 1 addition & 1 deletion batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func init() {
}

type BatchWriterVerificationFailed struct {
mismatchedPaginationKeys []uint64
mismatchedPaginationKeys []InlineVerifierMismatches
table string
}

Expand Down
11 changes: 9 additions & 2 deletions copydb/test/copydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"fmt"
"os"
"testing"

"github.com/Shopify/ghostferry"
Expand Down Expand Up @@ -138,8 +139,14 @@ func (t *CopydbTestSuite) TestCreateDatabaseCopiesTheRightCollation() {
row := t.ferry.TargetDB.QueryRow(query)
err = row.Scan(&characterSet, &collation)
t.Require().Nil(err)
t.Require().Equal(characterSet, "utf8")
t.Require().Equal(collation, "utf8_general_ci")

if os.Getenv("MYSQL_VERSION") == "8.0" {
t.Require().Equal(characterSet, "utf8mb3")
t.Require().Equal(collation, "utf8mb3_general_ci")
} else {
t.Require().Equal(characterSet, "utf8")
t.Require().Equal(collation, "utf8_general_ci")
}

query = "SELECT table_collation FROM information_schema.tables WHERE table_schema = \"%s\" AND table_name = \"%s\""
query = fmt.Sprintf(query, renamedSchemaName, renamedTableName)
Expand Down
74 changes: 48 additions & 26 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore {
return s.store.Copy()
}

type InlineVerifierMismatches struct {
Pk uint64
SourceChecksum string
TargetChecksum string
}

type InlineVerifier struct {
SourceDB *sql.DB
TargetDB *sql.DB
Expand Down Expand Up @@ -304,7 +310,7 @@ func (v *InlineVerifier) Result() (VerificationResultAndStatus, error) {
return v.backgroundVerificationResultAndStatus, v.backgroundVerificationErr
}

func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]uint64, error) {
func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]InlineVerifierMismatches, error) {
table := sourceBatch.TableSchema()

paginationKeys := make([]uint64, len(sourceBatch.Values()))
Expand Down Expand Up @@ -353,8 +359,8 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target
mismatches := v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData)

if !enforceInlineVerification {
for _, mismatchedPk := range mismatches {
v.reverifyStore.Add(table, mismatchedPk)
for _, mismatch := range mismatches {
v.reverifyStore.Add(table, mismatch.Pk)
}

if len(mismatches) > 0 {
Expand Down Expand Up @@ -446,15 +452,19 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
messageBuf.WriteString("cutover verification failed for: ")
incorrectTables := make([]string, 0)
for schemaName, _ := range mismatches {
for tableName, paginationKeys := range mismatches[schemaName] {
for tableName, mismatches := range mismatches[schemaName] {
tableName = fmt.Sprintf("%s.%s", schemaName, tableName)
incorrectTables = append(incorrectTables, tableName)

messageBuf.WriteString(tableName)
messageBuf.WriteString(" [paginationKeys: ")
for _, paginationKey := range paginationKeys {
messageBuf.WriteString(strconv.FormatUint(paginationKey, 10))
messageBuf.WriteString(" ")
for _, mismatch := range mismatches {
messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10))
messageBuf.WriteString(" (source: ")
messageBuf.WriteString(mismatch.SourceChecksum)
messageBuf.WriteString(", target: ")
messageBuf.WriteString(mismatch.TargetChecksum)
messageBuf.WriteString(") ")
}
messageBuf.WriteString("] ")
}
Expand Down Expand Up @@ -555,20 +565,28 @@ func (v *InlineVerifier) decompressData(table *TableSchema, column string, compr
}
}

func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]struct{} {
mismatchSet := map[uint64]struct{}{}
func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]InlineVerifierMismatches {
mismatchSet := map[uint64]InlineVerifierMismatches{}

for paginationKey, targetHash := range target {
sourceHash, exists := source[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
}
}
}

for paginationKey, sourceHash := range source {
targetHash, exists := target[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
}
}
}

Expand Down Expand Up @@ -613,17 +631,21 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s
return mismatchSet
}

func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []uint64 {
func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []InlineVerifierMismatches {
mismatches := v.compareHashes(sourceHashes, targetHashes)
compressedMismatch := v.compareDecompressedData(sourceData, targetData)
for paginationKey, _ := range compressedMismatch {
mismatches[paginationKey] = struct{}{}
mismatches[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here
TargetChecksum: "compressed-data-mismatch",
}
}

mismatchList := make([]uint64, 0, len(mismatches))
mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches))

for paginationKey, _ := range mismatches {
mismatchList = append(mismatchList, paginationKey)
for _, mismatch := range mismatches {
mismatchList = append(mismatchList, mismatch)
}

return mismatchList
Expand All @@ -650,21 +672,21 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error {
return nil
}

func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]uint64) {
func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]InlineVerifierMismatches) {
for schemaName, _ := range mismatches {
for tableName, paginationKeys := range mismatches[schemaName] {
for tableName, mismatches := range mismatches[schemaName] {
table := v.TableSchemaCache.Get(schemaName, tableName)
for _, paginationKey := range paginationKeys {
v.reverifyStore.Add(table, paginationKey)
for _, mismatch := range mismatches {
v.reverifyStore.Add(table, mismatch.Pk)
}
}
}
}

// Returns mismatches in the form of db -> table -> paginationKeys
func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]uint64, error) {
func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]InlineVerifierMismatches, error) {
mismatchFound := false
mismatches := make(map[string]map[string][]uint64)
mismatches := make(map[string]map[string][]InlineVerifierMismatches)
allBatches := v.reverifyStore.Batches(v.BatchSize)

if len(allBatches) == 0 {
Expand All @@ -684,11 +706,11 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][
mismatchFound = true

if _, exists := mismatches[batch.SchemaName]; !exists {
mismatches[batch.SchemaName] = make(map[string][]uint64)
mismatches[batch.SchemaName] = make(map[string][]InlineVerifierMismatches)
}

if _, exists := mismatches[batch.SchemaName][batch.TableName]; !exists {
mismatches[batch.SchemaName][batch.TableName] = make([]uint64, 0)
mismatches[batch.SchemaName][batch.TableName] = make([]InlineVerifierMismatches, 0)
}

mismatches[batch.SchemaName][batch.TableName] = append(mismatches[batch.SchemaName][batch.TableName], batchMismatches...)
Expand All @@ -702,7 +724,7 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][
// Since the mismatches gets re-added to the reverify store, this must return
// a union of mismatches of fingerprints and mismatches due to decompressed
// data.
func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, error) {
func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]InlineVerifierMismatches, error) {
targetSchema := batch.SchemaName
if targetSchemaName, exists := v.DatabaseRewrites[targetSchema]; exists {
targetSchema = targetSchemaName
Expand All @@ -715,7 +737,7 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e

sourceTableSchema := v.TableSchemaCache.Get(batch.SchemaName, batch.TableName)
if sourceTableSchema == nil {
return []uint64{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName)
return []InlineVerifierMismatches{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName)
}

wg := &sync.WaitGroup{}
Expand Down
5 changes: 4 additions & 1 deletion sharding/test/primary_key_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"fmt"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -90,7 +91,9 @@ func (t *PrimaryKeyTableTestSuite) TestPrimaryKeyTableVerificationFailure() {
t.Ferry.Run()

t.Require().NotNil(errHandler.LastError)
t.Require().Equal("row fingerprints for paginationKeys [2] on gftest1.tenants_table do not match", errHandler.LastError.Error())

t.Require().True(strings.HasPrefix(errHandler.LastError.Error(), "row fingerprints for paginationKeys [{2"))
t.Require().True(strings.HasSuffix(errHandler.LastError.Error(), "on gftest1.tenants_table do not match"))
}

func TestPrimaryKeyTableTestSuite(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions test/integration/inline_verifier_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_corrupted_insert_is_detected_inline_with_batch_writer

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}")
end

def test_different_compressed_data_is_detected_inline_with_batch_writer
Expand Down Expand Up @@ -68,7 +68,7 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1")
end

def test_same_decompressed_data_different_compressed_test_passes_inline_verification
Expand Down Expand Up @@ -163,7 +163,7 @@ def test_catches_binlog_streamer_corruption

ghostferry.run
assert verification_ran
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}")
end

def test_target_corruption_is_ignored_if_skip_target_verification
Expand Down Expand Up @@ -399,7 +399,7 @@ def test_catches_binlog_streamer_corruption_with_composite_pk
ghostferry.run
assert verification_ran
assert incorrect_tables_found, "verification did not catch corrupted table"
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: #{corrupting_id} ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: #{corrupting_id}")
end

def test_positive_negative_zero
Expand Down Expand Up @@ -430,7 +430,7 @@ def test_positive_negative_zero

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1")

# Now we run the real test case.
target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1")
Expand Down Expand Up @@ -484,7 +484,7 @@ def test_null_vs_empty_string

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1")
end

def test_null_vs_null_string
Expand All @@ -507,7 +507,7 @@ def test_null_vs_null_string

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1")
end

def test_null_in_different_order
Expand All @@ -533,7 +533,7 @@ def test_null_in_different_order

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1")
end

###################
Expand Down Expand Up @@ -605,7 +605,7 @@ def run_collation_test(data, source_charset, target_charset, identical:)

assert verify_during_cutover_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1")
end
end

Expand Down
6 changes: 3 additions & 3 deletions test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def test_interrupt_resume_inline_verifier_will_verify_entries_in_reverify_store
assert_equal "gftest.test_table_1", incorrect_tables.first

error_line = ghostferry.error_lines.last
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id} ] ", error_line["msg"]
assert error_line["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id}")
end

def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on_source_during_interrupt
Expand Down Expand Up @@ -306,7 +306,7 @@ def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on
assert_equal "gftest.test_table_1", incorrect_tables.first

error_line = ghostferry.error_lines.last
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id} ] ", error_line["msg"]
assert error_line["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id}")
end

# originally taken from @kolbitsch-lastline in https://github.com/Shopify/ghostferry/pull/160
Expand Down Expand Up @@ -671,6 +671,6 @@ def test_issue_149_corrupted

assert verification_ran
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{id_to_change} ] ", ghostferry.error_lines.last["msg"]
assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{id_to_change}")
end
end

0 comments on commit b3aaacf

Please sign in to comment.