Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review #1

Open
oleg-py opened this issue May 10, 2019 · 1 comment
Open

Review #1

oleg-py opened this issue May 10, 2019 · 1 comment

Comments

@oleg-py
Copy link

oleg-py commented May 10, 2019

WorkerPool

I like your implementation with versions, however, exec not safe wrt cancellation:

for {
  (workerVersion, worker) <- current.take
  // [X]
  currentVersion          <- version.get
  // [Y]
  b <- if (currentVersion == workerVersion)
         worker(a).guarantee(back(workerVersion, worker))
       else exec(a)
} yield b

Because of cancelable flatMaps, it's possible for cancellation to happen at [X] or [Y], in which case you leak the worker (as it's never returned to the MVar, even if I didn't do removeAll).

You can fix it using uncancelable, but you don't want to make everything uncancelable. bracket would work too (acquire and release parts of bracket are uncancelable), but looping and de-structuring would be painful.

Race

It seems (from the code) that you're building a tree of CompositeExceptions with prepend, i.e.

CompositeException(
  throwable1,
  CompositeException(
    throwable2,
    CompositeException(
      ...
    )
  )
)

It also seems that your getMessage override would not discriminate between these cases :) Ideally, you'd want to somehow ensure you work with CompositeException instead of Throwable, which you can safely combine together.

Custom extractors are a nice touch, but there's also an option to avoid matching on two cases: Either has merge method, which can only be called if left and right have (potential) same type. So, you can do something like either.leftMap(_.swap).merge to get (Fiber[...], Either[...]).


And if it all seems like nitpicking, that's because there's not really anything significant for me to complain about. Nice solutions, and congratulations 👍

@chepiov
Copy link
Owner

chepiov commented May 14, 2019

@oleg-py Thank you very much!

WorkerPool

You can fix it using uncancelable, but you don't want to make everything uncancelable.

I don't understand clearly, how can I separate what must be uncancelable from what can be cancelable...
I guess that retrieving current version can be cancelable:

def exec(a: A): F[B] =
  for {
    currentVersion     <- version.get
    b <- Concurrent[F].uncancelable {
          current.take >>= {
            case (workerVersion, worker) =>
              if (currentVersion == workerVersion)
                worker(a).guarantee(back(workerVersion, worker))
              else exec(a)
          }
        }
  } yield b

Race

Either has merge method, which can only be called if left and right have (potential) same type.

Awesome! Thank you:

object success {
    def unapply[F[_], A](attempt: Attempt[F, A]): Option[Success[F, A]] =
      attempt.leftMap(_.swap).merge match {
        case (fib, Right(v)) => (v, fib).some
        case _               => none
      }
  }

  object failure {
    def unapply[F[_], A](attempt: Attempt[F, A]): Option[Failure[F, A]] =
      attempt.leftMap(_.swap).merge match {
        case (fib, Left(e)) => (fib, CompositeException.one(e)).some
        case _              => none
      }
  }

Ideally, you'd want to somehow ensure you work with CompositeException instead of Throwable, which you can safely combine together.

I don't understand how it possible without any runtime checking/pattern matching. All what I get:

  case class CompositeException(ex: NonEmptyList[Throwable]) extends Exception("All race candidates have failed") {
    def compose(e: CompositeException): CompositeException = CompositeException(ex concatNel e.ex)
  }

  case object CompositeException {
    def apply(ex: Throwable): CompositeException = CompositeException(NonEmptyList.one(ex))
  }

 object failure {
    def unapply[F[_], A](attempt: Attempt[F, A]): Option[Failure[F, A]] =
      attempt.leftMap(_.swap).merge match {
        case (fib, Left(e)) => (fib, CompositeException(e)).some
        case _              => none
      }
  }

  def composeAndContinue[F[_]: Sync, A](fib: Fiber[F, Either[Throwable, A]], ea: CompositeException): F[A] =
    fib.join >>= {
      // TODO: How avoid PM on CompositeException?
      case Left(eb: CompositeException) => Sync[F].raiseError[A](ea.compose(eb))
      case Left(eb)                     => Sync[F].raiseError[A](ea.compose(CompositeException(eb)))
      case Right(v)                     => v.pure[F]
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants