Skip to content

Commit

Permalink
handle multiple table extends in multiple trx of same block
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Sep 11, 2023
1 parent 4dc2a8c commit 050db45
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 87 deletions.
20 changes: 9 additions & 11 deletions accountresolver/keyer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ var Keys keyer

type keyer struct{}

func (keyer) extendTableLookup(key Account, blockNum uint64, trxHash []byte) (out []byte) {
out = make([]byte, 1+32+8+64)
func (keyer) extendTableLookup(key Account, blockNum uint64) (out []byte) {
out = make([]byte, 1+32+8)
out[0] = tableAccountLookup
copy(out[1:33], key)
binary.BigEndian.PutUint64(out[33:41], math.MaxUint64-blockNum)
copy(out[41:], trxHash)
return out
}

Expand All @@ -34,8 +33,8 @@ func (keyer) tableLookupPrefix(key Account) (out []byte) {
return out
}

func (keyer) unpackTableLookup(key []byte) (Account, uint64, []byte) {
return key[1:33], math.MaxUint64 - binary.BigEndian.Uint64(key[33:41]), key[41:]
func (keyer) unpackTableLookup(key []byte) (Account, uint64) {
return key[1:33], math.MaxUint64 - binary.BigEndian.Uint64(key[33:])
}

func (keyer) cursor(readerName string) (out []byte) {
Expand All @@ -52,14 +51,13 @@ func (keyer) transactionSeenPrefix(blockNum uint64) []byte {
return out
}

func (keyer) transactionSeen(blockNum uint64, trxHash []byte) []byte {
out := make([]byte, 1+8+64)
func (keyer) knownTransaction(trxHash []byte) []byte {
out := make([]byte, 1+64)
out[0] = tableKnownTransaction
binary.BigEndian.PutUint64(out[1:9], math.MaxUint64-blockNum)
copy(out[9:], trxHash)
copy(out[1:], trxHash)
return out
}

func (keyer) unpackTransactionSeen(key []byte) (blockNum uint64, trxHash []byte) {
return binary.BigEndian.Uint64(key[1:9]), key[9:]
func (keyer) unpackKnownTransaction(key []byte) (trxHash []byte) {
return key[1:]
}
12 changes: 4 additions & 8 deletions accountresolver/keyer_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package accountsresolver

import (
"github.com/mr-tron/base58"
"testing"

"github.com/mr-tron/base58"
"github.com/stretchr/testify/require"
)

func Test_Keyer_ExtendTableLookup(t *testing.T) {
var a1 = "2iMPmzAgkUWRjq1E5C4gAFA7bDKCBUrUbogGd8dau5XP"
trxHashBytes, err := base58.Decode("VSod23zXfXD7RY9mPDuAJBkb674gZJ6n3CZUKT58Y4wCzFdcLLouCJkgNsG24Srkez7JK3mp6ozCiirojSbBG5u")
require.NoError(t, err)

key := Keys.extendTableLookup(accountFromBase58(t, a1), 1, trxHashBytes)
key := Keys.extendTableLookup(accountFromBase58(t, a1), 1)
expectedKey := []byte{tableAccountLookup}
expectedKey = append(expectedKey, accountFromBase58(t, a1)...)
expectedKey = append(expectedKey, []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfe}...)
expectedKey = append(expectedKey, trxHashBytes...)
require.Equal(t, expectedKey, key)
}

Expand All @@ -30,11 +27,10 @@ func Test_Keyer_Cusor(t *testing.T) {
func Test_Keyer_UnpackTableLookup(t *testing.T) {
expectedAccount := "13Y2WX93BgJa7xhEQHokNkuVoFgk4p9vwAAT3aTkj87"
expectedBlockNum := uint64(157564936)
expectedTrxHash := "VSod23zXfXD7RY9mPDuAJBkb674gZJ6n3CZUKT58Y4wCzFdcLLouCJkgNsG24Srkez7JK3mp6ozCiirojSbBG5u"

key := []byte{0, 0, 2, 221, 194, 243, 179, 183, 173, 114, 231, 92, 149, 174, 86, 70, 107, 79, 77, 133, 179, 2, 64, 248, 58, 81, 225, 250, 60, 184, 217, 59, 252, 255, 255, 255, 255, 246, 155, 191, 247, 24, 135, 160, 185, 200, 241, 239, 246, 95, 5, 218, 34, 45, 47, 87, 212, 109, 231, 185, 43, 190, 44, 64, 140, 192, 109, 59, 58, 213, 188, 210, 224, 94, 111, 208, 187, 34, 20, 205, 102, 155, 253, 129, 6, 146, 119, 140, 163, 187, 33, 35, 154, 95, 122, 98, 226, 246, 6, 133, 222, 231, 221, 21, 8}

acc, blockNum, hashBytes := Keys.unpackTableLookup(key)
acc, blockNum := Keys.unpackTableLookup(key)
require.Equal(t, expectedAccount, base58.Encode(acc))
require.Equal(t, expectedBlockNum, blockNum)
require.Equal(t, expectedTrxHash, base58.Encode(hashBytes))
}
9 changes: 5 additions & 4 deletions accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"bytes"
"context"
"fmt"
"io"
"strconv"
"strings"

"github.com/mr-tron/base58"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dstore"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/solana-go/programs/addresstablelookup"
"go.uber.org/zap"
"io"
"strconv"
"strings"
)

var AddressTableLookupAccountProgram = mustFromBase58("AddressLookupTab1e1111111111111111111111111")
Expand Down Expand Up @@ -163,7 +164,7 @@ func (p *Processor) manageAddressLookup(ctx context.Context, blockNum uint64, er

func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx *pbsol.ConfirmedTransaction) error {
for _, addressTableLookup := range trx.Transaction.Message.AddressTableLookups {
accs, _, _, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey)
accs, _, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey)
p.logger.Info("Resolve address table lookup", zap.String("account", base58.Encode(addressTableLookup.AccountKey)), zap.Int("count", len(accs)))
if err != nil {
return fmt.Errorf("resolving address table %s at block %d: %w", base58.Encode(addressTableLookup.AccountKey), blockNum, err)
Expand Down
40 changes: 21 additions & 19 deletions accountresolver/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package accountsresolver

import (
"context"
"os"
"testing"

pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
kvstore "github.com/streamingfast/kvdb/store"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"os"
"testing"
)

func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) {
Expand Down Expand Up @@ -70,13 +71,13 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) {
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount)
accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount)
require.Equal(t, expectedCreatedAccounts, accounts)
}

func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) {
tableLookupAccount := accountFromBase58(t, "6pyNrJXyGdDDA3esoLEHJ2uoohcdf2xGT11acfmfyA7Q")
tableLookupToExtendIndexFromAccountKeys := byte(2)
tableLookupToExtendIndex := byte(2)

expectedCreatedAccounts := fromBase58Strings(t,
"He3iAEV5rYjv6Xf7PxKro19eVrC3QAcdic5CF2D2obPt",
Expand Down Expand Up @@ -124,7 +125,7 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) {
Instructions: []*pbsol.InnerInstruction{
{
ProgramIdIndex: 4,
Accounts: []byte{tableLookupToExtendIndexFromAccountKeys, 15, 0, 3},
Accounts: []byte{tableLookupToExtendIndex, 15, 0, 3},
Data: append([]byte{2, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0}, encodeAccounts(expectedCreatedAccounts)...),
},
},
Expand All @@ -149,15 +150,15 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) {
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, _, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount)
accounts, _, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount)
require.Equal(t, expectedCreatedAccounts, accounts)
}

