Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migration to pekko #149

Merged
merged 3 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
[![Gitter](https://img.shields.io/gitter/room/clickhouse-scala-client/lobby.svg)](https://gitter.im/clickhouse-scala-client/lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.crobox.clickhouse/client_2.13/badge.svg?style=plastic)](https://maven-badges.herokuapp.com/maven-central/com.crobox.clickhouse/client_2.13)

Clickhouse Scala Client that uses Akka Http to create a reactive streams implementation to access the [Clickhouse](https://clickhouse.yandex) database in a reactive way.
Clickhouse Scala Client that uses Pekko Http to create a reactive streams implementation to access the [Clickhouse](https://clickhouse.yandex) database in a reactive way.

Features:
* read/write query execution
* akka streaming source for result parsing
* akka streaming sink for data insertion
* pekko streaming source for result parsing
* pekko streaming sink for data insertion
* streaming query progress (experimental)
* all the http interface settings
* load balancing with internal health checks (multi host and cluster aware host balancer)
Expand All @@ -35,7 +35,7 @@ libraryDependencies += "com.crobox.clickhouse" %% "client" % "0.9.0"
## Documentation
- [Quick Setup](#quick-setup)
- [Client](#client)
- [Indexer (Akka Streams Sink)](#indexer)
- [Indexer (Pekko Streams Sink)](#indexer)
- [Configuration](#configuration)
- [Client](#client-configuration)
- [Health checks](#health-checks)
Expand Down Expand Up @@ -77,7 +77,7 @@ val config: Config
val client: ClickhouseClient

val sink = ClickhouseSink.insertSink(config, client)
sink.runWith(Source.single(Insert("clicks", "{some_column: 3 }"))
sink.runWith(Source.single(Insert("clicks", "{some_column: 3 }")))
```

## Configuration
Expand Down Expand Up @@ -171,7 +171,7 @@ crobox.clickhouse.client {

### Indexer configuration

Inserting into clickhouse is done using an akka stream. All the settings are applied on a per table basis.
Inserting into clickhouse is done using an pekko stream. All the settings are applied on a per table basis.
We will do one insert when the maximum number of items `batch-size` or the maximum time has been exceeded `flush-interval`. Based on the number of `concurrent-requests` we can run multiple inserts in parallel for the same table.

```
Expand Down Expand Up @@ -256,7 +256,7 @@ client.sink("INSERT INTO my_table", Source.single(ByteString("el1"))).map(result
We only expose progress when running read only queries. The current implementation is recommended to be used only for long running queries which return a result relatively small in size (fits easily in memory).
The returned source is materialized with the query result.

When running queries with progress we set a custom client transport for the super pool used by client to run the queries. Due to limitation in the akka implementation which does not allow for the headers to be streamed we are parsing the raw http output and intercept the http headers to receive the progress.
When running queries with progress we set a custom client transport for the super pool used by client to run the queries. Due to limitation in the pekko implementation which does not allow for the headers to be streamed we are parsing the raw http output and intercept the http headers to receive the progress.

We expose multiple events for the progress:
* QueryAccepted - clickhouse returned the http response with code 200 (query might still fail)
Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ lazy val client: Project = (project in file("client"))
name := "client",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Seq(
"io.spray" %% "spray-json" % "1.3.6",
"com.typesafe.akka" %% "akka-actor" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"io.spray" %% "spray-json" % "1.3.6",
"org.apache.pekko" %% "pekko-actor" % PekkoVersion,
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"joda-time" % "joda-time" % "2.12.2"
"joda-time" % "joda-time" % "2.12.2"
) ++ testDependencies.map(_ % Test)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.crobox.clickhouse

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.scaladsl.{Framing, Source}
import akka.util.ByteString
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.stream.scaladsl.{Framing, Source}
import org.apache.pekko.util.ByteString
import com.crobox.clickhouse.balancing.HostBalancer
import com.crobox.clickhouse.internal.QuerySettings._
import com.crobox.clickhouse.internal._
Expand All @@ -15,7 +15,7 @@ import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}

/**
* Async clickhouse client using Akka Http and Streams
* Async clickhouse client using Pekko Http and Streams
*
* @author Sjoerd Mulder
* @since 31-03-17
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.crobox.clickhouse

import akka.http.scaladsl.model.StatusCode
import org.apache.pekko.http.scaladsl.model.StatusCode

sealed abstract class ClickhouseExecutionException(msg: String, cause: Throwable = null)
extends RuntimeException(msg, cause) {
Expand All @@ -18,7 +18,7 @@ case class ClickhouseChunkedException(message: String) extends ClickhouseExecuti

case class TooManyQueriesException()
extends ClickhouseExecutionException(
"The client's queue is full, you are trying to execute too many queries at the same time. This can be solved by either: checking the source of the queries to make sure this is not a bug\n Increasing the buffer size under the property `crobox.clickhouse.client.buffer-size`\n Adjust the settings of the super pool under `akka.http.host-connection-pool`"
"The client's queue is full, you are trying to execute too many queries at the same time. This can be solved by either: checking the source of the queries to make sure this is not a bug\n Increasing the buffer size under the property `crobox.clickhouse.client.buffer-size`\n Adjust the settings of the super pool under `pekko.http.host-connection-pool`"
) {
override val retryable: Boolean = false
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.crobox.clickhouse.balancing

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.Uri
import akka.pattern.ask
import akka.stream.scaladsl.Sink
import akka.stream.{ActorAttributes, Supervision}
import akka.util.Timeout
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import org.apache.pekko.http.scaladsl.model.Uri
import org.apache.pekko.pattern.ask
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.{ActorAttributes, Supervision}
import org.apache.pekko.util.Timeout
import com.crobox.clickhouse.balancing.discovery.ConnectionManagerActor.{GetConnection, LogDeadConnections}
import com.crobox.clickhouse.balancing.discovery.cluster.ClusterConnectionFlow

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.crobox.clickhouse.balancing

import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model._
import com.crobox.clickhouse.balancing.Connection.{BalancingHosts, ClusterAware, ConnectionType, SingleHost}
import com.crobox.clickhouse.balancing.discovery.ConnectionManagerActor
import com.crobox.clickhouse.balancing.discovery.health.ClickhouseHostHealth
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.crobox.clickhouse.balancing

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.Uri
import akka.pattern.ask
import akka.util.Timeout
import akka.util.Timeout.durationToTimeout
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import org.apache.pekko.http.scaladsl.model.Uri
import org.apache.pekko.pattern.ask
import org.apache.pekko.util.Timeout
import org.apache.pekko.util.Timeout.durationToTimeout
import com.crobox.clickhouse.balancing.discovery.ConnectionManagerActor
import com.crobox.clickhouse.balancing.discovery.ConnectionManagerActor.GetConnection
import com.crobox.clickhouse.internal.ClickhouseHostBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.crobox.clickhouse.balancing

import akka.http.scaladsl.model.Uri
import org.apache.pekko.http.scaladsl.model.Uri

import scala.concurrent.Future

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.crobox.clickhouse.balancing.discovery

import akka.actor.{Actor, ActorLogging, Cancellable, PoisonPill, Props, Stash, Status}
import akka.http.scaladsl.model.Uri
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.pekko.actor.{Actor, ActorLogging, Cancellable, PoisonPill, Props, Stash, Status}
import org.apache.pekko.http.scaladsl.model.Uri
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
import com.crobox.clickhouse.balancing.HostBalancer
import com.crobox.clickhouse.balancing.discovery.health.ClickhouseHostHealth.{Alive, ClickhouseHostStatus, Dead}
import com.crobox.clickhouse.balancing.iterator.CircularIteratorSet
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.crobox.clickhouse.balancing.discovery.cluster

import akka.actor.{ActorSystem, Cancellable}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.scaladsl.Source
import org.apache.pekko.actor.{ActorSystem, Cancellable}
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.Uri
import org.apache.pekko.http.scaladsl.settings.ConnectionPoolSettings
import org.apache.pekko.stream.scaladsl.Source
import com.crobox.clickhouse.balancing.discovery.ConnectionManagerActor.Connections
import com.crobox.clickhouse.internal.QuerySettings.ReadQueries
import com.crobox.clickhouse.internal.{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.crobox.clickhouse.balancing.discovery.health

import akka.NotUsed
import akka.actor.{ActorSystem, Cancellable}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.unmarshalling.Unmarshaller
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Source}
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.{ActorSystem, Cancellable}
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.settings.ConnectionPoolSettings
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshaller
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Flow, Source}
import com.crobox.clickhouse.internal.ClickhouseResponseParser

import scala.concurrent.duration._
Expand Down Expand Up @@ -70,7 +70,7 @@ object ClickhouseHostHealth extends ClickhouseResponseParser {
host: Uri
)(implicit ec: ExecutionContext, mat: Materializer): Flow[(Try[HttpResponse], T), ClickhouseHostStatus, NotUsed] =
Flow[(Try[HttpResponse], T)].mapAsync(1) {
case (Success(response @ akka.http.scaladsl.model.HttpResponse(StatusCodes.OK, _, _, _)), _) =>
case (Success(response @ org.apache.pekko.http.scaladsl.model.HttpResponse(StatusCodes.OK, _, _, _)), _) =>
Unmarshaller
.stringUnmarshaller(decodeResponse(response).entity)
.map(splitResponse)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.crobox.clickhouse.balancing

import akka.http.scaladsl.model.Uri
import org.apache.pekko.http.scaladsl.model.Uri

package object discovery {
// TODO we might want to provide the ability to specify a different port when using the hostnames from the cluster table
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.crobox.clickhouse.internal

import akka.actor.{ActorSystem, Terminated}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import org.apache.pekko.actor.{ActorSystem, Terminated}
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import com.crobox.clickhouse.balancing.HostBalancer
import com.crobox.clickhouse.internal.progress.QueryProgress._
import com.crobox.clickhouse.internal.progress.{QueryProgress, StreamingProgressClickhouseTransport}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.crobox.clickhouse.internal

import akka.http.scaladsl.model.Uri
import org.apache.pekko.http.scaladsl.model.Uri

private[clickhouse] trait ClickhouseHostBuilder {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.crobox.clickhouse.internal

import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.{HttpEncodingRange, RawHeader, `Content-Encoding`}
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity, Uri}
import org.apache.pekko.http.scaladsl.model.Uri.Query
import org.apache.pekko.http.scaladsl.model.headers.{HttpEncodingRange, RawHeader, `Content-Encoding`}
import org.apache.pekko.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity, Uri}
import com.crobox.clickhouse.internal.QuerySettings.ReadQueries
import com.crobox.clickhouse.internal.progress.ProgressHeadersAsEventsStage
import com.typesafe.config.Config
Expand All @@ -14,8 +14,8 @@ private[clickhouse] trait ClickhouseQueryBuilder extends LazyLogging {

private val Headers = {
import HttpEncodingRange.apply
import akka.http.scaladsl.model.headers.HttpEncodings.{deflate, gzip}
import akka.http.scaladsl.model.headers.`Accept-Encoding`
import org.apache.pekko.http.scaladsl.model.headers.HttpEncodings.{deflate, gzip}
import org.apache.pekko.http.scaladsl.model.headers.`Accept-Encoding`
immutable.Seq(`Accept-Encoding`(gzip, deflate))
}
private val MaxUriSize = 16 * 1024
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.crobox.clickhouse.internal

import akka.http.scaladsl.coding.Coders
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{HttpEncoding, HttpEncodings}
import akka.http.scaladsl.unmarshalling.Unmarshaller
import akka.stream.Materializer
import akka.stream.scaladsl.SourceQueue
import org.apache.pekko.http.scaladsl.coding.Coders
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.model.headers.{HttpEncoding, HttpEncodings}
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshaller
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.SourceQueue
import com.crobox.clickhouse.internal.progress.QueryProgress._
import com.crobox.clickhouse.{ClickhouseChunkedException, ClickhouseException}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.crobox.clickhouse.internal

import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.HttpEncoding
import org.apache.pekko.http.scaladsl.model.Uri.Query
import org.apache.pekko.http.scaladsl.model.headers.HttpEncoding
import com.crobox.clickhouse.internal.QuerySettings._
import com.typesafe.config.Config

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.crobox.clickhouse.internal.progress

import akka.actor.ActorSystem
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.http.scaladsl.{ClientTransport, Http}
import akka.stream.scaladsl.{BidiFlow, Flow, SourceQueue}
import akka.stream.stage._
import akka.stream.{Attributes, BidiShape, Inlet, Outlet}
import akka.util.ByteString
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.settings.ClientConnectionSettings
import org.apache.pekko.http.scaladsl.{ClientTransport, Http}
import org.apache.pekko.stream.scaladsl.{BidiFlow, Flow, SourceQueue}
import org.apache.pekko.stream.stage._
import org.apache.pekko.stream.{Attributes, BidiShape, Inlet, Outlet}
import org.apache.pekko.util.ByteString

import scala.concurrent.Future

/**
* Clickhouse sends http progress headers with the name X-ClickHouse-Progress which cannot be handled in a streaming way in akka
* Clickhouse sends http progress headers with the name X-ClickHouse-Progress which cannot be handled in a streaming way in Pekko.
* In the request we include our own custom header `X-Internal-Identifier` so we can send the internal query id with the progress
* The progress headers are being intercepted by the transport and sent to an internal source as progress events with the internal query id which will be used to route them to the query progress source
* We just proxy the request/response and do not manipulate them in any way
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.crobox.clickhouse.internal.progress
import akka.NotUsed
import akka.stream.scaladsl.{BroadcastHub, Keep, RunnableGraph, Source, SourceQueueWithComplete}
import akka.stream.{ActorAttributes, OverflowStrategy, Supervision}

import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.{BroadcastHub, Keep, RunnableGraph, Source, SourceQueueWithComplete}
import org.apache.pekko.stream.{ActorAttributes, OverflowStrategy, Supervision}
import com.typesafe.scalalogging.LazyLogging
import spray.json._
import spray.json.DefaultJsonProtocol._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.crobox.clickhouse.stream

import akka.Done
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.pekko.Done
import org.apache.pekko.stream.scaladsl.{Flow, Keep, Sink}
import com.crobox.clickhouse.ClickhouseClient
import com.crobox.clickhouse.internal.QuerySettings
import com.typesafe.config.Config
Expand Down
2 changes: 1 addition & 1 deletion client/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ crobox.clickhouse.client {
distributed_product_mode = "deny"
interactive_delay = 10000
}
akka.http.client.parsing.max-content-length = 83886080
pekko.http.client.parsing.max-content-length = 83886080
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.crobox.clickhouse

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.Uri
import akka.pattern.ask
import akka.testkit.TestKit
import akka.util.Timeout
import akka.util.Timeout.durationToTimeout
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import org.apache.pekko.http.scaladsl.model.Uri
import org.apache.pekko.pattern.ask
import org.apache.pekko.testkit.TestKit
import org.apache.pekko.util.Timeout
import org.apache.pekko.util.Timeout.durationToTimeout
import com.crobox.clickhouse.balancing.HostBalancer
import com.crobox.clickhouse.balancing.discovery.ConnectionManagerActor.GetConnection
import com.typesafe.config.{Config, ConfigFactory}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.crobox.clickhouse

import akka.actor.ActorSystem
import akka.testkit.TestKit
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.testkit.TestKit
import com.typesafe.config.{Config, ConfigFactory}
import org.scalactic.{Tolerance, TripleEqualsSupport}
import org.scalatest.BeforeAndAfterAll
Expand Down
Loading
Loading