Skip to content

Commit

Permalink
Get all ledgers in once call from the db
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1702 committed Sep 26, 2024
1 parent 43e00fa commit 8f5ba83
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 43 deletions.
20 changes: 20 additions & 0 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type StreamLedgerFn func(xdr.LedgerCloseMeta) error

type LedgerReader interface {
GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error)
BatchGetLedgers(ctx context.Context, sequence uint32, batchSize uint) ([]xdr.LedgerCloseMeta, error)
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
StreamLedgerRange(ctx context.Context, startLedger uint32, endLedger uint32, f StreamLedgerFn) error
Expand Down Expand Up @@ -103,6 +104,25 @@ func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.Ledge
}
}

// BatchGetLedgers fetches ledgers in batches from the db.
func (r ledgerReader) BatchGetLedgers(ctx context.Context, sequence uint32,
batchSize uint,
) ([]xdr.LedgerCloseMeta, error) {
sql := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(sq.And{
sq.GtOrEq{"sequence": sequence},
sq.LtOrEq{"sequence": sequence + uint32(batchSize) - 1},
})

var results []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &results, sql); err != nil {
return nil, err
}

return results, nil
}

// GetLedgerRange pulls the min/max ledger sequence numbers from the meta table.
func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
r.db.cache.RLock()
Expand Down
6 changes: 6 additions & 0 deletions cmd/soroban-rpc/internal/db/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func (m *MockLedgerReader) GetLedger(_ context.Context, sequence uint32) (xdr.Le
return *lcm, true, nil
}

func (m *MockLedgerReader) BatchGetLedgers(_ context.Context, _ uint32,
_ uint,
) ([]xdr.LedgerCloseMeta, error) {
return []xdr.LedgerCloseMeta{}, nil
}

func (m *MockLedgerReader) StreamAllLedgers(_ context.Context, _ StreamLedgerFn) error {
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (ledgerReader *ConstantLedgerReader) GetLedger(_ context.Context,
return createLedger(sequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes), true, nil
}

func (ledgerReader *ConstantLedgerReader) BatchGetLedgers(_ context.Context, _ uint32,
_ uint,
) ([]xdr.LedgerCloseMeta, error) {
return []xdr.LedgerCloseMeta{}, nil
}

func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(_ context.Context, _ db.StreamLedgerFn) error {
return nil
}
Expand Down
59 changes: 33 additions & 26 deletions cmd/soroban-rpc/internal/methods/get_ledgers.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,41 +175,48 @@ func (h ledgersHandler) handleLimit(limit uint) uint {
return h.defaultLimit
}

// fetchLedgers fetches ledgers from the DB beginning with the start request ledger until the max request limit.
// fetchLedgers fetches ledgers from the DB for the range [start, start+limit-1]
func (h ledgersHandler) fetchLedgers(ctx context.Context, start uint32,
limit uint, format string,
) ([]LedgerInfo, error) {
ledgers := make([]LedgerInfo, 0, limit)
for ledgerSeq := start; uint(len(ledgers)) < limit; ledgerSeq++ {
ledger, found, err := h.ledgerReader.GetLedger(ctx, ledgerSeq)
if err != nil {
return nil, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
ledgers, err := h.ledgerReader.BatchGetLedgers(ctx, start, limit)
if err != nil {
return nil, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: fmt.Sprintf("error fetching ledgers from db: %v", err),
}
if !found {
return nil, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: fmt.Sprintf("database does not contain metadata for ledger: %d", ledgerSeq),
}
}

// No more ledgers available
if len(ledgers) == 0 {
return nil, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: fmt.Sprintf("no ledgers found starting from sequence: %d", start),
}
}

ledgerResponse, err := h.getMetaAndHeaderInfo(ledger, format)
result := make([]LedgerInfo, 0, limit)
for _, ledger := range ledgers {
if uint(len(result)) >= limit {
break
}

ledgerInfo, err := h.parseLedgerInfo(ledger, format)
if err != nil {
return nil, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
Message: fmt.Sprintf("error processing ledger %d: %v", ledger.LedgerSequence(), err),
}
}
ledgers = append(ledgers, ledgerResponse)
result = append(result, ledgerInfo)
}
return ledgers, nil

return result, nil
}

