Skip to content

Commit

Permalink
Rework retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
thpani committed Aug 24, 2023
1 parent d04fe10 commit 9b83bee
Showing 1 changed file with 33 additions and 111 deletions.
144 changes: 33 additions & 111 deletions quint/src/quintVerifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import { Either, chain, left, right } from '@sweet-monads/either'
import { ErrorMessage } from './parsing/quintParserFrontend'
import { spawnSync } from 'child_process'
import path from 'path'
import fs from 'fs'
import os from 'os'
Expand Down Expand Up @@ -49,9 +48,6 @@ type VerifyError = {

export type VerifyResult<T> = Either<VerifyError, T>

// Paths to the apalache distribution
type ApalacheDist = { jar: string; exe: string }

// An object representing the Apalache configuration
// See https://github.com/informalsystems/apalache/blob/main/mod-infra/src/main/scala/at/forsyte/apalache/infra/passes/options.scala#L255
type ApalacheConfig = any
Expand Down Expand Up @@ -136,13 +132,11 @@ type CmdExecutor = {
// Constructs a new client service
new (url: string, creds: any): CmdExecutor
run: (req: RunRequest, cb: AsyncCallBack<any>) => void
ping: (o: {}, cb: AsyncCallBack<void>) => void
}

// The refined interface to the CmdExecutor we produce from the generated interface
type AsyncCmdExecutor = {
run: (req: RunRequest) => Promise<RunResponse>
ping: () => Promise<void>
}

// The interface for the Shai package
Expand All @@ -157,32 +151,6 @@ function err<A>(explanation: string, errors: ErrorMessage[] = [], traces?: ItfTr
return left({ explanation, errors, traces })
}

function findApalacheDistribution(): VerifyResult<ApalacheDist> {
const dist = path.isAbsolute(process.env.APALACHE_DIST!)
? process.env.APALACHE_DIST!
: path.join(process.cwd(), process.env.APALACHE_DIST!)

if (!fs.existsSync(dist)) {
return err(`Specified APALACHE_DIST ${dist} does not exist.`)
}

const jar = path.join(dist, 'lib', 'apalache.jar')
const exe = path.join(dist, 'bin', 'apalache-mc')

if (!fs.existsSync(jar)) {
return err(
`Apalache distribution is corrupted: cannot find ${jar}. Ensure the APALACHE_DIST environment variable points to the right directory.`
)
}
if (!fs.existsSync(exe)) {
return err(
`Apalache distribution is corrupted: cannot find ${exe}. Ensure the APALACHE_DIST environment variable points to the right directory.`
)
}

return right({ jar, exe })
}

// See https://grpc.io/docs/languages/node/basics/#example-code-and-setup
const grpcStubOptions = {
keepCase: true,
Expand All @@ -192,32 +160,8 @@ const grpcStubOptions = {
oneofs: true,
}

function loadProtoDefViaDistribution(dist: ApalacheDist): VerifyResult<ProtoPackageDefinition> {
const jarUtilitiyIsInstalled = spawnSync('jar', ['--version']).status === 0
if (!jarUtilitiyIsInstalled) {
return err('The `jar` utility must be installed')
}

// The proto file we extract from the apalache jar
const protoFileName = 'cmdExecutor.proto'
// Used as the target for the extracted proto file
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'apalache-proto-'))
const protoFile = path.join(tmpDir, protoFileName)

const protoIsFileExtracted = spawnSync('jar', ['xf', dist.jar, protoFileName], { cwd: tmpDir }).status === 0
if (!protoIsFileExtracted) {
return err(`Apalache distribution is corrupted. Could not extract proto file from apalache.jar.`)
}

const protoDef = proto.loadSync(protoFile, grpcStubOptions)
// We have the proto file loaded, so we can delete the tmp dir
fs.rmSync(tmpDir, { recursive: true, force: true })

return right(protoDef)
}

