Skip to content

Commit

Permalink
Merge pull request #361 from Shopify/improve-inline-verifier-mismatch…
Browse files Browse the repository at this point in the history
…-errors

Improve inline verifier mismatch errors
  • Loading branch information
driv3r committed Sep 11, 2024
2 parents 28b62b7 + c8e3c3f commit d0a5d63
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 67 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ test-go:
fi

ulimit -n 1024 && ./bin/gotestsum --format short-verbose ./test/go ./copydb/test ./sharding/test -count 1 -p 1 -failfast
go test -v

test-ruby:
bundle install
Expand Down
167 changes: 118 additions & 49 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package ghostferry
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -232,10 +235,23 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore {
return s.store.Copy()
}

type mismatchType string

const (
MismatchColumnMissingOnSource mismatchType = "column missing on source"
MismatchColumnMissingOnTarget mismatchType = "column missing on target"
MismatchRowMissingOnSource mismatchType = "row missing on source"
MismatchRowMissingOnTarget mismatchType = "row missing on target"
MismatchColumnValueDifference mismatchType = "column value difference"
MismatchRowChecksumDifference mismatchType = "rows checksum difference"
)

type InlineVerifierMismatches struct {
Pk uint64
SourceChecksum string
TargetChecksum string
MismatchColumn string
MismatchType mismatchType
}

type InlineVerifier struct {
Expand Down Expand Up @@ -432,10 +448,57 @@ func (v *InlineVerifier) VerifyBeforeCutover() error {
return nil
}

func formatMismatches(mismatches map[string]map[string][]InlineVerifierMismatches) (string, []string) {
// Build error message for display
var messageBuf bytes.Buffer
messageBuf.WriteString("cutover verification failed for: ")
incorrectTables := make([]string, 0)

for schemaName, _ := range mismatches {
sortedTables := make([]string, 0, len(mismatches[schemaName]))
for tableName, _ := range mismatches[schemaName] {
sortedTables = append(sortedTables, tableName)
}
sort.Strings(sortedTables)

for _, tableName := range sortedTables {
tableNameWithSchema := fmt.Sprintf("%s.%s", schemaName, tableName)
incorrectTables = append(incorrectTables, tableNameWithSchema)

messageBuf.WriteString(tableNameWithSchema)
messageBuf.WriteString(" [PKs: ")
for _, mismatch := range mismatches[schemaName][tableName] {
messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10))
messageBuf.WriteString(" (type: ")
messageBuf.WriteString(string(mismatch.MismatchType))
if mismatch.SourceChecksum != "" {
messageBuf.WriteString(", source: ")
messageBuf.WriteString(mismatch.SourceChecksum)
}
if mismatch.TargetChecksum != "" {
messageBuf.WriteString(", target: ")
messageBuf.WriteString(mismatch.TargetChecksum)
}

if mismatch.MismatchColumn != "" {
messageBuf.WriteString(", column: ")
messageBuf.WriteString(mismatch.MismatchColumn)
}

messageBuf.WriteString(") ")
}
messageBuf.WriteString("] ")
}
}

return messageBuf.String(), incorrectTables
}

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
v.verifyDuringCutoverStarted.Set(true)

mismatchFound, mismatches, err := v.verifyAllEventsInStore()

if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
return VerificationResult{}, err
Expand All @@ -447,35 +510,13 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
}, nil
}

// Build error message for display
var messageBuf bytes.Buffer
messageBuf.WriteString("cutover verification failed for: ")
incorrectTables := make([]string, 0)
for schemaName, _ := range mismatches {
for tableName, mismatches := range mismatches[schemaName] {
tableName = fmt.Sprintf("%s.%s", schemaName, tableName)
incorrectTables = append(incorrectTables, tableName)

messageBuf.WriteString(tableName)
messageBuf.WriteString(" [paginationKeys: ")
for _, mismatch := range mismatches {
messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10))
messageBuf.WriteString(" (source: ")
messageBuf.WriteString(mismatch.SourceChecksum)
messageBuf.WriteString(", target: ")
messageBuf.WriteString(mismatch.TargetChecksum)
messageBuf.WriteString(") ")
}
messageBuf.WriteString("] ")
}
}
message, incorrectTables := formatMismatches(mismatches)

message := messageBuf.String()
v.logger.WithField("incorrect_tables", incorrectTables).Error(message)

return VerificationResult{
DataCorrect: false,
Message: messageBuf.String(),
Message: message,
IncorrectTables: incorrectTables,
}, nil
}
Expand Down Expand Up @@ -570,43 +611,68 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin

for paginationKey, targetHash := range target {
sourceHash, exists := source[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
if !exists {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchRowMissingOnSource,
}
} else if !bytes.Equal(sourceHash, targetHash) {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchRowChecksumDifference,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
}
}
}

for paginationKey, sourceHash := range source {
targetHash, exists := target[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
for paginationKey, _ := range source {
_, exists := target[paginationKey]
if !exists {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
Pk: paginationKey,
MismatchType: MismatchRowMissingOnTarget,
}
}
}

return mismatchSet
}

