Skip to content

Commit

Permalink
Merge pull request #154 from crobox/feature/pekko_dependency
Browse files Browse the repository at this point in the history
Feature/pekko dependency
  • Loading branch information
lwolters authored Nov 2, 2023
2 parents 8357e6b + 5f52fcb commit 2accbbf
Show file tree
Hide file tree
Showing 39 changed files with 340 additions and 377 deletions.
28 changes: 9 additions & 19 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
scala: [2.12.13, 2.13.8]
java: [temurin@11]
scala: [2.13.8]
java: [temurin@17]
clickhouse: [21.3, 21.8.14, 22.3]
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -41,12 +41,12 @@ jobs:
compose-file: './docker-compose.yml'
down-flags: '--volumes'

- name: Setup Java (temurin@11)
if: matrix.java == 'temurin@11'
- name: Setup Java (temurin@17)
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: 11
java-version: 17

- name: Cache sbt
uses: actions/cache@v2
Expand Down Expand Up @@ -83,20 +83,20 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [2.13.8]
java: [temurin@11]
java: [temurin@17]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Java (temurin@11)
if: matrix.java == 'temurin@11'
- name: Setup Java (temurin@17)
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: 11
java-version: 17

- name: Cache sbt
uses: actions/cache@v2
Expand All @@ -110,16 +110,6 @@ jobs:
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Download target directories (2.12.13)
uses: actions/download-artifact@v2
with:
name: target-${{ matrix.os }}-2.12.13-${{ matrix.java }}

- name: Inflate target directories (2.12.13)
run: |
tar xf targets.tar
rm targets.tar
- name: Download target directories (2.13.8)
uses: actions/download-artifact@v2
with:
Expand Down
23 changes: 8 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Build._
import Build.*

// Scala Formatting
ThisBuild / scalafmtVersion := "1.5.1"
Expand All @@ -19,15 +19,7 @@ lazy val root = (project in file("."))
scalaVersion := "2.13.8",
crossScalaVersions := List("2.12.13", "2.13.8"),
javacOptions ++= Seq("-g", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "11", "-target", "11"),
scalacOptions ++= Seq(
// "-Wconf:cat=deprecation:ws,any:e",
// "-target:jvm-11",
"-unchecked",
"-deprecation",
"-feature",
"-language:_",
"-encoding",
"UTF-8"),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-language:_", "-encoding", "UTF-8"),
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (version.value.trim.endsWith("SNAPSHOT"))
Expand Down Expand Up @@ -74,24 +66,25 @@ lazy val client: Project = (project in file("client"))
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"joda-time" % "joda-time" % "2.12.2"
) ++ testDependencies.map(_ % Test)
"joda-time" % "joda-time" % "2.12.5"
) ++ Seq("org.apache.pekko" %% "pekko-testkit" % PekkoVersion % Test) ++ Build.testDependencies.map(_ % Test)
)

lazy val dsl = (project in file("dsl"))
.dependsOn(client, client % "test->test", testkit % Test)
.configs(Config.CustomIntegrationTest)
.settings(Config.testSettings: _*)
.settings(
name := "dsl",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Seq("com.google.guava" % "guava" % "23.0")
libraryDependencies ++= Seq("com.google.guava" % "guava" % "23.0", "com.typesafe" % "config" % "1.4.2")
)
.dependsOn(client, client % "test->test", testkit % Test)
// .settings(excludeDependencies ++= Seq(ExclusionRule("org.apache.pekko")))

lazy val testkit = (project in file("testkit"))
.dependsOn(client)
.settings(
name := "testkit",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Build.testDependencies
)
.dependsOn(client)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.crobox.clickhouse

case class ClickhouseServerVersion(versions: Seq[Int]) {

def minimalVersion(version: Int): Boolean = versions.head >= version

def minimalVersion(version: Int, subVersion: Int): Boolean =
if (versions.head < version) false
else if (versions.head == version) versions(1) >= subVersion
else true
}

