Skip to content

Commit

Permalink
Make asyncproc use asyncraises. (#497)
Browse files Browse the repository at this point in the history
* Make asyncproc use asyncraises.

* Fix missing asyncraises for waitForExit().
  • Loading branch information
cheatfate authored Jan 23, 2024
1 parent e296ae3 commit 09a0b11
Showing 1 changed file with 50 additions and 25 deletions.
75 changes: 50 additions & 25 deletions chronos/asyncproc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ proc closeProcessHandles(pipes: var AsyncProcessPipes,
lastError: OSErrorCode): OSErrorCode {.apforward.}
proc closeProcessStreams(pipes: AsyncProcessPipes,
options: set[AsyncProcessOption]): Future[void] {.
apforward.}
proc closeWait(holder: AsyncStreamHolder): Future[void] {.apforward.}
async: (raises: []).}
proc closeWait(holder: AsyncStreamHolder): Future[void] {.
async: (raises: []).}

template isOk(code: OSErrorCode): bool =
when defined(windows):
Expand Down Expand Up @@ -391,7 +392,8 @@ when defined(windows):
stdinHandle = ProcessStreamHandle(),
stdoutHandle = ProcessStreamHandle(),
stderrHandle = ProcessStreamHandle(),
): Future[AsyncProcessRef] {.async.} =
): Future[AsyncProcessRef] {.
async: (raises: [AsyncProcessError, CancelledError]).} =
var
pipes = preparePipes(options, stdinHandle, stdoutHandle,
stderrHandle).valueOr:
Expand Down Expand Up @@ -517,14 +519,16 @@ when defined(windows):
ok(false)

proc waitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] {.async.} =
timeout = InfiniteDuration): Future[int] {.
async: (raises: [AsyncProcessError, AsyncProcessTimeoutError,
CancelledError]).} =
if p.exitStatus.isSome():
return p.exitStatus.get()

let wres =
try:
await waitForSingleObject(p.processHandle, timeout)
except ValueError as exc:
except AsyncError as exc:
raiseAsyncProcessError("Unable to wait for process handle", exc)

if wres == WaitableResult.Timeout:
Expand All @@ -537,7 +541,8 @@ when defined(windows):

if exitCode >= 0:
p.exitStatus = Opt.some(exitCode)
return exitCode

exitCode

proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] =
if p.exitStatus.isSome():
Expand Down Expand Up @@ -787,7 +792,8 @@ else:
stdinHandle = ProcessStreamHandle(),
stdoutHandle = ProcessStreamHandle(),
stderrHandle = ProcessStreamHandle(),
): Future[AsyncProcessRef] {.async.} =
): Future[AsyncProcessRef] {.
async: (raises: [AsyncProcessError, CancelledError]).} =
var
pid: Pid
pipes = preparePipes(options, stdinHandle, stdoutHandle,
Expand Down Expand Up @@ -887,7 +893,7 @@ else:
)

trackCounter(AsyncProcessTrackerName)
return process
process

proc peekProcessExitCode(p: AsyncProcessRef,
reap = false): AsyncProcessResult[int] =
Expand Down Expand Up @@ -948,7 +954,9 @@ else:
ok(false)

proc waitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] =
timeout = InfiniteDuration): Future[int] {.
async: (raw: true, raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
var
retFuture = newFuture[int]("chronos.waitForExit()")
processHandle: ProcessHandle
Expand Down Expand Up @@ -1050,7 +1058,7 @@ else:

# Process is still running, so we going to wait for SIGCHLD.
retFuture.cancelCallback = cancellation
return retFuture
retFuture

proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] =
let res = ? p.peekProcessExitCode()
Expand Down Expand Up @@ -1155,7 +1163,7 @@ proc preparePipes(options: set[AsyncProcessOption],
stderrHandle: remoteStderr
))

proc closeWait(holder: AsyncStreamHolder) {.async.} =
proc closeWait(holder: AsyncStreamHolder) {.async: (raises: []).} =
let (future, transp) =
case holder.kind
of StreamKind.None:
Expand All @@ -1182,10 +1190,11 @@ proc closeWait(holder: AsyncStreamHolder) {.async.} =
res

if len(pending) > 0:
await allFutures(pending)
await noCancel allFutures(pending)

