Skip to content

Commit

Permalink
Clean up substrate network detection
Browse files Browse the repository at this point in the history
  • Loading branch information
sophialittlejohn committed Sep 4, 2024
1 parent 1551fc6 commit f973f50
Showing 1 changed file with 35 additions and 61 deletions.
96 changes: 35 additions & 61 deletions centrifuge-js/src/CentrifugeBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { sortAddresses } from '@polkadot/util-crypto'
import type { JsonRpcSigner, TransactionRequest } from 'ethers'
import 'isomorphic-fetch'
import {
BehaviorSubject,
Observable,
Subject,
bufferCount,
Expand All @@ -20,15 +19,13 @@ import {
map,
mergeWith,
of,
race,
share,
startWith,
switchMap,
take,
takeWhile,
tap,
throwError,
timer,
} from 'rxjs'
import { fromFetch } from 'rxjs/fetch'
import { TransactionErrorResult, TransactionOptions, TransactionResult } from './types'
Expand Down Expand Up @@ -298,92 +295,69 @@ type Events = ISubmittableResult['events']

const txCompletedEvents: Record<string, Subject<Events>> = {}
const blockEvents: Record<string, Observable<Events>> = {}
let parachainUrlCache: string | null = null

export class CentrifugeBase {
config: Config
relayChainUrl: string
subqueryUrl: string
rpcEndpoints: string[]
private parachainUrlCache: BehaviorSubject<string | null> // cache the healthy parachain url
private healthCheckInProgress: Promise<string | null> | null = null // prevent concurrent health checks

constructor(config: UserProvidedConfig = {}) {
this.config = { ...defaultConfig, ...config }
this.relayChainUrl = this.config.network === 'centrifuge' ? this.config.polkadotWsUrl : this.config.kusamaWsUrl
this.subqueryUrl =
this.config.network === 'centrifuge' ? this.config.centrifugeSubqueryUrl : this.config.altairSubqueryUrl
this.rpcEndpoints = this.config.centrifugeWsUrl.split(',').map((url) => url.trim())
this.parachainUrlCache = new BehaviorSubject<string | null>(null)
this.initParachainUrl()
}

private initParachainUrl() {
if (!this.healthCheckInProgress) {
this.healthCheckInProgress = this.findHealthyWs().then((url) => {
this.parachainUrlCache.next(url)
this.healthCheckInProgress = null
private async findHealthyWs(): Promise<string | null> {
for (const url of this.rpcEndpoints) {
const isHealthy = await this.checkWsHealth(url)
if (isHealthy) {
console.log(`Connection to ${url} established`)
return url
})
}
}
return this.healthCheckInProgress
}

private findHealthyWs(): Promise<string | null> {
return firstValueFrom(
from(this.rpcEndpoints).pipe(
switchMap((url) => this.checkWsHealth(url)),
map((url) => {
if (url) {
console.log(`Connection to ${url} established`)
return url
}
throw new Error('No healthy parachain URL found')
}),
catchError((error) => {
console.error('Error establishing connection to parachain URL:', error)
return of(null)
})
)
)
console.error('Error: No healthy parachain URL found')
return null
}

private checkWsHealth(url: string, timeoutMs: number = 5000): Observable<string | null> {
return race(
new Observable<string | null>((observer) => {
const ws = new WebSocket(url)
ws.onopen = () => {
ws.close()
observer.next(url)
observer.complete()
}
ws.onerror = () => {
ws.close()
observer.next(null)
observer.complete()
}
return () => ws.close()
}),
timer(timeoutMs).pipe(map(() => null))
).pipe(
tap((result) => {
if (result === null) {
console.log(`Connection to ${url} failed or timed out`)
}
})
)
private checkWsHealth(url: string, timeoutMs: number = 5000): Promise<boolean> {
return new Promise((resolve) => {
const ws = new WebSocket(url)
const timer = setTimeout(() => {
ws.close()
console.log(`Connection to ${url} timed out`)
resolve(false)
}, timeoutMs)

ws.onopen = () => {
clearTimeout(timer)
ws.close()
resolve(true)
}

ws.onerror = () => {
clearTimeout(timer)
ws.close()
console.log(`Connection to ${url} failed`)
resolve(false)
}
})
}

private async getCachedParachainUrl(): Promise<string> {
const cachedUrl = this.parachainUrlCache.getValue()
const cachedUrl = parachainUrlCache
if (cachedUrl) {
return cachedUrl
}
await this.findHealthyWs()
const parachainUrl = this.parachainUrlCache.getValue()
if (!parachainUrl) {
parachainUrlCache = await this.findHealthyWs()
if (!parachainUrlCache) {
throw new Error('No healthy parachain URL available')
}
return parachainUrl
return parachainUrlCache
}

async getChainId() {
Expand Down

0 comments on commit f973f50

Please sign in to comment.