async function loadProtoDefViaReflection(): Promise<VerifyResult<ProtoPackageDefinition>> {
// Types of the gRPC interface.
async function loadProtoDefViaReflection(retry: boolean): Promise<VerifyResult<ProtoPackageDefinition>> {
// Types of the gRPC interface
type ServerReflectionRequest = { file_containing_symbol: string }
type ServerReflectionResponseSuccess = {
file_descriptor_response: {
Expand All @@ -234,6 +178,7 @@ async function loadProtoDefViaReflection(): Promise<VerifyResult<ProtoPackageDef
type ServerReflectionService = {
new (url: string, creds: grpc.ChannelCredentials): ServerReflectionService
ServerReflectionInfo: () => grpc.ClientDuplexStream<ServerReflectionRequest, ServerReflectionResponse>
getChannel: () => { getConnectivityState: (_: boolean) => grpc.connectivityState }
}
type ServerReflectionPkg = {
grpc: { reflection: { v1alpha: { ServerReflection: ServerReflectionService } } }
Expand All @@ -246,18 +191,33 @@ async function loadProtoDefViaReflection(): Promise<VerifyResult<ProtoPackageDef
const serverReflectionService = reflectionProtoDescriptor.grpc.reflection.v1alpha.ServerReflection
const reflectionClient = new serverReflectionService(APALACHE_SERVER_URI, grpc.credentials.createInsecure())

// Wait for gRPC channel to come up, with 1sec pauses
if (retry) {
await (async () => {
for (;;) {
const grpcChannelState = reflectionClient.getChannel().getConnectivityState(true)
if (grpcChannelState == grpc.connectivityState.READY) {
return
} else {
await setTimeout(1000)
}
}
})()
}

// Query reflection endpoint
return new Promise<ServerReflectionResponse>((resolve, reject) => {
return new Promise<VerifyResult<ServerReflectionResponse>>((resolve, _) => {
const call = reflectionClient.ServerReflectionInfo()
call.on('data', (r: ServerReflectionResponse) => {
call.end()
resolve(r)
resolve(right(r))
})
call.on('error', (e: grpc.StatusObject) => reject(e))
call.on('error', (e: grpc.StatusObject) => resolve(err(`Error querying reflection endpoint: ${e}`)))

call.write({ file_containing_symbol: 'shai.cmdExecutor.CmdExecutor' })
}).then(protoDefResponse => {
// Construct a proto definition of the reflection response.
}).then(protoDefResponse =>
protoDefResponse.chain(protoDefResponse => {
// Construct a proto definition of the reflection response.
if ('error_response' in protoDefResponse) {
return err(
`Apalache gRPC endpoint is corrupted. Could not extract proto file: ${protoDefResponse.error_response.error_message}`
Expand All @@ -271,8 +231,8 @@ async function loadProtoDefViaReflection(): Promise<VerifyResult<ProtoPackageDef

// Use proto-loader to load the FileDescriptorProto wrapped in a FileDescriptorSet
return right(proto.loadFileDescriptorSetFromObject({ file: fileDescriptorProtos }, grpcStubOptions))
},
error => err(`Error querying reflection endpoint: ${error}`))
})
)
}

function loadGrpcClient(protoDef: ProtoPackageDefinition): AsyncCmdExecutor {
Expand All @@ -281,58 +241,20 @@ function loadGrpcClient(protoDef: ProtoPackageDefinition): AsyncCmdExecutor {
// See https://basarat.gitbook.io/typescript/type-system/type-assertion#double-assertion
const pkg = protoDescriptor.shai as unknown as ShaiPkg
const stub: any = new pkg.cmdExecutor.CmdExecutor(APALACHE_SERVER_URI, grpc.credentials.createInsecure())
console.log(`Connectivity state: ${stub.getChannel().getConnectivityState(true)}`)
const impl: AsyncCmdExecutor = {
return {
run: promisify((data: RunRequest, cb: AsyncCallBack<any>) => stub.run(data, cb)),
ping: promisify((cb: AsyncCallBack<void>) => stub.ping({}, cb)),
}
return impl
}

// Retry a function repeatedly, in .5 second intervals, until it does not throw.
async function retry<T>(f: () => Promise<T>): Promise<T> {
for (;;) {
// avoid linter error on while(true): https://github.com/eslint/eslint/issues/5477
try {
return await f()
} catch {
// Wait .5 secs before retry
await setTimeout(500)
}
}
}

// Call `f` repeatedly until its promise resolves, in .5 second intervals, for up to 5 seconds.
// Returns right(T) on success, or a left(VerifyError) on timeout.
async function retryWithTimeout<T>(f: () => Promise<T>): Promise<VerifyResult<T>> {
const delayMS = 5000
return Promise.race([
retry(f).then(right),
setTimeout(delayMS, err<T>(`Failed to obtain a connection to Apalache after ${delayMS / 1000} seconds.`)),
])
}

/**
* Connect to the Apalache server, and verify the connection by pinging the server for up to 5 seconds.
* Connect to the Apalache server, and verify that the gRPC channel is up.
*
* @returns A promise resolving to:
* - a `right<Apalache>` if the connection is successful, or
* - a `left<VerifyError>` if either the connection attempt is unsuccessful or pinging timed out.
* @param retry Wait for the gRPC connection to come up.
*
* @returns A promise resolving to a `right<Apalache>` if the connection is successful, or a `left<VerifyError>` if not.
*/
async function tryConnect(): Promise<VerifyResult<Apalache>> {
// Attempt to load proto definition:
// - if APALACHE_DIST is set, from the Apalache distribution
// - otherwise, via gRPC reflection
const protoDefResult: VerifyResult<proto.PackageDefinition> = process.env.APALACHE_DIST
? findApalacheDistribution().chain(loadProtoDefViaDistribution)
: await loadProtoDefViaReflection()
// Load gRPC client
const maybeCmdExecutor = protoDefResult.map(loadGrpcClient)
const pingResult = await maybeCmdExecutor.asyncChain(cmdExecutor =>
// Try to ping the server, with a timeout
retryWithTimeout(() => cmdExecutor.ping())
)
return pingResult.chain(_ => maybeCmdExecutor.map(apalache))
async function tryConnect(retry: boolean = false): Promise<VerifyResult<Apalache>> {
return (await loadProtoDefViaReflection(retry)).map(loadGrpcClient).map(apalache)
}

/**
Expand Down Expand Up @@ -432,7 +354,7 @@ async function connect(): Promise<VerifyResult<Apalache>> {
apalache.on('error', error => resolve(err(`Failed to launch Apalache server: ${error.message}`)))
})
)
.then(chain(tryConnect))
.then(chain(() => tryConnect(true)))
}

/**
Expand Down

0 comments on commit 9b83bee

Please sign in to comment.