proc closeProcessStreams(pipes: AsyncProcessPipes,
options: set[AsyncProcessOption]): Future[void] =
options: set[AsyncProcessOption]): Future[void] {.
async: (raw: true, raises: []).} =
let pending =
block:
var res: seq[Future[void]]
Expand All @@ -1196,10 +1205,12 @@ proc closeProcessStreams(pipes: AsyncProcessPipes,
if ProcessFlag.AutoStderr in pipes.flags:
res.add(pipes.stderrHolder.closeWait())
res
allFutures(pending)
noCancel allFutures(pending)

proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation,
timeout = InfiniteDuration): Future[int] {.async.} =
timeout = InfiniteDuration): Future[int] {.
async: (raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
let timerFut =
if timeout == InfiniteDuration:
newFuture[void]("chronos.killAndwaitForExit")
Expand All @@ -1223,7 +1234,10 @@ proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation,
return exitCode

let waitFut = p.waitForExit().wait(100.milliseconds)
discard await race(FutureBase(waitFut), FutureBase(timerFut))
try:
discard await race(FutureBase(waitFut), FutureBase(timerFut))
except ValueError:
raiseAssert "This should not be happened!"

if waitFut.finished() and not(waitFut.failed()):
let res = p.peekExitCode()
Expand All @@ -1237,33 +1251,38 @@ proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation,
await waitFut.cancelAndWait()
raiseAsyncProcessTimeoutError()

proc closeWait*(p: AsyncProcessRef) {.async.} =
proc closeWait*(p: AsyncProcessRef) {.async: (raises: []).} =
# Here we ignore all possible errrors, because we do not want to raise
# exceptions.
discard closeProcessHandles(p.pipes, p.options, OSErrorCode(0))
await noCancel(p.pipes.closeProcessStreams(p.options))
await p.pipes.closeProcessStreams(p.options)
discard p.closeThreadAndProcessHandle()
untrackCounter(AsyncProcessTrackerName)

proc stdinStream*(p: AsyncProcessRef): AsyncStreamWriter =
## Returns STDIN async stream associated with process `p`.
doAssert(p.pipes.stdinHolder.kind == StreamKind.Writer,
"StdinStreamWriter is not available")
p.pipes.stdinHolder.writer

proc stdoutStream*(p: AsyncProcessRef): AsyncStreamReader =
## Returns STDOUT async stream associated with process `p`.
doAssert(p.pipes.stdoutHolder.kind == StreamKind.Reader,
"StdoutStreamReader is not available")
p.pipes.stdoutHolder.reader

proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader =
## Returns STDERR async stream associated with process `p`.
doAssert(p.pipes.stderrHolder.kind == StreamKind.Reader,
"StderrStreamReader is not available")
p.pipes.stderrHolder.reader

proc execCommand*(command: string,
options = {AsyncProcessOption.EvalCommand},
timeout = InfiniteDuration
): Future[int] {.async.} =
): Future[int] {.
async: (raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
let
poptions = options + {AsyncProcessOption.EvalCommand}
process = await startProcess(command, options = poptions)
Expand All @@ -1277,7 +1296,9 @@ proc execCommand*(command: string,
proc execCommandEx*(command: string,
options = {AsyncProcessOption.EvalCommand},
timeout = InfiniteDuration
): Future[CommandExResponse] {.async.} =
): Future[CommandExResponse] {.
async: (raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
let
process = await startProcess(command, options = options,
stdoutHandle = AsyncProcess.Pipe,
Expand All @@ -1291,13 +1312,13 @@ proc execCommandEx*(command: string,
status = await process.waitForExit(timeout)
output =
try:
string.fromBytes(outputReader.read())
string.fromBytes(await outputReader)
except AsyncStreamError as exc:
raiseAsyncProcessError("Unable to read process' stdout channel",
exc)
error =
try:
string.fromBytes(errorReader.read())
string.fromBytes(await errorReader)
except AsyncStreamError as exc:
raiseAsyncProcessError("Unable to read process' stderr channel",
exc)
Expand All @@ -1308,13 +1329,15 @@ proc execCommandEx*(command: string,
res

proc pid*(p: AsyncProcessRef): int =
## Returns process ``p`` identifier.
## Returns process ``p`` unique process identifier.
int(p.processId)

template processId*(p: AsyncProcessRef): int = pid(p)

proc killAndWaitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] =
timeout = InfiniteDuration): Future[int] {.
async: (raw: true, raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
## Perform continuous attempts to kill the ``p`` process for specified period
## of time ``timeout``.
##
Expand All @@ -1330,7 +1353,9 @@ proc killAndWaitForExit*(p: AsyncProcessRef,
opAndWaitForExit(p, WaitOperation.Kill, timeout)

proc terminateAndWaitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] =
timeout = InfiniteDuration): Future[int] {.
async: (raw: true, raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
## Perform continuous attempts to terminate the ``p`` process for specified
## period of time ``timeout``.
##
Expand Down

0 comments on commit 09a0b11

Please sign in to comment.