Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tron: add hot blocks support #345

Open
wants to merge 7 commits into
base: support-hot-blocks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion test/tron-usdt/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const TOPIC0 = 'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef

const dataSource = new DataSourceBuilder()
.setGateway('https://v2.archive.subsquid.io/network/tron-mainnet')
.setBlockRange({from: 11322942, to: 11323358})
.setHttpApi({url: 'https://rpc.ankr.com/http/tron'})
.setBlockRange({from: 65677134})
.addLog({
where: {
address: [CONTRACT],
Expand Down
150 changes: 140 additions & 10 deletions tron/tron-data/src/data-source.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Batch, coldIngest} from '@subsquid/util-internal-ingest-tools'
import {RangeRequest, SplitRequest} from '@subsquid/util-internal-range'
import {assertNotNull} from '@subsquid/util-internal'
import {Batch, BlockConsistencyError, BlockRef, coldIngest, HotProcessor, HotUpdate, isDataConsistencyError, trimInvalid} from '@subsquid/util-internal-ingest-tools'
import {getRequestAt, mapRangeRequestList, rangeEnd, RangeRequest, splitRange, splitRangeByRequest, SplitRequest} from '@subsquid/util-internal-range'
import {assertNotNull, AsyncQueue, last, maybeLast, Throttler, wait} from '@subsquid/util-internal'
import assert from 'assert'
import {BlockData, TransactionInfo} from './data'
import {HttpApi} from './http'
Expand Down Expand Up @@ -45,9 +45,14 @@ export class HttpDataSource {
}

async getFinalizedHeight(): Promise<number> {
let height = await this.getHeight()
return height - this.finalityConfirmation
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should never be negative.


async getHeight(): Promise<number> {
let block = await this.httpApi.getNowBlock()
let number = assertNotNull(block.block_header.raw_data.number)
return number - this.finalityConfirmation
return number
}

getFinalizedBlocks(
Expand All @@ -56,7 +61,7 @@ export class HttpDataSource {
): AsyncIterable<Batch<BlockData>> {
return coldIngest({
getFinalizedHeight: () => this.getFinalizedHeight(),
getSplit: (req) => this.getSplit(req),
getSplit: (req) => this.getColdSplit(req),
requests,
concurrency: this.strideConcurrency,
splitSize: this.strideSize,
Expand All @@ -65,12 +70,110 @@ export class HttpDataSource {
})
}

async *getHotBlocks(
requests: RangeRequest<DataRequest>[],
): AsyncIterable<HotUpdate<BlockData>> {
if (requests.length == 0) return

let self = this

let from = requests[0].range.from - 1
let block = await this.getBlockHeader(from)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we start from zero?


let queue: HotUpdate<BlockData>[] = []

let proc = new HotProcessor<BlockData>(
{
height: from,
hash: block.blockID,
top: [],
},
{
process: async (update) => { queue.push(update) },
getBlock: async (ref) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realised, that src.getBlockStream() interface for hot blocks requires handling data consistency errors on a processor side as well.

For example, consider the case, when storage request was made to a block that is already gone.

Currently we don't handle this in the SDK, but prev callback based interface for hot blocks had ability to do that.

let req = getRequestAt(requests, ref.height) || {}
let block = await this.getBlock(ref.height, !!req.transactionsInfo)
if (block.block.blockID !== ref.hash) {
throw new BlockConsistencyError({hash: ref.hash})
}
await this.addRequestedData([block], req)
if (block._isInvalid) {
throw new BlockConsistencyError(block, block._errorMessage)
}
return block
},
async *getBlockRange(from: number, to: BlockRef): AsyncIterable<BlockData[]> {
assert(to.height != null)
if (from > to.height) {
from = to.height
}
for (let split of splitRangeByRequest(requests, {from, to: to.height})) {
let request = split.request || {}
for (let range of splitRange(10, split.range)) {
let blocks = await self.getHotSplit({
range,
request,
finalizedHeight: proc.getFinalizedHeight(),
})
let lastBlock = maybeLast(blocks)?.height ?? range.from - 1
yield blocks
if (lastBlock < range.to) {
throw new BlockConsistencyError({height: lastBlock + 1})
}
}
}
},
getHeader(block) {
return {
height: block.height,
hash: block.block.blockID,
parentHash: block.block.block_header.raw_data.parentHash,
}
},
}
)

let isEnd = () => proc.getFinalizedHeight() >= rangeEnd(last(requests).range)

let prev = -1
let height = new Throttler(() => this.getHeight(), this.headPollInterval)
while (!isEnd()) {
let next = await height.call()
if (next <= prev) continue
prev = next
for (let i = 0; i < 100; i++) {
try {
await proc.goto({
best: {height: next},
finalized: {
height: Math.max(next - this.finalityConfirmation, 0)
}
})

let update = queue.shift()
while (update) {
yield update
update = queue.shift()
}

break
} catch(err: any) {
if (isDataConsistencyError(err)) {
await wait(100)
} else {
throw err
}
}
}
}
}

private async getBlock(num: number, detail: boolean): Promise<BlockData> {
let block = await this.httpApi.getBlock(num, detail)
return {
block,
height: block.block_header.raw_data.number || 0,
hash: getBlockHash(block.blockID)
hash: getBlockHash(block.blockID),
}
}

Expand Down Expand Up @@ -105,11 +208,38 @@ export class HttpDataSource {
await Promise.all(promises)
}

private async getSplit(req: SplitRequest<DataRequest>): Promise<BlockData[]> {
private async getColdSplit(req: SplitRequest<DataRequest>): Promise<BlockData[]> {
let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions)
if (req.request.transactionsInfo) {
this.addTransactionsInfo(blocks)
belopash marked this conversation as resolved.
Show resolved Hide resolved
}

return blocks
}

private async getHotSplit(req: SplitRequest<DataRequest> & {finalizedHeight: number}): Promise<BlockData[]> {
let blocks = await this.getBlocks(req.range.from, req.range.to, !!req.request.transactions)

let chain: BlockData[] = []

for (let i = 0; i < blocks.length; i++) {
let block = blocks[i]
if (block == null) break
if (i > 0 && chain[i - 1].block.blockID !== block.block.block_header.raw_data.parentHash) break
chain.push(block)
}

await this.addRequestedData(chain, req.request)

return trimInvalid(chain)
}

private async addRequestedData(blocks: BlockData[], req: DataRequest): Promise<void> {
if (blocks.length == 0) return

let subtasks = []

if (req.transactionsInfo) {
subtasks.push(this.addTransactionsInfo(blocks))
}

await Promise.all(subtasks)
}
}
2 changes: 2 additions & 0 deletions tron/tron-data/src/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,6 @@ export interface BlockData {
hash: string
block: Block
transactionsInfo?: TransactionInfo[]
_isInvalid?: boolean
_errorMessage?: string
}
2 changes: 1 addition & 1 deletion tron/tron-normalization/src/mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import assert from 'assert'
import {Block, BlockHeader, CallValueInfo, InternalTransaction, Log, Transaction} from './data'


function mapBlockHeader(src: raw.Block): BlockHeader {
export function mapBlockHeader(src: raw.Block): BlockHeader {
return {
hash: src.blockID,
height: src.block_header.raw_data.number || 0,
Expand Down
7 changes: 7 additions & 0 deletions tron/tron-stream/src/data/data-partial.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type * as data from '@subsquid/tron-normalization'
import type {MakePartial} from './util'
import {HashAndHeight} from '@subsquid/util-internal-ingest-tools'


export type BlockRequiredFields = 'height' | 'hash'
Expand All @@ -20,3 +21,9 @@ export interface PartialBlock {
logs?: PartialLog[]
internalTransactions?: PartialInternalTransaction[]
}


export interface BlocksData<B> {
finalizedHead: HashAndHeight
blocks: B[]
}
10 changes: 7 additions & 3 deletions tron/tron-stream/src/gateway/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {array, cast} from '@subsquid/util-internal-validation'
import assert from 'assert'
import {DataRequest} from '../data/data-request'
import {getDataSchema} from './data-schema'
import {PartialBlock} from '../data/data-partial'
import {BlocksData, PartialBlock} from '../data/data-partial'


export class TronGateway {
Expand Down Expand Up @@ -56,7 +56,7 @@ export class TronGateway {
async *getBlockStream(
requests: RangeRequestList<DataRequest>,
stopOnHead?: boolean
): AsyncIterable<PartialBlock[]> {
): AsyncIterable<BlocksData<PartialBlock>> {
let archiveRequests = mapRangeRequestList(requests, req => {
let {fields, includeAllBlocks, ...items} = req
let archiveItems: any = {}
Expand All @@ -79,6 +79,10 @@ export class TronGateway {
})) {
let req = getRequestAt(requests, batch.blocks[0].header.number)

// FIXME: needs to be done during batch ingestion
let finalizedHeight = await this.getFinalizedHeight()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for this. Simply take the last block in the batch as finalized.

Copy link
Contributor Author

@belopash belopash Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I understand what you mean in this comment, but this is some odd behavior. Finalized head should never decrease
If you write stream data directly to a message queue, you won't expect a such trap

let finalizedHead = await this.getBlockHeader(finalizedHeight)

let blocks = cast(
array(getDataSchema(assertNotNull(req?.fields))),
batch.blocks
Expand All @@ -90,7 +94,7 @@ export class TronGateway {
}
})

yield blocks as any
yield {finalizedHead, blocks: blocks as PartialBlock[]}
}
}
}
65 changes: 48 additions & 17 deletions tron/tron-stream/src/http/source.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {mapRangeRequestList, RangeRequestList} from '@subsquid/util-internal-range'
import {applyRangeBound, mapRangeRequestList, RangeRequestList} from '@subsquid/util-internal-range'
import {
Block,
DataRequest as RawDataRequest,
HttpDataSource as RawHttpDataSource
} from '@subsquid/tron-data'
import {mapBlock} from '@subsquid/tron-normalization'
import {BlockHeader, mapBlock, mapBlockHeader} from '@subsquid/tron-normalization'
import {DataRequest} from '../data/data-request'
import {PartialBlock} from '../data/data-partial'
import {BlocksData, PartialBlock} from '../data/data-partial'
import {filterBlockBatch} from './filter'
import assert from 'assert'
import {last} from '@subsquid/util-internal'


export class HttpDataSource {
Expand All @@ -18,25 +19,55 @@ export class HttpDataSource {
}

async getBlockHash(height: number): Promise<string | undefined> {
let header = await this.baseDataSource.getBlockHeader(height)
return header.blockID
let header = await this.getBlockHeader(height)
return header?.hash
}

getBlockHeader(height: number): Promise<Block | undefined> {
return this.baseDataSource.getBlockHeader(height)
async getBlockHeader(height: number): Promise<BlockHeader | undefined> {
let header = await this.baseDataSource.getBlockHeader(height)
return header ? mapBlockHeader(header) : undefined
}

async *getBlockStream(
async *getBlockStream(opts: {
requests: RangeRequestList<DataRequest>,
stopOnHead?: boolean
): AsyncIterable<PartialBlock[]> {
for await (let batch of this.baseDataSource.getFinalizedBlocks(
mapRangeRequestList(requests, toRawDataRequest),
stopOnHead
)) {
let blocks = batch.blocks.map(mapBlock)
filterBlockBatch(requests, blocks)
yield blocks
supportHotBlocks?: boolean
}): AsyncIterable<BlocksData<PartialBlock>> {
let requests = opts.requests
let from = requests[0].range.from

while (true) {
requests = applyRangeBound(requests, {from})

for await (let batch of this.baseDataSource.getFinalizedBlocks(
mapRangeRequestList(requests, toRawDataRequest),
!!opts.supportHotBlocks || opts.stopOnHead
)) {
// FIXME: needs to be done during batch ingestion
let finalizedHeight = await this.getFinalizedHeight()
let finalizedHead = await this.getBlockHeader(finalizedHeight)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for this. Simply take the last block in the batch.

assert(finalizedHead != null)

let blocks = batch.blocks.map(mapBlock)
filterBlockBatch(requests, blocks)
yield {finalizedHead, blocks: blocks as PartialBlock[]}
from = last(blocks).header.height + 1
}

if (opts.supportHotBlocks) {
requests = applyRangeBound(requests, {from})

for await (let data of this.baseDataSource.getHotBlocks(
mapRangeRequestList(requests, toRawDataRequest),
)) {
let blocks = data.blocks.map(mapBlock)
filterBlockBatch(requests, blocks)
yield {finalizedHead: data.finalizedHead, blocks: blocks as PartialBlock[]}
from = Math.min(last(blocks).header.height, data.finalizedHead.height) + 1
}
}

if (opts.stopOnHead) break
}
}
}
Expand Down
Loading
Loading