diff --git a/cmd/soroban-rpc/internal/db/ledger.go b/cmd/soroban-rpc/internal/db/ledger.go index 860e6aa1..70b29b29 100644 --- a/cmd/soroban-rpc/internal/db/ledger.go +++ b/cmd/soroban-rpc/internal/db/ledger.go @@ -19,6 +19,7 @@ type StreamLedgerFn func(xdr.LedgerCloseMeta) error type LedgerReader interface { GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) + BatchGetLedgers(ctx context.Context, sequence uint32, batchSize uint) ([]xdr.LedgerCloseMeta, error) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) StreamLedgerRange(ctx context.Context, startLedger uint32, endLedger uint32, f StreamLedgerFn) error @@ -103,6 +104,25 @@ func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.Ledge } } +// BatchGetLedgers fetches ledgers in batches from the db. +func (r ledgerReader) BatchGetLedgers(ctx context.Context, sequence uint32, + batchSize uint, +) ([]xdr.LedgerCloseMeta, error) { + sql := sq.Select("meta"). + From(ledgerCloseMetaTableName). + Where(sq.And{ + sq.GtOrEq{"sequence": sequence}, + sq.LtOrEq{"sequence": sequence + uint32(batchSize) - 1}, + }) + + var results []xdr.LedgerCloseMeta + if err := r.db.Select(ctx, &results, sql); err != nil { + return nil, err + } + + return results, nil +} + // GetLedgerRange pulls the min/max ledger sequence numbers from the meta table. func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) { r.db.cache.RLock() diff --git a/cmd/soroban-rpc/internal/db/mocks.go b/cmd/soroban-rpc/internal/db/mocks.go index fee538d9..cb199c44 100644 --- a/cmd/soroban-rpc/internal/db/mocks.go +++ b/cmd/soroban-rpc/internal/db/mocks.go @@ -97,6 +97,12 @@ func (m *MockLedgerReader) GetLedger(_ context.Context, sequence uint32) (xdr.Le return *lcm, true, nil } +func (m *MockLedgerReader) BatchGetLedgers(_ context.Context, _ uint32, + _ uint, +) ([]xdr.LedgerCloseMeta, error) { + return []xdr.LedgerCloseMeta{}, nil +} + func (m *MockLedgerReader) StreamAllLedgers(_ context.Context, _ StreamLedgerFn) error { return nil } diff --git a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go index 0fcb4494..8416f774 100644 --- a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go +++ b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go @@ -56,6 +56,12 @@ func (ledgerReader *ConstantLedgerReader) GetLedger(_ context.Context, return createLedger(sequence, expectedLatestLedgerProtocolVersion, expectedLatestLedgerHashBytes), true, nil } +func (ledgerReader *ConstantLedgerReader) BatchGetLedgers(_ context.Context, _ uint32, + _ uint, +) ([]xdr.LedgerCloseMeta, error) { + return []xdr.LedgerCloseMeta{}, nil +} + func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(_ context.Context, _ db.StreamLedgerFn) error { return nil } diff --git a/cmd/soroban-rpc/internal/methods/get_ledgers.go b/cmd/soroban-rpc/internal/methods/get_ledgers.go index 1d87c31e..3fc8b26d 100644 --- a/cmd/soroban-rpc/internal/methods/get_ledgers.go +++ b/cmd/soroban-rpc/internal/methods/get_ledgers.go @@ -175,41 +175,48 @@ func (h ledgersHandler) handleLimit(limit uint) uint { return h.defaultLimit } -// fetchLedgers fetches ledgers from the DB beginning with the start request ledger until the max request limit. +// fetchLedgers fetches ledgers from the DB for the range [start, start+limit-1] func (h ledgersHandler) fetchLedgers(ctx context.Context, start uint32, limit uint, format string, ) ([]LedgerInfo, error) { - ledgers := make([]LedgerInfo, 0, limit) - for ledgerSeq := start; uint(len(ledgers)) < limit; ledgerSeq++ { - ledger, found, err := h.ledgerReader.GetLedger(ctx, ledgerSeq) - if err != nil { - return nil, &jrpc2.Error{ - Code: jrpc2.InternalError, - Message: err.Error(), - } + ledgers, err := h.ledgerReader.BatchGetLedgers(ctx, start, limit) + if err != nil { + return nil, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: fmt.Sprintf("error fetching ledgers from db: %v", err), } - if !found { - return nil, &jrpc2.Error{ - Code: jrpc2.InvalidParams, - Message: fmt.Sprintf("database does not contain metadata for ledger: %d", ledgerSeq), - } + } + + // No more ledgers available + if len(ledgers) == 0 { + return nil, &jrpc2.Error{ + Code: jrpc2.InvalidParams, + Message: fmt.Sprintf("no ledgers found starting from sequence: %d", start), } + } - ledgerResponse, err := h.getMetaAndHeaderInfo(ledger, format) + result := make([]LedgerInfo, 0, limit) + for _, ledger := range ledgers { + if uint(len(result)) >= limit { + break + } + + ledgerInfo, err := h.parseLedgerInfo(ledger, format) if err != nil { return nil, &jrpc2.Error{ Code: jrpc2.InternalError, - Message: err.Error(), + Message: fmt.Sprintf("error processing ledger %d: %v", ledger.LedgerSequence(), err), } } - ledgers = append(ledgers, ledgerResponse) + result = append(result, ledgerInfo) } - return ledgers, nil + + return result, nil } -// getMetaAndHeaderInfo extracts and formats the ledger metadata and header information. -func (h ledgersHandler) getMetaAndHeaderInfo(ledger xdr.LedgerCloseMeta, format string) (LedgerInfo, error) { - ledgerResponse := LedgerInfo{ +// parseLedgerInfo extracts and formats the ledger metadata and header information. +func (h ledgersHandler) parseLedgerInfo(ledger xdr.LedgerCloseMeta, format string) (LedgerInfo, error) { + ledgerInfo := LedgerInfo{ Hash: ledger.LedgerHash().HexString(), Sequence: ledger.LedgerSequence(), LedgerCloseTime: ledger.LedgerCloseTime(), @@ -233,11 +240,11 @@ func (h ledgersHandler) getMetaAndHeaderInfo(ledger xdr.LedgerCloseMeta, format return LedgerInfo{}, fmt.Errorf("could not convert ledger metadata and "+ "header to JSON: %w", convErr) } - ledgerResponse.LedgerCloseMetaJSON = closeMetaJSON - ledgerResponse.LedgerHeaderJSON = headerJSON + ledgerInfo.LedgerCloseMetaJSON = closeMetaJSON + ledgerInfo.LedgerHeaderJSON = headerJSON default: - ledgerResponse.LedgerCloseMeta = base64.StdEncoding.EncodeToString(closeMetaB) - ledgerResponse.LedgerHeader = base64.StdEncoding.EncodeToString(headerB) + ledgerInfo.LedgerCloseMeta = base64.StdEncoding.EncodeToString(closeMetaB) + ledgerInfo.LedgerHeader = base64.StdEncoding.EncodeToString(headerB) } - return ledgerResponse, nil + return ledgerInfo, nil } diff --git a/cmd/soroban-rpc/internal/methods/get_ledgers_test.go b/cmd/soroban-rpc/internal/methods/get_ledgers_test.go index e2e740c8..24c70bc5 100644 --- a/cmd/soroban-rpc/internal/methods/get_ledgers_test.go +++ b/cmd/soroban-rpc/internal/methods/get_ledgers_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,9 +25,9 @@ var expectedLedgerInfo = LedgerInfo{ func setupTestDB(t *testing.T, numLedgers int) *db.DB { testDB := NewTestDB(t) daemon := interfaces.MakeNoOpDeamon() - for sequence := range numLedgers { + for sequence := 1; sequence <= numLedgers; sequence++ { ledgerCloseMeta := txMeta(uint32(sequence)-100, true) - tx, err := db.NewReadWriter(log.DefaultLogger, testDB, daemon, 150, 15, passphrase).NewTx(context.Background()) + tx, err := db.NewReadWriter(log.DefaultLogger, testDB, daemon, 150, 100, passphrase).NewTx(context.Background()) require.NoError(t, err) require.NoError(t, tx.LedgerWriter().InsertLedger(ledgerCloseMeta)) require.NoError(t, tx.Commit(ledgerCloseMeta)) @@ -35,7 +36,7 @@ func setupTestDB(t *testing.T, numLedgers int) *db.DB { } func TestGetLedgers_DefaultLimit(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 50) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -49,8 +50,8 @@ func TestGetLedgers_DefaultLimit(t *testing.T) { response, err := handler.getLedgers(context.TODO(), request) require.NoError(t, err) - assert.Equal(t, uint32(10), response.LatestLedger) - assert.Equal(t, ledgerCloseTime(10), response.LatestLedgerCloseTime) + assert.Equal(t, uint32(50), response.LatestLedger) + assert.Equal(t, ledgerCloseTime(50), response.LatestLedgerCloseTime) assert.Equal(t, "5", response.Cursor) assert.Len(t, response.Ledgers, 5) assert.Equal(t, uint32(1), response.Ledgers[0].Sequence) @@ -61,7 +62,7 @@ func TestGetLedgers_DefaultLimit(t *testing.T) { } func TestGetLedgers_CustomLimit(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 50) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -71,22 +72,22 @@ func TestGetLedgers_CustomLimit(t *testing.T) { request := GetLedgersRequest{ StartLedger: 1, Pagination: &PaginationOptions{ - Limit: 3, + Limit: 41, }, } response, err := handler.getLedgers(context.TODO(), request) require.NoError(t, err) - assert.Equal(t, uint32(10), response.LatestLedger) - assert.Equal(t, "3", response.Cursor) - assert.Len(t, response.Ledgers, 3) + assert.Equal(t, uint32(50), response.LatestLedger) + assert.Equal(t, "41", response.Cursor) + assert.Len(t, response.Ledgers, 41) assert.Equal(t, uint32(1), response.Ledgers[0].Sequence) - assert.Equal(t, uint32(3), response.Ledgers[2].Sequence) + assert.Equal(t, uint32(41), response.Ledgers[40].Sequence) } func TestGetLedgers_WithCursor(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 10) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -111,7 +112,7 @@ func TestGetLedgers_WithCursor(t *testing.T) { } func TestGetLedgers_InvalidStartLedger(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 10) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -128,7 +129,7 @@ func TestGetLedgers_InvalidStartLedger(t *testing.T) { } func TestGetLedgers_LimitExceedsMaxLimit(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 10) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -148,7 +149,7 @@ func TestGetLedgers_LimitExceedsMaxLimit(t *testing.T) { } func TestGetLedgers_InvalidCursor(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 10) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -167,7 +168,7 @@ func TestGetLedgers_InvalidCursor(t *testing.T) { } func TestGetLedgers_JSONFormat(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 10) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -219,7 +220,7 @@ func TestGetLedgers_NoLedgers(t *testing.T) { } func TestGetLedgers_CursorGreaterThanLatestLedger(t *testing.T) { - testDB := setupTestDB(t, 11) + testDB := setupTestDB(t, 10) handler := ledgersHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -236,3 +237,44 @@ func TestGetLedgers_CursorGreaterThanLatestLedger(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "cursor must be between") } + +func BenchmarkGetLedgers(b *testing.B) { + testDB := NewTestDB(b) + logger := log.DefaultLogger + writer := db.NewReadWriter(logger, testDB, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase) + write, err := writer.NewTx(context.TODO()) + require.NoError(b, err) + + lcms := make([]xdr.LedgerCloseMeta, 0, 100_000) + for i := range cap(lcms) { + lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0)) + } + + ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() + for _, lcm := range lcms { + require.NoError(b, ledgerW.InsertLedger(lcm)) + require.NoError(b, txW.InsertTransactions(lcm)) + } + require.NoError(b, write.Commit(lcms[len(lcms)-1])) + + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 200, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 1334, + Pagination: &PaginationOptions{ + Limit: 200, // using the current maximum request limit for getLedgers endpoint + }, + } + + b.ResetTimer() + for range b.N { + response, err := handler.getLedgers(context.TODO(), request) + require.NoError(b, err) + assert.Equal(b, uint32(1334), response.Ledgers[0].Sequence) + assert.Equal(b, uint32(1533), response.Ledgers[199].Sequence) + } +}