Skip to content

Commit

Permalink
indexer-common,indexer-agent: auto graft resolve
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Sep 12, 2022
1 parent 0b6ed28 commit d74033f
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 26 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ Indexer Infrastructure
--auto-allocation-min-batch-size Minimum number of allocation
transactions inside a batch for AUTO
management mode [number] [default: 1]
--auto-graft-resolver-depth Maximum depth of grafting dependency to
automatically
resolve [number] [default: 0]
Network Subgraph
--network-subgraph-deployment Network subgraph deployment [string]
Expand Down
8 changes: 8 additions & 0 deletions docs/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -854,4 +854,12 @@ Failed to resolve POI: User provided POI does not match reference fetched from t

Check sync and health status of the subgraph to access the issue. If needed, provide a POI or use `--force` to bypass POI checks.

## IE069

**Summary**

Failed to deploy subgraph deployment graft base.

**Solution**

Same error as IE026, but auto-deploying as the base for a grafted subgraph. Please make sure the auto graft depth resolver has correct limit, and that the graft base deployment has synced to the graft block before trying again - Set indexing rules for periodic reconciles.
2 changes: 2 additions & 0 deletions packages/indexer-agent/src/__tests__/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ const setup = async () => {
})

const indexNodeIDs = ['node_1']
const graftDepth = 1
indexerManagementClient = await createIndexerManagementClient({
models,
address: toAddress(address),
Expand Down Expand Up @@ -168,6 +169,7 @@ const setup = async () => {
parseGRT('1000'),
address,
AllocationManagementMode.AUTO,
graftDepth,
)
}

Expand Down
14 changes: 14 additions & 0 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ export default {
.map((id: string) => id.trim())
.filter((id: string) => id.length > 0),
})
.option('auto-graft-resolver-depth', {
description: `Maximum depth of grafting dependency to automatically resolve`,
type: 'number',
default: 0,
group: 'Indexer Infrastructure',
})
.option('poi-disputable-epochs', {
description:
'The number of epochs in the past to look for potential POI disputes',
Expand Down Expand Up @@ -368,6 +374,12 @@ export default {
) {
return 'Invalid --rebate-claim-max-batch-size provided. Must be > 0 and an integer.'
}
if (
!Number.isInteger(argv['auto-graft-resolver-depth']) ||
argv['auto-graft-resolver-depth'] < 0
) {
return 'Invalid --auto-graft-resolver-depth provided. Must be >= 0 and an integer.'
}
return true
})
.option('vector-node', {
Expand Down Expand Up @@ -791,6 +803,7 @@ export default {
receiptCollector,
allocationManagementMode,
autoAllocationMinBatchSize: argv.autoAllocationMinBatchSize,
autoGraftResolverDepth: argv.autoGraftResolverDepth,
})

await createIndexerManagementServer({
Expand All @@ -809,6 +822,7 @@ export default {
argv.defaultAllocationAmount,
indexerAddress,
allocationManagementMode,
argv.autoGraftResolverDepth,
)
const networkSubgraphDeployment = argv.networkSubgraphDeployment
? new SubgraphDeploymentID(argv.networkSubgraphDeployment)
Expand Down
52 changes: 48 additions & 4 deletions packages/indexer-agent/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class Indexer {
defaultAllocationAmount: BigNumber
indexerAddress: string
allocationManagementMode: AllocationManagementMode
autoGraftResolverDepth: number

constructor(
logger: Logger,
Expand All @@ -96,12 +97,14 @@ export class Indexer {
defaultAllocationAmount: BigNumber,
indexerAddress: string,
allocationManagementMode: AllocationManagementMode,
autoGraftResolverDepth: number,
) {
this.indexerManagement = indexerManagement
this.statusResolver = statusResolver
this.logger = logger
this.indexerAddress = indexerAddress
this.allocationManagementMode = allocationManagementMode
this.autoGraftResolverDepth = autoGraftResolverDepth

if (adminEndpoint.startsWith('https')) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -827,23 +830,59 @@ export class Indexer {
}
}

graftBase = (err: Error, depth: number): SubgraphDeploymentID | undefined => {
if (
err.message.includes(
'subgraph validation error: [the graft base is invalid: deployment not found: ',
)
) {
const graftBaseQm = err.message.substring(
err.message.indexOf('Qm'),
err.message.indexOf('Qm') + 46,
)
if (depth >= this.autoGraftResolverDepth) {
this.logger.warn(
`Grafting base depth reached auto-graft-resolver-depth limit, stop auto-grafting`,
{
base: graftBaseQm,
depth,
},
)
return
}
return new SubgraphDeploymentID(graftBaseQm)
}
}

async deploy(
name: string,
deployment: SubgraphDeploymentID,
node_id: string,
depth: number,
): Promise<void> {
try {
this.logger.info(`Deploy subgraph deployment`, {
name,
deployment: deployment.display,
})
const response = await this.rpc.request('subgraph_deploy', {
let response = await this.rpc.request('subgraph_deploy', {
name,
ipfs_hash: deployment.ipfsHash,
node_id: node_id,
})
if (response.error) {
throw response.error
const baseDeployment = this.graftBase(response.error, depth)
if (baseDeployment) {
await this.ensure(name, baseDeployment, depth + 1)
// Try deploy target deployment again, ideally do this after checking graft base sync status for graft block
response = await this.rpc.request('subgraph_deploy', {
name,
ipfs_hash: deployment.ipfsHash,
node_id: node_id,
})
} else {
throw indexerError(IndexerErrorCode.IE026, response.error)
}
}
this.logger.info(`Successfully deployed subgraph deployment`, {
name,
Expand Down Expand Up @@ -917,7 +956,12 @@ export class Indexer {
}
}

async ensure(name: string, deployment: SubgraphDeploymentID): Promise<void> {
async ensure(
name: string,
deployment: SubgraphDeploymentID,
depth?: number,
): Promise<void> {
depth = depth ?? 0
try {
// Randomly assign to unused nodes if they exist,
// otherwise use the node with lowest deployments assigned
Expand All @@ -937,7 +981,7 @@ export class Indexer {
return nodeA.deployments.length - nodeB.deployments.length
})[0].id
await this.create(name)
await this.deploy(name, deployment, targetNode)
await this.deploy(name, deployment, targetNode, depth)
await this.reassign(deployment, targetNode)
} catch (error) {
const err = indexerError(IndexerErrorCode.IE020, error)
Expand Down
4 changes: 3 additions & 1 deletion packages/indexer-common/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export enum IndexerErrorCode {
IE066 = 'IE066',
IE067 = 'IE067',
IE068 = 'IE068',
IE069 = 'IE069',
}

export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
Expand Down Expand Up @@ -150,7 +151,8 @@ export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
IE065: 'Failed to unallocate: Allocation has already been closed',
IE066: 'Failed to allocate: allocation ID already exists on chain',
IE067: 'Failed to query POI for current epoch start block',
IE068: 'User-provided POI did not match reference POI from graph-node',
IE068: 'User-provided POI did not match with reference POI from graph-node',
IE069: 'Failed to deploy the graft base for the target deployment',
}

export type IndexerErrorCause = unknown
Expand Down
8 changes: 7 additions & 1 deletion packages/indexer-common/src/indexer-management/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ export interface IndexerManagementClientOptions {
receiptCollector?: AllocationReceiptCollector
allocationManagementMode?: AllocationManagementMode
autoAllocationMinBatchSize?: number
autoGraftResolverDepth?: number
}

export class IndexerManagementClient extends Client {
Expand Down Expand Up @@ -502,6 +503,7 @@ export const createIndexerManagementClient = async (
receiptCollector,
allocationManagementMode,
autoAllocationMinBatchSize,
autoGraftResolverDepth,
} = options
const schema = buildSchema(print(SCHEMA_SDL))
const resolvers = {
Expand All @@ -515,7 +517,11 @@ export const createIndexerManagementClient = async (

const dai: WritableEventual<string> = mutable()

const subgraphManager = new SubgraphManager(deploymentManagementEndpoint, indexNodeIDs)
const subgraphManager = new SubgraphManager(
deploymentManagementEndpoint,
indexNodeIDs,
autoGraftResolverDepth,
)
let allocationManager: AllocationManager | undefined = undefined
let actionManager: ActionManager | undefined = undefined
let networkMonitor: NetworkMonitor | undefined = undefined
Expand Down
103 changes: 83 additions & 20 deletions packages/indexer-common/src/indexer-management/subgraphs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import {
indexerError,
IndexerErrorCode,
IndexerManagementModels,
IndexingDecisionBasis,
IndexingRuleAttributes,
SubgraphIdentifierType,
upsertIndexingRule,
fetchIndexingRules,
} from '@graphprotocol/indexer-common'
import { Logger, SubgraphDeploymentID } from '@graphprotocol/common-ts'
import jayson, { Client as RpcClient } from 'jayson/promise'
Expand All @@ -10,8 +15,9 @@ import pTimeout from 'p-timeout'
export class SubgraphManager {
client: RpcClient
indexNodeIDs: string[]
autoGraftResolverDepth: number

constructor(endpoint: string, indexNodeIDs: string[]) {
constructor(endpoint: string, indexNodeIDs: string[], autoGraftResolverDepth?: number) {
if (endpoint.startsWith('https')) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this.client = jayson.Client.https(endpoint as any)
Expand All @@ -20,6 +26,35 @@ export class SubgraphManager {
this.client = jayson.Client.http(endpoint as any)
}
this.indexNodeIDs = indexNodeIDs
this.autoGraftResolverDepth = autoGraftResolverDepth ?? 0
}

graftBase = (
logger: Logger,
err: Error,
depth: number,
): SubgraphDeploymentID | undefined => {
if (
err.message.includes(
'subgraph validation error: [the graft base is invalid: deployment not found',
)
) {
const graftBaseQm = err.message.substring(
err.message.indexOf('Qm'),
err.message.indexOf('Qm') + 46,
)
if (depth >= this.autoGraftResolverDepth) {
logger.warn(
`Grafting depth for deployment reached auto-graft-resolver-depth limit`,
{
deployment: graftBaseQm,
depth,
},
)
return
}
return new SubgraphDeploymentID(graftBaseQm)
}
}

async createSubgraph(logger: Logger, name: string): Promise<void> {
Expand All @@ -44,24 +79,25 @@ export class SubgraphManager {
name: string,
deployment: SubgraphDeploymentID,
indexNode: string | undefined,
depth: number,
): Promise<void> {
try {
let targetNode: string
if (indexNode) {
targetNode = indexNode
if (!this.indexNodeIDs.includes(targetNode)) {
logger.warn(
`Specified deployment target node not present in indexNodeIDs supplied at startup, proceeding with deploy to target node anyway.`,
{
targetNode: indexNode,
indexNodeIDs: this.indexNodeIDs,
},
)
}
} else {
targetNode =
this.indexNodeIDs[Math.floor(Math.random() * this.indexNodeIDs.length)]
let targetNode: string
if (indexNode) {
targetNode = indexNode
if (!this.indexNodeIDs.includes(targetNode)) {
logger.warn(
`Specified deployment target node not present in indexNodeIDs supplied at startup, proceeding with deploy to target node anyway.`,
{
targetNode: indexNode,
indexNodeIDs: this.indexNodeIDs,
},
)
}
} else {
targetNode = this.indexNodeIDs[Math.floor(Math.random() * this.indexNodeIDs.length)]
}

try {
logger.info(`Deploy subgraph`, {
name,
deployment: deployment.display,
Expand All @@ -76,16 +112,41 @@ export class SubgraphManager {
const response = await pTimeout(requestPromise, 120000)

if (response.error) {
throw response.error
const baseDeployment = this.graftBase(logger, response.error, depth)
if (baseDeployment) {
logger.debug(
`Found graft base deployment, ensure deployment and a matching offchain indexing rules`,
)
// Only add offchain after ensure
await this.ensure(logger, models, name, baseDeployment, targetNode, depth + 1)

// ensure targetDeployment again after graft base syncs, can use
// "subgraph validation error: [the graft base is invalid: failed to graft onto `Qm...` since it has not processed any blocks (only processed block ___)]"
await this.ensure(logger, models, name, deployment, targetNode, depth)

throw indexerError(IndexerErrorCode.IE069, response.error)
} else {
throw indexerError(IndexerErrorCode.IE026, response.error)
}
}
logger.info(`Successfully deployed subgraph`, {
name,
deployment: deployment.display,
endpoints: response.result,
})

// TODO: Insert an offchain indexing rule if one matching this deployment doesn't yet exist
// Will be useful for supporting deploySubgraph resolver
const indexingRules = (await fetchIndexingRules(models, false)).map(
(rule) => new SubgraphDeploymentID(rule.identifier),
)
if (!indexingRules.includes(deployment)) {
const offchainIndexingRule = {
identifier: deployment.ipfsHash,
identifierType: SubgraphIdentifierType.DEPLOYMENT,
decisionBasis: IndexingDecisionBasis.OFFCHAIN,
} as Partial<IndexingRuleAttributes>
await upsertIndexingRule(logger, models, offchainIndexingRule)
}
} catch (error) {
const err = indexerError(IndexerErrorCode.IE026, error)
logger.error(`Failed to deploy subgraph deployment`, {
Expand Down Expand Up @@ -196,10 +257,12 @@ export class SubgraphManager {
name: string,
deployment: SubgraphDeploymentID,
indexNode: string | undefined,
depth?: number,
): Promise<void> {
depth = depth ?? 0
try {
await this.createSubgraph(logger, name)
await this.deploy(logger, models, name, deployment, indexNode)
await this.deploy(logger, models, name, deployment, indexNode, depth)
await this.reassign(logger, deployment, indexNode)
} catch (error) {
const err = indexerError(IndexerErrorCode.IE020, error)
Expand Down

0 comments on commit d74033f

Please sign in to comment.