Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add block indexing to Indexer API #1606

Merged
merged 22 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
66bd305
Minor rename of tx indexer
aaronbuchwald Sep 28, 2024
5eff712
Add ExecutedBlock type
aaronbuchwald Sep 28, 2024
461c6d7
Update externalsubscriber to use ExecutedBlock type
aaronbuchwald Sep 28, 2024
8fd6e02
Revert unrelated change
aaronbuchwald Sep 28, 2024
d8fb93b
Rename txDBIndexer and api
aaronbuchwald Sep 28, 2024
02085f8
Add blocks to indexer API
aaronbuchwald Sep 30, 2024
d826764
Replace publicfees alias with internalfees
aaronbuchwald Oct 1, 2024
c21b4f2
Address comments on ExecutedBlock fields
aaronbuchwald Oct 1, 2024
b7cb7d7
Convert StatefulBlock to ExecutedBlock once
aaronbuchwald Oct 1, 2024
26c1ecc
Move NewExecutedBlock off of the ExecutedBlock type
aaronbuchwald Oct 1, 2024
9ce793a
Move dimensionsFormatter to top of file
aaronbuchwald Oct 1, 2024
3aa92cd
Add blockID field to ExecutedBlock type
aaronbuchwald Oct 1, 2024
f3a5dde
Fix unpopulated blkID in unmarshal ExecutedBlock
aaronbuchwald Oct 1, 2024
aacfa93
Merge branch 'executed-block' into block-index
aaronbuchwald Oct 1, 2024
9312888
Merge branch 'main' into block-index
aaronbuchwald Oct 2, 2024
abf0ce9
Merge branch 'main' into block-index
containerman17 Oct 2, 2024
1d45a4a
Merge branch 'main' into block-index
containerman17 Oct 3, 2024
84fcfc0
Clean up block index test
aaronbuchwald Oct 3, 2024
cddae58
minor cleanup
aaronbuchwald Oct 3, 2024
2797164
Fix chaintest helper
aaronbuchwald Oct 3, 2024
0f4688e
Separate out indexer restart test
aaronbuchwald Oct 3, 2024
6f473b9
Merge branch 'main' into block-index
aaronbuchwald Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions api/indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/ava-labs/avalanchego/ids"

"github.com/ava-labs/hypersdk/api/jsonrpc"
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/codec"
"github.com/ava-labs/hypersdk/requester"
)

Expand All @@ -27,6 +29,54 @@ type Client struct {
requester *requester.EndpointRequester
}

