Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAPT1 - 1543] | Internal Consumer Endpoint - Enhance to Check Lag | Remove Partition With No Data | Send Notification Having Lag Percentage | Unit Test #25

Merged
merged 6 commits into from
May 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import hydra.common.NotificationsTestSuite
import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender}
import hydra.common.config.KafkaConfigUtils.{KafkaClientSecurityConfig, SchemaRegistrySecurityConfig, kafkaSecurityEmptyConfig}
import hydra.common.alerting.sender.InternalNotificationSender
import hydra.kafka.algebras.ConsumerGroupsAlgebra.PartitionOffsetMap
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{PartitionOffsetMap, PartitionOffsetsWithTotalLag}
import hydra.kafka.algebras.KafkaClientAlgebra.{OffsetInfo, Record}
import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue}
import hydra.kafka.model.TopicConsumerOffset.{TopicConsumerOffsetKey, TopicConsumerOffsetValue}
Expand Down Expand Up @@ -185,6 +185,31 @@ class ConsumerGroupsAlgebraSpec extends AnyWordSpecLike with Matchers with ForAl
case Right(_) => succeed
}.unsafeRunSync()
}

"getOffsetsForInternalCGTopic test to verify no lag with commit offsets as false" in {
val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123")

kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync()

kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false)
.take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some

cga.getOffsetsForInternalCGTopic shouldBe ( PartitionOffsetsWithTotalLag(1,1,0,0,_))
}

"getOffsetsForInternalCGTopic test to verify some lag with commit offsets as false" in {
val (key1, value1) = getGenericRecords(dvsConsumerTopic.value, "abc", "123")
val (key2, value2) = getGenericRecords(dvsConsumerTopic.value, "abcd", "1234")

kafkaClient.publishMessage((key1, Some(value1), None), dvsConsumerTopic.value).unsafeRunSync()
kafkaClient.publishMessage((key2, Some(value2), None), dvsConsumerTopic.value).unsafeRunSync()

kafkaClient.consumeMessages(dvsConsumerTopic.value, consumer1, commitOffsets = false)
.take(1).compile.last.unsafeRunSync() shouldBe (key1, value1.some, None).some

cga.getOffsetsForInternalCGTopic shouldBe (PartitionOffsetsWithTotalLag(2, 1, 1, 50, _))
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, IO, Timer}
import cats.implicits._
import fs2.kafka._
import hydra.avro.registry.SchemaRegistry
import hydra.common.alerting.AlertProtocol.NotificationMessage
import hydra.common.alerting.NotificationLevel
import hydra.common.alerting.AlertProtocol.{NotificationMessage, NotificationScope}
import hydra.common.alerting.{NotificationLevel, NotificationType}
import hydra.common.alerting.sender.{InternalNotificationSender, NotificationSender}
import hydra.common.config.KafkaConfigUtils.KafkaClientSecurityConfig
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{Consumer, ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers}
Expand All @@ -33,7 +33,7 @@ trait ConsumerGroupsAlgebra[F[_]] {

def getAllConsumers: F[List[ConsumerTopics]]

def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]]
def getOffsetsForInternalCGTopic: F[PartitionOffsetsWithTotalLag]

def getAllConsumersByTopic: F[List[TopicConsumers]]