object ClickhouseServerVersion {

def apply(version: String): ClickhouseServerVersion =
ClickhouseServerVersion(version.split('.').toSeq.map(_.filter(_.isDigit)).filter(_.trim.nonEmpty).map(_.toInt))

def latest: ClickhouseServerVersion = ClickhouseServerVersion(versions = Seq(21, 8, 14))
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package com.crobox.clickhouse.balancing

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model._
import com.crobox.clickhouse.balancing.Connection.{BalancingHosts, ClusterAware, ConnectionType, SingleHost}
import com.crobox.clickhouse.balancing.discovery.ConnectionManagerActor
import com.crobox.clickhouse.balancing.discovery.health.ClickhouseHostHealth
import com.crobox.clickhouse.internal.ClickhouseHostBuilder
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model._

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
//import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters._

trait HostBalancer extends LazyLogging {
def nextHost: Future[Uri]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.crobox.clickhouse.internal

import org.apache.pekko.http.scaladsl.model.Uri.Query
import org.apache.pekko.http.scaladsl.model.headers.HttpEncoding
import com.crobox.clickhouse.internal.QuerySettings._
import com.typesafe.config.Config
import org.apache.pekko.http.scaladsl.model.Uri.Query
import org.apache.pekko.http.scaladsl.model.headers.HttpEncoding

//import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.Try

case class QuerySettings(readOnly: ReadOnlySetting = AllQueries,
Expand Down
21 changes: 0 additions & 21 deletions client/src/main/scala/com/crobox/clickhouse/package.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.crobox.clickhouse.stream

import org.apache.pekko.stream.scaladsl._
import com.crobox.clickhouse.internal.QuerySettings
import com.crobox.clickhouse.{ClickhouseClient, ClickhouseClientAsyncSpec}
import org.apache.pekko.stream.scaladsl._
import org.scalatest.concurrent.{Eventually, ScalaFutures}

import scala.concurrent.duration._
Expand Down Expand Up @@ -54,19 +54,21 @@ class ClickhouseIndexingSubscriberTest extends ClickhouseClientAsyncSpec with Sc
)
)

def parsedInserts(key: String) = unparsedInserts(key).map(
_.mapValues({ // do NOT change to .view.mapValues given compilation errors for scala 2.12.+
case value: Int => value.toString
case value: String => "\"" + value + "\""
case value: IndexedSeq[_] => "[" + value.mkString(", ") + "]"
}).map { case (k, v) => s""""$k" : $v""" }
def parsedInserts(key: String): Seq[String] = unparsedInserts(key).map(
_.view
.mapValues({
case value: Int => value.toString
case value: String => "\"" + value + "\""
case value: IndexedSeq[_] => "[" + value.mkString(", ") + "]"
})
.map { case (k, v) => s""""$k" : $v""" }
.mkString(", ")
)

it should "index items" in {
val inserts = parsedInserts("two")
val res = Source
.fromIterator(() => inserts.toIterator) // do NOT change to .iterator given compilation errors for scala 2.12.+
.fromIterator(() => inserts.iterator)
.map(data => Insert("test.insert", "{" + data + "}"))
.runWith(ClickhouseSink.toSink(config, client, Some("no-overrides")))
Await.ready(res, 5.seconds)
Expand Down
59 changes: 56 additions & 3 deletions dsl/src/it/scala/com/crobox/clickhouse/DslITSpec.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
package com.crobox.clickhouse

import com.crobox.clickhouse.dsl.Query
import com.crobox.clickhouse.dsl._
import com.crobox.clickhouse.dsl.execution.{ClickhouseQueryExecutor, QueryExecutor}
import com.crobox.clickhouse.dsl.language._
import com.crobox.clickhouse.dsl.schemabuilder.{CreateTable, Engine}
import com.crobox.clickhouse.internal.QuerySettings
import com.crobox.clickhouse.testkit.ClickhouseSpec
import org.scalatest.Suite
import org.scalatest.time.{Millis, Seconds, Span}
import spray.json.DefaultJsonProtocol.{jsonFormat, IntJsonFormat, StringJsonFormat}
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

import scala.concurrent.Future

trait DslITSpec extends DslIntegrationSpec {
trait DslITSpec extends ClickhouseClientSpec with ClickhouseSpec with TestSchema with ClickhouseTokenizerModule {
this: Suite =>

implicit lazy val queryExecutor: QueryExecutor = ClickhouseQueryExecutor.default(clickClient)
implicit def ctx: TokenizeContext = TokenizeContext(clickClient.serverVersion)

val table1Entries: Seq[Table1Entry] = Seq()
val table2Entries: Seq[Table2Entry] = Seq()
val table3Entries: Seq[Table3Entry] = Seq()

override implicit def patienceConfig: PatienceConfig =
PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(20, Millis)))

Expand All @@ -20,11 +31,53 @@ trait DslITSpec extends DslIntegrationSpec {
clickClient.query(toSql(query.internalQuery, None)).map(_.trim)
}

protected def r(query: Column): String = runSql(select(query)).futureValue.trim

protected def runSql(query: OperationalQuery): Future[String] =
clickClient.query(toSql(query.internalQuery, None))

override def beforeAll(): Unit = {
super.beforeAll()
val tables = for {
_ <- clickClient.execute(
CreateTable(OneTestTable, Engine.Memory, ifNotExists = true).query
)
_ <- clickClient.execute(
CreateTable(
TwoTestTable,
Engine.Memory,
ifNotExists = true
).query
)
_ <- clickClient.execute(
CreateTable(
ThreeTestTable,
Engine.Memory,
ifNotExists = true
).query
)
} yield {}
whenReady(tables) { _ =>
val inserts = for {
_ <- table1Entries.into(OneTestTable)
_ <- table2Entries.into(TwoTestTable)
_ <- table3Entries.into(ThreeTestTable)
} yield {}
inserts.futureValue
}
}

override def afterAll(): Unit = super.afterAll()
}

object DslITSpec {
case class StringResult(result: String)

implicit val stringResultFormat: RootJsonFormat[StringResult] =
jsonFormat[String, StringResult](StringResult.apply, "result")

case class IntResult(result: Int)

implicit val intResultFormat: RootJsonFormat[IntResult] =
jsonFormat[Int, IntResult](IntResult.apply, "result")
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class ClickhouseTimeSeriesIT extends DslITSpec with TableDrivenPropertyChecks {
}

private def getEntries(multiInterval: MultiInterval, entriesId: UUID) =
chExecutor.execute[CustomResult](
queryExecutor.execute[CustomResult](
select(count() as "shields", toUInt64(timeSeries(timestampColumn, multiInterval)) as alias)
.from(OneTestTable)
.groupBy(alias)
Expand Down
Loading

0 comments on commit 2accbbf

Please sign in to comment.