Skip to content

Commit

Permalink
Merge pull request #22 from ps-dev/adapt1-1526
Browse files Browse the repository at this point in the history
[ADAPT1-1526] | Create an offset stream for consumer group internal topic
  • Loading branch information
abhivermaaa authored May 8, 2024
2 parents b013766 + 9e43eed commit 4d6d7d3
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers}
import hydra.kafka.algebras.ConsumerGroupsAlgebra._
import hydra.kafka.algebras.HydraTag.StringJsonFormat
import hydra.kafka.algebras.KafkaClientAlgebra.Record
import hydra.kafka.algebras.KafkaClientAlgebra.{Offset, Partition, Record}
import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Infinite
import hydra.kafka.algebras.RetryableFs2Stream._
import hydra.kafka.model.TopicConsumer
Expand Down Expand Up @@ -132,8 +132,16 @@ object ConsumerGroupsAlgebra {
kafkaClientSecurityConfig: KafkaClientSecurityConfig
) (implicit notificationsService: InternalNotificationSender[F]): F[ConsumerGroupsAlgebra[F]] = {


val parentStream = kafkaClientAlgebra.consumeSafelyWithOffsetInfo(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false)

val offsetStream: fs2.Stream[F, Either[Throwable, (Partition, Offset)]] = parentStream.map(x => {
x.map(_._2)
})

val dvsConsumersStream: fs2.Stream[F, Record] = {
kafkaClientAlgebra.consumeSafelyMessages(dvsConsumersTopic.value, uniquePerNodeConsumerGroup, commitOffsets = false)
parentStream
.map(_.map(_._1))
//Ignore records with errors
.collect { case Right(value) => value }
}
Expand Down Expand Up @@ -258,3 +266,19 @@ private object ConsumerGroupsStorageFacade {
def empty: ConsumerGroupsStorageFacade = ConsumerGroupsStorageFacade(Map.empty)
}

private case class ConsumerGroupsOffsetFacade(offsetMap: Map[Partition, Offset]) {

def addOffset(key: Partition, value: Offset): ConsumerGroupsOffsetFacade = {
val res = this.copy(this.offsetMap + (key -> value))
println(this.offsetMap)
res
}

def removeOffset(key: Partition): ConsumerGroupsOffsetFacade =
this.copy(this.offsetMap - key)
}

private object ConsumerGroupsOffsetFacade {
def empty: ConsumerGroupsOffsetFacade = ConsumerGroupsOffsetFacade(Map.empty)
}

Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ object KafkaClientAlgebra {
: fs2.Stream[F, Either[Throwable, (StringRecord, (Partition, Offset))]] = ???

override def consumeSafelyWithOffsetInfo(topicName: TopicName, consumerGroup: ConsumerGroup, commitOffsets: Boolean)
: fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = ???
: fs2.Stream[F, Either[Throwable, ((GenericRecord, Option[GenericRecord], Option[Headers]), (Partition, Offset))]] = fs2.Stream.empty

override def withProducerRecordSizeLimit(sizeLimitBytes: Long): F[KafkaClientAlgebra[F]] = Sync[F].delay {
getTestInstance(cache, schemaRegistryUrl, schemaRegistry, sizeLimitBytes.some)
Expand Down

0 comments on commit 4d6d7d3

Please sign in to comment.