Skip to content

Commit

Permalink
fix #49: setup now detects if reorg support is handled on an engine
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Nov 15, 2023
1 parent 39740d2 commit aeff598
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newDBLoader(
moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag))
cli.NoError(err, "invalid mistmatch mode")

dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, handleReorgs, zlog, tracer)
dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer)
if err != nil {
return nil, fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("extract sink config: %w", err)
}

dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, true, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error {

func toolsCreateLoader() *db.Loader {
dsn := viper.GetString("tools-global-dsn")
loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, true, zlog, tracer)
loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer)
cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn)

if err := loader.LoadTables(); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewLoader(
psqlDsn string,
flushInterval time.Duration,
moduleMismatchMode OnModuleHashMismatch,
handleReorgs bool,
handleReorgs *bool,
logger *zap.Logger,
tracer logging.Tracer,
) (*Loader, error) {
Expand Down Expand Up @@ -83,10 +83,16 @@ func NewLoader(
return nil, fmt.Errorf("dialect not found: %s", err)
}

if handleReorgs && l.getDialect().OnlyInserts() {
if handleReorgs == nil {
// automatic detection
l.handleReorgs = !l.getDialect().OnlyInserts()
} else {
l.handleReorgs = *handleReorgs
}

if l.handleReorgs && l.getDialect().OnlyInserts() {
return nil, fmt.Errorf("driver %s does not support reorg handling. You must use set a non-zero undo-buffer-size", dsn.driver)
}
l.handleReorgs = handleReorgs

logger.Info("created new DB loader",
zap.Duration("flush_interval", flushInterval),
Expand Down
4 changes: 2 additions & 2 deletions db/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestEscapeColumns(t *testing.T) {
t.Skip(`PG_DSN not set, please specify PG_DSN to run this test, example: PG_DSN="psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?enable_incremental_sort=off&sslmode=disable"`)
}

dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, false, zlog, tracer)
dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
require.NoError(t, err)

tx, err := dbLoader.DB.Begin()
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestEscapeValues(t *testing.T) {
t.Skip(`PG_DSN not set, please specify PG_DSN to run this test, example: PG_DSN="psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?enable_incremental_sort=off&sslmode=disable"`)
}

dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, false, zlog, tracer)
dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
require.NoError(t, err)

tx, err := dbLoader.DB.Begin()
Expand Down
2 changes: 1 addition & 1 deletion db/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func NewTestLoader(
tables map[string]*TableInfo,
) (*Loader, *TestTx) {

loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, true, zlog, tracer)
loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit aeff598

Please sign in to comment.