Skip to content

Commit

Permalink
Merge pull request #887 from Banno/seekto-fail
Browse files Browse the repository at this point in the history
Add SeekTo.fail affordance
  • Loading branch information
amohrland authored May 8, 2024
2 parents da15b08 + 052ce50 commit b19e02d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/com/banno/kafka/RecordStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ object RecordStream {
StreamSelector.Impl(hAndU, whetherCommits)
}

private def assign[F[_]: Monad: Clock, A, B](
private def assign[F[_]: MonadThrow: Clock, A, B](
consumer: ConsumerApi[F, GenericRecord, GenericRecord],
topical: Topical[A, B],
seekToF: Kleisli[F, PartitionQueries[F], SeekTo],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
topics: List[String],
offsets: Map[TopicPartition, Long],
seekTo: SeekTo = SeekTo.beginning,
)(implicit F: Monad[F], C: Clock[F]): F[Unit] =
)(implicit F: MonadThrow[F], C: Clock[F]): F[Unit] =
assignAndSeek(topics, SeekTo.offsets(offsets, seekTo))

def assign(topic: String, offsets: Map[TopicPartition, Long])(implicit
F: Monad[F],
F: MonadThrow[F],
C: Clock[F],
): F[Unit] =
assign(List(topic), offsets)

def assignAndSeek(
topics: List[String],
seekTo: SeekTo,
)(implicit F: Monad[F], C: Clock[F]): F[Unit] =
)(implicit F: MonadThrow[F], C: Clock[F]): F[Unit] =
for {
infos <- consumer.partitionsFor(topics)
partitions = infos.map(_.toTopicPartition)
Expand Down
30 changes: 25 additions & 5 deletions core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import cats.effect.*
import cats.syntax.all.*

import scala.concurrent.duration.*

import org.apache.kafka.common.*

import scala.util.control.NoStackTrace

sealed trait SeekTo

object SeekTo {
object Failure extends NoStackTrace {
override val getMessage: String =
"Consumer offset seeking failed"
}
sealed trait Attempt {
private[SeekTo] def toOffsets[F[_]: Monad: Clock](
queries: PartitionQueries[F],
Expand Down Expand Up @@ -104,30 +109,39 @@ object SeekTo {
}

sealed trait FinalFallback {
private[SeekTo] def seek[F[_]](
private[SeekTo] def seek[F[_]: ApplicativeThrow](
consumer: ConsumerApi[F, _, _],
partitions: Iterable[TopicPartition],
): F[Unit]
}

object FinalFallback {
private case object Beginning extends FinalFallback {
override def seek[F[_]](
override def seek[F[_]: ApplicativeThrow](
consumer: ConsumerApi[F, _, _],
partitions: Iterable[TopicPartition],
): F[Unit] =
consumer.seekToBeginning(partitions)
}
private case object End extends FinalFallback {
override def seek[F[_]](
override def seek[F[_]: ApplicativeThrow](
consumer: ConsumerApi[F, _, _],
partitions: Iterable[TopicPartition],
): F[Unit] =
consumer.seekToEnd(partitions)
}

private case class Fail(throwable: Throwable) extends FinalFallback {
override def seek[F[_]: ApplicativeThrow](
consumer: ConsumerApi[F, _, _],
partitions: Iterable[TopicPartition],
): F[Unit] =
throwable.raiseError[F, Unit]
}

val beginning: FinalFallback = Beginning
val end: FinalFallback = End
def fail(throwable: Throwable): FinalFallback = Fail(throwable)
}

private case class Impl(
Expand All @@ -147,6 +161,12 @@ object SeekTo {

def end: SeekTo = Impl(List.empty, FinalFallback.end)

def failWith(throwable: Throwable): SeekTo =
Impl(List.empty, FinalFallback.fail(throwable))

def fail: SeekTo =
failWith(Failure)

def timestamps(
timestamps: Map[TopicPartition, Long],
default: SeekTo,
Expand All @@ -168,7 +188,7 @@ object SeekTo {
): SeekTo =
firstAttemptThen(Attempt.timestampBeforeNow(duration), default)

def seek[F[_]: Monad: Clock](
def seek[F[_]: MonadThrow: Clock](
consumer: ConsumerApi[F, _, _],
partitions: Iterable[TopicPartition],
seekTo: SeekTo,
Expand Down

0 comments on commit b19e02d

Please sign in to comment.