Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema/indexer)!: implement start indexing #21636

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
34 changes: 12 additions & 22 deletions indexer/postgres/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package postgres
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

"cosmossdk.io/schema/indexer"
"cosmossdk.io/schema/logutil"
Expand All @@ -21,8 +21,6 @@ type Config struct {
DisableRetainDeletions bool `json:"disable_retain_deletions"`
}

type SqlLogger = func(msg, sql string, params ...interface{})

type indexerImpl struct {
ctx context.Context
db *sql.DB
Expand All @@ -32,10 +30,17 @@ type indexerImpl struct {
logger logutil.Logger
}

func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) {
config, err := decodeConfig(params.Config.Config)
if err != nil {
return indexer.InitResult{}, err
func init() {
indexer.Register("postgres", indexer.Initializer{
InitFunc: startIndexer,
ConfigType: Config{},
})
}

func startIndexer(params indexer.InitParams) (indexer.InitResult, error) {
config, ok := params.Config.Config.(Config)
if !ok {
return indexer.InitResult{}, fmt.Errorf("invalid config type, expected %T got %T", Config{}, params.Config.Config)
}

ctx := params.Context
Expand Down Expand Up @@ -89,18 +94,3 @@ func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) {
View: idx,
}, nil
}

func decodeConfig(rawConfig map[string]interface{}) (*Config, error) {
bz, err := json.Marshal(rawConfig)
if err != nil {
return nil, err
}

var config Config
err = json.Unmarshal(bz, &config)
if err != nil {
return nil, err
}

return &config, nil
}
26 changes: 0 additions & 26 deletions indexer/postgres/tests/config.go

This file was deleted.

23 changes: 13 additions & 10 deletions indexer/postgres/tests/init_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st
connectionUrl := createTestDB(t)

buf := &strings.Builder{}

cfg, err := postgresConfigToIndexerConfig(postgres.Config{
DatabaseURL: connectionUrl,
DisableRetainDeletions: disableRetainDeletions,
})
require.NoError(t, err)

res, err := postgres.StartIndexer(indexer.InitParams{
Config: cfg,
res, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexer.IndexingConfig{
Target: map[string]indexer.Config{
"postgres": {
Type: "postgres",
Config: postgres.Config{
DatabaseURL: connectionUrl,
DisableRetainDeletions: disableRetainDeletions,
},
},
},
},
Context: context.Background(),
Logger: &prettyLogger{buf},
Logger: prettyLogger{buf},
})
require.NoError(t, err)
listener := res.Listener
Expand Down
28 changes: 18 additions & 10 deletions indexer/postgres/tests/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,39 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) {
require.NoError(t, err)
})

cfg, err := postgresConfigToIndexerConfig(postgres.Config{
DatabaseURL: dbUrl,
DisableRetainDeletions: !retainDeletions,
})
require.NoError(t, err)

debugLog := &strings.Builder{}

pgIndexer, err := postgres.StartIndexer(indexer.InitParams{
Config: cfg,
res, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexer.IndexingConfig{
Target: map[string]indexer.Config{
"postgres": {
Type: "postgres",
Config: postgres.Config{
DatabaseURL: dbUrl,
DisableRetainDeletions: !retainDeletions,
},
},
},
},
Context: ctx,
Logger: &prettyLogger{debugLog},
AddressCodec: addressutil.HexAddressCodec{},
})
require.NoError(t, err)
require.NoError(t, err)

sim, err := appdatasim.NewSimulator(appdatasim.Options{
Listener: pgIndexer.Listener,
Listener: res.Listener,
AppSchema: indexertesting.ExampleAppSchema,
StateSimOptions: statesim.Options{
CanRetainDeletions: retainDeletions,
},
})
require.NoError(t, err)

pgIndexerView := res.IndexerInfos["postgres"].View
require.NotNil(t, pgIndexerView)