// Use a separate type that only decodes the block bytes because we cannot decode block JSON
// due to Actions/Auth interfaces included in the block's transactions.
type GetBlockClientResponse struct {
BlockBytes codec.Bytes `json:"blockBytes"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevents a Block from being unmarshalled from JSON? It's the Auth and Action interfaces, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even have this separate type? If we're only going to use the block bytes from the response since we can't unmarshal the actual block type why don't we just drop the block field from the server-side response struct entirely and get rid of this second struct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to support clients (JS) that prefer to handle JSON than the binary representation. For golang, we actually cannot handle the JSON using the current requester because it will simply unmarshal the json into the struct and we can't unmarshal JSON directly into the block type due to the Action and Auth interfaces.

Have a note here #1606 (comment) to add an encoding parameter that we could block this PR on if you prefer.

}

func (c *Client) GetBlock(ctx context.Context, blkID ids.ID, parser chain.Parser) (*chain.ExecutedBlock, error) {
resp := GetBlockClientResponse{}
err := c.requester.SendRequest(
ctx,
"getBlock",
&GetBlockRequest{BlockID: blkID},
&resp,
)
if err != nil {
return nil, err
}
return chain.UnmarshalExecutedBlock(resp.BlockBytes, parser)
}

func (c *Client) GetBlockByHeight(ctx context.Context, height uint64, parser chain.Parser) (*chain.ExecutedBlock, error) {
resp := GetBlockClientResponse{}
err := c.requester.SendRequest(
ctx,
"getBlockByHeight",
&GetBlockByHeightRequest{Height: height},
&resp,
)
if err != nil {
return nil, err
}
return chain.UnmarshalExecutedBlock(resp.BlockBytes, parser)
}

func (c *Client) GetLatestBlock(ctx context.Context, parser chain.Parser) (*chain.ExecutedBlock, error) {
resp := GetBlockClientResponse{}
err := c.requester.SendRequest(
ctx,
"getLatestBlock",
nil,
&resp,
)
if err != nil {
return nil, err
}
return chain.UnmarshalExecutedBlock(resp.BlockBytes, parser)
}

func (c *Client) GetTx(ctx context.Context, txID ids.ID) (GetTxResponse, bool, error) {
resp := GetTxResponse{}
err := c.requester.SendRequest(
Expand Down
251 changes: 251 additions & 0 deletions api/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package indexer

import (
"encoding/binary"
"errors"
"fmt"
"path/filepath"
"sync/atomic"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/wrappers"

"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/codec"
"github.com/ava-labs/hypersdk/consts"
"github.com/ava-labs/hypersdk/event"
"github.com/ava-labs/hypersdk/fees"
"github.com/ava-labs/hypersdk/internal/cache"
"github.com/ava-labs/hypersdk/internal/pebble"
)

const maxBlockWindow uint64 = 1_000_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we limit it? If someone wants to run an indexer that stores all transactions and they have 20TB of NVMe, why should we stop them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't maintain a blockID -> height mapping on disk to avoid using hashes as keys. This means we iterate over all the data on startup, so we set a reasonable guardrail.

If you want to store all transactions, you need to scale horizontally rather than writing to a single pebbledb instance. This won't work at scale, so I'd prefer to set a reasonable guardrail than allow users to try and scale something that wasn't meant to.


var errBlockNotFound = errors.New("block not found")

var _ event.Subscription[*chain.ExecutedBlock] = (*Indexer)(nil)

type Indexer struct {
blockDB *pebble.Database // height -> block bytes
blockIDToHeight *cache.FIFO[ids.ID, uint64]
blockHeightToBlock *cache.FIFO[uint64, *chain.ExecutedBlock]
blockWindow uint64 // Maximum window of blocks to retain
lastHeight atomic.Uint64
parser chain.Parser

// ID -> timestamp, success, units, fee, outputs
txDB *pebble.Database
}

func NewIndexer(path string, parser chain.Parser, blockWindow uint64) (*Indexer, error) {
if blockWindow > maxBlockWindow {
return nil, fmt.Errorf("block window %d exceeds maximum %d", blockWindow, maxBlockWindow)
}
txDB, _, err := pebble.New(filepath.Join(path, "tx"), pebble.NewDefaultConfig())
if err != nil {
return nil, err
}
blockDB, _, err := pebble.New(filepath.Join(path, "block"), pebble.NewDefaultConfig())
if err != nil {
return nil, err
}

blockIDCache, err := cache.NewFIFO[ids.ID, uint64](int(blockWindow))
if err != nil {
return nil, err
}
blockCache, err := cache.NewFIFO[uint64, *chain.ExecutedBlock](int(blockWindow))
if err != nil {
return nil, err
}
i := &Indexer{
blockDB: blockDB,
blockIDToHeight: blockIDCache,
blockHeightToBlock: blockCache,
blockWindow: blockWindow,
parser: parser,
txDB: txDB,
}
return i, i.initBlocks()
}

func (i *Indexer) initBlocks() error {
// Load blockID <-> height mapping
iter := i.blockDB.NewIterator()
defer iter.Release()

var lastHeight uint64
for iter.Next() {
value := iter.Value()
blk, err := chain.UnmarshalExecutedBlock(value, i.parser)
if err != nil {
return err
}

i.blockIDToHeight.Put(blk.ID(), blk.Hght)
i.blockHeightToBlock.Put(blk.Hght, blk)
lastHeight = blk.Hght
}
if err := iter.Error(); err != nil {
return err
}
iter.Release()

i.lastHeight.Store(lastHeight)

if lastHeight > i.blockWindow {
lastRetainedHeight := lastHeight - i.blockWindow
lastRetainedHeightBytes := binary.BigEndian.AppendUint64(nil, lastRetainedHeight)
if err := i.blockDB.DeleteRange(nil, lastRetainedHeightBytes); err != nil {
return err
}
}

return nil
}

func (i *Indexer) Accept(blk *chain.ExecutedBlock) error {
if err := i.storeTransactions(blk); err != nil {
return err
}
return i.storeBlock(blk)
}

func (i *Indexer) storeBlock(blk *chain.ExecutedBlock) error {
if i.blockWindow == 0 {
return nil
}

executedBlkBytes, err := blk.Marshal()
if err != nil {
return err
}

if err := i.blockDB.Put(binary.BigEndian.AppendUint64(nil, blk.Hght), executedBlkBytes); err != nil {
return err
}
// Ignore overflows in key calculation which will simply delete a non-existent key
if err := i.blockDB.Delete(binary.BigEndian.AppendUint64(nil, blk.Hght-i.blockWindow)); err != nil {
return err
}

i.blockIDToHeight.Put(blk.ID(), blk.Hght)
i.blockHeightToBlock.Put(blk.Hght, blk)
i.lastHeight.Store(blk.Hght)

return nil
}

func (i *Indexer) GetLatestBlock() (*chain.ExecutedBlock, error) {
return i.GetBlockByHeight(i.lastHeight.Load())
}

func (i *Indexer) GetBlockByHeight(height uint64) (*chain.ExecutedBlock, error) {
blk, ok := i.blockHeightToBlock.Get(height)
if !ok {
return nil, fmt.Errorf("%w: height=%d", errBlockNotFound, height)
}
return blk, nil
}

func (i *Indexer) GetBlock(blkID ids.ID) (*chain.ExecutedBlock, error) {
height, ok := i.blockIDToHeight.Get(blkID)
if !ok {
return nil, fmt.Errorf("%w: %s", errBlockNotFound, blkID)
}
return i.GetBlockByHeight(height)
}

func (i *Indexer) storeTransactions(blk *chain.ExecutedBlock) error {
batch := i.txDB.NewBatch()
defer batch.Reset()

for j, tx := range blk.Txs {
result := blk.Results[j]
if err := i.storeTransaction(
batch,
tx.ID(),
blk.Tmstmp,
result.Success,
result.Units,
result.Fee,
result.Outputs,
); err != nil {
return err
}
}

return batch.Write()
}

func (*Indexer) storeTransaction(
batch database.KeyValueWriter,
txID ids.ID,
timestamp int64,
success bool,
units fees.Dimensions,
fee uint64,
outputs [][]byte,
) error {
outputLength := consts.ByteLen // Single byte containing number of outputs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would either store the entire transaction or just the result. There's no need for a custom marshaler in this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and think storing the entire tx + result makes sense here #1626

This is not changed in this PR though, so don't think it needs to be in scope

for _, output := range outputs {
outputLength += consts.Uint32Len + len(output)
}
txResultLength := consts.Uint64Len + consts.BoolLen + fees.DimensionsLen + consts.Uint64Len + outputLength

writer := codec.NewWriter(txResultLength, consts.NetworkSizeLimit)
writer.PackUint64(uint64(timestamp))
writer.PackBool(success)
writer.PackFixedBytes(units.Bytes())
writer.PackUint64(fee)
writer.PackByte(byte(len(outputs)))
for _, output := range outputs {
writer.PackBytes(output)
}
if err := writer.Err(); err != nil {
return err
}
return batch.Put(txID[:], writer.Bytes())
}

func (i *Indexer) GetTransaction(txID ids.ID) (bool, int64, bool, fees.Dimensions, uint64, [][]byte, error) {
v, err := i.txDB.Get(txID[:])
if errors.Is(err, database.ErrNotFound) {
return false, 0, false, fees.Dimensions{}, 0, nil, nil
}
if err != nil {
return false, 0, false, fees.Dimensions{}, 0, nil, err
}
reader := codec.NewReader(v, consts.NetworkSizeLimit)
timestamp := reader.UnpackUint64(true)
success := reader.UnpackBool()
dimensionsBytes := make([]byte, fees.DimensionsLen)
reader.UnpackFixedBytes(fees.DimensionsLen, &dimensionsBytes)
fee := reader.UnpackUint64(true)
numOutputs := int(reader.UnpackByte())
outputs := make([][]byte, numOutputs)
for i := range outputs {
outputs[i] = reader.UnpackLimitedBytes(consts.NetworkSizeLimit)
}
if err := reader.Err(); err != nil {
return false, 0, false, fees.Dimensions{}, 0, nil, err
}
dimensions, err := fees.UnpackDimensions(dimensionsBytes)
if err != nil {
return false, 0, false, fees.Dimensions{}, 0, nil, err
}
return true, int64(timestamp), success, dimensions, fee, outputs, nil
}

func (i *Indexer) Close() error {
errs := wrappers.Errs{}
errs.Add(
i.txDB.Close(),
i.blockDB.Close(),
)
return errs.Err
}
Loading
Loading