Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: Allow configuring polling interval for data collection #977

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 26 additions & 36 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import pMap from 'p-map'
import pFilter from 'p-filter'
import mapValues from 'lodash.mapvalues'
import zip from 'lodash.zip'
import { AgentConfigs, NetworkAndOperator } from './types'

type ActionReconciliationContext = [AllocationDecision[], number, number]

Expand Down Expand Up @@ -126,13 +127,6 @@ export const convertSubgraphBasedRulesToDeploymentBased = (
return rules
}

// Represents a pair of Network and Operator instances belonging to the same protocol
// network. Used when mapping over multiple protocol networks.
type NetworkAndOperator = {
network: Network
operator: Operator
}

// Extracts the network identifier from a pair of matching Network and Operator objects.
function networkAndOperatorIdentity({
network,
Expand Down Expand Up @@ -196,26 +190,21 @@ export class Agent {
offchainSubgraphs: SubgraphDeploymentID[]
autoMigrationSupport: boolean
deploymentManagement: DeploymentManagementMode

constructor(
logger: Logger,
metrics: Metrics,
graphNode: GraphNode,
operators: Operator[],
indexerManagement: IndexerManagementClient,
networks: Network[],
offchainSubgraphs: SubgraphDeploymentID[],
autoMigrationSupport: boolean,
deploymentManagement: DeploymentManagementMode,
) {
this.logger = logger.child({ component: 'Agent' })
this.metrics = metrics
this.graphNode = graphNode
this.indexerManagement = indexerManagement
this.multiNetworks = createMultiNetworks(networks, operators)
this.offchainSubgraphs = offchainSubgraphs
this.autoMigrationSupport = !!autoMigrationSupport
this.deploymentManagement = deploymentManagement
pollingInterval: number

constructor(configs: AgentConfigs) {
this.logger = configs.logger.child({ component: 'Agent' })
this.metrics = configs.metrics
this.graphNode = configs.graphNode
this.indexerManagement = configs.indexerManagement
this.multiNetworks = createMultiNetworks(
configs.networks,
configs.operators,
)
this.offchainSubgraphs = configs.offchainSubgraphs
this.autoMigrationSupport = !!configs.autoMigrationSupport
this.deploymentManagement = configs.deploymentManagement
this.pollingInterval = configs.pollingInterval
}

async start(): Promise<Agent> {
Expand Down Expand Up @@ -261,9 +250,11 @@ export class Agent {
}

reconciliationLoop() {
const requestIntervalSmall = this.pollingInterval
const requestIntervalLarge = this.pollingInterval * 5
const logger = this.logger.child({ component: 'ReconciliationLoop' })
const currentEpochNumber: Eventual<NetworkMapped<number>> = timer(
600_000,
requestIntervalLarge,
).tryMap(
async () =>
await this.multiNetworks.map(({ network }) => {
Expand All @@ -279,7 +270,7 @@ export class Agent {
)

const maxAllocationEpochs: Eventual<NetworkMapped<number>> = timer(
600_000,
requestIntervalLarge,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
Expand All @@ -295,7 +286,7 @@ export class Agent {
)

const indexingRules: Eventual<NetworkMapped<IndexingRuleAttributes[]>> =
timer(20_000).tryMap(
timer(requestIntervalSmall).tryMap(
async () => {
return this.multiNetworks.map(async ({ network, operator }) => {
logger.trace('Fetching indexing rules', {
Expand Down Expand Up @@ -332,7 +323,7 @@ export class Agent {
)

const activeDeployments: Eventual<SubgraphDeploymentID[]> = timer(
60_000,
requestIntervalSmall,
).tryMap(
() => {
logger.trace('Fetching active deployments')
Expand All @@ -348,7 +339,7 @@ export class Agent {
)

const networkDeployments: Eventual<NetworkMapped<SubgraphDeployment[]>> =
timer(240_000).tryMap(
timer(requestIntervalSmall).tryMap(
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching network deployments', {
Expand All @@ -367,7 +358,7 @@ export class Agent {

const eligibleTransferDeployments: Eventual<
NetworkMapped<TransferredSubgraphDeployment[]>
> = timer(300_000).tryMap(
> = timer(requestIntervalLarge).tryMap(
async () => {
// Return early if the auto migration feature is disabled.
if (!this.autoMigrationSupport) {
Expand Down Expand Up @@ -535,7 +526,6 @@ export class Agent {
// let targetDeployments be an union of targetAllocations
// and offchain subgraphs.
const targetDeployments: Eventual<SubgraphDeploymentID[]> = join({
ticker: timer(120_000),
indexingRules,
networkDeploymentAllocationDecisions,
}).tryMap(
Expand Down Expand Up @@ -569,7 +559,7 @@ export class Agent {
)

const activeAllocations: Eventual<NetworkMapped<Allocation[]>> = timer(
120_000,
requestIntervalSmall,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
Expand Down Expand Up @@ -646,7 +636,7 @@ export class Agent {
)

join({
ticker: timer(240_000),
ticker: timer(requestIntervalLarge),
currentEpochNumber,
maxAllocationEpochs,
activeDeployments,
Expand Down
24 changes: 18 additions & 6 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { NetworkSpecification } from '@graphprotocol/indexer-common/dist/network
import { BigNumber } from 'ethers'
import { displayZodParsingError } from '@graphprotocol/indexer-common'
import { readFileSync } from 'fs'
import { AgentConfigs } from '../types'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type AgentOptions = { [key: string]: any } & Argv['argv']
Expand Down Expand Up @@ -298,6 +299,13 @@ export const start = {
default: 'auto',
group: 'Indexer Infrastructure',
})
.option('polling-interval', {
description: 'Polling interval for data collection',
type: 'number',
required: false,
default: 120_000,
group: 'Indexer Infrastructure',
})
.option('auto-allocation-min-batch-size', {
description: `Minimum number of allocation transactions inside a batch for auto allocation management. No obvious upperbound, with default of 1`,
type: 'number',
Expand Down Expand Up @@ -667,17 +675,21 @@ export async function run(
// --------------------------------------------------------------------------------
// * The Agent itself
// --------------------------------------------------------------------------------
const agent = new Agent(
const agentConfigs: AgentConfigs = {
logger,
metrics,
graphNode,
operators,
indexerManagementClient,
indexerManagement: indexerManagementClient,
networks,
argv.offchainSubgraphs.map((s: string) => new SubgraphDeploymentID(s)),
argv.enableAutoMigrationSupport,
argv.deploymentManagement,
)
deploymentManagement: argv.deploymentManagement,
autoMigrationSupport: argv.enableAutoMigrationSupport,
offchainSubgraphs: argv.offchainSubgraphs.map(
(s: string) => new SubgraphDeploymentID(s),
),
pollingInterval: argv.pollingInterval,
}
const agent = new Agent(agentConfigs)
await agent.start()
}

Expand Down
29 changes: 18 additions & 11 deletions packages/indexer-agent/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
import { Logger, Metrics, SubgraphDeploymentID } from '@graphprotocol/common-ts'
import {
Network,
NetworkSubgraph,
ReceiptCollector,
GraphNode,
DeploymentManagementMode,
IndexerManagementClient,
Operator,
} from '@graphprotocol/indexer-common'
import { NetworkMonitor } from '@graphprotocol/indexer-common'

export interface AgentConfig {
// Represents a pair of Network and Operator instances belonging to the same protocol
// network. Used when mapping over multiple protocol networks.
export type NetworkAndOperator = {
network: Network
operator: Operator
}

export interface AgentConfigs {
logger: Logger
metrics: Metrics
indexer: GraphNode
network: Network
networkMonitor: NetworkMonitor
networkSubgraph: NetworkSubgraph
allocateOnNetworkSubgraph: boolean
registerIndexer: boolean
graphNode: GraphNode
operators: Operator[]
indexerManagement: IndexerManagementClient
networks: Network[]
deploymentManagement: DeploymentManagementMode
autoMigrationSupport: boolean
offchainSubgraphs: SubgraphDeploymentID[]
receiptCollector: ReceiptCollector
pollingInterval: number
}
Loading