diff --git a/core/src/main/scala/com/banno/kafka/RecordStream.scala b/core/src/main/scala/com/banno/kafka/RecordStream.scala index 313cf02f..62483aa1 100644 --- a/core/src/main/scala/com/banno/kafka/RecordStream.scala +++ b/core/src/main/scala/com/banno/kafka/RecordStream.scala @@ -611,7 +611,7 @@ object RecordStream { StreamSelector.Impl(hAndU, whetherCommits) } - private def assign[F[_]: Monad: Clock, A, B]( + private def assign[F[_]: MonadThrow: Clock, A, B]( consumer: ConsumerApi[F, GenericRecord, GenericRecord], topical: Topical[A, B], seekToF: Kleisli[F, PartitionQueries[F], SeekTo], diff --git a/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala b/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala index b18927b2..6fc4f8e2 100644 --- a/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala +++ b/core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala @@ -72,11 +72,11 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) { topics: List[String], offsets: Map[TopicPartition, Long], seekTo: SeekTo = SeekTo.beginning, - )(implicit F: Monad[F], C: Clock[F]): F[Unit] = + )(implicit F: MonadThrow[F], C: Clock[F]): F[Unit] = assignAndSeek(topics, SeekTo.offsets(offsets, seekTo)) def assign(topic: String, offsets: Map[TopicPartition, Long])(implicit - F: Monad[F], + F: MonadThrow[F], C: Clock[F], ): F[Unit] = assign(List(topic), offsets) @@ -84,7 +84,7 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) { def assignAndSeek( topics: List[String], seekTo: SeekTo, - )(implicit F: Monad[F], C: Clock[F]): F[Unit] = + )(implicit F: MonadThrow[F], C: Clock[F]): F[Unit] = for { infos <- consumer.partitionsFor(topics) partitions = infos.map(_.toTopicPartition) diff --git a/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala b/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala index 31738a92..2b11febc 100644 --- a/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala +++ b/core/src/main/scala/com/banno/kafka/consumer/SeekTo.scala @@ -21,12 +21,17 @@ import cats.effect.* import cats.syntax.all.* import scala.concurrent.duration.* - import org.apache.kafka.common.* +import scala.util.control.NoStackTrace + sealed trait SeekTo object SeekTo { + object Failure extends NoStackTrace { + override val getMessage: String = + "Consumer offset seeking failed" + } sealed trait Attempt { private[SeekTo] def toOffsets[F[_]: Monad: Clock]( queries: PartitionQueries[F], @@ -104,7 +109,7 @@ object SeekTo { } sealed trait FinalFallback { - private[SeekTo] def seek[F[_]]( + private[SeekTo] def seek[F[_]: ApplicativeThrow]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], ): F[Unit] @@ -112,22 +117,31 @@ object SeekTo { object FinalFallback { private case object Beginning extends FinalFallback { - override def seek[F[_]]( + override def seek[F[_]: ApplicativeThrow]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], ): F[Unit] = consumer.seekToBeginning(partitions) } private case object End extends FinalFallback { - override def seek[F[_]]( + override def seek[F[_]: ApplicativeThrow]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], ): F[Unit] = consumer.seekToEnd(partitions) } + private case class Fail(throwable: Throwable) extends FinalFallback { + override def seek[F[_]: ApplicativeThrow]( + consumer: ConsumerApi[F, _, _], + partitions: Iterable[TopicPartition], + ): F[Unit] = + throwable.raiseError[F, Unit] + } + val beginning: FinalFallback = Beginning val end: FinalFallback = End + def fail(throwable: Throwable): FinalFallback = Fail(throwable) } private case class Impl( @@ -147,6 +161,12 @@ object SeekTo { def end: SeekTo = Impl(List.empty, FinalFallback.end) + def failWith(throwable: Throwable): SeekTo = + Impl(List.empty, FinalFallback.fail(throwable)) + + def fail: SeekTo = + failWith(Failure) + def timestamps( timestamps: Map[TopicPartition, Long], default: SeekTo, @@ -168,7 +188,7 @@ object SeekTo { ): SeekTo = firstAttemptThen(Attempt.timestampBeforeNow(duration), default) - def seek[F[_]: Monad: Clock]( + def seek[F[_]: MonadThrow: Clock]( consumer: ConsumerApi[F, _, _], partitions: Iterable[TopicPartition], seekTo: SeekTo,