From d1f4cf0452a70a8d432e88d200e62764d13d4cd6 Mon Sep 17 00:00:00 2001 From: Matthew Orris <1466844+mattheworris@users.noreply.github.com> Date: Fri, 25 Oct 2024 11:46:48 -0400 Subject: [PATCH] fix: make account service scanner the same as graph (#661) # Problem `account-service` can crash if it loses connection to the blockchain ``` 2024-10-24 16:16:50 API-WS: disconnected from wss://0.rpc.testnet.amplica.io/: 1006:: Abnormal Closure [Nest] 76924 - 10/24/2024, 4:16:50 PM ERROR [BlockchainService] Communications error with Frequency node; starting 60-second shutdown timer 2024-10-24 16:16:50 RPC-CORE: getBlockHash(blockNumber?: BlockNumber): BlockHash:: disconnected from wss://0.rpc.testnet.amplica.io/: 1006:: Abnormal Closure [Nest] 76924 - 10/24/2024, 4:16:50 PM ERROR [TxnNotifierService] Unexpected error scanning chain [Nest] 76924 - 10/24/2024, 4:16:50 PM ERROR [TxnNotifierService] {} Error: disconnected from wss://0.rpc.testnet.amplica.io/: 1006:: Abnormal Closure at WebSocket.__internal__onSocketClose (/Users/mattheworris/projects/fresh-gateway/node_modules/@polkadot/api/node_modules/@polkadot/rpc-provider/cjs/ws/index.js:355:23) at callListener (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/event-target.js:290:14) at WebSocket.onClose (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/event-target.js:220:9) at WebSocket.emit (node:events:518:28) at WebSocket.emitClose (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/websocket.js:272:10) at Receiver.receiverOnFinish (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/websocket.js:1209:20) at Receiver.emit (node:events:518:28) at finish (node:internal/streams/writable:946:10) at node:internal/streams/writable:927:13 at process.processTicksAndRejections (node:internal/process/task_queues:82:21) ****** UNCAUGHT EXCEPTION ****** Error: disconnected from wss://0.rpc.testnet.amplica.io/: 1006:: Abnormal Closure at WebSocket.__internal__onSocketClose (/Users/mattheworris/projects/fresh-gateway/node_modules/@polkadot/api/node_modules/@polkadot/rpc-provider/cjs/ws/index.js:355:23) at callListener (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/event-target.js:290:14) at WebSocket.onClose (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/event-target.js:220:9) at WebSocket.emit (node:events:518:28) at WebSocket.emitClose (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/websocket.js:272:10) at Receiver.receiverOnFinish (/Users/mattheworris/projects/fresh-gateway/node_modules/ws/lib/websocket.js:1209:20) at Receiver.emit (node:events:518:28) at finish (node:internal/streams/writable:946:10) at node:internal/streams/writable:927:13 at process.processTicksAndRejections (node:internal/process/task_queues:82:21) ``` # Solution Implement the same updates as graph service, which does not exhibit the problem. ## Steps to Verify: 1. Run account service connected to a local node. 2. Stop the local frequency node. 3. Verify that account-service does not crash due to the unhandled exception above. --------- Co-authored-by: Joe Caputo --- .../utils/blockchain-scanner.service.spec.ts | 106 +++++++++++++----- .../src/utils/blockchain-scanner.service.ts | 50 +++++++-- package.json | 23 ++-- 3 files changed, 133 insertions(+), 46 deletions(-) diff --git a/libs/account-lib/src/utils/blockchain-scanner.service.spec.ts b/libs/account-lib/src/utils/blockchain-scanner.service.spec.ts index 40d4acf5..8d09b17a 100644 --- a/libs/account-lib/src/utils/blockchain-scanner.service.spec.ts +++ b/libs/account-lib/src/utils/blockchain-scanner.service.spec.ts @@ -1,21 +1,55 @@ import { Injectable, Logger } from '@nestjs/common'; import { Test } from '@nestjs/testing'; -import { Hash, SignedBlock } from '@polkadot/types/interfaces'; -import { BlockchainService } from '#blockchain/blockchain.service'; +import { BlockHash, BlockNumber, SignedBlock } from '@polkadot/types/interfaces'; import { DEFAULT_REDIS_NAMESPACE, getRedisToken, InjectRedis } from '@songkeys/nestjs-redis'; import { Redis } from 'ioredis'; -import { FrameSystemEventRecord } from '@polkadot/types/lookup'; import { BlockchainScannerService } from './blockchain-scanner.service'; +import { FrameSystemEventRecord } from '@polkadot/types/lookup'; +import { mockApiPromise } from '#testlib/polkadot-api.mock.spec'; +import { BlockchainService, NONCE_SERVICE_REDIS_NAMESPACE } from '#blockchain/blockchain.service'; +import { IBlockchainNonProviderConfig } from '#blockchain/blockchain.config'; +import { GenerateMockConfigProvider } from '#testlib/utils.config-tests'; +import { EventEmitterModule } from '@nestjs/event-emitter'; +import { AnyNumber } from '@polkadot/types/types'; + +jest.mock('@polkadot/api', () => { + const originalModule = jest.requireActual('@polkadot/api'); + return { + __esModules: true, + WsProvider: jest.fn().mockImplementation(() => originalModule.WsProvider), + ApiPromise: jest.fn().mockImplementation(() => ({ + ...originalModule.ApiPromise, + ...mockApiPromise, + })), + }; +}); +const mockBlockchainConfigProvider = GenerateMockConfigProvider('blockchain', { + frequencyTimeoutSecs: 10, + frequencyApiWsUrl: new URL('ws://localhost:9944'), + isDeployedReadOnly: false, +}); const mockRedis = { + get: jest.fn(), + set: jest.fn(), + defineCommand: jest.fn(), +}; + +const mockDefaultRedisProvider = { provide: getRedisToken(DEFAULT_REDIS_NAMESPACE), - useValue: { get: jest.fn(), set: jest.fn() }, + useValue: mockRedis, +}; + +const mockNonceRedisProvider = { + provide: getRedisToken(NONCE_SERVICE_REDIS_NAMESPACE), + useValue: mockRedis, }; const mockBlockHash = { toString: jest.fn(() => '0x1234'), some: () => true, -}; + isEmpty: false, +} as unknown as BlockHash; const mockSignedBlock = { block: { @@ -26,29 +60,11 @@ const mockSignedBlock = { }, }; -Object.defineProperty(mockBlockHash, 'isEmpty', { - get: jest.fn(() => false), -}); - const mockEmptyBlockHash = { toString: jest.fn(() => '0x00000'), some: () => false, -}; -Object.defineProperty(mockEmptyBlockHash, 'isEmpty', { - get: jest.fn(() => true), -}); -const mockBlockchainService = { - isReady: jest.fn(() => Promise.resolve()), - getBlock: jest.fn((_blockHash?: string | Hash) => mockSignedBlock as unknown as SignedBlock), - getBlockHash: jest.fn((blockNumber: number) => (blockNumber > 1 ? mockEmptyBlockHash : mockBlockHash)), - getLatestFinalizedBlockNumber: jest.fn(), - getEvents: jest.fn(() => []), -}; - -const mockBlockchainServiceProvider = { - provide: BlockchainService, - useValue: mockBlockchainService, -}; + isEmpty: true, +} as unknown as BlockHash; @Injectable() class ScannerService extends BlockchainScannerService { @@ -67,10 +83,48 @@ describe('BlockchainScannerService', () => { beforeAll(async () => { const moduleRef = await Test.createTestingModule({ - providers: [mockRedis, Logger, mockBlockchainServiceProvider, ScannerService], + imports: [ + EventEmitterModule.forRoot({ + // Use this instance throughout the application + global: true, + // set this to `true` to use wildcards + wildcard: false, + // the delimiter used to segment namespaces + delimiter: '.', + // set this to `true` if you want to emit the newListener event + newListener: false, + // set this to `true` if you want to emit the removeListener event + removeListener: false, + // the maximum amount of listeners that can be assigned to an event + maxListeners: 10, + // show event name in memory leak message when more than maximum amount of listeners is assigned + verboseMemoryLeak: false, + // disable throwing uncaughtException if an error event is emitted and it has no listeners + ignoreErrors: false, + }), + ], + providers: [ + mockDefaultRedisProvider, + mockNonceRedisProvider, + mockBlockchainConfigProvider, + Logger, + BlockchainService, + ScannerService, + ], }).compile(); + moduleRef.enableShutdownHooks(); service = moduleRef.get(ScannerService); blockchainService = moduleRef.get(BlockchainService); + const mockApi: any = await blockchainService.getApi(); + jest.spyOn(blockchainService, 'getBlock').mockResolvedValue(mockSignedBlock as unknown as SignedBlock); + jest + .spyOn(blockchainService, 'getBlockHash') + .mockImplementation((blockNumber: BlockNumber | AnyNumber) => + Promise.resolve((blockNumber as unknown as number) > 1 ? mockEmptyBlockHash : mockBlockHash), + ); + jest.spyOn(blockchainService, 'getLatestFinalizedBlockNumber'); + jest.spyOn(blockchainService, 'getEvents').mockResolvedValue([]); + mockApi.emit('connected'); // keeps the test suite from hanging when finished }); describe('scan', () => { diff --git a/libs/account-lib/src/utils/blockchain-scanner.service.ts b/libs/account-lib/src/utils/blockchain-scanner.service.ts index f7f4567c..fd51bfdc 100644 --- a/libs/account-lib/src/utils/blockchain-scanner.service.ts +++ b/libs/account-lib/src/utils/blockchain-scanner.service.ts @@ -14,17 +14,20 @@ export interface IBlockchainScanParameters { } export class EndOfChainError extends Error {} +export class SkipBlockError extends Error {} function eventName({ event: { section, method } }: FrameSystemEventRecord) { return `${section}.${method}`; } export abstract class BlockchainScannerService { + private scanIsPaused = false; + protected scanInProgress = false; protected chainEventHandlers = new Map< string, - ((block: SignedBlock, event: FrameSystemEventRecord) => any | Promise)[] + ((block: SignedBlock, event: FrameSystemEventRecord) => unknown | Promise)[] >(); private readonly lastSeenBlockNumberKey: string; @@ -37,6 +40,20 @@ export abstract class BlockchainScannerService { protected readonly logger: Logger, ) { this.lastSeenBlockNumberKey = `${this.constructor.name}:${LAST_SEEN_BLOCK_NUMBER_KEY}`; + this.blockchainService.on('chain.disconnected', () => { + this.paused = true; + }); + this.blockchainService.on('chain.connected', () => { + this.paused = false; + }); + } + + protected get paused() { + return this.scanIsPaused; + } + + private set paused(p: boolean) { + this.scanIsPaused = p; } public get scanParameters() { @@ -57,7 +74,6 @@ export abstract class BlockchainScannerService { } try { - await this.blockchainService.isReady(); // Only scan blocks if initial conditions met await this.checkInitialScanParameters(); @@ -76,26 +92,36 @@ export abstract class BlockchainScannerService { this.logger.verbose(`Starting scan from block #${currentBlockNumber}`); // eslint-disable-next-line no-constant-condition - while (true) { - await this.checkScanParameters(currentBlockNumber, currentBlockHash); // throws when end-of-chain reached - const block = await this.blockchainService.getBlock(currentBlockHash); - const blockEvents = await this.blockchainService.getEvents(currentBlockHash); - await this.handleChainEvents(block, blockEvents); - await this.processCurrentBlock(block, blockEvents); + while (!this.paused) { + try { + await this.checkScanParameters(currentBlockNumber, currentBlockHash); // throws when end-of-chain reached + const block = await this.blockchainService.getBlock(currentBlockHash); + const blockEvents = await this.blockchainService.getEvents(currentBlockHash); + await this.handleChainEvents(block, blockEvents); + await this.processCurrentBlock(block, blockEvents); + } catch (err) { + if (!(err instanceof SkipBlockError)) { + throw err; + } + this.logger.debug(`Skipping block ${currentBlockNumber}`); + } await this.setLastSeenBlockNumber(currentBlockNumber); // Move to the next block currentBlockNumber += 1; currentBlockHash = await this.blockchainService.getBlockHash(currentBlockNumber); } - } catch (e: any) { + } catch (e) { if (e instanceof EndOfChainError) { this.logger.debug(e.message); return; } - this.logger.error('Unexpected error scanning chain', JSON.stringify(e), e?.stack); - throw e; + // Don't throw if scan paused; that's WHY it's paused + if (!this.paused) { + this.logger.error(JSON.stringify(e)); + throw e; + } } finally { this.scanInProgress = false; } @@ -130,7 +156,7 @@ export abstract class BlockchainScannerService { public registerChainEventHandler( events: string[], - callback: (block: SignedBlock, blockEvents: FrameSystemEventRecord) => any | Promise, + callback: (block: SignedBlock, blockEvents: FrameSystemEventRecord) => unknown | Promise, ) { events.forEach((event) => { const handlers = new Set(this.chainEventHandlers.get(event) || []); diff --git a/package.json b/package.json index 131d411b..411e0557 100644 --- a/package.json +++ b/package.json @@ -65,14 +65,21 @@ "generate:swagger-ui:graph": "npx --yes @redocly/cli build-docs openapi-specs/graph.openapi.json --output=./docs/graph/index.html", "pregenerate:swagger-ui:graph": "npx --yes @redocly/cli build-docs openapi-specs/graph-webhooks.openapi.yaml --output=./docs/graph/webhooks.html", "generate:swagger-ui": "npm run generate:swagger-ui:account ; npm run generate:swagger-ui:content-publishing ; npm run generate:swagger-ui:content-watcher ; npm run generate:swagger-ui:graph", - "test:account": "dotenvx run -f env-files/account.template.env -- jest 'account*'", - "test:content-publishing": "dotenvx run -f env-files/content-publishing.template.env -- jest 'content-publishing*'", - "test:content-watcher": "dotenvx run -f env-files/content-watcher.template.env -- jest 'content-watcher*'", - "test:graph": "dotenvx run -f env-files/graph.template.env -- jest 'graph*'", - "test:libs:blockchain": "dotenvx run -f env-files/graph.template.env -- jest --runInBand 'libs/blockchain*'", - "test:libs:cache": "dotenvx run -f env-files/graph.template.env -- jest 'libs/cache*'", - "test:libs:utils": "jest 'libs/utils*'", - "test:libs": "npm run test:libs:blockchain ; npm run test:libs:cache; npm run test:libs:utils;", + "test:account": "dotenvx run -f env-files/account.template.env -- jest 'apps/account-api' 'apps/account-worker'", + "test:content-publishing": "dotenvx run -f env-files/content-publishing.template.env -- jest 'apps/content-publishing-api' 'apps/content-publishing-worker'", + "test:content-watcher": "dotenvx run -f env-files/content-watcher.template.env -- jest 'apps/content-watcher'", + "test:graph": "dotenvx run -f env-files/graph.template.env -- jest 'apps/graph-api' 'apps/graph-worker'", + "test:libs:account": "dotenvx run -f env-files/account.template.env -- jest 'libs/account-lib'", + "test:libs:blockchain": "dotenvx run -f env-files/graph.template.env -- jest 'libs/blockchain'", + "test:libs:cache": "dotenvx run -f env-files/graph.template.env -- jest 'libs/cache'", + "test:libs:config": "dotenvx run -f env-files/graph.template.env -- jest 'libs/config'", + "test:libs:content-publishing": "dotenvx run -f env-files/content-publishing.template.env -- jest 'libs/content-publishing-lib'", + "test:libs:content-watcher": "dotenvx run -f env-files/content-watcher.template.env -- jest 'libs/content-watcher-lib'", + "test:libs:graph": "dotenvx run -f env-files/graph.template.env -- jest 'libs/graph-lib'", + "test:libs:queue": "dotenvx run -f env-files/graph.template.env -- jest 'libs/queue'", + "test:libs:storage": "dotenvx run -f env-files/content-publishing.template.env -- jest 'libs/storage'", + "test:libs:utils": "jest 'libs/utils'", + "test:libs": "dotenvx run -f env-files/graph.template.env -f env-files/content-publishing.template.env -- jest 'libs/'", "test": "npm run test:account ; npm run test:content-publishing ; npm run test:content-watcher ; npm run test:graph ; npm run test:libs", "test:verbose": "jest --coverage --verbose", "test:e2e:account": "dotenvx run -f env-files/account.template.env -- jest --runInBand --detectOpenHandles --testRegex 'account-api/.*\\.e2e-spec\\.ts'",