Skip to content

Commit

Permalink
adding metrics and moving the store cursor and extending address
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduard-Voiculescu committed Sep 1, 2023
1 parent b2fccda commit d280035
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/streamingfast/bstream"
"go.uber.org/zap"
Expand All @@ -20,6 +21,8 @@ type BundleReader struct {
errChan chan error
logger *zap.Logger
headerWritten bool

lastRead time.Time
}

func NewBundleReader(ctx context.Context, logger *zap.Logger) *BundleReader {
Expand All @@ -29,6 +32,7 @@ func NewBundleReader(ctx context.Context, logger *zap.Logger) *BundleReader {
blockData: make(chan []byte, 1),
errChan: make(chan error, 1),
logger: logger,
lastRead: time.Now(),
}
}

Expand Down Expand Up @@ -62,7 +66,7 @@ func (r *BundleReader) PushBlock(block *bstream.Block) error {
}

func (r *BundleReader) Read(p []byte) (bytesRead int, err error) {

r.logger.Debug("read called", zap.Duration("since_last_read", time.Since(r.lastRead)))
if r.readBuffer == nil {
if err := r.fillBuffer(); err != nil {
return 0, err
Expand All @@ -75,6 +79,7 @@ func (r *BundleReader) Read(p []byte) (bytesRead int, err error) {
r.readBuffer = nil
}

r.lastRead = time.Now()
return bytesRead, nil
}

Expand Down
25 changes: 11 additions & 14 deletions accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,27 @@ import (
"bytes"
"context"
"fmt"
"io"
"strconv"
"strings"

"github.com/mr-tron/base58"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dstore"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/solana-go/programs/addresstablelookup"
"go.uber.org/zap"
"io"
"strconv"
"strings"
)

var AddressTableLookupAccountProgram = mustFromBase58("AddressLookupTab1e1111111111111111111111111")
var SystemProgram = mustFromBase58("11111111111111111111111111111111")

type Cursor struct {
slotNum uint64
blockHash []byte
slotNum uint64
}

func NewCursor(blockNum uint64, blockHash []byte) *Cursor {
func NewCursor(blockNum uint64) *Cursor {
return &Cursor{
slotNum: blockNum,
blockHash: blockHash,
slotNum: blockNum,
}
}

Expand Down Expand Up @@ -150,11 +147,6 @@ func (p *Processor) ProcessBlock(ctx context.Context, block *pbsol.Block) error
return fmt.Errorf("managing address lookup at block %d: %w", block.Slot, err)
}
}
p.cursor = NewCursor(block.Slot, []byte(block.Blockhash))
err := p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", block.Slot, err)
}
return nil
}

Expand Down Expand Up @@ -220,6 +212,11 @@ func (p *Processor) ProcessInstruction(ctx context.Context, blockNum uint64, pro
if err != nil {
return fmt.Errorf("extending address table %s at block %d: %w", tableLookupAccount, blockNum, err)
}
p.cursor = NewCursor(blockNum)
err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", blockNum, err)
}
}

return nil
Expand Down
12 changes: 8 additions & 4 deletions accountresolver/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) {
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(185_914_861, nil)
cursor := NewCursor(185_914_861)
resolver := NewKVDBAccountsResolver(db)
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db), zap.NewNop())
err = p.ProcessBlock(context.Background(), solBlock)
Expand Down Expand Up @@ -143,7 +143,7 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) {
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(157_564_919, nil)
cursor := NewCursor(157_564_919)
resolver := NewKVDBAccountsResolver(db)
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db), zap.NewNop())
err = p.ProcessBlock(context.Background(), solBlock)
Expand Down Expand Up @@ -218,12 +218,14 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(185_914_861, nil)
cursor := NewCursor(185_914_861)
resolver := NewKVDBAccountsResolver(db)
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db), zap.NewNop())

err = p.accountsResolver.Extended(context.Background(), 185_914_860, tableLookupAccountInTransaction, Accounts{AddressTableLookupAccountProgram})
require.NoError(t, err)
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)
Expand Down Expand Up @@ -301,12 +303,14 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(185_914_861, nil)
cursor := NewCursor(185_914_861)
resolver := NewKVDBAccountsResolver(db)
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db), zap.NewNop())

