Skip to content

Commit

Permalink
Stabilized parsing QueryProgress
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonard Wolters committed Dec 11, 2023
1 parent 7bd012c commit 2e879a8
Showing 1 changed file with 20 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.crobox.clickhouse.internal.progress

import com.typesafe.scalalogging.LazyLogging
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.{BroadcastHub, Keep, RunnableGraph, Source, SourceQueueWithComplete}
import org.apache.pekko.stream.{ActorAttributes, OverflowStrategy, Supervision}
import com.typesafe.scalalogging.LazyLogging
import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.json._

import scala.util.{Failure, Success, Try}

object QueryProgress extends LazyLogging {
Expand All @@ -18,7 +19,8 @@ object QueryProgress extends LazyLogging {
case class QueryRetry(cause: Throwable, retryNumber: Int) extends QueryProgress

case class ClickhouseQueryProgress(identifier: String, progress: QueryProgress)
case class Progress(rowsRead: Long, bytesRead: Long, rowsWritten: Long, bytesWritten: Long, totalRows: Long) extends QueryProgress
case class Progress(rowsRead: Long, bytesRead: Long, rowsWritten: Long, bytesWritten: Long, totalRows: Long)
extends QueryProgress

def queryProgressStream: RunnableGraph[(SourceQueueWithComplete[String], Source[ClickhouseQueryProgress, NotUsed])] =
Source
Expand All @@ -29,31 +31,21 @@ object QueryProgress extends LazyLogging {
Some(ClickhouseQueryProgress(queryId, QueryAccepted))
case queryId :: progressJson :: Nil =>
Try {
progressJson.parseJson match {
case JsObject(fields) if fields.size == 3 =>
ClickhouseQueryProgress(
queryId,
Progress(
fields("read_rows").convertTo[String].toLong,
fields("read_bytes").convertTo[String].toLong,
0,
0,
fields("total_rows").convertTo[String].toLong
)
)
case JsObject(fields) if fields.size >= 5 =>
ClickhouseQueryProgress(
queryId,
Progress(
fields("read_rows").convertTo[String].toLong,
fields("read_bytes").convertTo[String].toLong,
fields("written_rows").convertTo[String].toLong,
fields("written_bytes").convertTo[String].toLong,
fields("total_rows_to_read").convertTo[String].toLong
)
)
case _ => throw new IllegalArgumentException(s"Cannot extract progress from $progressJson")
}
val fields = progressJson.parseJson.asJsObject.fields
ClickhouseQueryProgress(
queryId,
Progress(
fields("read_rows").convertTo[String].toLong,
fields("read_bytes").convertTo[String].toLong,
fields.get("written_rows").map(_.convertTo[String].toLong).getOrElse(0),
fields.get("written_bytes").map(_.convertTo[String].toLong).getOrElse(0),
Iterable("total_rows_to_read", "total_rows")
.flatMap(fields.get)
.map(_.convertTo[String].toLong)
.headOption
.getOrElse(0L)
)
)
} match {
case Success(value) => Some(value)
case Failure(exception) =>
Expand Down

0 comments on commit 2e879a8

Please sign in to comment.