Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor chain for capacity stake/unstake #329

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
nodejs 20.12.2
nodejs 20.16.0
make 4.3
11 changes: 9 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Controller, Get, Post, HttpCode, HttpStatus, Logger, Param, HttpException, Body, Put } from '@nestjs/common';
import { Controller, Get, Post, HttpCode, HttpStatus, Logger, Param, HttpException, Body } from '@nestjs/common';
import { ApiBody, ApiOkResponse, ApiOperation, ApiTags } from '@nestjs/swagger';
import type { HandleResponse } from '@frequency-chain/api-augment/interfaces';
import { TransactionType } from '#lib/types/enums';
import { HandlesService } from '#api/services/handles.service';
import { EnqueueService } from '#lib/services/enqueue-request.service';
Expand Down
6 changes: 6 additions & 0 deletions services/account/apps/worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import { EventEmitter2 } from '@nestjs/event-emitter';
import { redisReady } from '#lib/utils/redis';
import { WorkerModule } from './worker.module';

// Monkey-patch BigInt so that JSON.stringify will work
// eslint-disable-next-line
BigInt.prototype['toJSON'] = function () {
return this.toString();
};

const logger = new Logger('worker_main');

// bootstrap() does not have a main loop to keep the process alive.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Module } from '@nestjs/common';
import { BlockchainModule } from '#lib/blockchain/blockchain.module';
import { EnqueueService } from '#lib/services/enqueue-request.service';
import { TxnNotifierService } from './notifier.service';

@Module({
imports: [BlockchainModule],
providers: [EnqueueService, TxnNotifierService],
exports: [EnqueueService, TxnNotifierService],
providers: [TxnNotifierService],
exports: [],
})
export class TxnNotifierModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import { BlockchainService } from '#lib/blockchain/blockchain.service';
import { TransactionType } from '#lib/types/enums';
import { SECONDS_PER_BLOCK, TxWebhookRsp, RedisUtils } from 'libs/common/src';
import { createWebhookRsp } from '#worker/transaction_notifier/notifier.service.helper.createWebhookRsp';
import { BlockchainScannerService, NullScanError } from '#lib/utils/blockchain-scanner.service';
import { BlockchainScannerService } from '#lib/utils/blockchain-scanner.service';
import { SchedulerRegistry } from '@nestjs/schedule';
import { BlockHash } from '@polkadot/types/interfaces';
import { SignedBlock } from '@polkadot/types/interfaces';
import { HexString } from '@polkadot/util/types';
import { ITxStatus } from '#lib/interfaces/tx-status.interface';
import { FrameSystemEventRecord } from '@polkadot/types/lookup';
import { ConfigService } from '#lib/config/config.service';
import { QueueConstants } from '#lib/queues';
import { CapacityCheckerService } from '#lib/blockchain/capacity-checker.service';

@Injectable()
export class TxnNotifierService
Expand Down Expand Up @@ -47,9 +48,13 @@ export class TxnNotifierService
private readonly schedulerRegistry: SchedulerRegistry,
@InjectRedis() cacheManager: Redis,
private readonly configService: ConfigService,
private readonly capacityService: CapacityCheckerService,
) {
super(cacheManager, blockchainService, new Logger(TxnNotifierService.prototype.constructor.name));
this.scanParameters = { onlyFinalized: this.configService.trustUnfinalizedBlocks };
this.registerChainEventHandler(['capacity.UnStaked', 'capacity.Staked'], () =>
this.capacityService.checkForSufficientCapacity(),
);
}

public get intervalName() {
Expand All @@ -72,25 +77,6 @@ export class TxnNotifierService
}
}

protected async checkInitialScanParameters(): Promise<void> {
const pendingTxns = await this.cacheManager.hlen(RedisUtils.TXN_WATCH_LIST_KEY);
if (pendingTxns === 0) {
throw new NullScanError('No pending extrinsics; no scan will be performed');
}

return super.checkInitialScanParameters();
}

protected async checkScanParameters(blockNumber: number, blockHash: BlockHash): Promise<void> {
const pendingTxns = await this.cacheManager.hlen(RedisUtils.TXN_WATCH_LIST_KEY);

if (pendingTxns === 0) {
throw new NullScanError('No pending extrinsics; terminating current scan iteration');
}

return super.checkScanParameters(blockNumber, blockHash);
}