err = p.accountsResolver.Extended(context.Background(), 185_914_860, tableLookupAccountInTransaction, Accounts{tableAccountToExtend})
require.NoError(t, err)
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)
Expand Down
9 changes: 3 additions & 6 deletions accountresolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (r *KVDBAccountsResolver) Extended(ctx context.Context, blockNum uint64, ke
if err != nil {
return fmt.Errorf("writing extended accounts for key %q: %w", key, err)
}
err = r.store.FlushPuts(ctx)
if err != nil {
return fmt.Errorf("flushing extended accounts for key %q: %w", key, err)
}
Expand Down Expand Up @@ -70,8 +69,7 @@ func (r *KVDBAccountsResolver) Resolve(ctx context.Context, atBlockNum uint64, k

func (r *KVDBAccountsResolver) StoreCursor(ctx context.Context, readerName string, cursor *Cursor) error {
payload := make([]byte, 8+32)
binary.BigEndian.PutUint64(payload[:8], cursor.slotNum)
copy(payload[8:], cursor.blockHash)
binary.BigEndian.PutUint64(payload, cursor.slotNum)
err := r.store.Put(ctx, Keys.cursor(readerName), payload)
if err != nil {
return fmt.Errorf("writing cursor: %w", err)
Expand All @@ -95,9 +93,8 @@ func (r *KVDBAccountsResolver) GetCursor(ctx context.Context, readerName string)
if payload == nil {
return nil, nil
}
blockNum := binary.BigEndian.Uint64(payload[:8])
blockHash := payload[8:]
return NewCursor(blockNum, blockHash), nil
blockNum := binary.BigEndian.Uint64(payload)
return NewCursor(blockNum), nil
}

func decodeAccounts(payload []byte) Accounts {
Expand Down
9 changes: 5 additions & 4 deletions accountresolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"os"
"testing"

"github.com/mr-tron/base58"
"github.com/streamingfast/kvdb/store"
_ "github.com/streamingfast/kvdb/store/badger3"
"github.com/stretchr/testify/require"
Expand All @@ -26,6 +25,8 @@ func TestKVDBAccountsResolver_Extended(t *testing.T) {
resolver := NewKVDBAccountsResolver(db)
err = resolver.Extended(context.Background(), 1, accountFromBase58(t, a1), []Account{accountFromBase58(t, a2), accountFromBase58(t, a3)})
require.NoError(t, err)
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
require.NoError(t, err)
Expand All @@ -35,6 +36,8 @@ func TestKVDBAccountsResolver_Extended(t *testing.T) {

err = resolver.Extended(context.Background(), 100, accountFromBase58(t, a1), []Account{accountFromBase58(t, a4)})
require.NoError(t, err)
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

accounts, _, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
require.NoError(t, err)
Expand Down Expand Up @@ -65,16 +68,14 @@ func TestKVDBAccountsResolver_StoreCursor(t *testing.T) {
require.NoError(t, err)

resolver := NewKVDBAccountsResolver(db)
expectedBlockHash, err := base58.Decode("8cv9oNupqL1wKogVHcQpqxC7QPy4SiaRghBiP5U2YYLp")
require.NoError(t, err)

err = resolver.StoreCursor(context.Background(), "r1", NewCursor(1, expectedBlockHash))
err = resolver.StoreCursor(context.Background(), "r1", NewCursor(1))
require.NoError(t, err)

c, err := resolver.GetCursor(context.Background(), "r1")
require.NoError(t, err)
require.Equal(t, uint64(1), c.slotNum)
require.Equal(t, expectedBlockHash, c.blockHash)
}

func TestKVDBAccountsResolver_StoreCursor_None(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/firesol/tablelookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func processAddressLookupE(logger *zap.Logger, tracer logging.Tracer) func(cmd *

if cursor == nil {
logger.Info("No cursor found, starting from beginning")
cursor = accountsresolver.NewCursor(154655004, nil)
cursor = accountsresolver.NewCursor(154655004)
}

fmt.Println("Cursor", cursor)
Expand Down

0 comments on commit d280035

Please sign in to comment.