blockDataGen := sim.BlockDataGenN(10, 100)
numBlocks := 200
if testing.Short() {
Expand All @@ -93,7 +101,7 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) {
require.NoError(t, sim.ProcessBlockData(blockData), debugLog.String())

// compare the expected state in the simulator to the actual state in the indexer and expect the diff to be empty
require.Empty(t, appdatasim.DiffAppData(sim, pgIndexer.View), debugLog.String())
require.Empty(t, appdatasim.DiffAppData(sim, pgIndexerView), debugLog.String())

// reset the debug log after each successful block so that it doesn't get too long when debugging
debugLog.Reset()
Expand Down
6 changes: 3 additions & 3 deletions schema/decoding/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type DecoderResolver interface {
// EncodeModuleName encodes a module name into a byte slice that can be used as the actor in a KVPairUpdate.
EncodeModuleName(string) ([]byte, error)

// IterateAll iterates over all available module decoders.
IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error
// AllDecoders iterates over all available module decoders.
AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error) error

// LookupDecoder looks up a specific module decoder.
LookupDecoder(moduleName string) (decoder schema.ModuleCodec, found bool, err error)
Expand Down Expand Up @@ -48,7 +48,7 @@ func (a moduleSetDecoderResolver) EncodeModuleName(s string) ([]byte, error) {
return nil, fmt.Errorf("module %s not found", s)
}

func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error {
func (a moduleSetDecoderResolver) AllDecoders(f func(string, schema.ModuleCodec) error) error {
keys := make([]string, 0, len(a.moduleSet))
for k := range a.moduleSet {
keys = append(keys, k)
Expand Down
4 changes: 2 additions & 2 deletions schema/decoding/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var testResolver = ModuleSetDecoderResolver(moduleSet)

func TestModuleSetDecoderResolver_IterateAll(t *testing.T) {
objectTypes := map[string]bool{}
err := testResolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
err := testResolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error {
cdc.Schema.AllTypes(func(t schema.Type) bool {
objTyp, ok := t.(schema.StateObjectType)
if ok {
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestModuleSetDecoderResolver_IterateAll_Error(t *testing.T) {
resolver := ModuleSetDecoderResolver(map[string]interface{}{
"modD": modD{},
})
err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
err := resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error {
if moduleName == "modD" {
t.Fatalf("expected error")
}
Expand Down
2 changes: 1 addition & 1 deletion schema/decoding/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Sync(listener appdata.Listener, source SyncSource, resolver DecoderResolver
return nil
}

return resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
return resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error {
if opts.ModuleFilter != nil && !opts.ModuleFilter(moduleName) {
// ignore this module
return nil
Expand Down
52 changes: 52 additions & 0 deletions schema/indexer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package indexer

// Config species the configuration passed to an indexer initialization function.
// It includes both common configuration options related to include or excluding
// parts of the data stream as well as indexer specific options under the config
// subsection.
//
// NOTE: it is an error for an indexer to change its common options, such as adding
// or removing indexed modules, after the indexer has been initialized because this
// could result in an inconsistent state.
type Config struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be set in the app.toml or?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the idea is that this is a section in app.toml, probably under the [indexer] section

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we then add the corresponding toml struct tag then? (Like done in other server/v2 config struct)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which tags do we want? toml, mapstructure, json? One of these? All three?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So no json needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and added toml, mapstructure and comment and also kept JSON: c967f78. Does that look good?

// Type is the name of the indexer type as registered with Register.
Type string `mapstructure:"type" toml:"type" json:"type" comment:"The name of the registered indexer type."`

// Config are the indexer specific config options specified by the user.
Config interface{} `mapstructure:"config" toml:"config" json:"config,omitempty" comment:"Indexer specific configuration options."`

// Filter is the filter configuration for the indexer.
Filter *FilterConfig `mapstructure:"filter" toml:"filter" json:"filter,omitempty" comment:"Filter configuration for the indexer. Currently UNSUPPORTED!"`
}

// FilterConfig specifies the configuration for filtering the data stream
type FilterConfig struct {
// ExcludeState specifies that the indexer will not receive state updates.
ExcludeState bool `mapstructure:"exclude_state" toml:"exclude_state" json:"exclude_state" comment:"Exclude all state updates."`

// ExcludeEvents specifies that the indexer will not receive events.
ExcludeEvents bool `mapstructure:"exclude_events" toml:"exclude_events" json:"exclude_events" comment:"Exclude all events."`

// ExcludeTxs specifies that the indexer will not receive transaction's.
ExcludeTxs bool `mapstructure:"exclude_txs" toml:"exclude_txs" json:"exclude_txs" comment:"Exclude all transactions."`

// ExcludeBlockHeaders specifies that the indexer will not receive block headers,
// although it will still receive StartBlock and Commit callbacks, just without
// the header data.
ExcludeBlockHeaders bool `mapstructure:"exclude_block_headers" toml:"exclude_block_headers" json:"exclude_block_headers" comment:"Exclude all block headers."`

Modules *ModuleFilterConfig `mapstructure:"modules" toml:"modules" json:"modules,omitempty" comment:"Module filter configuration."`
}

// ModuleFilterConfig specifies the configuration for filtering modules.
type ModuleFilterConfig struct {
// Include specifies a list of modules whose state the indexer will
// receive state updates for.
// Only one of include or exclude modules should be specified.
Include []string `mapstructure:"include" toml:"include" json:"include" comment:"List of modules to include. Only one of include or exclude should be specified."`

// Exclude specifies a list of modules whose state the indexer will not
// receive state updates for.
// Only one of include or exclude modules should be specified.
Exclude []string `mapstructure:"exclude" toml:"exclude" json:"exclude" comment:"List of modules to exclude. Only one of include or exclude should be specified."`
}
43 changes: 6 additions & 37 deletions schema/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,13 @@ import (
"cosmossdk.io/schema/view"
)

// Config species the configuration passed to an indexer initialization function.
// It includes both common configuration options related to include or excluding
// parts of the data stream as well as indexer specific options under the config
// subsection.
//
// NOTE: it is an error for an indexer to change its common options, such as adding
// or removing indexed modules, after the indexer has been initialized because this
// could result in an inconsistent state.
type Config struct {
// Type is the name of the indexer type as registered with Register.
Type string `json:"type"`
// Initializer describes an indexer initialization function and other metadata.
type Initializer struct {
// InitFunc is the function that initializes the indexer.
InitFunc InitFunc

// Config are the indexer specific config options specified by the user.
Config map[string]interface{} `json:"config"`

// ExcludeState specifies that the indexer will not receive state updates.
ExcludeState bool `json:"exclude_state"`

// ExcludeEvents specifies that the indexer will not receive events.
ExcludeEvents bool `json:"exclude_events"`

// ExcludeTxs specifies that the indexer will not receive transaction's.
ExcludeTxs bool `json:"exclude_txs"`

// ExcludeBlockHeaders specifies that the indexer will not receive block headers,
// although it will still receive StartBlock and Commit callbacks, just without
// the header data.
ExcludeBlockHeaders bool `json:"exclude_block_headers"`

// IncludeModules specifies a list of modules whose state the indexer will
// receive state updates for.
// Only one of include or exclude modules should be specified.
IncludeModules []string `json:"include_modules"`

// ExcludeModules specifies a list of modules whose state the indexer will not
// receive state updates for.
// Only one of include or exclude modules should be specified.
ExcludeModules []string `json:"exclude_modules"`
// ConfigType is the type of the configuration object that the indexer expects.
ConfigType interface{}
}

type InitFunc = func(InitParams) (InitResult, error)
Expand Down
50 changes: 0 additions & 50 deletions schema/indexer/manager.go

This file was deleted.

10 changes: 7 additions & 3 deletions schema/indexer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package indexer
import "fmt"

// Register registers an indexer type with the given initialization function.
func Register(indexerType string, initFunc InitFunc) {
func Register(indexerType string, descriptor Initializer) {
if _, ok := indexerRegistry[indexerType]; ok {
panic(fmt.Sprintf("indexer %s already registered", indexerType))
}

indexerRegistry[indexerType] = initFunc
if descriptor.InitFunc == nil {
panic(fmt.Sprintf("indexer %s has no initialization function", indexerType))
}

indexerRegistry[indexerType] = descriptor
}

var indexerRegistry = map[string]InitFunc{}
var indexerRegistry = map[string]Initializer{}
Loading
Loading