Skip to content

Commit

Permalink
fix: bug fixes from testing
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeCap08055 committed Aug 9, 2024
1 parent 64059ae commit 8b372a5
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,10 @@
import { BlockchainModule, ConfigModule, ConfigService, GraphStateManager } from '#lib';
import { BlockchainModule, GraphStateManager } from '#lib';
import { Module } from '@nestjs/common';
import { RedisModule } from '@songkeys/nestjs-redis';
import { GraphMonitorService } from './graph.monitor.service';
import { CapacityCheckerService } from '#lib/blockchain/capacity-checker.service';

@Module({
imports: [
BlockchainModule,
RedisModule.forRootAsync(
{
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
config: [{ url: configService.redisUrl.toString() }],
}),
inject: [ConfigService],
},
true, // isGlobal
),
],
imports: [BlockchainModule],
providers: [GraphMonitorService, GraphStateManager, CapacityCheckerService],
exports: [],
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,22 @@ export class GraphMonitorService extends BlockchainScannerService {
]);
this.graphSchemaIds = schemaResponse.flatMap(({ ids }) => ids.map((id) => id.toNumber()));
this.logger.log('Monitoring schemas for graph updates: ', this.graphSchemaIds);
const pendingTxns = await this.cacheManager.hkeys(TXN_WATCH_LIST_KEY);
const pendingTxns = await this.cacheManager.hgetall(TXN_WATCH_LIST_KEY);
// If no transactions pending, skip to end of chain at startup, else, skip to earliest
/// birth block of a monitored extrinsic if we haven't crawled that far yet
if (pendingTxns.length === 0) {
if (Object.keys(pendingTxns).length === 0) {
const blockNumber = await this.blockchainService.getLatestFinalizedBlockNumber();
this.logger.log(`Skipping to end of the chain to resume scanning (block #${blockNumber})`);
await this.setLastSeenBlockNumber(blockNumber);
} else {
const pendingTxnArray = pendingTxns.map((jsonStr) => JSON.parse(jsonStr) as ITxStatus);
const minBirthBlock = Math.min(...pendingTxnArray.map(({ birth }) => birth).sort((a, b) => a - b));
const minBirthBlock = Math.min(
...Object.values(pendingTxns)
.map((jsonStr) => {
const txStatus = JSON.parse(jsonStr) as ITxStatus;
return txStatus.birth;
})
.sort((a, b) => a - b),
);
const lastSeenBlock = await this.getLastSeenBlockNumber();
this.logger.log('Skipping ahead to monitor submitted extrinsics', { skipTo: minBirthBlock - 1, lastSeenBlock });
if (lastSeenBlock < minBirthBlock - 1) {
Expand Down Expand Up @@ -281,8 +287,6 @@ export class GraphMonitorService extends BlockchainScannerService {
}

public async monitorAllGraphUpdates(block: SignedBlock, { event }: FrameSystemEventRecord) {
const graphUpdateNotification = {};

// Don't need this check logically, but it's a type guard to be able to access the specific event type data
if (
this.blockchainService.api.events.statefulStorage.PaginatedPageUpdated.is(event) ||
Expand Down Expand Up @@ -342,7 +346,7 @@ export class GraphMonitorService extends BlockchainScannerService {
* @param {boolean} includeAll - Whether to include webhooks registered for 'all'
* @returns {string[]} Array of URLs
*/
async getWebhookList(msaId: string, includeAll = true): Promise<string[]> {
public async getWebhookList(msaId: string, includeAll = true): Promise<string[]> {
const value = await this.cacheManager.hget(RedisConstants.REDIS_WEBHOOK_PREFIX, msaId);
let webhooks = value ? (JSON.parse(value) as string[]) : [];

Expand Down
2 changes: 1 addition & 1 deletion services/graph/apps/worker/src/worker.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { QueueModule } from '#lib/queues/queue.module';
{
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
config: [{ url: configService.redisUrl.toString() }],
config: [{ url: configService.redisUrl.toString(), keyPrefix: configService.cacheKeyPrefix }],
}),
inject: [ConfigService],
},
Expand Down
47 changes: 31 additions & 16 deletions services/graph/libs/common/src/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ export class BlockchainService implements OnApplicationBootstrap, BeforeApplicat
return this.api.registry.createType(type, ...args);
}

public createExtrinsicCall({ pallet, extrinsic }: { pallet: string; extrinsic: string }, ...args: (any | undefined)[]): SubmittableExtrinsic<'promise', ISubmittableResult> {
public createExtrinsicCall(
{ pallet, extrinsic }: { pallet: string; extrinsic: string },
...args: (any | undefined)[]
): SubmittableExtrinsic<'promise', ISubmittableResult> {
return this.api.tx[pallet][extrinsic](...args);
}

Expand All @@ -120,7 +123,12 @@ export class BlockchainService implements OnApplicationBootstrap, BeforeApplicat
return args ? this.api.query[pallet][extrinsic](...args) : this.api.query[pallet][extrinsic]();
}

public async queryAt(blockHash: BlockHash, pallet: string, extrinsic: string, ...args: (any | undefined)[]): Promise<any> {
public async queryAt(
blockHash: BlockHash,
pallet: string,
extrinsic: string,
...args: (any | undefined)[]
): Promise<any> {
const newApi = await this.api.at(blockHash);
return newApi.query[pallet][extrinsic](...args);
}
Expand All @@ -129,18 +137,15 @@ export class BlockchainService implements OnApplicationBootstrap, BeforeApplicat
return this.rpc('system', 'accountNextIndex', account);
}

public async capacityInfo(providerId: string): Promise<{
providerId: string;
currentBlockNumber: number;
nextEpochStart: number;
remainingCapacity: bigint;
totalCapacityIssued: bigint;
currentEpoch: number;
}> {
public async capacityInfo(providerId: string): Promise<ICapacityInfo> {
const providerU64 = this.api.createType('u64', providerId);
const { epochStart }: PalletCapacityEpochInfo = await this.query('capacity', 'currentEpochInfo');
const epochBlockLength: u32 = await this.query('capacity', 'epochLength');
const capacityDetailsOption: Option<PalletCapacityCapacityDetails> = await this.query('capacity', 'capacityLedger', providerU64);
const capacityDetailsOption: Option<PalletCapacityCapacityDetails> = await this.query(
'capacity',
'capacityLedger',
providerU64,
);
const { remainingCapacity, totalCapacityIssued } = capacityDetailsOption.unwrapOr({
remainingCapacity: 0,
totalCapacityIssued: 0,
Expand All @@ -152,8 +157,10 @@ export class BlockchainService implements OnApplicationBootstrap, BeforeApplicat
providerId,
currentBlockNumber: currentBlock.toNumber(),
nextEpochStart: epochStart.add(epochBlockLength).toNumber(),
remainingCapacity: typeof remainingCapacity === 'number' ? BigInt(remainingCapacity) : remainingCapacity.toBigInt(),
totalCapacityIssued: typeof totalCapacityIssued === 'number' ? BigInt(totalCapacityIssued) : totalCapacityIssued.toBigInt(),
remainingCapacity:
typeof remainingCapacity === 'number' ? BigInt(remainingCapacity) : remainingCapacity.toBigInt(),
totalCapacityIssued:
typeof totalCapacityIssued === 'number' ? BigInt(totalCapacityIssued) : totalCapacityIssued.toBigInt(),
};
}

Expand All @@ -172,7 +179,11 @@ export class BlockchainService implements OnApplicationBootstrap, BeforeApplicat
return epochLength.toNumber();
}

public async crawlBlockListForTx(txHash: Hash, blockList: bigint[], successEvents: [{ pallet: string; event: string }]): Promise<ITxMonitorResult> {
public async crawlBlockListForTx(
txHash: Hash,
blockList: bigint[],
successEvents: [{ pallet: string; event: string }],
): Promise<ITxMonitorResult> {
const txReceiptPromises: Promise<ITxMonitorResult>[] = blockList.map(async (blockNumber) => {
const blockHash = await this.getBlockHash(blockNumber);
const block = await this.getBlock(blockHash);
Expand All @@ -192,7 +203,9 @@ export class BlockchainService implements OnApplicationBootstrap, BeforeApplicat
let txError: RegistryError | undefined;

try {
const events = (await eventsPromise).filter(({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex));
const events = (await eventsPromise).filter(
({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex),
);

events.forEach((record) => {
const { event } = record;
Expand All @@ -207,7 +220,9 @@ export class BlockchainService implements OnApplicationBootstrap, BeforeApplicat
}

// check custom success events
if (successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method)) {
if (
successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method)
) {
this.logger.debug(`Found success event ${eventName} ${method}`);
isTxSuccess = true;
}
Expand Down

0 comments on commit 8b372a5

Please sign in to comment.