Expand Down Expand Up @@ -98,7 +98,11 @@ final case class TestConsumerGroupsAlgebra(consumerGroupMap: Map[TopicConsumerKe

override def getUniquePerNodeConsumerGroup: String = "uniquePerNodeConsumerGroup"

override def getOffsetsForInternalConsumerGroup: IO[List[PartitionOffset]] = ???
override def getOffsetsForInternalCGTopic: IO[PartitionOffsetsWithTotalLag] = {
IO.pure(PartitionOffsetsWithTotalLag(60, 30, 30, 50,
List(PartitionOffset(1,10,20,10), PartitionOffset(2,10,20,10), PartitionOffset(3,10,20,10))
))
}
}

object TestConsumerGroupsAlgebra {
Expand All @@ -109,6 +113,9 @@ object ConsumerGroupsAlgebra {

type PartitionOffsetMap = Map[Int, Long]

final case class PartitionOffsetsWithTotalLag(totalLargestOffset: Long, totalGroupOffset: Long, totalLag: Long,
lagPercentage: Double, partitionOffsets: List[PartitionOffset])

final case class PartitionOffset(partition: Int, groupOffset: Long, largestOffset: Long, partitionLag: Long)

final case class TopicConsumers(topicName: String, consumers: List[Consumer])
Expand Down Expand Up @@ -157,33 +164,43 @@ object ConsumerGroupsAlgebra {
override def getConsumersForTopic(topicName: String): F[TopicConsumers] =
consumerGroupsStorageFacade.get.flatMap(a => addStateToTopicConsumers(a.getConsumersForTopicName(topicName)))

override def getOffsetsForInternalCGTopic: F[PartitionOffsetsWithTotalLag] = {

override def getOffsetsForInternalConsumerGroup: F[List[PartitionOffset]] = {
def getValueFromOffsetMap(partition: Int, offsetMap: Map[Partition, Offset]): Long =
offsetMap.get(partition) match {
case Some(value) => value + 1.toLong
case _ => 0
}

for {
groupOffsetsFromOffsetStream <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset())

// TODO: To be optimized
largestOffsets <- kAA.getLatestOffsets(dvsConsumersTopic.value)
.map(_.map(k => PartitionOffset
(
k._1.partition,
groupOffsetsFromOffsetStream.getOrElse(k._1.partition, 0),
k._2.value,
-1
)).toList)

offsetsWithLag = largestOffsets
.map(
k => PartitionOffset
(
k.partition,
k.groupOffset,
k.largestOffset,
k.largestOffset - k.groupOffset
)
)
}yield offsetsWithLag
groupOffsetMap <- consumerGroupsOffsetFacade.get.map(_.getAllPartitionOffset())

partitionOffsetMapWithLag <- kAA.getLatestOffsets(dvsConsumersTopic.value)
.map(_.toList
.filter(_._2.value > 0.toLong)
.map(latestOffset => PartitionOffset(
latestOffset._1.partition,
getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap),
latestOffset._2.value,
latestOffset._2.value - getValueFromOffsetMap(latestOffset._1.partition, groupOffsetMap)
)).toList)

(totalLargestOffset, totalGroupOffset) =
(partitionOffsetMapWithLag.map(_.largestOffset).sum, partitionOffsetMapWithLag.map(_.groupOffset).sum)

totalLag = totalLargestOffset - totalGroupOffset

lagPercentage: Double = (totalLag.toDouble / totalLargestOffset.toDouble) * 100

_ <- notificationsService.send(NotificationScope(NotificationLevel.Warn),
NotificationMessage(
s"""Total Offset Lag on ${dvsConsumersTopic} is ${totalLag.toString} ,
| Lag percentage is ${lagPercentage.toString} ,
| Total_Group_Offset is ${totalGroupOffset} ,
| Total_Largest_Offset is ${totalLargestOffset}""".stripMargin)
)
} yield
PartitionOffsetsWithTotalLag(totalLargestOffset, totalGroupOffset, totalLag, lagPercentage, partitionOffsetMapWithLag)
}

private def addStateToTopicConsumers(topicConsumers: TopicConsumers): F[TopicConsumers] = {
Expand Down Expand Up @@ -273,10 +290,12 @@ object ConsumerGroupsAlgebra {
consumerGroupsOffsetFacade: Ref[F, ConsumerGroupsOffsetFacade]
)(implicit notificationsService: InternalNotificationSender[F]): F[Unit] = {

offsetStream.evalTap {
case Right((partition, offset)) => consumerGroupsOffsetFacade.update(_.addOffset(partition, offset))
case _ => Logger[F].error("Error in consumeOffsetStreamIntoCache")
}.compile.drain
offsetStream.evalTap {
case Right((partition, offset)) => consumerGroupsOffsetFacade.update(_.addOffset(partition, offset))
case _ => Logger[F].error("Error in consumeOffsetStreamIntoCache")
}
.makeRetryableWithNotification(Infinite, "offsetStream")
.compile.drain
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ConsumerGroupsEndpoint[F[_]: Futurable](consumerGroupsAlgebra: ConsumerGro
} ~ pathPrefix("hydra-internal-topic") {
val startTime = Instant.now
pathEndOrSingleSlash {
onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalConsumerGroup)) {
onComplete(Futurable[F].unsafeToFuture(consumerGroupsAlgebra.getOffsetsForInternalCGTopic)) {
case Success(detailedConsumer) =>
addHttpMetric("hydra-internal-topic", StatusCodes.OK, "/v2/consumer-groups/hydra-internal-topic", startTime, method.value)
complete(StatusCodes.OK, detailedConsumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import spray.json.{RootJsonFormat, _}
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import hydra.core.transport.AckStrategy
import hydra.kafka.algebras.{ConsumerGroupsAlgebra, KafkaAdminAlgebra}
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, TopicConsumers}
import hydra.kafka.algebras.ConsumerGroupsAlgebra.{ConsumerTopics, DetailedConsumerGroup, DetailedTopicConsumers, PartitionOffset, PartitionOffsetsWithTotalLag, TopicConsumers}
import hydra.kafka.algebras.KafkaAdminAlgebra.{LagOffsets, Offset, TopicAndPartition}
import hydra.kafka.serializers.TopicMetadataV2Parser.IntentionallyUnimplemented

Expand Down Expand Up @@ -56,6 +56,20 @@ trait ConsumerGroupMarshallers extends DefaultJsonProtocol with SprayJsonSupport
override def read(json: JsValue): ConsumerGroupsAlgebra.Consumer = throw IntentionallyUnimplemented
}

implicit object totalOffsetsWithLag extends RootJsonFormat[PartitionOffsetsWithTotalLag] {
override def write(partitionOffsetsWithTotalLag: PartitionOffsetsWithTotalLag): JsValue = JsObject(List(
Some("totalGroupOffset" -> JsNumber(partitionOffsetsWithTotalLag.totalGroupOffset)),
Some("totalLargestOffset" -> JsNumber(partitionOffsetsWithTotalLag.totalLargestOffset)),
Some("totalLag" -> JsNumber(partitionOffsetsWithTotalLag.totalLag)),
Some("lagPercentage" -> DoubleJsonFormat.write(partitionOffsetsWithTotalLag.lagPercentage)),
if (partitionOffsetsWithTotalLag.partitionOffsets.isEmpty) None
else Some(
"partitionOffsets" -> JsArray(partitionOffsetsWithTotalLag.partitionOffsets.sortBy(_.partition).map(partitionOffset.write).toVector)
)).flatten.toMap)

override def read(json: JsValue): PartitionOffsetsWithTotalLag = throw IntentionallyUnimplemented
}

implicit val consumerTopicsFormat: RootJsonFormat[ConsumerTopics] = jsonFormat2(ConsumerTopics)
implicit val topicConsumersFormat: RootJsonFormat[TopicConsumers] = jsonFormat2(TopicConsumers)

Expand Down
Loading