Skip to content

Commit

Permalink
feat: add chain-scan for extrinsic completion to content-publishing-s…
Browse files Browse the repository at this point in the history
…ervice
  • Loading branch information
JoeCap08055 committed Aug 7, 2024
1 parent 06b511f commit 22dd9ac
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { FrameSystemEventRecord } from '@polkadot/types/lookup';
import { HexString } from '@polkadot/util/types';
import { ITxStatus } from '#libs/interfaces/tx-status.interface';
import { CapacityCheckerService } from '#libs/blockchain/capacity-checker.service';
import axios from 'axios';

@Injectable()
export class TxStatusMonitoringService extends BlockchainScannerService {
Expand Down Expand Up @@ -51,7 +50,9 @@ export class TxStatusMonitoringService extends BlockchainScannerService {
) {
super(cacheManager, blockchainService, new Logger(TxStatusMonitoringService.prototype.constructor.name));
this.scanParameters = { onlyFinalized: this.configService.trustUnfinalizedBlocks };
this.registerChainEventHandler(['capacity.UnStaked', 'capacity.Staked'], () => this.capacityService.checkCapacity());
this.registerChainEventHandler(['capacity.UnStaked', 'capacity.Staked'], () =>
this.capacityService.checkForSufficientCapacity(),
);
}

public get intervalName() {
Expand All @@ -74,9 +75,11 @@ export class TxStatusMonitoringService extends BlockchainScannerService {
let pipeline = this.cacheManager.multi({ pipeline: true });

if (extrinsicIndices.length > 0) {
const at = await this.blockchainService.api.at(currentBlock.hash);
const at = await this.blockchainService.api.at(currentBlock.block.header.hash);
const epoch = (await at.query.capacity.currentEpoch()).toNumber();
const events: FrameSystemEventRecord[] = blockEvents.filter(({ phase }) => phase.isApplyExtrinsic && extrinsicIndices.some((index) => phase.asApplyExtrinsic.eq(index)));
const events: FrameSystemEventRecord[] = blockEvents.filter(
({ phase }) => phase.isApplyExtrinsic && extrinsicIndices.some((index) => phase.asApplyExtrinsic.eq(index)),
);

const totalCapacityWithdrawn: bigint = events.reduce((sum, { event }) => {
if (at.events.capacity.CapacityWithdrawn.is(event)) {
Expand All @@ -87,11 +90,16 @@ export class TxStatusMonitoringService extends BlockchainScannerService {

// eslint-disable-next-line no-restricted-syntax
for (const [txHash, txIndex] of extrinsicIndices) {
const extrinsicEvents = events.filter(({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex));
const extrinsicEvents = events.filter(
({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex),
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const txStatusStr = (await this.cacheManager.hget(TXN_WATCH_LIST_KEY, txHash))!;
const txStatus = JSON.parse(txStatusStr) as ITxStatus;
const successEvent = extrinsicEvents.find(({ event }) => event.section === txStatus.successEvent.section && event.method === txStatus.successEvent.method)?.event;
const successEvent = extrinsicEvents.find(
({ event }) =>
event.section === txStatus.successEvent.section && event.method === txStatus.successEvent.method,
)?.event;
const failureEvent = extrinsicEvents.find(({ event }) => at.events.system.ExtrinsicFailed.is(event))?.event;

// TODO: Should the webhook provide for reporting failure?
Expand Down Expand Up @@ -127,7 +135,9 @@ export class TxStatusMonitoringService extends BlockchainScannerService {
// eslint-disable-next-line no-restricted-syntax
for (const { birth, death, txHash, referencePublishJob } of pendingTxns) {
if (death <= currentBlockNumber) {
this.logger.warn(`Tx ${txHash} expired (birth: ${birth}, death: ${death}, currentBlock: ${currentBlockNumber}), adding back to the publishing queue`);
this.logger.warn(
`Tx ${txHash} expired (birth: ${birth}, death: ${death}, currentBlock: ${currentBlockNumber}), adding back to the publishing queue`,
);
// could not find the transaction, this might happen if transaction never gets into a block
await this.retryPublishJob(referencePublishJob);
pipeline = pipeline.hdel(TXN_WATCH_LIST_KEY, txHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ export class IPFSPublisher {
public async publish(message: IPublisherJob): Promise<[Hash, SubmittableExtrinsic<'promise', ISubmittableResult>]> {
this.logger.debug(JSON.stringify(message));
const providerKeys = createKeys(this.configService.providerAccountSeedPhrase);
const tx = this.blockchainService.createExtrinsicCall({ pallet: 'messages', extrinsic: 'addIpfsMessage' }, message.schemaId, message.data.cid, message.data.payloadLength);
const tx = this.blockchainService.createExtrinsicCall(
{ pallet: 'messages', extrinsic: 'addIpfsMessage' },
message.schemaId,
message.data.cid,
message.data.payloadLength,
);
return this.processSingleBatch(providerKeys, tx);
}

Expand All @@ -34,7 +39,12 @@ export class IPFSPublisher {
): Promise<[Hash, SubmittableExtrinsic<'promise', ISubmittableResult>]> {
this.logger.debug(`Submitting tx of size ${tx.length}`);
try {
const ext = this.blockchainService.createExtrinsic({ pallet: 'frequencyTxPayment', extrinsic: 'payWithCapacity' }, providerKeys, tx);
const ext = this.blockchainService.createExtrinsic(
{ pallet: 'frequencyTxPayment', extrinsic: 'payWithCapacity' },
providerKeys,
tx,
);
this.logger.debug('Submitted transaction: ', ext.getCall().toHex());
const nonce = await this.nonceService.getNextNonce();
const txHash = await ext.signAndSend(nonce);
if (!txHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
}

public async onApplicationBootstrap() {
await this.capacityCheckerService.checkCapacity();
await this.capacityCheckerService.checkForSufficientCapacity();
}

async onModuleDestroy(): Promise<void> {
Expand All @@ -51,7 +51,7 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
async process(job: Job<IPublisherJob, any, string>): Promise<void> {
try {
// Check capacity first; if out of capacity, send job back to queue
if (!(await this.capacityCheckerService.checkCapacity())) {
if (!(await this.capacityCheckerService.checkForSufficientCapacity())) {
job.moveToDelayed(Date.now(), job.token); // fake delay, we just want to avoid processing the current job if we're out of capacity
throw new DelayedError();
}
Expand All @@ -62,8 +62,12 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
const status: ITxStatus = {
txHash: txHash.toString(),
successEvent: { section: 'messages', method: 'MessagesInBlock' },
birth: tx.era.asMortalEra.birth(currentBlockNumber),
death: tx.era.asMortalEra.death(currentBlockNumber),
// TODO: For some reason, the constructed transaction here keeps coming back as ImmortalEra.
// Until that's fixed, just assume a fixed mortality of 50 blocks from the current block.
// birth: tx.era.asMortalEra.birth(currentBlockNumber),
// death: tx.era.asMortalEra.death(currentBlockNumber),
birth: currentBlockNumber,
death: currentBlockNumber + 50,
referencePublishJob: job.data,
};
const obj = {};
Expand Down Expand Up @@ -96,7 +100,10 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
try {
this.schedulerRegistry.addTimeout(
CAPACITY_EPOCH_TIMEOUT_NAME,
setTimeout(() => this.capacityCheckerService.checkCapacity(), blocksRemaining * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND),
setTimeout(
() => this.capacityCheckerService.checkForSufficientCapacity(),
blocksRemaining * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND,
),
);
} catch (err) {
// ignore duplicate timeout
Expand All @@ -105,7 +112,10 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot

@OnEvent('capacity.available', { async: true, promisify: true })
private async handleCapacityRefilled() {
this.logger.debug('Received capacity.refilled event');
// Avoid spamming the log
if (await this.publishQueue.isPaused()) {
this.logger.debug('Received capacity.available event');
}
try {
this.schedulerRegistry.deleteTimeout(CAPACITY_EPOCH_TIMEOUT_NAME);
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { Injectable, Logger, OnApplicationBootstrap, OnApplicationShutdown } fro
import { ApiPromise, HttpProvider, WsProvider } from '@polkadot/api';
import { options } from '@frequency-chain/api-augment';
import { KeyringPair } from '@polkadot/keyring/types';
import { BlockHash, BlockNumber, Hash, SignedBlock } from '@polkadot/types/interfaces';
import { BlockHash, BlockNumber, SignedBlock } from '@polkadot/types/interfaces';
import { SubmittableExtrinsic } from '@polkadot/api/types';
import { AnyNumber, ISubmittableResult, RegistryError } from '@polkadot/types/types';
import { AnyNumber, ISubmittableResult } from '@polkadot/types/types';
import { u32, Option, Bytes, Vec, u16 } from '@polkadot/types';
import { PalletCapacityCapacityDetails, PalletCapacityEpochInfo } from '@polkadot/types/lookup';
import { Extrinsic } from './extrinsic';
Expand All @@ -28,6 +28,13 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS

private logger: Logger;

private readyResolve: (boolean) => void;
private readyReject: (reason: any) => void;
private isReadyPromise = new Promise<boolean>((resolve, reject) => {
this.readyResolve = resolve;
this.readyReject = reject;
});

public async onApplicationBootstrap() {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const providerUrl = this.configService.frequencyUrl!;
Expand All @@ -42,17 +49,12 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
}
// this.api = await firstValueFrom(ApiRx.create({ provider, ...options }));
this.api = await ApiPromise.create({ provider, ...options });
await this.api.isReady;
this.readyResolve(await this.api.isReady);
this.logger.log('Blockchain API ready.');
}

public async onApplicationShutdown(_signal?: string | undefined) {
const promises: Promise<void>[] = [];
if (this.api) {
promises.push(this.api.disconnect());
}

await Promise.all(promises);
await this.api?.disconnect();
}

constructor(configService: ConfigService) {
Expand All @@ -61,8 +63,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
}

public async isReady(): Promise<boolean> {
await this.api?.isReady;
return true;
return (await this.isReadyPromise) && !!(await this.api.isReady);
}

public getBlockHash(block: BlockNumber | AnyNumber): Promise<BlockHash> {
Expand Down Expand Up @@ -178,95 +179,6 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
return this.rpc('system', 'accountNextIndex', account);
}

public async crawlBlockListForTx(
txHash: Hash,
blockList: number[],
successEvents: [{ pallet: string; event: string }],
): Promise<{
found: boolean;
success: boolean;
blockHash?: BlockHash;
capacityEpoch?: number;
capacityWithdrawn?: bigint;
error?: RegistryError;
}> {
const txReceiptPromises: Promise<{
found: boolean;
success: boolean;
blockHash?: BlockHash;
capacityWithdrawn?: bigint;
error?: RegistryError;
}>[] = blockList.map(async (blockNumber) => {
const blockHash = await this.getBlockHash(blockNumber);
const block = await this.getBlock(blockHash);
const txIndex = block.block.extrinsics.findIndex((extrinsic) => extrinsic.hash.toString() === txHash.toString());

if (txIndex === -1) {
return { found: false, success: false };
}

this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`);
const at = await this.api.at(blockHash.toHex());
const capacityEpoch = (await at.query.capacity.currentEpoch()).toNumber();
const eventsPromise = at.query.system.events();

let isTxSuccess = false;
let totalBlockCapacity = 0n;
let txError: RegistryError | undefined;

try {
const events = (await eventsPromise).filter(
({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex),
);

events.forEach((record) => {
const { event } = record;
const eventName = event.section;
const { method } = event;
const { data } = event;
this.logger.debug(`Received event: ${eventName} ${method} ${data}`);

// find capacity withdrawn event
if (at.events.capacity.CapacityWithdrawn.is(event)) {
totalBlockCapacity += event.data.amount.toBigInt();
}

// check custom success events
if (
successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method)
) {
this.logger.debug(`Found success event ${eventName} ${method}`);
isTxSuccess = true;
}

// check for system extrinsic failure
if (at.events.system.ExtrinsicFailed.is(event)) {
const { dispatchError } = event.data;
const moduleThatErrored = dispatchError.asModule;
const moduleError = dispatchError.registry.findMetaError(moduleThatErrored);
txError = moduleError;
this.logger.error(`Extrinsic failed with error: ${JSON.stringify(moduleError)}`);
}
});
} catch (error) {
this.logger.error(error);
}
this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`);
return {
found: true,
success: isTxSuccess,
blockHash,
capacityEpoch,
capacityWithDrawn: totalBlockCapacity,
error: txError,
};
});
const results = await Promise.all(txReceiptPromises);
const result = results.find((receipt) => receipt.found);
this.logger.debug(`Found tx receipt: ${JSON.stringify(result)}`);
return result ?? { found: false, success: false };
}

public async getSchemaIdByName(schemaNamespace: string, schemaDescriptor: string): Promise<number> {
const { ids }: { ids: Vec<u16> } = await this.api.query.schemas.schemaNameToIds(schemaNamespace, schemaDescriptor);
const schemaId = ids.toArray().pop()?.toNumber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ export class CapacityCheckerService {
outOfCapacity = totalCapacityUsed >= limit;

if (outOfCapacity) {
this.logger.warn(`Total capacity usage limit reached: used ${totalCapacityUsed} of ${limit} allowed (${totalCapacityIssued} total issued)`);
this.logger.warn(
`Total capacity usage limit reached: used ${totalCapacityUsed} of ${limit} allowed (${totalCapacityIssued} total issued)`,
);
}

return outOfCapacity;
Expand All @@ -64,16 +66,24 @@ export class CapacityCheckerService {
this.logger.warn(`Capacity service threshold reached: used ${epochUsedCapacity} of ${limit}`);
} else if (this.lastCapacityEpoch !== currentEpoch || this.lastCapacityUsedCheck !== epochUsedCapacity) {
// Minimum with bigints
const serviceRemaining = remainingCapacity > limit - epochUsedCapacity ? limit - epochUsedCapacity : remainingCapacity;
this.logger.verbose(`Service Capacity usage: ${epochUsedCapacity} of ${limit} allowed (${serviceRemaining} remaining)`);
const serviceRemaining =
remainingCapacity > limit - epochUsedCapacity ? limit - epochUsedCapacity : remainingCapacity;
this.logger.verbose(
`Service Capacity usage: ${epochUsedCapacity} of ${limit} allowed (${serviceRemaining} remaining)`,
);
this.lastCapacityEpoch = currentEpoch;
this.lastCapacityUsedCheck = epochUsedCapacity;
}

return outOfCapacity;
}

public async checkCapacity(): Promise<boolean> {
/**
* Checks remaining Capacity against configured per-service and total Capacity limits.
*
* @returns {boolean} Returns true if remaining Capacity is within allowed limits; false otherwise
*/
public async checkForSufficientCapacity(): Promise<boolean> {
let outOfCapacity = false;

try {
Expand All @@ -87,7 +97,9 @@ export class CapacityCheckerService {
this.logger.warn(`No capacity!`);
}

const totalLimitExceeded = capacityLimit.totalLimit ? this.checkTotalCapacityLimit(capacityInfo, capacityLimit.totalLimit) : false;
const totalLimitExceeded = capacityLimit.totalLimit
? this.checkTotalCapacityLimit(capacityInfo, capacityLimit.totalLimit)
: false;
const serviceLimitExceeded = await this.checkServiceCapacityLimit(capacityInfo, capacityLimit.serviceLimit);

outOfCapacity = capacityInfo.remainingCapacity <= 0n || serviceLimitExceeded || totalLimitExceeded;
Expand All @@ -102,6 +114,6 @@ export class CapacityCheckerService {
this.logger.error('Caught error in checkCapacity', err?.stack);
}

return outOfCapacity;
return !outOfCapacity;
}
}

0 comments on commit 22dd9ac

Please sign in to comment.