From f973f503d5518d26b2a394152250450dd4f81992 Mon Sep 17 00:00:00 2001 From: sophian Date: Wed, 4 Sep 2024 19:01:10 -0400 Subject: [PATCH] Clean up substrate network detection --- centrifuge-js/src/CentrifugeBase.ts | 96 +++++++++++------------------ 1 file changed, 35 insertions(+), 61 deletions(-) diff --git a/centrifuge-js/src/CentrifugeBase.ts b/centrifuge-js/src/CentrifugeBase.ts index 603746ee4..b7e8e03cc 100644 --- a/centrifuge-js/src/CentrifugeBase.ts +++ b/centrifuge-js/src/CentrifugeBase.ts @@ -7,7 +7,6 @@ import { sortAddresses } from '@polkadot/util-crypto' import type { JsonRpcSigner, TransactionRequest } from 'ethers' import 'isomorphic-fetch' import { - BehaviorSubject, Observable, Subject, bufferCount, @@ -20,7 +19,6 @@ import { map, mergeWith, of, - race, share, startWith, switchMap, @@ -28,7 +26,6 @@ import { takeWhile, tap, throwError, - timer, } from 'rxjs' import { fromFetch } from 'rxjs/fetch' import { TransactionErrorResult, TransactionOptions, TransactionResult } from './types' @@ -298,14 +295,13 @@ type Events = ISubmittableResult['events'] const txCompletedEvents: Record> = {} const blockEvents: Record> = {} +let parachainUrlCache: string | null = null export class CentrifugeBase { config: Config relayChainUrl: string subqueryUrl: string rpcEndpoints: string[] - private parachainUrlCache: BehaviorSubject // cache the healthy parachain url - private healthCheckInProgress: Promise | null = null // prevent concurrent health checks constructor(config: UserProvidedConfig = {}) { this.config = { ...defaultConfig, ...config } @@ -313,77 +309,55 @@ export class CentrifugeBase { this.subqueryUrl = this.config.network === 'centrifuge' ? this.config.centrifugeSubqueryUrl : this.config.altairSubqueryUrl this.rpcEndpoints = this.config.centrifugeWsUrl.split(',').map((url) => url.trim()) - this.parachainUrlCache = new BehaviorSubject(null) - this.initParachainUrl() } - private initParachainUrl() { - if (!this.healthCheckInProgress) { - this.healthCheckInProgress = this.findHealthyWs().then((url) => { - this.parachainUrlCache.next(url) - this.healthCheckInProgress = null + private async findHealthyWs(): Promise { + for (const url of this.rpcEndpoints) { + const isHealthy = await this.checkWsHealth(url) + if (isHealthy) { + console.log(`Connection to ${url} established`) return url - }) + } } - return this.healthCheckInProgress - } - private findHealthyWs(): Promise { - return firstValueFrom( - from(this.rpcEndpoints).pipe( - switchMap((url) => this.checkWsHealth(url)), - map((url) => { - if (url) { - console.log(`Connection to ${url} established`) - return url - } - throw new Error('No healthy parachain URL found') - }), - catchError((error) => { - console.error('Error establishing connection to parachain URL:', error) - return of(null) - }) - ) - ) + console.error('Error: No healthy parachain URL found') + return null } - private checkWsHealth(url: string, timeoutMs: number = 5000): Observable { - return race( - new Observable((observer) => { - const ws = new WebSocket(url) - ws.onopen = () => { - ws.close() - observer.next(url) - observer.complete() - } - ws.onerror = () => { - ws.close() - observer.next(null) - observer.complete() - } - return () => ws.close() - }), - timer(timeoutMs).pipe(map(() => null)) - ).pipe( - tap((result) => { - if (result === null) { - console.log(`Connection to ${url} failed or timed out`) - } - }) - ) + private checkWsHealth(url: string, timeoutMs: number = 5000): Promise { + return new Promise((resolve) => { + const ws = new WebSocket(url) + const timer = setTimeout(() => { + ws.close() + console.log(`Connection to ${url} timed out`) + resolve(false) + }, timeoutMs) + + ws.onopen = () => { + clearTimeout(timer) + ws.close() + resolve(true) + } + + ws.onerror = () => { + clearTimeout(timer) + ws.close() + console.log(`Connection to ${url} failed`) + resolve(false) + } + }) } private async getCachedParachainUrl(): Promise { - const cachedUrl = this.parachainUrlCache.getValue() + const cachedUrl = parachainUrlCache if (cachedUrl) { return cachedUrl } - await this.findHealthyWs() - const parachainUrl = this.parachainUrlCache.getValue() - if (!parachainUrl) { + parachainUrlCache = await this.findHealthyWs() + if (!parachainUrlCache) { throw new Error('No healthy parachain URL available') } - return parachainUrl + return parachainUrlCache } async getChainId() {