func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]struct{} {
mismatchSet := map[uint64]struct{}{}
func compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]InlineVerifierMismatches {
mismatchSet := map[uint64]InlineVerifierMismatches{}

for paginationKey, targetDecompressedColumns := range target {
sourceDecompressedColumns, exists := source[paginationKey]
if !exists {
mismatchSet[paginationKey] = struct{}{}
// row missing on source
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchRowMissingOnSource,
}
continue
}

for colName, targetData := range targetDecompressedColumns {
sourceData, exists := sourceDecompressedColumns[colName]
if !exists || !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = struct{}{}
if !exists {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchColumnMissingOnSource,
MismatchColumn: colName,
}
break // no need to compare other columns
} else if !bytes.Equal(sourceData, targetData) {
sourceChecksum := md5.Sum(sourceData)
targetChecksum := md5.Sum(targetData)

mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchColumnValueDifference,
MismatchColumn: colName,
SourceChecksum: hex.EncodeToString(sourceChecksum[:]),
TargetChecksum: hex.EncodeToString(targetChecksum[:]),
}
break // no need to compare other columns
}
}
Expand All @@ -615,15 +681,22 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s
for paginationKey, sourceDecompressedColumns := range source {
targetDecompressedColumns, exists := target[paginationKey]
if !exists {
mismatchSet[paginationKey] = struct{}{}
// row missing on target
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchRowMissingOnTarget,
}
continue
}

for colName, sourceData := range sourceDecompressedColumns {
targetData, exists := targetDecompressedColumns[colName]
if !exists || !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = struct{}{}
break
for colName := range sourceDecompressedColumns {
_, exists := targetDecompressedColumns[colName]
if !exists {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchColumn: colName,
MismatchType: MismatchColumnMissingOnTarget,
}
}
}
}
Expand All @@ -633,13 +706,9 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s

func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []InlineVerifierMismatches {
mismatches := v.compareHashes(sourceHashes, targetHashes)
compressedMismatch := v.compareDecompressedData(sourceData, targetData)
for paginationKey, _ := range compressedMismatch {
mismatches[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here
TargetChecksum: "compressed-data-mismatch",
}
compressedMismatch := compareDecompressedData(sourceData, targetData)
for paginationKey, mismatch := range compressedMismatch {
mismatches[paginationKey] = mismatch
}

mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches))
Expand Down
119 changes: 119 additions & 0 deletions inline_verifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package ghostferry

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCompareDecompressedDataNoDifference(t *testing.T) {
source := map[uint64]map[string][]byte{
31: {"name": []byte("Leszek")},
}
target := map[uint64]map[string][]byte{
31: {"name": []byte("Leszek")},
}

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]InlineVerifierMismatches{}, result)
}

func TestCompareDecompressedDataContentDifference(t *testing.T) {
source := map[uint64]map[string][]byte{
1: {"name": []byte("Leszek")},
}
target := map[uint64]map[string][]byte{
1: {"name": []byte("Steve")},
}

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]InlineVerifierMismatches{
1: {
Pk: 1,
MismatchType: MismatchColumnValueDifference,
MismatchColumn: "name",
SourceChecksum: "e356a972989f87a1531252cfa2152797",
TargetChecksum: "81b8a1b77068d06e1c8190825253066f",
},
}, result)
}

func TestCompareDecompressedDataMissingTarget(t *testing.T) {
source := map[uint64]map[string][]byte{
1: {"name": []byte("Leszek")},
}
target := map[uint64]map[string][]byte{}

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]InlineVerifierMismatches{1: {Pk: 1, MismatchType: MismatchRowMissingOnTarget}}, result)
}

func TestCompareDecompressedDataMissingSource(t *testing.T) {
source := map[uint64]map[string][]byte{}
target := map[uint64]map[string][]byte{
3: {"name": []byte("Leszek")},
}

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]InlineVerifierMismatches{3: {Pk: 3, MismatchType: MismatchRowMissingOnSource}}, result)
}

func TestFormatMismatch(t *testing.T) {
mismatches := map[string]map[string][]InlineVerifierMismatches{
"default": {
"users": {
InlineVerifierMismatches{
Pk: 1,
MismatchType: MismatchRowMissingOnSource,
},
},
},
}
message, tables := formatMismatches(mismatches)

assert.Equal(t, string("cutover verification failed for: default.users [PKs: 1 (type: row missing on source) ] "), message)
assert.Equal(t, []string{string("default.users")}, tables)
}

func TestFormatMismatches(t *testing.T) {
mismatches := map[string]map[string][]InlineVerifierMismatches{
"default": {
"users": {
InlineVerifierMismatches{
Pk: 1,
MismatchType: MismatchRowMissingOnSource,
},
InlineVerifierMismatches{
Pk: 5,
MismatchType: MismatchRowMissingOnTarget,
},
},
"posts": {
InlineVerifierMismatches{
Pk: 9,
MismatchType: MismatchColumnValueDifference,
MismatchColumn: string("title"),
SourceChecksum: "boo",
TargetChecksum: "aaa",
},
},
"attachments": {
InlineVerifierMismatches{
Pk: 7,
MismatchType: MismatchColumnValueDifference,
MismatchColumn: string("name"),
SourceChecksum: "boo",
TargetChecksum: "aaa",
},
},
},
}
message, tables := formatMismatches(mismatches)

assert.Equal(t, string("cutover verification failed for: default.attachments [PKs: 7 (type: column value difference, source: boo, target: aaa, column: name) ] default.posts [PKs: 9 (type: column value difference, source: boo, target: aaa, column: title) ] default.users [PKs: 1 (type: row missing on source) 5 (type: row missing on target) ] "), message)
assert.Equal(t, []string{string("default.attachments"), string("default.posts"), string("default.users")}, tables)
}
2 changes: 1 addition & 1 deletion iterative_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (v *IterativeVerifier) reverifyUntilStoreIsSmallEnough(maxIterations int) e
before := v.reverifyStore.RowCount
start := time.Now()

_, err := v.verifyStore("reverification_before_cutover", []MetricTag{{"iteration", string(iteration)}})
_, err := v.verifyStore("reverification_before_cutover", []MetricTag{{"iteration", strconv.FormatInt(int64(iteration), 10)}})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d0a5d63

Please sign in to comment.