Skip to content

Commit

Permalink
Added an offset stream which will be having all committed offset info…
Browse files Browse the repository at this point in the history
… aand also added ConsumerGroupsOffsetFacade for holding all offset info
  • Loading branch information
abhivermaaa committed Apr 21, 2024
1 parent b013766 commit d179477
Showing 1 changed file with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers}
import hydra.kafka.algebras.ConsumerGroupsAlgebra._
import hydra.kafka.algebras.HydraTag.StringJsonFormat
import hydra.kafka.algebras.KafkaClientAlgebra.Record
import hydra.kafka.algebras.KafkaClientAlgebra.{Offset, Partition, Record}
import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Infinite
import hydra.kafka.algebras.RetryableFs2Stream._
import hydra.kafka.model.TopicConsumer
Expand Down Expand Up @@ -132,8 +132,16 @@ object ConsumerGroupsAlgebra {
kafkaClientSecurityConfig: KafkaClientSecurityConfig
) (implicit notificationsService: InternalNotificationSender[F]): F[ConsumerGroupsAlgebra[F]] = {


val parentStream = kafkaClientAlgebra.consumeSafelyWithOffsetInfo(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false)

val offsetStream: fs2.Stream[F, Either[Throwable, (Partition, Offset)]] = parentStream.map(x => {
x.map(_._2)
})

val dvsConsumersStream: fs2.Stream[F, Record] = {
kafkaClientAlgebra.consumeSafelyMessages(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false)
parentStream
.map(_.map(_._1))
//Ignore records with errors
.collect { case Right(value) => value }
}
Expand Down Expand Up @@ -258,3 +266,19 @@ private object ConsumerGroupsStorageFacade {
def empty: ConsumerGroupsStorageFacade = ConsumerGroupsStorageFacade(Map.empty)
}

private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) {

def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = {
val res = this.copy(this.offsetMap + (key -> value))
println(this.offsetMap)
res
}

def removeOffset(key: Partition): ConsumerGroupsOffsetFacade =
this.copy(this.offsetMap - key)
}

private object ConsumerGroupsOffsetFacade {
def empty: ConsumerGroupsOffsetFacade = ConsumerGroupsOffsetFacade(Map.empty)
}

0 comments on commit d179477

Please sign in to comment.