From 09a0b117194ed41ee6cebf628404698006d238b4 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Tue, 23 Jan 2024 09:34:10 +0200 Subject: [PATCH] Make asyncproc use asyncraises. (#497) * Make asyncproc use asyncraises. * Fix missing asyncraises for waitForExit(). --- chronos/asyncproc.nim | 75 ++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/chronos/asyncproc.nim b/chronos/asyncproc.nim index 8615c570..f0087767 100644 --- a/chronos/asyncproc.nim +++ b/chronos/asyncproc.nim @@ -231,8 +231,9 @@ proc closeProcessHandles(pipes: var AsyncProcessPipes, lastError: OSErrorCode): OSErrorCode {.apforward.} proc closeProcessStreams(pipes: AsyncProcessPipes, options: set[AsyncProcessOption]): Future[void] {. - apforward.} -proc closeWait(holder: AsyncStreamHolder): Future[void] {.apforward.} + async: (raises: []).} +proc closeWait(holder: AsyncStreamHolder): Future[void] {. + async: (raises: []).} template isOk(code: OSErrorCode): bool = when defined(windows): @@ -391,7 +392,8 @@ when defined(windows): stdinHandle = ProcessStreamHandle(), stdoutHandle = ProcessStreamHandle(), stderrHandle = ProcessStreamHandle(), - ): Future[AsyncProcessRef] {.async.} = + ): Future[AsyncProcessRef] {. + async: (raises: [AsyncProcessError, CancelledError]).} = var pipes = preparePipes(options, stdinHandle, stdoutHandle, stderrHandle).valueOr: @@ -517,14 +519,16 @@ when defined(windows): ok(false) proc waitForExit*(p: AsyncProcessRef, - timeout = InfiniteDuration): Future[int] {.async.} = + timeout = InfiniteDuration): Future[int] {. + async: (raises: [AsyncProcessError, AsyncProcessTimeoutError, + CancelledError]).} = if p.exitStatus.isSome(): return p.exitStatus.get() let wres = try: await waitForSingleObject(p.processHandle, timeout) - except ValueError as exc: + except AsyncError as exc: raiseAsyncProcessError("Unable to wait for process handle", exc) if wres == WaitableResult.Timeout: @@ -537,7 +541,8 @@ when defined(windows): if exitCode >= 0: p.exitStatus = Opt.some(exitCode) - return exitCode + + exitCode proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] = if p.exitStatus.isSome(): @@ -787,7 +792,8 @@ else: stdinHandle = ProcessStreamHandle(), stdoutHandle = ProcessStreamHandle(), stderrHandle = ProcessStreamHandle(), - ): Future[AsyncProcessRef] {.async.} = + ): Future[AsyncProcessRef] {. + async: (raises: [AsyncProcessError, CancelledError]).} = var pid: Pid pipes = preparePipes(options, stdinHandle, stdoutHandle, @@ -887,7 +893,7 @@ else: ) trackCounter(AsyncProcessTrackerName) - return process + process proc peekProcessExitCode(p: AsyncProcessRef, reap = false): AsyncProcessResult[int] = @@ -948,7 +954,9 @@ else: ok(false) proc waitForExit*(p: AsyncProcessRef, - timeout = InfiniteDuration): Future[int] = + timeout = InfiniteDuration): Future[int] {. + async: (raw: true, raises: [ + AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} = var retFuture = newFuture[int]("chronos.waitForExit()") processHandle: ProcessHandle @@ -1050,7 +1058,7 @@ else: # Process is still running, so we going to wait for SIGCHLD. retFuture.cancelCallback = cancellation - return retFuture + retFuture proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] = let res = ? p.peekProcessExitCode() @@ -1155,7 +1163,7 @@ proc preparePipes(options: set[AsyncProcessOption], stderrHandle: remoteStderr )) -proc closeWait(holder: AsyncStreamHolder) {.async.} = +proc closeWait(holder: AsyncStreamHolder) {.async: (raises: []).} = let (future, transp) = case holder.kind of StreamKind.None: @@ -1182,10 +1190,11 @@ proc closeWait(holder: AsyncStreamHolder) {.async.} = res if len(pending) > 0: - await allFutures(pending) + await noCancel allFutures(pending) proc closeProcessStreams(pipes: AsyncProcessPipes, - options: set[AsyncProcessOption]): Future[void] = + options: set[AsyncProcessOption]): Future[void] {. + async: (raw: true, raises: []).} = let pending = block: var res: seq[Future[void]] @@ -1196,10 +1205,12 @@ proc closeProcessStreams(pipes: AsyncProcessPipes, if ProcessFlag.AutoStderr in pipes.flags: res.add(pipes.stderrHolder.closeWait()) res - allFutures(pending) + noCancel allFutures(pending) proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation, - timeout = InfiniteDuration): Future[int] {.async.} = + timeout = InfiniteDuration): Future[int] {. + async: (raises: [ + AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} = let timerFut = if timeout == InfiniteDuration: newFuture[void]("chronos.killAndwaitForExit") @@ -1223,7 +1234,10 @@ proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation, return exitCode let waitFut = p.waitForExit().wait(100.milliseconds) - discard await race(FutureBase(waitFut), FutureBase(timerFut)) + try: + discard await race(FutureBase(waitFut), FutureBase(timerFut)) + except ValueError: + raiseAssert "This should not be happened!" if waitFut.finished() and not(waitFut.failed()): let res = p.peekExitCode() @@ -1237,25 +1251,28 @@ proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation, await waitFut.cancelAndWait() raiseAsyncProcessTimeoutError() -proc closeWait*(p: AsyncProcessRef) {.async.} = +proc closeWait*(p: AsyncProcessRef) {.async: (raises: []).} = # Here we ignore all possible errrors, because we do not want to raise # exceptions. discard closeProcessHandles(p.pipes, p.options, OSErrorCode(0)) - await noCancel(p.pipes.closeProcessStreams(p.options)) + await p.pipes.closeProcessStreams(p.options) discard p.closeThreadAndProcessHandle() untrackCounter(AsyncProcessTrackerName) proc stdinStream*(p: AsyncProcessRef): AsyncStreamWriter = + ## Returns STDIN async stream associated with process `p`. doAssert(p.pipes.stdinHolder.kind == StreamKind.Writer, "StdinStreamWriter is not available") p.pipes.stdinHolder.writer proc stdoutStream*(p: AsyncProcessRef): AsyncStreamReader = + ## Returns STDOUT async stream associated with process `p`. doAssert(p.pipes.stdoutHolder.kind == StreamKind.Reader, "StdoutStreamReader is not available") p.pipes.stdoutHolder.reader proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader = + ## Returns STDERR async stream associated with process `p`. doAssert(p.pipes.stderrHolder.kind == StreamKind.Reader, "StderrStreamReader is not available") p.pipes.stderrHolder.reader @@ -1263,7 +1280,9 @@ proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader = proc execCommand*(command: string, options = {AsyncProcessOption.EvalCommand}, timeout = InfiniteDuration - ): Future[int] {.async.} = + ): Future[int] {. + async: (raises: [ + AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} = let poptions = options + {AsyncProcessOption.EvalCommand} process = await startProcess(command, options = poptions) @@ -1277,7 +1296,9 @@ proc execCommand*(command: string, proc execCommandEx*(command: string, options = {AsyncProcessOption.EvalCommand}, timeout = InfiniteDuration - ): Future[CommandExResponse] {.async.} = + ): Future[CommandExResponse] {. + async: (raises: [ + AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} = let process = await startProcess(command, options = options, stdoutHandle = AsyncProcess.Pipe, @@ -1291,13 +1312,13 @@ proc execCommandEx*(command: string, status = await process.waitForExit(timeout) output = try: - string.fromBytes(outputReader.read()) + string.fromBytes(await outputReader) except AsyncStreamError as exc: raiseAsyncProcessError("Unable to read process' stdout channel", exc) error = try: - string.fromBytes(errorReader.read()) + string.fromBytes(await errorReader) except AsyncStreamError as exc: raiseAsyncProcessError("Unable to read process' stderr channel", exc) @@ -1308,13 +1329,15 @@ proc execCommandEx*(command: string, res proc pid*(p: AsyncProcessRef): int = - ## Returns process ``p`` identifier. + ## Returns process ``p`` unique process identifier. int(p.processId) template processId*(p: AsyncProcessRef): int = pid(p) proc killAndWaitForExit*(p: AsyncProcessRef, - timeout = InfiniteDuration): Future[int] = + timeout = InfiniteDuration): Future[int] {. + async: (raw: true, raises: [ + AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} = ## Perform continuous attempts to kill the ``p`` process for specified period ## of time ``timeout``. ## @@ -1330,7 +1353,9 @@ proc killAndWaitForExit*(p: AsyncProcessRef, opAndWaitForExit(p, WaitOperation.Kill, timeout) proc terminateAndWaitForExit*(p: AsyncProcessRef, - timeout = InfiniteDuration): Future[int] = + timeout = InfiniteDuration): Future[int] {. + async: (raw: true, raises: [ + AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} = ## Perform continuous attempts to terminate the ``p`` process for specified ## period of time ``timeout``. ##