// getMetaAndHeaderInfo extracts and formats the ledger metadata and header information.
func (h ledgersHandler) getMetaAndHeaderInfo(ledger xdr.LedgerCloseMeta, format string) (LedgerInfo, error) {
ledgerResponse := LedgerInfo{
// parseLedgerInfo extracts and formats the ledger metadata and header information.
func (h ledgersHandler) parseLedgerInfo(ledger xdr.LedgerCloseMeta, format string) (LedgerInfo, error) {
ledgerInfo := LedgerInfo{
Hash: ledger.LedgerHash().HexString(),
Sequence: ledger.LedgerSequence(),
LedgerCloseTime: ledger.LedgerCloseTime(),
Expand All @@ -233,11 +240,11 @@ func (h ledgersHandler) getMetaAndHeaderInfo(ledger xdr.LedgerCloseMeta, format
return LedgerInfo{}, fmt.Errorf("could not convert ledger metadata and "+
"header to JSON: %w", convErr)
}
ledgerResponse.LedgerCloseMetaJSON = closeMetaJSON
ledgerResponse.LedgerHeaderJSON = headerJSON
ledgerInfo.LedgerCloseMetaJSON = closeMetaJSON
ledgerInfo.LedgerHeaderJSON = headerJSON
default:
ledgerResponse.LedgerCloseMeta = base64.StdEncoding.EncodeToString(closeMetaB)
ledgerResponse.LedgerHeader = base64.StdEncoding.EncodeToString(headerB)
ledgerInfo.LedgerCloseMeta = base64.StdEncoding.EncodeToString(closeMetaB)
ledgerInfo.LedgerHeader = base64.StdEncoding.EncodeToString(headerB)
}
return ledgerResponse, nil
return ledgerInfo, nil
}
76 changes: 59 additions & 17 deletions cmd/soroban-rpc/internal/methods/get_ledgers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -24,9 +25,9 @@ var expectedLedgerInfo = LedgerInfo{
func setupTestDB(t *testing.T, numLedgers int) *db.DB {
testDB := NewTestDB(t)
daemon := interfaces.MakeNoOpDeamon()
for sequence := range numLedgers {
for sequence := 1; sequence <= numLedgers; sequence++ {
ledgerCloseMeta := txMeta(uint32(sequence)-100, true)
tx, err := db.NewReadWriter(log.DefaultLogger, testDB, daemon, 150, 15, passphrase).NewTx(context.Background())
tx, err := db.NewReadWriter(log.DefaultLogger, testDB, daemon, 150, 100, passphrase).NewTx(context.Background())
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(ledgerCloseMeta))
require.NoError(t, tx.Commit(ledgerCloseMeta))
Expand All @@ -35,7 +36,7 @@ func setupTestDB(t *testing.T, numLedgers int) *db.DB {
}

func TestGetLedgers_DefaultLimit(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 50)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand All @@ -49,8 +50,8 @@ func TestGetLedgers_DefaultLimit(t *testing.T) {
response, err := handler.getLedgers(context.TODO(), request)
require.NoError(t, err)

assert.Equal(t, uint32(10), response.LatestLedger)
assert.Equal(t, ledgerCloseTime(10), response.LatestLedgerCloseTime)
assert.Equal(t, uint32(50), response.LatestLedger)
assert.Equal(t, ledgerCloseTime(50), response.LatestLedgerCloseTime)
assert.Equal(t, "5", response.Cursor)
assert.Len(t, response.Ledgers, 5)
assert.Equal(t, uint32(1), response.Ledgers[0].Sequence)
Expand All @@ -61,7 +62,7 @@ func TestGetLedgers_DefaultLimit(t *testing.T) {
}

func TestGetLedgers_CustomLimit(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 50)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand All @@ -71,22 +72,22 @@ func TestGetLedgers_CustomLimit(t *testing.T) {
request := GetLedgersRequest{
StartLedger: 1,
Pagination: &PaginationOptions{
Limit: 3,
Limit: 41,
},
}

response, err := handler.getLedgers(context.TODO(), request)
require.NoError(t, err)

assert.Equal(t, uint32(10), response.LatestLedger)
assert.Equal(t, "3", response.Cursor)
assert.Len(t, response.Ledgers, 3)
assert.Equal(t, uint32(50), response.LatestLedger)
assert.Equal(t, "41", response.Cursor)
assert.Len(t, response.Ledgers, 41)
assert.Equal(t, uint32(1), response.Ledgers[0].Sequence)
assert.Equal(t, uint32(3), response.Ledgers[2].Sequence)
assert.Equal(t, uint32(41), response.Ledgers[40].Sequence)
}

func TestGetLedgers_WithCursor(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 10)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand All @@ -111,7 +112,7 @@ func TestGetLedgers_WithCursor(t *testing.T) {
}

func TestGetLedgers_InvalidStartLedger(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 10)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand All @@ -128,7 +129,7 @@ func TestGetLedgers_InvalidStartLedger(t *testing.T) {
}

func TestGetLedgers_LimitExceedsMaxLimit(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 10)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand All @@ -148,7 +149,7 @@ func TestGetLedgers_LimitExceedsMaxLimit(t *testing.T) {
}

func TestGetLedgers_InvalidCursor(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 10)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand All @@ -167,7 +168,7 @@ func TestGetLedgers_InvalidCursor(t *testing.T) {
}

func TestGetLedgers_JSONFormat(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 10)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand Down Expand Up @@ -219,7 +220,7 @@ func TestGetLedgers_NoLedgers(t *testing.T) {
}

func TestGetLedgers_CursorGreaterThanLatestLedger(t *testing.T) {
testDB := setupTestDB(t, 11)
testDB := setupTestDB(t, 10)
handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 100,
Expand All @@ -236,3 +237,44 @@ func TestGetLedgers_CursorGreaterThanLatestLedger(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), "cursor must be between")
}

func BenchmarkGetLedgers(b *testing.B) {
testDB := NewTestDB(b)
logger := log.DefaultLogger
writer := db.NewReadWriter(logger, testDB, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase)
write, err := writer.NewTx(context.TODO())
require.NoError(b, err)

lcms := make([]xdr.LedgerCloseMeta, 0, 100_000)
for i := range cap(lcms) {
lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0))
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1]))

handler := ledgersHandler{
ledgerReader: db.NewLedgerReader(testDB),
maxLimit: 200,
defaultLimit: 5,
}

request := GetLedgersRequest{
StartLedger: 1334,
Pagination: &PaginationOptions{
Limit: 200, // using the current maximum request limit for getLedgers endpoint
},
}

b.ResetTimer()
for range b.N {
response, err := handler.getLedgers(context.TODO(), request)
require.NoError(b, err)
assert.Equal(b, uint32(1334), response.Ledgers[0].Sequence)
assert.Equal(b, uint32(1533), response.Ledgers[199].Sequence)
}
}

0 comments on commit 8f5ba83

Please sign in to comment.