public async getLastSeenBlockNumber(): Promise<number> {
let blockNumber = await super.getLastSeenBlockNumber();
const pendingTxns = await this.cacheManager.hvals(RedisUtils.TXN_WATCH_LIST_KEY);
Expand All @@ -107,15 +93,16 @@ export class TxnNotifierService
return blockNumber;
}

async processCurrentBlock(currentBlockHash: BlockHash, currentBlockNumber: number): Promise<void> {
async processCurrentBlock(currentBlock: SignedBlock, blockEvents: FrameSystemEventRecord[]): Promise<void> {
const currentBlockNumber = currentBlock.block.header.number.toNumber();

// Get set of tx hashes to monitor from cache
const pendingTxns = (await this.cacheManager.hvals(RedisUtils.TXN_WATCH_LIST_KEY)).map(
(val) => JSON.parse(val) as ITxStatus,
);

const block = await this.blockchainService.getBlock(currentBlockHash);
const extrinsicIndices: [HexString, number][] = [];
block.block.extrinsics.forEach((extrinsic, index) => {
currentBlock.block.extrinsics.forEach((extrinsic, index) => {
if (pendingTxns.some(({ txHash }) => txHash === extrinsic.hash.toHex())) {
extrinsicIndices.push([extrinsic.hash.toHex(), index]);
}
Expand All @@ -124,9 +111,9 @@ export class TxnNotifierService
let pipeline = this.cacheManager.multi({ pipeline: true });

if (extrinsicIndices.length > 0) {
const at = await this.blockchainService.api.at(currentBlockHash);
const at = await this.blockchainService.api.at(currentBlock.block.header.hash);
const epoch = (await at.query.capacity.currentEpoch()).toNumber();
const events: FrameSystemEventRecord[] = (await at.query.system.events()).filter(
const events: FrameSystemEventRecord[] = blockEvents.filter(
({ phase }) => phase.isApplyExtrinsic && extrinsicIndices.some((index) => phase.asApplyExtrinsic.eq(index)),
);

Expand Down Expand Up @@ -154,7 +141,7 @@ export class TxnNotifierService
const moduleError = dispatchError.registry.findMetaError(moduleThatErrored);
this.logger.error(`Extrinsic failed with error: ${JSON.stringify(moduleError)}`);
} else if (successEvent) {
this.logger.verbose(`Successfully found transaction ${txHash} in block ${currentBlockHash}`);
this.logger.verbose(`Successfully found transaction ${txHash} in block ${currentBlockNumber}`);
const webhook = await this.getWebhook();
let webhookResponse: Partial<TxWebhookRsp> = {};
webhookResponse.referenceId = txStatus.referenceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { SubmittableExtrinsic } from '@polkadot/api-base/types';
import { Codec, ISubmittableResult } from '@polkadot/types/types';
import { MILLISECONDS_PER_SECOND } from 'time-constants';
import { SchedulerRegistry } from '@nestjs/schedule';
import { BlockchainService } from '#lib/blockchain/blockchain.service';
import { BlockchainService, ICapacityInfo } from '#lib/blockchain/blockchain.service';
import { createKeys } from '#lib/blockchain/create-keys';
import { NonceService } from '#lib/services/nonce.service';
import { TransactionType } from '#lib/types/enums';
Expand All @@ -18,6 +18,12 @@ import { RedisUtils, TransactionData } from 'libs/common/src';
import { ConfigService } from '#lib/config/config.service';
import { ITxStatus } from '#lib/interfaces/tx-status.interface';
import { HexString } from '@polkadot/util/types';
import {
CAPACITY_AVAILABLE_EVENT,
CAPACITY_EXHAUSTED_EVENT,
CapacityCheckerService,
} from '#lib/blockchain/capacity-checker.service';
import { OnEvent } from '@nestjs/event-emitter';

export const SECONDS_PER_BLOCK = 12;
const CAPACITY_EPOCH_TIMEOUT_NAME = 'capacity_check';
Expand All @@ -29,7 +35,7 @@ const CAPACITY_EPOCH_TIMEOUT_NAME = 'capacity_check';
@Processor(QueueConstants.TRANSACTION_PUBLISH_QUEUE)
export class TransactionPublisherService extends BaseConsumer implements OnApplicationShutdown {
public async onApplicationBootstrap() {
await this.checkCapacity();
await this.capacityCheckerService.checkForSufficientCapacity();
}

public async onApplicationShutdown(_signal?: string | undefined): Promise<void> {
Expand All @@ -48,6 +54,7 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
private blockchainService: BlockchainService,
private nonceService: NonceService,
private schedulerRegistry: SchedulerRegistry,
private capacityCheckerService: CapacityCheckerService,
) {
super();
}
Expand All @@ -61,7 +68,7 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
let txHash: HexString;
try {
// Check capacity first; if out of capacity, send job back to queue
if (!(await this.checkCapacity())) {
if (!(await this.capacityCheckerService.checkForSufficientCapacity())) {
job.moveToDelayed(Date.now(), job.token); // fake delay, we just want to avoid processing the current job if we're out of capacity
throw new DelayedError();
}
Expand Down Expand Up @@ -114,11 +121,10 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
obj[txHash] = JSON.stringify(status);
this.cacheManager.hset(RedisUtils.TXN_WATCH_LIST_KEY, obj);
} catch (error: any) {
if (error instanceof DelayedError) {
throw error;
if (!(error instanceof DelayedError)) {
this.logger.error('Unknown error encountered: ', error, error?.stack);
}

this.logger.error('Unknown error encountered: ', error, error?.stack);
throw error;
}
}
Expand Down Expand Up @@ -183,89 +189,59 @@ export class TransactionPublisherService extends BaseConsumer implements OnAppli
}
}

/**
* Checks the capacity of the account publisher and takes appropriate actions based on the capacity status.
* If the capacity is exhausted, it pauses the account change publish queue and sets a timeout to check the capacity again.
* If the capacity is refilled, it resumes the account change publish queue and clears the timeout.
* If any jobs failed due to low balance/no capacity, it retries them.
* If any error occurs during the capacity check, it logs the error.
*/
private async checkCapacity(): Promise<boolean> {
let outOfCapacity = false;
@OnEvent(CAPACITY_EXHAUSTED_EVENT)
public async handleCapacityExhausted(capacityInfo: ICapacityInfo) {
await this.transactionPublishQueue.pause();
const blocksRemaining = capacityInfo.nextEpochStart - capacityInfo.currentBlockNumber;
const epochTimeout = blocksRemaining * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
// Avoid spamming the log
if (!(await this.transactionPublishQueue.isPaused())) {
this.logger.warn(
`Capacity Exhausted: Pausing account change publish queue until next epoch: ${epochTimeout / 1000} seconds`,
);
}
try {
const { capacityLimit } = this.configService;
const capacityInfo = await this.blockchainService.capacityInfo(this.configService.providerId);
const { remainingCapacity } = capacityInfo;
const { currentEpoch } = capacityInfo;
const epochCapacityKey = `epochCapacity:${currentEpoch}`;
const epochUsedCapacity = BigInt((await this.cacheManager.get(epochCapacityKey)) ?? 0); // Fetch capacity used by the service
outOfCapacity = remainingCapacity <= 0n;

if (!outOfCapacity) {
this.logger.debug(` Capacity remaining: ${remainingCapacity}`);
if (capacityLimit.type === 'percentage') {
const capacityLimitPercentage = BigInt(capacityLimit.value);
const capacityLimitThreshold = (capacityInfo.totalCapacityIssued * capacityLimitPercentage) / 100n;
this.logger.debug(`Capacity limit threshold: ${capacityLimitThreshold}`);
if (epochUsedCapacity >= capacityLimitThreshold) {
outOfCapacity = true;
this.logger.warn(`Capacity threshold reached: used ${epochUsedCapacity} of ${capacityLimitThreshold}`);
}
} else if (epochUsedCapacity >= capacityLimit.value) {
outOfCapacity = true;
this.logger.warn(`Capacity threshold reached: used ${epochUsedCapacity} of ${capacityLimit.value}`);
}
// Check if a timeout with the same name already exists
if (this.schedulerRegistry.doesExist('timeout', CAPACITY_EPOCH_TIMEOUT_NAME)) {
// If it does, delete it
this.schedulerRegistry.deleteTimeout(CAPACITY_EPOCH_TIMEOUT_NAME);
}

if (outOfCapacity) {
await this.transactionPublishQueue.pause();
const blocksRemaining = capacityInfo.nextEpochStart - capacityInfo.currentBlockNumber;
const epochTimeout = blocksRemaining * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
this.logger.warn(
`Capacity Exhausted: Pausing account change publish queue until next epoch: ${epochTimeout / 1000} seconds`,
);
try {
// Check if a timeout with the same name already exists
if (this.schedulerRegistry.doesExist('timeout', CAPACITY_EPOCH_TIMEOUT_NAME)) {
// If it does, delete it
this.schedulerRegistry.deleteTimeout(CAPACITY_EPOCH_TIMEOUT_NAME);
}

// Add the new timeout
this.schedulerRegistry.addTimeout(
CAPACITY_EPOCH_TIMEOUT_NAME,
setTimeout(() => this.checkCapacity(), epochTimeout),
);
} catch (err) {
// Handle any errors
console.error(err);
}
} else {
this.logger.verbose('Capacity Available: Resuming account change publish queue and clearing timeout');
// Get the failed jobs and check if they failed due to capacity
const failedJobs = await this.transactionPublishQueue.getFailed();
const capacityFailedJobs = failedJobs.filter((job) =>
job.failedReason?.includes('1010: Invalid Transaction: Inability to pay some fees'),
);
// Retry the failed jobs
await Promise.all(
capacityFailedJobs.map(async (job) => {
this.logger.debug(`Retrying job ${job.id}`);
job.retry();
}),
);
try {
this.schedulerRegistry.deleteTimeout(CAPACITY_EPOCH_TIMEOUT_NAME);
} catch (err) {
// ignore
}
// Add the new timeout
this.schedulerRegistry.addTimeout(
CAPACITY_EPOCH_TIMEOUT_NAME,
setTimeout(() => this.capacityCheckerService.checkForSufficientCapacity(), epochTimeout),
);
} catch (err) {
// Handle any errors
console.error(err);
}
}

await this.transactionPublishQueue.resume();
}
@OnEvent(CAPACITY_AVAILABLE_EVENT)
public async handleCapacityAvailable() {
// Avoid spamming the log
if (await this.transactionPublishQueue.isPaused()) {
this.logger.verbose('Capacity Available: Resuming account change publish queue and clearing timeout');
}
// Get the failed jobs and check if they failed due to capacity
const failedJobs = await this.transactionPublishQueue.getFailed();
const capacityFailedJobs = failedJobs.filter((job) =>
job.failedReason?.includes('1010: Invalid Transaction: Inability to pay some fees'),
);
// Retry the failed jobs
await Promise.all(
capacityFailedJobs.map(async (job) => {
this.logger.debug(`Retrying job ${job.id}`);
job.retry();
}),
);
try {
this.schedulerRegistry.deleteTimeout(CAPACITY_EPOCH_TIMEOUT_NAME);
} catch (err) {
this.logger.error('Caught error in checkCapacity', err);
// ignore
}

return !outOfCapacity;
await this.transactionPublishQueue.resume();
}
}
2 changes: 1 addition & 1 deletion services/account/apps/worker/src/worker.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ import { TransactionPublisherModule } from './transaction_publisher/publisher.mo
TxnNotifierModule,
],
providers: [ProviderWebhookService, NonceService],
exports: [],
exports: [EventEmitterModule],
})
export class WorkerModule {}
2 changes: 1 addition & 1 deletion services/account/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ HEALTH_CHECK_SUCCESS_THRESHOLD=10
# Maximum amount of provider capacity this app is allowed to use (per epoch)
# type: 'percentage' | 'amount'
# value: number (may be percentage, ie '80', or absolute amount of capacity)
CAPACITY_LIMIT='{"type":"percentage", "value":80}'
CAPACITY_LIMIT='{"serviceLimit":{"type":"percentage","value":"80"}}'

# URL for the Sign-In With Frequency UI
SIWF_URL=https://projectlibertylabs.github.io/siwf/ui
Expand Down
6 changes: 6 additions & 0 deletions services/account/jest.init.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/* eslint-disable func-names */
/* eslint-disable no-extend-native */
// eslint-disable-next-line dot-notation
BigInt.prototype['toJSON'] = function () {
return this.toString();
};
Loading
Loading