Skip to content

Commit

Permalink
Implement HEAD tracker using RPC endpoint (#16)
Browse files Browse the repository at this point in the history
* Implement HEAD tracker using RPC endpoint

* Add test

* Code cleanup
  • Loading branch information
sosedoff authored May 11, 2022
1 parent 832e5cf commit 44e7196
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/firehose-cosmos/app_firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func init() {
cmd.Flags().Uint64("firehose-tracker-offset", 100, "Number of blocks for the bstream block resolver")
cmd.Flags().String("firehose-block-index-url", "", "If non-empty, will use this URL as a store to load index data used by some transforms")
cmd.Flags().IntSlice("firehose-block-index-sizes", []int{100000, 10000, 1000, 100}, "List of sizes for block indices")
cmd.Flags().String("firehose-rpc-head-tracker-url", "", "If non-empty, will use this URL to make RPC calls to status endpoint")
return nil
}

Expand All @@ -58,6 +59,12 @@ func init() {
tracker.AddGetter(bstream.BlockStreamHeadTarget, bstream.StreamHeadBlockRefGetter(blockstreamAddr))
}

// Enable HEAD tracker when block stream is not available, to allow firehose to serve static data
rpcHeadTrackerURL := viper.GetString("firehose-rpc-head-tracker-url")
if rpcHeadTrackerURL != "" && blockstreamAddr == "" {
tracker.AddGetter(bstream.BlockStreamHeadTarget, rpcHeadTracker(rpcHeadTrackerURL))
}

// Configure authentication (default is no auth)
authenticator, err := dauthAuthenticator.New(viper.GetString("common-auth-plugin"))
if err != nil {
Expand Down
66 changes: 66 additions & 0 deletions cmd/firehose-cosmos/rpc_head_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/streamingfast/bstream"
)

type statusResponse struct {
Result struct {
SyncInfo struct {
LatestBlockHash string `json:"latest_block_hash"`
LatestBlockHeight string `json:"latest_block_height"`
} `json:"sync_info"`
} `json:"result"`
}

func rpcHeadTracker(endpoint string) bstream.BlockRefGetter {
var lock sync.Mutex

return func(ctx context.Context) (bstream.BlockRef, error) {
lock.Lock()
defer lock.Unlock()

reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return nil, err
}
req.WithContext(reqCtx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
return nil, fmt.Errorf("endpoint returned status code %v", resp.StatusCode)
}

status := statusResponse{}
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return nil, err
}

if status.Result.SyncInfo.LatestBlockHeight == "" {
return nil, fmt.Errorf("latest block height is not available")
}

height, err := strconv.Atoi(status.Result.SyncInfo.LatestBlockHeight)
if err != nil {
return nil, err
}

return bstream.NewBlockRef(status.Result.SyncInfo.LatestBlockHash, uint64(height)), nil
}
}
60 changes: 60 additions & 0 deletions cmd/firehose-cosmos/rpc_head_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestRPCHeadTracker(t *testing.T) {
testStatusResp := `
{
"jsonrpc": "2.0",
"id": -1,
"result": {
"sync_info": {
"latest_block_hash": "03BAE86E2E0BAD1BBED598F2565AD9669DF36ED77E367F86BC0FCC51B7F178AE",
"latest_app_hash": "94C6600C41AA5AB759A1491F7A150A15D20372DE0704D90ACFE11751D4EA0A29",
"latest_block_height": "4493725",
"latest_block_time": "2022-05-11T13:20:31.23374692Z",
"earliest_block_hash": "3B6989296D0844863DC8957FF145AE7071B9970F91A608B71F20BA17A68162BD",
"earliest_app_hash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
"earliest_block_height": "3215230",
"earliest_block_time": "2021-06-18T17:00:00Z",
"catching_up": false
}
}
}`

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/bad":
w.WriteHeader(400)
case "/empty":
fmt.Fprintf(w, "{}")
default:
fmt.Fprintf(w, testStatusResp)
}
}))
defer server.Close()

tracker := rpcHeadTracker(server.URL + "/good")
ref, err := tracker(context.Background())
assert.NoError(t, err)
assert.Equal(t, uint64(4493725), ref.Num())
assert.Equal(t, "03BAE86E2E0BAD1BBED598F2565AD9669DF36ED77E367F86BC0FCC51B7F178AE", ref.ID())

tracker = rpcHeadTracker(server.URL + "/bad")
ref, err = tracker(context.Background())
assert.Equal(t, "endpoint returned status code 400", err.Error())
assert.Nil(t, ref)

tracker = rpcHeadTracker(server.URL + "/empty")
ref, err = tracker(context.Background())
assert.Equal(t, "latest block height is not available", err.Error())
assert.Nil(t, ref)
}

0 comments on commit 44e7196

Please sign in to comment.