func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLookupTableProgramID(t *testing.T) {
tableAccountToExtend := accountFromBase58(t, "GcjJQhD7L7esCrjNmkPM8oitsFXRpbWo11LMWfLH89u3")
tableLookupToExtendIndexFromAccountKeys := byte(0)
tableLookupAddressToExtend := accountFromBase58(t, "GcjJQhD7L7esCrjNmkPM8oitsFXRpbWo11LMWfLH89u3")
tableLookupAddressToExtendIndex := byte(0)
tableLookupAddressToResolve := accountFromBase58(t, "6pyNrJXyGdDDA3esoLEHJ2uoohcdf2xGT11acfmfyA7Q")

tableLookupAccountInTransaction := accountFromBase58(t, "6pyNrJXyGdDDA3esoLEHJ2uoohcdf2xGT11acfmfyA7Q")
expectedCreatedAccounts := fromBase58Strings(t,
"PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY",
"7aDTsspkQNGKmrexAN7FLx9oxU3iPczSSvHNggyuqYkR",
Expand All @@ -173,10 +174,10 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
Transactions: []*pbsol.ConfirmedTransaction{
{
Transaction: &pbsol.Transaction{
Signatures: [][]byte{{0}},
Signatures: [][]byte{{0x01}},
Message: &pbsol.Message{
AccountKeys: [][]byte{
tableAccountToExtend,
tableLookupAddressToExtend,
SystemProgram,
},
Instructions: []*pbsol.CompiledInstruction{
Expand All @@ -186,7 +187,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
},
AddressTableLookups: []*pbsol.MessageAddressTableLookup{
{
AccountKey: tableLookupAccountInTransaction,
AccountKey: tableLookupAddressToResolve,
WritableIndexes: []byte{0},
},
},
Expand All @@ -199,7 +200,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
Instructions: []*pbsol.InnerInstruction{
{
ProgramIdIndex: 2,
Accounts: []byte{tableLookupToExtendIndexFromAccountKeys, 0, 0, 2},
Accounts: []byte{tableLookupAddressToExtendIndex, 0, 0, 2},
Data: append([]byte{2, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0}, encodeAccounts(expectedCreatedAccounts)...),
},
},
Expand All @@ -222,21 +223,21 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
resolver := NewKVDBAccountsResolver(db)
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db), zap.NewNop())

err = p.accountsResolver.Extend(context.Background(), 185_914_860, []byte{0x00}, tableLookupAccountInTransaction, Accounts{AddressTableLookupAccountProgram})
err = p.accountsResolver.Extend(context.Background(), 185_914_860, []byte{0x00}, tableLookupAddressToResolve, Accounts{AddressTableLookupAccountProgram})
require.NoError(t, err)
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, _, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend)
accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAddressToExtend)
require.Equal(t, expectedCreatedAccounts, accounts)
}

func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTableLookup(t *testing.T) {
tableAccountToExtend := accountFromBase58(t, "GcjJQhD7L7esCrjNmkPM8oitsFXRpbWo11LMWfLH89u3")
tableLookupToExtendIndexFromAccountKeys := byte(3)
tableLookupToExtendIndex := byte(3)

tableLookupAccountInTransaction := accountFromBase58(t, "6pyNrJXyGdDDA3esoLEHJ2uoohcdf2xGT11acfmfyA7Q")
expectedCreatedAccounts := fromBase58Strings(t,
Expand All @@ -254,7 +255,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
Transactions: []*pbsol.ConfirmedTransaction{
{
Transaction: &pbsol.Transaction{
Signatures: [][]byte{{0}},
Signatures: [][]byte{{0x01}},
Message: &pbsol.Message{
AccountKeys: [][]byte{
accountFromBase58(t, "DEM7JJFjemWE5tjt3aC9eeTsGtTnyAs95EWhY2bM6n1o"),
Expand Down Expand Up @@ -283,7 +284,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
Instructions: []*pbsol.InnerInstruction{
{
ProgramIdIndex: 2,
Accounts: []byte{tableLookupToExtendIndexFromAccountKeys, 0, 0, 0},
Accounts: []byte{tableLookupToExtendIndex, 0, 0, 0},
Data: append([]byte{2, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0}, encodeAccounts(expectedCreatedAccounts)...),
},
},
Expand All @@ -307,6 +308,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
resolver := NewKVDBAccountsResolver(db)
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db), zap.NewNop())

// Pre populate the table lookup account with the address table lookup program
err = p.accountsResolver.Extend(context.Background(), 185_914_860, []byte{0x00}, tableLookupAccountInTransaction, Accounts{tableAccountToExtend})
require.NoError(t, err)
err = resolver.store.FlushPuts(context.Background())
Expand All @@ -315,6 +317,6 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, _, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend)
accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend)
require.Equal(t, expectedCreatedAccounts, accounts)
}
58 changes: 22 additions & 36 deletions accountresolver/resolver.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package accountsresolver

import (
"bytes"
"context"
"encoding/binary"
"errors"
Expand All @@ -12,9 +11,7 @@ import (

type AccountsResolver interface {
Extend(ctx context.Context, blockNum uint64, trxHash []byte, key Account, accounts Accounts) error
Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, []byte, error)
ResolveSeenTransaction(ctx context.Context, atBlockNum uint64, trxHash []byte) ([]byte, error)
SeenTransaction(ctx context.Context, atBlockNum uint64, trxHash []byte) error
Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, error)
StoreCursor(ctx context.Context, readerName string, cursor *Cursor) error
GetCursor(ctx context.Context, readerName string) (*Cursor, error)
}
Expand All @@ -30,62 +27,51 @@ func NewKVDBAccountsResolver(store store.KVStore) *KVDBAccountsResolver {
}

func (r *KVDBAccountsResolver) Extend(ctx context.Context, blockNum uint64, trxHash []byte, key Account, accounts Accounts) error {
currentAccounts, resolveAtBlockNum, keyTrxHash, err := r.Resolve(ctx, blockNum, key)
if err != nil {
return fmt.Errorf("retreiving last accounts for key %q: %w", key, err)
}

if resolveAtBlockNum == blockNum && bytes.Equal(trxHash, keyTrxHash) {
// already extended at this block, nothing to do
if !r.isKnownTransaction(ctx, trxHash) {
return nil
}

payload := encodeAccounts(append(currentAccounts, accounts...))
err = r.store.Put(ctx, Keys.extendTableLookup(key, blockNum, trxHash), payload)
currentAccounts, _, err := r.Resolve(ctx, blockNum, key)
if err != nil {
return fmt.Errorf("retreiving last accounts for key %q: %w", key, err)
}
extendedAccount := append(currentAccounts, accounts...)
payload := encodeAccounts(extendedAccount)
err = r.store.Put(ctx, Keys.extendTableLookup(key, blockNum), payload)
if err != nil {
return fmt.Errorf("writing extended accounts for key %q: %w", key, err)
}

err = r.store.Put(ctx, Keys.knownTransaction(trxHash), []byte{})
if err != nil {
return fmt.Errorf("flushing extended accounts for key %q: %w", key, err)
return fmt.Errorf("writing known transaction %x: %w", trxHash, err)
}

return nil
}

func (r *KVDBAccountsResolver) Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, []byte, error) {
func (r *KVDBAccountsResolver) Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, error) {
keyBytes := Keys.tableLookupPrefix(key)
iter := r.store.Prefix(ctx, keyBytes, store.Unlimited)
if iter.Err() != nil {
return nil, 0, nil, fmt.Errorf("querying accounts for key %q: %w", key, iter.Err())
return nil, 0, fmt.Errorf("querying accounts for key %q: %w", key, iter.Err())
}

for iter.Next() {
item := iter.Item()
_, keyBlockNum, hash := Keys.unpackTableLookup(item.Key)
_, keyBlockNum := Keys.unpackTableLookup(item.Key)
if keyBlockNum <= atBlockNum {
return decodeAccounts(item.Value), keyBlockNum, hash, nil
return decodeAccounts(item.Value), keyBlockNum, nil
}
}

return nil, 0, nil, nil
return nil, 0, nil
}

func (r *KVDBAccountsResolver) ResolveSeenTransaction(ctx context.Context, atBlockNum uint64, trxHash []byte) ([]byte, error) {
val, err := r.store.Get(ctx, Keys.transactionSeen(atBlockNum, trxHash))
if err != nil {

}
if val != nil {

}
}

func (r *KVDBAccountsResolver) SeenTransaction(ctx context.Context, atBlockNum uint64, trxHash []byte) error {

return nil
//err := r.store.Put()
//
//return nil, nil
func (r *KVDBAccountsResolver) isKnownTransaction(ctx context.Context, transactionHash []byte) bool {
trxKey := Keys.knownTransaction(transactionHash)
_, err := r.store.Get(ctx, trxKey)
return errors.Is(err, store.ErrNotFound)
}

func (r *KVDBAccountsResolver) StoreCursor(ctx context.Context, readerName string, cursor *Cursor) error {
Expand All @@ -96,7 +82,7 @@ func (r *KVDBAccountsResolver) StoreCursor(ctx context.Context, readerName strin
return fmt.Errorf("writing cursor: %w", err)
}

err = r.store.FlushPuts(ctx)
err = r.store.FlushPuts(ctx) //todo: move that up in call stack
if err != nil {
return fmt.Errorf("flushing cursor: %w", err)
}
Expand Down
Loading

0 comments on commit 050db45

Please sign in to comment.