From 8030d6ebae991983219a3232301bb47066325e74 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 13:40:52 +0300 Subject: [PATCH 01/13] [CORE][NONE]: small refactoring --- .../chronicler/core/api/DatabaseApi.scala | 3 ++- .../core/query/DatabaseOperationQuery.scala | 15 +++++---------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala b/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala index 0954b3d6..4d3f5118 100644 --- a/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala +++ b/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala @@ -33,6 +33,8 @@ import org.typelevel.jawn.ast.JArray * @tparam R - HTTP response * @tparam U - HTTP URi * @tparam E - Entity + * + * @since Big Bang */ class DatabaseApi[F[_], G[_], R, U, E]( dbName: String, @@ -130,7 +132,6 @@ class DatabaseApi[F[_], G[_], R, U, E]( pretty: Boolean = false ): F[ErrorOr[Array[(Array[String], JArray)]]] = { val uri = singleQuery(dbName, query, epoch, pretty) - F.flatMap(re.get(uri, compress))(resp => FK(rh.groupedResultJson(resp))) } } diff --git a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala index b38eb05b..77978634 100644 --- a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala +++ b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala @@ -40,14 +40,13 @@ trait DatabaseOperationQuery[U] { if (epoch.isNone) queryParams else ("epoch" -> epoch.toString) :: queryParams + // format: off private[chronicler] final def write( dbName: String, consistency: Consistency, precision: Precision, retentionPolicy: Option[String] - )(implicit qb: QueryBuilder[U] - ): U = { - + )(implicit qb: QueryBuilder[U]): U = { val queryParams = Nil val withRP = @@ -70,9 +69,7 @@ trait DatabaseOperationQuery[U] { query: String, epoch: Epoch, pretty: Boolean - )(implicit qb: QueryBuilder[U] - ): U = { - + )(implicit qb: QueryBuilder[U]): U = { val queryParams = List("q" -> query) val withEpoch = addEpochQueryParam(epoch, queryParams) val withPretty = addPrettyQueryParam(pretty, withEpoch) @@ -86,8 +83,7 @@ trait DatabaseOperationQuery[U] { epoch: Epoch, pretty: Boolean, chunkSize: Int - )(implicit qb: QueryBuilder[U] - ): U = { + )(implicit qb: QueryBuilder[U]): U = { val queryParams = List( "db" -> dbName, @@ -107,8 +103,7 @@ trait DatabaseOperationQuery[U] { queries: Seq[String], epoch: Epoch, pretty: Boolean - )(implicit qb: QueryBuilder[U] - ): U = { + )(implicit qb: QueryBuilder[U]): U = { val queryParams = List("q" -> queries.mkString(";")) val withEpoch = addEpochQueryParam(epoch, queryParams) val withPretty = addPrettyQueryParam(pretty, withEpoch) From d56b5de58399536cbba31433ef10c2ee228ec08e Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 13:41:21 +0300 Subject: [PATCH 02/13] [EXAMPLE][NONE]: update udp example --- .../Main.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala b/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala index 8dd74272..9d47b1c3 100644 --- a/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala +++ b/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala @@ -1,26 +1,24 @@ package com.github.fsanaulla.chronicler.example.udp -import com.github.fsanaulla.chronicler.core.model.{InfluxFormatter, Point} -import com.github.fsanaulla.chronicler.macros.Influx import com.github.fsanaulla.chronicler.macros.annotations.{field, tag} +import com.github.fsanaulla.chronicler.macros.auto._ import com.github.fsanaulla.chronicler.udp.InfluxUdp +import scala.util.Try + object Main { + def main(args: Array[String]): Unit = { final case class Test(@tag name: String, @field age: Int) - - // generate formatter at compile-time - implicit val fmt: InfluxFormatter[Test] = Influx.formatter[Test] - - val t = Test("f", 1) - val host = args.headOption.getOrElse("localhost") + val t = Test("f", 1) + val host = args.headOption.getOrElse("localhost") val influx = InfluxUdp(host) for { // write record to Influx _ <- influx.write("cpu", t) // close client - _ <- influx.close() + _ <- Try(influx.close()) } yield println("Stored!") } } From bacd26a66deac9be2b80af1f10b0db8da49de887 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 13:41:52 +0300 Subject: [PATCH 03/13] [AKKA][NONE]: remove rudimental spec --- .../GzippedDatabaseApiSpec.scala | 189 ------------------ 1 file changed, 189 deletions(-) delete mode 100644 modules/akka/io/src/it/scala/com.github.fsanaulla.chronicler.akka/GzippedDatabaseApiSpec.scala diff --git a/modules/akka/io/src/it/scala/com.github.fsanaulla.chronicler.akka/GzippedDatabaseApiSpec.scala b/modules/akka/io/src/it/scala/com.github.fsanaulla.chronicler.akka/GzippedDatabaseApiSpec.scala deleted file mode 100644 index a724c8bf..00000000 --- a/modules/akka/io/src/it/scala/com.github.fsanaulla.chronicler.akka/GzippedDatabaseApiSpec.scala +++ /dev/null @@ -1,189 +0,0 @@ -package com.github.fsanaulla.chronicler.akka - -import java.nio.file.Paths - -import _root_.akka.actor.ActorSystem -import _root_.akka.testkit.TestKit -import com.github.fsanaulla.chronicler.akka.SampleEntitys._ -import com.github.fsanaulla.chronicler.akka.io.{AkkaIOClient, InfluxIO} -import com.github.fsanaulla.chronicler.akka.management.{AkkaManagementClient, InfluxMng} -import com.github.fsanaulla.chronicler.akka.shared.InfluxConfig -import com.github.fsanaulla.chronicler.core.enums.Epochs -import com.github.fsanaulla.chronicler.core.jawn._ -import com.github.fsanaulla.chronicler.core.model.Point -import com.github.fsanaulla.chronicler.testing.it.{DockerizedInfluxDB, Futures} -import org.scalatest.{FlatSpecLike, Ignore, Matchers} -import org.typelevel.jawn.ast.{JArray, JNum, JString} - -import scala.concurrent.ExecutionContext.Implicits.global - -@Ignore -class GzippedDatabaseApiSpec - extends TestKit(ActorSystem()) - with FlatSpecLike - with Matchers - with Futures - with DockerizedInfluxDB { - - val testDB = "db" - - lazy val influxConf = - InfluxConfig(host, port, credentials = Some(creds), compress = true, None) - - lazy val mng: AkkaManagementClient = - InfluxMng(host, port, credentials = Some(creds)) - - lazy val io: AkkaIOClient = - InfluxIO(influxConf) - - lazy val db: io.Database = io.database(testDB) - - it should "write data from file" in { - mng.createDatabase(testDB).futureValue.right.get shouldEqual 200 - - db.writeFromFile(Paths.get(getClass.getResource("/points.txt").getPath)) - .futureValue - .right - .get shouldEqual 204 - - db.readJson("SELECT * FROM test1").futureValue.right.get.length shouldEqual 3 - } - - it should "write 2 points represented entities" in { - val point1 = Point("test2") - .addTag("sex", "Male") - .addTag("firstName", "Martin") - .addTag("lastName", "Odersky") - .addField("age", 54) - - val point2 = Point("test2") - .addTag("sex", "Male") - .addTag("firstName", "Jame") - .addTag("lastName", "Franko") - .addField("age", 36) - - db.writePoint(point1).futureValue.right.get shouldEqual 204 - - db.readJson("SELECT * FROM test2", epoch = Epochs.Nanoseconds) - .futureValue - .right - .get - // skip timestamp - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(54), JString("Martin"), JString("Odersky"), JString("Male"))) - ) - - db.bulkWritePoints(Array(point1, point2)).futureValue.right.get shouldEqual 204 - - db.readJson("SELECT * FROM test2", epoch = Epochs.Nanoseconds) - .futureValue - .right - .get - // skip timestamp - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(54), JString("Martin"), JString("Odersky"), JString("Male"))), - JArray(Array(JNum(36), JString("Jame"), JString("Franko"), JString("Male"))), - JArray(Array(JNum(54), JString("Martin"), JString("Odersky"), JString("Male"))) - ) - } - - it should "retrieve multiple request" in { - - val multiQuery = db - .bulkReadJson( - Array( - "SELECT * FROM test2", - "SELECT * FROM test2 WHERE age < 40" - ) - ) - .futureValue - - multiQuery.right.get.length shouldEqual 2 - multiQuery.right.get shouldBe a[Array[_]] - - multiQuery.right.get.head.length shouldEqual 3 - multiQuery.right.get.head shouldBe a[Array[_]] - multiQuery.right.get.head.head shouldBe a[JArray] - - multiQuery.right.get.last.length shouldEqual 1 - multiQuery.right.get.last shouldBe a[Array[_]] - multiQuery.right.get.last.head shouldBe a[JArray] - - multiQuery.right.get - .map(_.map(_.arrayValue.right.get.tail)) shouldEqual largeMultiJsonEntity.map( - _.map(_.arrayValue.right.get.tail) - ) - } - - it should "write native" in { - - db.writeNative("test3,sex=Male,firstName=Jame,lastName=Lannister age=48") - .futureValue - .right - .get shouldEqual 204 - - db.readJson("SELECT * FROM test3") - .futureValue - .right - .get - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(48), JString("Jame"), JString("Lannister"), JString("Male"))) - ) - - db.bulkWriteNative( - Seq( - "test4,sex=Male,firstName=Jon,lastName=Snow age=24", - "test4,sex=Female,firstName=Deny,lastName=Targaryen age=25" - ) - ) - .futureValue - .right - .get shouldEqual 204 - - db.readJson("SELECT * FROM test4") - .futureValue - .right - .get - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(25), JString("Deny"), JString("Targaryen"), JString("Female"))), - JArray(Array(JNum(24), JString("Jon"), JString("Snow"), JString("Male"))) - ) - } - - it should "return grouped result by sex and sum of ages" in { - - db.bulkWriteNative( - Array( - "test5,sex=Male,firstName=Jon,lastName=Snow age=24", - "test5,sex=Male,firstName=Rainer,lastName=Targaryen age=25" - ) - ) - .futureValue - .right - .get shouldEqual 204 - - db.readGroupedJson( - "SELECT SUM(\"age\") FROM \"test5\" GROUP BY \"sex\"", - epoch = Epochs.Nanoseconds - ) - .futureValue - .right - .get - .map { case (k, v) => k.toSeq -> v } shouldEqual Array( - Seq("Male") -> JArray(Array(JNum(0), JNum(49))) - ) - } - - it should "write escaped value" in { - val p = Point("test6") - .addTag("key,", "value,") - .addField("field=key", 1) - - db.writePoint(p).futureValue.right.get shouldEqual 204 - - db.readJson("SELECT * FROM test6").futureValue.right.get.length shouldEqual 1 - - mng.close() shouldEqual {} - io.close() shouldEqual {} - } -} From feacaf6db9aeb9dcc1495bec6e56e5f07193083d Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 13:52:05 +0300 Subject: [PATCH 04/13] [AHC][ISSUE-178]: moving to pure ahc client --- .../ahc/io/it/CompressionSpec.scala | 5 +- .../ahc/io/it/DatabaseApiSpec.scala | 31 +-- .../ahc/io/it/GzippedDatabaseApiSpec.scala | 181 ------------------ .../chronicler/ahc/io/AhcIOClient.scala | 17 +- .../unit/DatabaseApiOperationQuerySpec.scala | 37 ++-- .../chronicler/ahc/io/unit/package.scala | 10 +- .../ahc/management/AhcManagementClient.scala | 15 +- .../management/ContinuousQueriesSpec.scala | 23 +-- .../management/DataManagementQuerySpec.scala | 75 ++++---- .../QueriesManagementQuerySpec.scala | 19 +- .../RetentionPolicyManagementQuerySpec.scala | 39 ++-- .../management/ShardManagementQuerySpec.scala | 23 +-- .../SubscriptionsManagementQuerySpec.scala | 29 +-- .../management/UserManagementQuerySpec.scala | 51 ++--- .../chronicler/ahc/management/package.scala | 10 +- .../ahc/shared/InfluxAhcClient.scala | 17 +- .../fsanaulla/chronicler/ahc/shared/Uri.scala | 56 ++++++ .../chronicler/ahc/shared/alias/package.scala | 24 --- .../ahc/shared/handlers/AhcJsonHandler.scala | 55 ++++-- .../ahc/shared/handlers/AhcQueryBuilder.scala | 25 ++- .../shared/handlers/AhcRequestExecutor.scala | 59 +++--- .../shared/handlers/AhcJsonHandlerSpec.scala | 19 +- .../shared/handlers/AhcQueryBuilderSpec.scala | 2 +- .../handlers/AhcResponseHandlerSpec.scala | 34 +++- 24 files changed, 372 insertions(+), 484 deletions(-) delete mode 100644 modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/GzippedDatabaseApiSpec.scala create mode 100644 modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/Uri.scala delete mode 100644 modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/alias/package.scala diff --git a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala index 3e81ae36..ff41b8d2 100644 --- a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala +++ b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala @@ -4,10 +4,11 @@ import java.nio.file.Paths import com.github.fsanaulla.chronicler.ahc.io.InfluxIO import com.github.fsanaulla.chronicler.ahc.management.InfluxMng +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.core.alias.Id import com.github.fsanaulla.chronicler.core.api.DatabaseApi import com.github.fsanaulla.chronicler.testing.it.DockerizedInfluxDB -import com.softwaremill.sttp.{Response, Uri} +import org.asynchttpclient.Response import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} import org.scalatest.{FlatSpec, Matchers} @@ -30,7 +31,7 @@ class CompressionSpec lazy val io = InfluxIO(host, port, Some(creds) /*, compress = true*/ ) - lazy val db: DatabaseApi[Future, Id, Response[Array[Byte]], Uri, String] = + lazy val db: DatabaseApi[Future, Id, Response, Uri, String] = io.database(testDB) it should "ping database" in { diff --git a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala index 482f9fd3..af981be4 100644 --- a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala +++ b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala @@ -84,27 +84,28 @@ class DatabaseApiSpec extends FlatSpec with Matchers with Futures with Dockerize } it should "retrieve multiple request" in { + db.readJson("SELECT * FROM test2").futureValue.right.get.length shouldEqual 3 + + db.readJson("SELECT * FROM test2 WHERE age < 40").futureValue.right.get.length shouldEqual 1 + val multiQuery = db - .bulkReadJson( - Array( - "SELECT * FROM test2", - "SELECT * FROM test2 WHERE age < 40" - ) - ) + .bulkReadJson(Seq("SELECT * FROM test2", "SELECT * FROM test2 WHERE age < 40")) .futureValue + .right + .get - multiQuery.right.get.length shouldEqual 2 - multiQuery.right.get shouldBe a[Array[_]] + multiQuery.length shouldEqual 2 + multiQuery shouldBe a[Array[_]] - multiQuery.right.get.head.length shouldEqual 3 - multiQuery.right.get.head shouldBe a[Array[_]] - multiQuery.right.get.head.head shouldBe a[JArray] + multiQuery.head.length shouldEqual 3 + multiQuery.head shouldBe a[Array[_]] + multiQuery.head.head shouldBe a[JArray] - multiQuery.right.get.last.length shouldEqual 1 - multiQuery.right.get.last shouldBe a[Array[_]] - multiQuery.right.get.last.head shouldBe a[JArray] + multiQuery.last.length shouldEqual 1 + multiQuery.last shouldBe a[Array[_]] + multiQuery.last.head shouldBe a[JArray] - multiQuery.right.get + multiQuery .map(_.map(_.arrayValue.right.get.tail)) shouldEqual largeMultiJsonEntity.map( _.map(_.arrayValue.right.get.tail) ) diff --git a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/GzippedDatabaseApiSpec.scala b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/GzippedDatabaseApiSpec.scala deleted file mode 100644 index 23e2ba4f..00000000 --- a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/GzippedDatabaseApiSpec.scala +++ /dev/null @@ -1,181 +0,0 @@ -package com.github.fsanaulla.chronicler.ahc.io.it - -import java.nio.file.Paths - -import com.github.fsanaulla.chronicler.ahc.io.{AhcIOClient, InfluxIO} -import com.github.fsanaulla.chronicler.ahc.management.{AhcManagementClient, InfluxMng} -import com.github.fsanaulla.chronicler.ahc.shared.InfluxConfig -import com.github.fsanaulla.chronicler.core.enums.Epochs -import com.github.fsanaulla.chronicler.core.jawn._ -import com.github.fsanaulla.chronicler.core.model.Point -import com.github.fsanaulla.chronicler.testing.it.{DockerizedInfluxDB, Futures} -import org.scalatest.{FlatSpec, Ignore, Matchers} -import org.typelevel.jawn.ast.{JArray, JNum, JString} - -import scala.concurrent.ExecutionContext.Implicits.global - -@Ignore -class GzippedDatabaseApiSpec extends FlatSpec with Matchers with Futures with DockerizedInfluxDB { - - val testDB = "db" - - lazy val influxConf = - InfluxConfig(host, port, credentials = Some(creds), compress = true, None) - - lazy val mng: AhcManagementClient = - InfluxMng(host, port, credentials = Some(creds)) - - lazy val io: AhcIOClient = - InfluxIO(influxConf) - - lazy val db: io.Database = io.database(testDB) - - it should "write data from file" in { - mng.createDatabase(testDB).futureValue.right.get shouldEqual 200 - - db.writeFromFile(Paths.get(getClass.getResource("/points.txt").getPath)) - .futureValue - .right - .get shouldEqual 204 - - db.readJson("SELECT * FROM test1").futureValue.right.get.length shouldEqual 3 - } - - it should "write 2 points represented entities" in { - val point1 = Point("test2") - .addTag("sex", "Male") - .addTag("firstName", "Martin") - .addTag("lastName", "Odersky") - .addField("age", 54) - - val point2 = Point("test2") - .addTag("sex", "Male") - .addTag("firstName", "Jame") - .addTag("lastName", "Franko") - .addField("age", 36) - - db.writePoint(point1).futureValue.right.get shouldEqual 204 - - db.readJson("SELECT * FROM test2", epoch = Epochs.Nanoseconds) - .futureValue - .right - .get - // skip timestamp - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(54), JString("Martin"), JString("Odersky"), JString("Male"))) - ) - - db.bulkWritePoints(Array(point1, point2)).futureValue.right.get shouldEqual 204 - - db.readJson("SELECT * FROM test2", epoch = Epochs.Nanoseconds) - .futureValue - .right - .get - // skip timestamp - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(54), JString("Martin"), JString("Odersky"), JString("Male"))), - JArray(Array(JNum(36), JString("Jame"), JString("Franko"), JString("Male"))), - JArray(Array(JNum(54), JString("Martin"), JString("Odersky"), JString("Male"))) - ) - } - - it should "retrieve multiple request" in { - - val multiQuery = db - .bulkReadJson( - Array( - "SELECT * FROM test2", - "SELECT * FROM test2 WHERE age < 40" - ) - ) - .futureValue - - multiQuery.right.get.length shouldEqual 2 - multiQuery.right.get shouldBe a[Array[_]] - - multiQuery.right.get.head.length shouldEqual 3 - multiQuery.right.get.head shouldBe a[Array[_]] - multiQuery.right.get.head.head shouldBe a[JArray] - - multiQuery.right.get.last.length shouldEqual 1 - multiQuery.right.get.last shouldBe a[Array[_]] - multiQuery.right.get.last.head shouldBe a[JArray] - - multiQuery.right.get - .map(_.map(_.arrayValue.right.get.tail)) shouldEqual largeMultiJsonEntity.map( - _.map(_.arrayValue.right.get.tail) - ) - } - - it should "write native" in { - - db.writeNative("test3,sex=Male,firstName=Jame,lastName=Lannister age=48") - .futureValue - .right - .get shouldEqual 204 - - db.readJson("SELECT * FROM test3") - .futureValue - .right - .get - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(48), JString("Jame"), JString("Lannister"), JString("Male"))) - ) - - db.bulkWriteNative( - Seq( - "test4,sex=Male,firstName=Jon,lastName=Snow age=24", - "test4,sex=Female,firstName=Deny,lastName=Targaryen age=25" - ) - ) - .futureValue - .right - .get shouldEqual 204 - - db.readJson("SELECT * FROM test4") - .futureValue - .right - .get - .map(jarr => jarr.copy(vs = jarr.vs.tail)) shouldEqual Array( - JArray(Array(JNum(25), JString("Deny"), JString("Targaryen"), JString("Female"))), - JArray(Array(JNum(24), JString("Jon"), JString("Snow"), JString("Male"))) - ) - } - - it should "return grouped result by sex and sum of ages" in { - - db.bulkWriteNative( - Array( - "test5,sex=Male,firstName=Jon,lastName=Snow age=24", - "test5,sex=Male,firstName=Rainer,lastName=Targaryen age=25" - ) - ) - .futureValue - .right - .get shouldEqual 204 - - db.readGroupedJson( - "SELECT SUM(\"age\") FROM \"test5\" GROUP BY \"sex\"", - epoch = Epochs.Nanoseconds - ) - .futureValue - .right - .get - .map { case (k, v) => k.toSeq -> v } shouldEqual Array( - Seq("Male") -> JArray(Array(JNum(0), JNum(49))) - ) - } - - it should "write escaped value" in { - val p = Point("test6") - .addTag("key,", "value,") - .addField("field=key", 1) - - db.writePoint(p).futureValue.right.get shouldEqual 204 - - db.readJson("SELECT * FROM test6").futureValue.right.get.length shouldEqual 1 - - mng.close() shouldEqual {} - io.close() shouldEqual {} - } -} diff --git a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala index df34a6fe..07b2dc40 100644 --- a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala +++ b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala @@ -16,21 +16,20 @@ package com.github.fsanaulla.chronicler.ahc.io -import com.github.fsanaulla.chronicler.ahc.shared.InfluxAhcClient import com.github.fsanaulla.chronicler.ahc.shared.handlers.{ AhcJsonHandler, AhcQueryBuilder, AhcRequestExecutor } -import com.github.fsanaulla.chronicler.ahc.shared.implicits._ +import com.github.fsanaulla.chronicler.ahc.shared.implicits.{fkId, futureFailable, futureFunctor} +import com.github.fsanaulla.chronicler.ahc.shared.{InfluxAhcClient, Uri} import com.github.fsanaulla.chronicler.core.IOClient import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id} import com.github.fsanaulla.chronicler.core.api.{DatabaseApi, MeasurementApi} import com.github.fsanaulla.chronicler.core.components.ResponseHandler import com.github.fsanaulla.chronicler.core.implicits.{applyId, functorId} import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxDBInfo} -import com.softwaremill.sttp.{Response, Uri} -import org.asynchttpclient.AsyncHttpClientConfig +import org.asynchttpclient.{AsyncHttpClientConfig, Response} import scala.concurrent.{ExecutionContext, Future} import scala.reflect.ClassTag @@ -43,12 +42,12 @@ final class AhcIOClient( asyncClientConfig: Option[AsyncHttpClientConfig] )(implicit ex: ExecutionContext) extends InfluxAhcClient(asyncClientConfig) - with IOClient[Future, Id, Response[Array[Byte]], Uri, String] { + with IOClient[Future, Id, Response, Uri, String] { - val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) - implicit val re: AhcRequestExecutor = new AhcRequestExecutor - implicit val rh: ResponseHandler[Id, Response[Array[Byte]]] = new ResponseHandler(jsonHandler) + val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) + implicit val re: AhcRequestExecutor = new AhcRequestExecutor + implicit val rh: ResponseHandler[Id, Response] = new ResponseHandler(jsonHandler) override def database(dbName: String) = new DatabaseApi(dbName, compress) diff --git a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala index 4c0b7c9c..09e00b70 100644 --- a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala +++ b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.io.unit +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.enums.{Consistencies, Epochs, Precisions} import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.DatabaseOperationQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} import scala.language.implicitConversions @@ -31,32 +31,30 @@ import scala.language.implicitConversions * Date: 27.07.17 */ class DatabaseApiOperationQuerySpec - extends FlatSpec - with Matchers - with DatabaseOperationQuery[Uri] { + extends FlatSpec + with Matchers + with DatabaseOperationQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val testDB = "db" val testQuery = "SELECT * FROM test" - implicit def a2Opt[A](a: A): Option[A] = Some(a) - it should "return correct write query" in new AuthEnv { - write(testDB, Consistencies.One, Precisions.Nanoseconds, None) - .toString() shouldEqual queryTester( + write(testDB, Consistencies.One, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester( "/write", List( "db" -> testDB, @@ -67,8 +65,7 @@ class DatabaseApiOperationQuerySpec ) ) - write(testDB, Consistencies.All, Precisions.Nanoseconds, None) - .toString() shouldEqual queryTester( + write(testDB, Consistencies.All, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester( "/write", List( "db" -> testDB, @@ -81,23 +78,23 @@ class DatabaseApiOperationQuerySpec } it should "return correct write query without auth " in new NonAuthEnv { - write(testDB, Consistencies.One, Precisions.Nanoseconds, None).toString() shouldEqual + write(testDB, Consistencies.One, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester("/write", List("db" -> testDB, "consistency" -> "one", "precision" -> "ns")) - write(testDB, Consistencies.One, Precisions.Microseconds, None).toString() shouldEqual + write(testDB, Consistencies.One, Precisions.Microseconds, None).mkUrl shouldEqual queryTester("/write", List("db" -> testDB, "consistency" -> "one", "precision" -> "u")) } it should "return correct single read query" in new AuthEnv { - val queryPrms = List( + val queryPrms: List[(String, String)] = List( "db" -> testDB, "u" -> credentials.get.username, "p" -> credentials.get.password, "epoch" -> "ns", "q" -> "SELECT * FROM test" ) - singleQuery(testDB, testQuery, Epochs.Nanoseconds, pretty = false).toString() shouldEqual + singleQuery(testDB, testQuery, Epochs.Nanoseconds, pretty = false).mkUrl shouldEqual queryTester("/query", queryPrms) } @@ -116,7 +113,7 @@ class DatabaseApiOperationQuerySpec Seq("SELECT * FROM test", "SELECT * FROM test1"), Epochs.Nanoseconds, pretty = false - ).toString() shouldEqual + ).mkUrl shouldEqual queryTester("/query", queryPrms) val queryPrms1: List[(String, String)] = List( @@ -132,7 +129,7 @@ class DatabaseApiOperationQuerySpec Seq("SELECT * FROM test", "SELECT * FROM test1"), Epochs.Nanoseconds, pretty = true - ).toString() shouldEqual + ).mkUrl shouldEqual queryTester("/query", queryPrms1) } } diff --git a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala index b43b7962..50f5c660 100644 --- a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala +++ b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala @@ -26,20 +26,20 @@ package object unit { } def queryTesterAuth(query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&u=${credentials.username.encode}" + s"http://localhost:8086/query?q=${query}&p=${credentials.password}&u=${credentials.username}" def queryTesterAuth(db: String, query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&db=${db.encode}&u=${credentials.username.encode}" + s"http://localhost:8086/query?q=${query}&p=${credentials.password}&db=${db}&u=${credentials.username}" def queryTester(query: String): String = - s"http://localhost:8086/query?q=${query.encode}" + s"http://localhost:8086/query?q=${query}" def queryTester(db: String, query: String): String = - s"http://localhost:8086/query?q=${query.encode}&db=${db.encode}" + s"http://localhost:8086/query?q=${query}&db=${db}" def queryTester(path: String, mp: List[(String, String)]): String = { val queries = mp - .map { case (k, v) => s"$k=${v.encode}" } + .map { case (k, v) => s"$k=$v" } .mkString("&") s"http://localhost:8086$path?$queries" diff --git a/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala b/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala index 8a968822..e2843290 100644 --- a/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala +++ b/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala @@ -16,12 +16,12 @@ package com.github.fsanaulla.chronicler.ahc.management -import com.github.fsanaulla.chronicler.ahc.shared.InfluxAhcClient import com.github.fsanaulla.chronicler.ahc.shared.handlers.{ AhcJsonHandler, AhcQueryBuilder, AhcRequestExecutor } +import com.github.fsanaulla.chronicler.ahc.shared.{InfluxAhcClient, Uri} import com.github.fsanaulla.chronicler.core.ManagementClient import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id} import com.github.fsanaulla.chronicler.core.components.ResponseHandler @@ -32,8 +32,7 @@ import com.github.fsanaulla.chronicler.core.model.{ InfluxCredentials, InfluxDBInfo } -import com.softwaremill.sttp.{Response, Uri} -import org.asynchttpclient.AsyncHttpClientConfig +import org.asynchttpclient.{AsyncHttpClientConfig, Response} import scala.concurrent.{ExecutionContext, Future} @@ -46,12 +45,12 @@ final class AhcManagementClient( val F: Functor[Future], val FK: FunctionK[Id, Future]) extends InfluxAhcClient(asyncClientConfig) - with ManagementClient[Future, Id, Response[Array[Byte]], Uri, String] { + with ManagementClient[Future, Id, Response, Uri, String] { - val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress = false) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) - implicit val re: AhcRequestExecutor = new AhcRequestExecutor - implicit val rh: ResponseHandler[Id, Response[Array[Byte]]] = new ResponseHandler(jsonHandler) + val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress = false) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) + implicit val re: AhcRequestExecutor = new AhcRequestExecutor + implicit val rh: ResponseHandler[Id, Response] = new ResponseHandler(jsonHandler) override def ping: Future[ErrorOr[InfluxDBInfo]] = { re.get(qb.buildQuery("/ping"), compress = false) diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala index 943df473..3b0000ae 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.ContinuousQueries -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,17 +30,18 @@ import org.scalatest.{FlatSpec, Matchers} class ContinuousQueriesSpec extends FlatSpec with Matchers with ContinuousQueries[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val db = "mydb" @@ -48,31 +49,31 @@ class ContinuousQueriesSpec extends FlatSpec with Matchers with ContinuousQuerie val query = "SELECT mean(bees) AS mean_bees INTO aggregate_bees FROM farm GROUP BY time(30m)" "ContinuousQuerys operation" should "generate correct show query" in new AuthEnv { - showCQQuery.toString() shouldEqual queryTesterAuth("SHOW CONTINUOUS QUERIES")(credentials.get) + showCQQuery.mkUrl shouldEqual queryTesterAuth("SHOW CONTINUOUS QUERIES")(credentials.get) } it should "generate correct drop query" in new AuthEnv { - dropCQQuery(db, cq).toString() shouldEqual queryTesterAuth(s"DROP CONTINUOUS QUERY $cq ON $db")( + dropCQQuery(db, cq).mkUrl shouldEqual queryTesterAuth(s"DROP CONTINUOUS QUERY $cq ON $db")( credentials.get ) } it should "generate correct create query" in new AuthEnv { - createCQQuery(db, cq, query).toString() shouldEqual queryTesterAuth( + createCQQuery(db, cq, query).mkUrl shouldEqual queryTesterAuth( s"CREATE CONTINUOUS QUERY $cq ON $db BEGIN $query END" )(credentials.get) } it should "generate correct show query without auth" in new NonAuthEnv { - showCQQuery.toString() shouldEqual queryTester("SHOW CONTINUOUS QUERIES") + showCQQuery.mkUrl shouldEqual queryTester("SHOW CONTINUOUS QUERIES") } it should "generate correct drop query without auth" in new NonAuthEnv { - dropCQQuery(db, cq).toString() shouldEqual queryTester(s"DROP CONTINUOUS QUERY $cq ON $db") + dropCQQuery(db, cq).mkUrl shouldEqual queryTester(s"DROP CONTINUOUS QUERY $cq ON $db") } it should "generate correct create query without auth" in new NonAuthEnv { - createCQQuery(db, cq, query).toString() shouldEqual queryTester( + createCQQuery(db, cq, query).mkUrl shouldEqual queryTester( s"CREATE CONTINUOUS QUERY $cq ON $db BEGIN $query END" ) } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala index 6ea4fa4a..800e552c 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.DataManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,17 +30,18 @@ import org.scalatest.{FlatSpec, Matchers} class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val testDb: String = "testDb" @@ -52,58 +53,56 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement val testOffset = Some(3) it should "generate correct 'create database' query" in new AuthEnv { - createDatabaseQuery(testDb, None, None, None, None).toString() shouldEqual + createDatabaseQuery(testDb, None, None, None, None).mkUrl shouldEqual queryTesterAuth(s"CREATE DATABASE $testDb")(credentials.get) - createDatabaseQuery(testDb, None, Some(2), None, None).toString() shouldEqual + createDatabaseQuery(testDb, None, Some(2), None, None).mkUrl shouldEqual queryTesterAuth(s"CREATE DATABASE $testDb WITH REPLICATION 2")(credentials.get) } it should "generate correct 'drop database' query" in new AuthEnv { - dropDatabaseQuery(testDb).toString() shouldEqual + dropDatabaseQuery(testDb).mkUrl shouldEqual queryTesterAuth(s"DROP DATABASE $testDb")(credentials.get) } it should "generate correct 'drop series' query" in new AuthEnv { - dropSeriesQuery(testDb, testSeries).toString() shouldEqual + dropSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTesterAuth(testDb, s"DROP SERIES FROM $testSeries")(credentials.get) } it should "generate correct 'drop measurement' query" in new AuthEnv { - dropMeasurementQuery(testDb, testMeasurement).toString() shouldEqual + dropMeasurementQuery(testDb, testMeasurement).mkUrl shouldEqual queryTesterAuth(testDb, s"DROP MEASUREMENT $testMeasurement")(credentials.get) } it should "generate correct 'drop all series' query" in new AuthEnv { - deleteAllSeriesQuery(testDb, testSeries).toString() shouldEqual + deleteAllSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTesterAuth(testDb, s"DELETE FROM $testSeries")(credentials.get) } it should "generate correct 'show measurement' query" in new AuthEnv { - showMeasurementQuery(testDb).toString() shouldEqual + showMeasurementQuery(testDb).mkUrl shouldEqual queryTesterAuth(testDb, "SHOW MEASUREMENTS")(credentials.get) } it should "generate correct 'show database' query" in new AuthEnv { - showDatabasesQuery.toString() shouldEqual + showDatabasesQuery.mkUrl shouldEqual queryTesterAuth(s"SHOW DATABASES")(credentials.get) } it should "generate correct 'show tag-key' query" in new AuthEnv { - showTagKeysQuery(testDb, testMeasurement, testWhereClause, testLimit, testOffset) - .toString() shouldEqual + showTagKeysQuery(testDb, testMeasurement, testWhereClause, testLimit, testOffset).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG KEYS ON $testDb FROM $testMeasurement WHERE ${testWhereClause.get} LIMIT ${testLimit.get} OFFSET ${testOffset.get}" )(credentials.get) - showTagKeysQuery(testDb, testMeasurement, testWhereClause, None, None).toString() shouldEqual + showTagKeysQuery(testDb, testMeasurement, testWhereClause, None, None).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG KEYS ON $testDb FROM $testMeasurement WHERE ${testWhereClause.get}" )(credentials.get) } it should "generate correct 'show tag-value' query" in new AuthEnv { - showTagValuesQuery(testDb, testMeasurement, Seq("key"), testWhereClause, testLimit, testOffset) - .toString() shouldEqual + showTagValuesQuery(testDb, testMeasurement, Seq("key"), testWhereClause, testLimit, testOffset).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY = key WHERE ${testWhereClause.get} LIMIT ${testLimit.get} OFFSET ${testOffset.get}" )(credentials.get) @@ -114,14 +113,14 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement testWhereClause, testLimit, testOffset - ).toString() shouldEqual + ).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY IN (key,key1) WHERE ${testWhereClause.get} LIMIT ${testLimit.get} OFFSET ${testOffset.get}" )(credentials.get) } it should "generate correct 'show field-key' query" in new AuthEnv { - showFieldKeysQuery(testDb, testMeasurement).toString() shouldEqual + showFieldKeysQuery(testDb, testMeasurement).mkUrl shouldEqual queryTesterAuth(s"SHOW FIELD KEYS ON $testDb FROM $testMeasurement")(credentials.get) } @@ -132,7 +131,7 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement None, None, None - ).toString() shouldEqual queryTester(s"CREATE DATABASE $testDb WITH DURATION 3d") + ).mkUrl shouldEqual queryTester(s"CREATE DATABASE $testDb WITH DURATION 3d") createDatabaseQuery( testDb, @@ -140,36 +139,40 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement Some(2), Some("1d"), Some("testName") - ).toString() shouldEqual queryTester( + ).mkUrl shouldEqual queryTester( s"CREATE DATABASE $testDb WITH DURATION 3d REPLICATION 2 SHARD DURATION 1d NAME testName" ) } it should "generate correct 'drop database' query without auth" in new NonAuthEnv { - dropDatabaseQuery(testDb).toString() shouldEqual queryTester(s"DROP DATABASE $testDb") + dropDatabaseQuery(testDb).mkUrl shouldEqual queryTester(s"DROP DATABASE $testDb") } it should "generate correct 'drop series' query without auth" in new NonAuthEnv { - dropSeriesQuery(testDb, testSeries) - .toString() shouldEqual queryTester(testDb, s"DROP SERIES FROM $testSeries") + dropSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTester( + testDb, + s"DROP SERIES FROM $testSeries" + ) } it should "generate auth correct 'drop measurement' query without auth" in new NonAuthEnv { - dropMeasurementQuery(testDb, testMeasurement).toString() shouldEqual + dropMeasurementQuery(testDb, testMeasurement).mkUrl shouldEqual queryTester(testDb, s"DROP MEASUREMENT $testMeasurement") } it should "generate correct auth 'drop all series' query without auth" in new NonAuthEnv { - deleteAllSeriesQuery(testDb, testSeries) - .toString() shouldEqual queryTester(testDb, s"DELETE FROM $testSeries") + deleteAllSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTester( + testDb, + s"DELETE FROM $testSeries" + ) } it should "generate correct auth 'show measurement' query without auth" in new NonAuthEnv { - showMeasurementQuery(testDb).toString() shouldEqual queryTester(testDb, "SHOW MEASUREMENTS") + showMeasurementQuery(testDb).mkUrl shouldEqual queryTester(testDb, "SHOW MEASUREMENTS") } it should "generate correct 'show database' query without auth" in new NonAuthEnv { - showDatabasesQuery.toString() shouldEqual queryTester(s"SHOW DATABASES") + showDatabasesQuery.mkUrl shouldEqual queryTester(s"SHOW DATABASES") } it should "generate correct 'show tag-key' query without auth" in new NonAuthEnv { @@ -179,7 +182,7 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement None, None, None - ).toString() shouldEqual queryTester(s"SHOW TAG KEYS ON $testDb FROM $testMeasurement") + ).mkUrl shouldEqual queryTester(s"SHOW TAG KEYS ON $testDb FROM $testMeasurement") showTagKeysQuery( testDb, @@ -187,24 +190,22 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement testWhereClause, testLimit, None - ).toString() shouldEqual queryTester( + ).mkUrl shouldEqual queryTester( s"SHOW TAG KEYS ON $testDb FROM $testMeasurement WHERE ${testWhereClause.get} LIMIT ${testLimit.get}" ) } it should "generate correct 'show tag-value' query without auth" in new NonAuthEnv { - showTagValuesQuery(testDb, testMeasurement, Seq("key"), None, None, None) - .toString() shouldEqual queryTester( + showTagValuesQuery(testDb, testMeasurement, Seq("key"), None, None, None).mkUrl shouldEqual queryTester( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY = key" ) - showTagValuesQuery(testDb, testMeasurement, Seq("key", "key1"), testWhereClause, None, None) - .toString() shouldEqual queryTester( + showTagValuesQuery(testDb, testMeasurement, Seq("key", "key1"), testWhereClause, None, None).mkUrl shouldEqual queryTester( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY IN (key,key1) WHERE ${testWhereClause.get}" ) } it should "generate correct 'show field-key' query without auth" in new NonAuthEnv { - showFieldKeysQuery(testDb, testMeasurement).toString() shouldEqual + showFieldKeysQuery(testDb, testMeasurement).mkUrl shouldEqual queryTester(s"SHOW FIELD KEYS ON $testDb FROM $testMeasurement") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala index 171b73df..4d070bef 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.QueriesManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,32 +30,33 @@ import org.scalatest.{FlatSpec, Matchers} class QueriesManagementQuerySpec extends FlatSpec with Matchers with QueriesManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } it should "show query" in new AuthEnv { - showQuerysQuery.toString() shouldEqual queryTesterAuth("SHOW QUERIES")(credentials.get) + showQuerysQuery.mkUrl shouldEqual queryTesterAuth("SHOW QUERIES")(credentials.get) } it should "kill query" in new AuthEnv { - killQueryQuery(5).toString() shouldEqual queryTesterAuth("KILL QUERY 5")(credentials.get) + killQueryQuery(5).mkUrl shouldEqual queryTesterAuth("KILL QUERY 5")(credentials.get) } it should "show query without auth" in new NonAuthEnv { - showQuerysQuery.toString() shouldEqual queryTester("SHOW QUERIES") + showQuerysQuery.mkUrl shouldEqual queryTester("SHOW QUERIES") } it should "kill query without auth" in new NonAuthEnv { - killQueryQuery(5).toString() shouldEqual queryTester("KILL QUERY 5") + killQueryQuery(5).mkUrl shouldEqual queryTester("KILL QUERY 5") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala index 40311556..4d2fcc4b 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.duration._ import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.RetentionPolicyManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} import scala.language.postfixOps @@ -31,68 +31,67 @@ import scala.language.postfixOps * Date: 27.07.17 */ class RetentionPolicyManagementQuerySpec - extends FlatSpec - with Matchers - with RetentionPolicyManagementQuery[Uri] { + extends FlatSpec + with Matchers + with RetentionPolicyManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val testRPName = "testRP" val testDBName = "testDB" it should "create retention policy" in new AuthEnv { - createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours), default = true) - .toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours), default = true).mkUrl shouldEqual queryTesterAuth( s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 SHARD DURATION 4h DEFAULT" )(credentials.get) - createRPQuery(testRPName, testDBName, 4 hours, 3, None, default = true).toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, None, default = true).mkUrl shouldEqual queryTesterAuth( s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 DEFAULT" )(credentials.get) - createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours)).toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours)).mkUrl shouldEqual queryTesterAuth( s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 SHARD DURATION 4h" )(credentials.get) } it should "create retention policy without auth" in new NonAuthEnv { - createRPQuery(testRPName, testDBName, 4 hours, 3, None).toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, None).mkUrl shouldEqual queryTester(s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3") } it should "drop retention policy" in new AuthEnv { - dropRPQuery(testRPName, testDBName).toString() shouldEqual + dropRPQuery(testRPName, testDBName).mkUrl shouldEqual queryTesterAuth(s"DROP RETENTION POLICY $testRPName ON $testDBName")(credentials.get) } it should "drop retention policy without auth" in new NonAuthEnv { - dropRPQuery(testRPName, testDBName).toString() shouldEqual + dropRPQuery(testRPName, testDBName).mkUrl shouldEqual queryTester(s"DROP RETENTION POLICY $testRPName ON $testDBName") } it should "update retention policy" in new AuthEnv { - updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), Some(4 hours), default = true) - .toString() shouldEqual + updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), Some(4 hours), default = true).mkUrl shouldEqual queryTesterAuth( s"ALTER RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 SHARD DURATION 4h DEFAULT" )(credentials.get) - updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), None).toString() shouldEqual + updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), None).mkUrl shouldEqual queryTesterAuth( s"ALTER RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3" )(credentials.get) @@ -100,9 +99,9 @@ class RetentionPolicyManagementQuerySpec } it should "update retention policy without auth" in new NonAuthEnv { - updateRPQuery(testRPName, testDBName, Some(4 hours), None, None).toString() shouldEqual + updateRPQuery(testRPName, testDBName, Some(4 hours), None, None).mkUrl shouldEqual queryTester(s"ALTER RETENTION POLICY $testRPName ON $testDBName DURATION 4h") - updateRPQuery(testRPName, testDBName, None, Some(3), Some(4 hours)).toString() shouldEqual + updateRPQuery(testRPName, testDBName, None, Some(3), Some(4 hours)).mkUrl shouldEqual queryTester( s"ALTER RETENTION POLICY $testRPName ON $testDBName REPLICATION 3 SHARD DURATION 4h" ) diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala index eb7a79e4..7db28f3c 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.ShardManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,42 +30,43 @@ import org.scalatest.{FlatSpec, Matchers} class ShardManagementQuerySpec extends FlatSpec with Matchers with ShardManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } it should "drop shard by id" in new AuthEnv { - dropShardQuery(5).toString() shouldEqual queryTesterAuth("DROP SHARD 5")(credentials.get) + dropShardQuery(5).mkUrl shouldEqual queryTesterAuth("DROP SHARD 5")(credentials.get) } it should "drop shard by id without auth" in new NonAuthEnv { - dropShardQuery(5).toString() shouldEqual queryTester("DROP SHARD 5") + dropShardQuery(5).mkUrl shouldEqual queryTester("DROP SHARD 5") } it should "show shards" in new AuthEnv { - showShardsQuery.toString() shouldEqual queryTesterAuth("SHOW SHARDS")(credentials.get) + showShardsQuery.mkUrl shouldEqual queryTesterAuth("SHOW SHARDS")(credentials.get) } it should "show shards without auth" in new NonAuthEnv { - showShardsQuery.toString() shouldEqual queryTester("SHOW SHARDS") + showShardsQuery.mkUrl shouldEqual queryTester("SHOW SHARDS") } it should "show shard groups" in new AuthEnv { - showShardGroupsQuery.toString() shouldEqual queryTesterAuth("SHOW SHARD GROUPS")( + showShardGroupsQuery.mkUrl shouldEqual queryTesterAuth("SHOW SHARD GROUPS")( credentials.get ) } it should "show shard groups without auth" in new NonAuthEnv { - showShardGroupsQuery.toString() shouldEqual queryTester("SHOW SHARD GROUPS") + showShardGroupsQuery.mkUrl shouldEqual queryTester("SHOW SHARD GROUPS") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala index 232130fc..d908cdf3 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.enums.Destinations import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.SubscriptionsManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -29,22 +29,23 @@ import org.scalatest.{FlatSpec, Matchers} * Date: 21.08.17 */ class SubscriptionsManagementQuerySpec - extends FlatSpec - with Matchers - with SubscriptionsManagementQuery[Uri] { + extends FlatSpec + with Matchers + with SubscriptionsManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val subName = "subs" @@ -58,34 +59,34 @@ class SubscriptionsManagementQuerySpec s"CREATE SUBSCRIPTION $subName ON $dbName.$rpName DESTINATIONS $destType $resHosts" it should "create subs query" in new AuthEnv { - createSubscriptionQuery(subName, dbName, rpName, destType, hosts).toString() shouldEqual + createSubscriptionQuery(subName, dbName, rpName, destType, hosts).mkUrl shouldEqual queryTesterAuth(createRes)(credentials.get) } it should "create subs query without auth" in new NonAuthEnv { - createSubscriptionQuery(subName, dbName, rpName, destType, hosts).toString() shouldEqual + createSubscriptionQuery(subName, dbName, rpName, destType, hosts).mkUrl shouldEqual queryTester(createRes) } val dropRes = s"DROP SUBSCRIPTION $subName ON $dbName.$rpName" it should "drop subs query" in new AuthEnv { - dropSubscriptionQuery(subName, dbName, rpName).toString() shouldEqual + dropSubscriptionQuery(subName, dbName, rpName).mkUrl shouldEqual queryTesterAuth(dropRes)(credentials.get) } it should "drop subs query without auth" in new NonAuthEnv { - dropSubscriptionQuery(subName, dbName, rpName).toString() shouldEqual queryTester(dropRes) + dropSubscriptionQuery(subName, dbName, rpName).mkUrl shouldEqual queryTester(dropRes) } val showRes = "SHOW SUBSCRIPTIONS" it should "show subs query" in new AuthEnv { - showSubscriptionsQuery.toString() shouldEqual + showSubscriptionsQuery.mkUrl shouldEqual queryTesterAuth(showRes)(credentials.get) } it should "show subs query without auth" in new NonAuthEnv { - showSubscriptionsQuery.toString() shouldEqual queryTester(showRes) + showSubscriptionsQuery.mkUrl shouldEqual queryTester(showRes) } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala index f5da6790..0df89ef3 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.enums.Privileges import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.UserManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -31,17 +31,18 @@ import org.scalatest.{FlatSpec, Matchers} class UserManagementQuerySpec extends FlatSpec with Matchers with UserManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } private val testUsername = "TEST_USER_NAME" @@ -50,17 +51,17 @@ class UserManagementQuerySpec extends FlatSpec with Matchers with UserManagement private val testPrivilege = Privileges.ALL it should "create user query" in new AuthEnv { - createUserQuery(testUsername, testPassword).toString() shouldEqual + createUserQuery(testUsername, testPassword).mkUrl shouldEqual queryTesterAuth(s"CREATE USER $testUsername WITH PASSWORD '$testPassword'")(credentials.get) } it should "create user query without auth" in new NonAuthEnv { - createUserQuery(testUsername, testPassword).toString() shouldEqual + createUserQuery(testUsername, testPassword).mkUrl shouldEqual queryTester(s"CREATE USER $testUsername WITH PASSWORD '$testPassword'") } it should "create admin user query" in new AuthEnv { - createAdminQuery(testUsername, testPassword).toString() shouldEqual + createAdminQuery(testUsername, testPassword).mkUrl shouldEqual queryTesterAuth( s"CREATE USER $testUsername WITH PASSWORD '$testPassword' WITH ALL PRIVILEGES" )(credentials.get) @@ -68,88 +69,88 @@ class UserManagementQuerySpec extends FlatSpec with Matchers with UserManagement } it should "create admin user query without auth" in new NonAuthEnv { - createAdminQuery(testUsername, testPassword).toString() shouldEqual + createAdminQuery(testUsername, testPassword).mkUrl shouldEqual queryTester(s"CREATE USER $testUsername WITH PASSWORD '$testPassword' WITH ALL PRIVILEGES") } it should "drop user query" in new AuthEnv { - dropUserQuery(testUsername).toString() shouldEqual + dropUserQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"DROP USER $testUsername")(credentials.get) } it should "drop user query without auth" in new NonAuthEnv { - dropUserQuery(testUsername).toString() shouldEqual + dropUserQuery(testUsername).mkUrl shouldEqual queryTester(s"DROP USER $testUsername") } it should "show users query" in new AuthEnv { - showUsersQuery.toString() shouldEqual queryTesterAuth("SHOW USERS")(credentials.get) + showUsersQuery.mkUrl shouldEqual queryTesterAuth("SHOW USERS")(credentials.get) } it should "show users query without auth" in new NonAuthEnv { - showUsersQuery.toString() shouldEqual queryTester("SHOW USERS") + showUsersQuery.mkUrl shouldEqual queryTester("SHOW USERS") } it should "show user privileges query" in new AuthEnv { - showUserPrivilegesQuery(testUsername).toString() shouldEqual + showUserPrivilegesQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"SHOW GRANTS FOR $testUsername")(credentials.get) } it should "show user privileges query without auth" in new NonAuthEnv { - showUserPrivilegesQuery(testUsername).toString() shouldEqual queryTester( + showUserPrivilegesQuery(testUsername).mkUrl shouldEqual queryTester( s"SHOW GRANTS FOR $testUsername" ) } it should "make admin query" in new AuthEnv { - makeAdminQuery(testUsername).toString() shouldEqual + makeAdminQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"GRANT ALL PRIVILEGES TO $testUsername")(credentials.get) } it should "make admin query without auth" in new NonAuthEnv { - makeAdminQuery(testUsername).toString() shouldEqual queryTester( + makeAdminQuery(testUsername).mkUrl shouldEqual queryTester( s"GRANT ALL PRIVILEGES TO $testUsername" ) } it should "disable admin query" in new AuthEnv { - disableAdminQuery(testUsername).toString() shouldEqual + disableAdminQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"REVOKE ALL PRIVILEGES FROM $testUsername")(credentials.get) } it should "disable admin query without auth" in new NonAuthEnv { - disableAdminQuery(testUsername).toString() shouldEqual queryTester( + disableAdminQuery(testUsername).mkUrl shouldEqual queryTester( s"REVOKE ALL PRIVILEGES FROM $testUsername" ) } it should "set user password query" in new AuthEnv { - setUserPasswordQuery(testUsername, testPassword).toString() shouldEqual + setUserPasswordQuery(testUsername, testPassword).mkUrl shouldEqual queryTesterAuth(s"SET PASSWORD FOR $testUsername = '$testPassword'")(credentials.get) } it should "set user password query without auth" in new NonAuthEnv { - setUserPasswordQuery(testUsername, testPassword).toString() shouldEqual + setUserPasswordQuery(testUsername, testPassword).mkUrl shouldEqual queryTester(s"SET PASSWORD FOR $testUsername = '$testPassword'") } it should "set privileges query" in new AuthEnv { - setPrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + setPrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTesterAuth(s"GRANT $testPrivilege ON $testDatabase TO $testUsername")(credentials.get) } it should "set privileges query without auth" in new NonAuthEnv { - setPrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + setPrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTester(s"GRANT $testPrivilege ON $testDatabase TO $testUsername") } it should "revoke privileges query" in new AuthEnv { - revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTesterAuth(s"REVOKE $testPrivilege ON $testDatabase FROM $testUsername")(credentials.get) } it should "revoke privileges query without auth" in new NonAuthEnv { - revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTester(s"REVOKE $testPrivilege ON $testDatabase FROM $testUsername") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala index e8311bbf..5b6b76e6 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala @@ -26,20 +26,20 @@ package object management { } def queryTesterAuth(query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?u=${credentials.username.encode}&p=${credentials.password.encode}&q=${query.encode}" + s"http://localhost:8086/query?u=${credentials.username}&p=${credentials.password}&q=${query}" def queryTesterAuth(db: String, query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?db=${db.encode}&u=${credentials.username.encode}&p=${credentials.password.encode}&q=${query.encode}" + s"http://localhost:8086/query?db=${db}&u=${credentials.username}&p=${credentials.password}&q=${query}" def queryTester(query: String): String = - s"http://localhost:8086/query?q=${query.encode}" + s"http://localhost:8086/query?q=${query}" def queryTester(db: String, query: String): String = - s"http://localhost:8086/query?db=${db.encode}&q=${query.encode}" + s"http://localhost:8086/query?db=${db}&q=${query}" def queryTester(path: String, queryPrms: List[(String, String)]): String = { val s = queryPrms - .map { case (k, v) => s"$k=${v.encode}" } + .map { case (k, v) => s"$k=$v" } .mkString("&") s"http://localhost:8086$path?$s" diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala index ad4d0149..edbee8f2 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala @@ -16,15 +16,16 @@ package com.github.fsanaulla.chronicler.ahc.shared -import com.softwaremill.sttp.SttpBackend -import com.softwaremill.sttp.asynchttpclient.future.AsyncHttpClientFutureBackend -import org.asynchttpclient.AsyncHttpClientConfig - -import scala.concurrent.Future +import org.asynchttpclient.{AsyncHttpClientConfig, DefaultAsyncHttpClient} class InfluxAhcClient(asyncClientConfig: Option[AsyncHttpClientConfig]) { self: AutoCloseable => - private[ahc] implicit val backend: SttpBackend[Future, Nothing] = - asyncClientConfig.fold(AsyncHttpClientFutureBackend())(AsyncHttpClientFutureBackend.usingConfig) + private[ahc] val schema: String = asyncClientConfig + .map(_.isUseOpenSsl) + .map(if (_) "https" else "http") + .getOrElse("http") + + private[ahc] implicit val client: DefaultAsyncHttpClient = + asyncClientConfig.fold(new DefaultAsyncHttpClient())(new DefaultAsyncHttpClient(_)) - override def close(): Unit = backend.close() + override def close(): Unit = client.close() } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/Uri.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/Uri.scala new file mode 100644 index 00000000..96b41412 --- /dev/null +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/Uri.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2017-2019 Faiaz Sanaulla + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.fsanaulla.chronicler.ahc.shared + +import java.net.URLEncoder + +import org.asynchttpclient.Param + +/*** + * Syntetic container for request information + * + * @param schema - request schema + * @param host - request api address + * @param port - request port + * @param query - query path + * @param params - query params + * + * @since 0.6.0 + */ +final case class Uri( + schema: String, + host: String, + port: Int, + query: String, + params: List[Param] = Nil) { + + /** Append query parameter */ + def addParam(param: Param): Uri = + this.copy(params = param :: params) + + /** Create string based representation */ + def mkUrl: String = { + val encode: String => String = + p => URLEncoder.encode(p, "UTF-8") + + val queryParams = params.reverse + .map(p => p.getName + "=" + encode(p.getValue)) + .mkString("&") + + schema + "://" + host + ":" + port + query + "?" + queryParams + } +} diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/alias/package.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/alias/package.scala deleted file mode 100644 index ccdbbf2b..00000000 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/alias/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2017-2019 Faiaz Sanaulla - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.github.fsanaulla.chronicler.ahc.shared - -import com.softwaremill.sttp.{Id, RequestT} -import org.typelevel.jawn.ast.JValue - -package object alias { - type Request = RequestT[Id, JValue, Nothing] -} diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala index 2e10c6ab..ecee4190 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala @@ -1,41 +1,56 @@ +/* + * Copyright 2017-2019 Faiaz Sanaulla + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.github.fsanaulla.chronicler.ahc.shared.handlers -import java.nio.ByteBuffer import java.nio.charset.{Charset, StandardCharsets} import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id} import com.github.fsanaulla.chronicler.core.components.JsonHandler -import com.github.fsanaulla.chronicler.core.either.EitherOps import com.github.fsanaulla.chronicler.core.encoding.encodingFromContentType +import com.github.fsanaulla.chronicler.core.gzip import com.github.fsanaulla.chronicler.core.implicits._ import com.github.fsanaulla.chronicler.core.jawn.RichJParser -import com.softwaremill.sttp.Response +import org.asynchttpclient.Response import org.typelevel.jawn.ast.{JParser, JValue} -private[ahc] final class AhcJsonHandler(compress: Boolean) - extends JsonHandler[Id, Response[Array[Byte]]] { +import scala.jdk.CollectionConverters._ - def responseBody(response: Response[Array[Byte]]): ErrorOr[JValue] = { - val ethBts = response.rawErrorBody -// val maybeDecompressed = if (compress) ethBts.mapRight(gzip.decompress) else ethBts +private[ahc] final class AhcJsonHandler(compress: Boolean) extends JsonHandler[Id, Response] { - val encoding: Charset = response.contentType + def responseBody(response: Response): ErrorOr[JValue] = { + val bodyBts = response.getResponseBodyAsBytes + val maybeDecompressed = if (compress) gzip.decompress(bodyBts) else bodyBts + val encoding: Charset = Option(response.getContentType) .flatMap(encodingFromContentType) .map(Charset.forName) .getOrElse(StandardCharsets.UTF_8) - ethBts - .mapRight(new String(_, encoding)) - .mapRight(JParser.parseFromStringOrNull) - .flatMapLeft { bt => - val btBuff = ByteBuffer.wrap(bt) - JParser.parseFromByteBufferEither(ByteBuffer.wrap(bt)) - } + val bodyStr = new String(maybeDecompressed, encoding) + + JParser.parseFromStringEither(bodyStr) } - def responseHeader(response: Response[Array[Byte]]): Seq[(String, String)] = - response.headers + def responseHeader(response: Response): List[(String, String)] = + response.getHeaders + .entries() + .asScala + .toList + .map(e => e.getKey -> e.getValue) - def responseCode(response: Response[Array[Byte]]): Int = - response.code + def responseCode(response: Response): Int = + response.getStatusCode } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala index f70aab50..8a4f8e1d 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala @@ -16,38 +16,35 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.core.components.QueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials -import com.softwaremill.sttp.Uri.QueryFragment -import com.softwaremill.sttp.Uri.QueryFragment.KeyValue -import com.softwaremill.sttp._ +import org.asynchttpclient.Param import scala.annotation.tailrec private[ahc] class AhcQueryBuilder( + schema: String, host: String, port: Int, credentials: Option[InfluxCredentials]) extends QueryBuilder[Uri](credentials) { - override def buildQuery(url: String): Uri = - Uri(host = host, port).path(url) + override def buildQuery(query: String): Uri = + Uri(schema, host, port, query) - override def buildQuery(uri: String, queryParams: List[(String, String)]): Uri = { - val u = Uri(host = host, port).path(uri) - val encoding = Uri.QueryFragmentEncoding.All - val kvLst = queryParams.map { - case (k, v) => KeyValue(k, v, valueEncoding = encoding) - } + override def buildQuery(query: String, queryParams: List[(String, String)]): Uri = { + val u = buildQuery(query) + val params = queryParams.map { case (k, v) => new Param(k, v) } @tailrec - def addQueryParam(u: Uri, lst: Seq[QueryFragment]): Uri = { + def addQueryParam(u: Uri, lst: List[Param]): Uri = { lst match { case Nil => u - case h :: tail => addQueryParam(u.queryFragment(h), tail) + case h :: tail => addQueryParam(u.addParam(h), tail) } } - addQueryParam(u, kvLst) + addQueryParam(u, params) } } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala index 1b27d07a..8f493d63 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala @@ -16,22 +16,16 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.core.components.RequestExecutor import com.github.fsanaulla.chronicler.core.gzip -import com.softwaremill.sttp.{ - asByteArray, - emptyRequest, - HeaderNames, - MediaTypes, - Response, - SttpBackend, - Uri -} +import org.asynchttpclient.{AsyncHttpClient, Response} +import scala.compat.java8.FutureConverters._ import scala.concurrent.Future -private[ahc] final class AhcRequestExecutor()(implicit backend: SttpBackend[Future, Nothing]) - extends RequestExecutor[Future, Response[Array[Byte]], Uri, String] { +private[ahc] final class AhcRequestExecutor()(implicit client: AsyncHttpClient) + extends RequestExecutor[Future, Response, Uri, String] { /** * Execute uri @@ -39,36 +33,30 @@ private[ahc] final class AhcRequestExecutor()(implicit backend: SttpBackend[Futu * @param uri - request uri * @return - Return wrapper response */ - override def get(uri: Uri, compress: Boolean): Future[Response[Array[Byte]]] = { - val request = emptyRequest.get(uri) - - // currently not supported -// val maybeEncoded = -// if (compress) request.acceptEncoding("gzip") -// else request - - request - .response(asByteArray) - .send() + override def get(uri: Uri, compress: Boolean): Future[Response] = { + client.prepareGet(uri.mkUrl).execute.toCompletableFuture.toScala } override def post( uri: Uri, body: String, compress: Boolean - ): Future[Response[Array[Byte]]] = { - val req = emptyRequest.post(uri).response(asByteArray) + ): Future[Response] = { + val req = client.preparePost(uri.mkUrl) val maybeEncoded = if (compress) { val (length, data) = gzip.compress(body.getBytes()) // it fails with input stream, using byte array instead req - .body(data) - .header(HeaderNames.ContentEncoding, "gzip", replaceExisting = true) - .contentType(MediaTypes.Binary) - .contentLength(length) - } else req.body(body) + .setBody(data) + .setHeader("Content-Encoding", "gzip") + .setHeader("Content-Type", "application/octet-stream") + .setHeader("Content-Length", length) + } else req.setBody(body.getBytes()) - maybeEncoded.send() + maybeEncoded + .execute() + .toCompletableFuture + .toScala } /** @@ -76,10 +64,11 @@ private[ahc] final class AhcRequestExecutor()(implicit backend: SttpBackend[Futu * * @param uri - request uri */ - override def post(uri: Uri): Future[Response[Array[Byte]]] = { - emptyRequest - .post(uri) - .response(asByteArray) - .send() + override def post(uri: Uri): Future[Response] = { + client + .preparePost(uri.mkUrl) + .execute() + .toCompletableFuture + .toScala } } diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala index 35d1e8ea..b7464aa0 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala @@ -16,7 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers -import com.softwaremill.sttp.Response +import java.nio.ByteBuffer + +import io.netty.buffer.Unpooled +import org.asynchttpclient.Response +import org.asynchttpclient.netty.EagerResponseBodyPart import org.scalatest.concurrent.ScalaFutures import org.scalatest.{FlatSpec, Matchers, OptionValues} import org.typelevel.jawn.ast._ @@ -60,7 +64,18 @@ class AhcJsonHandlerSpec extends FlatSpec with Matchers with ScalaFutures with O ] }""" - val resp: Response[Array[Byte]] = Response.ok(singleStrJson.getBytes()) + val resp: Response = { + val b = new Response.ResponseBuilder() + + b.accumulate( + new EagerResponseBodyPart( + Unpooled.copiedBuffer(ByteBuffer.wrap(singleStrJson.getBytes())), + true + ) + ) + + b.build + } val result: JValue = JParser.parseFromString(singleStrJson).get diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala index 487c5b07..f73fdd93 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala @@ -23,7 +23,7 @@ class AhcQueryBuilderSpec extends FlatSpec with Matchers { val host = "localhost" val port = 8080 - val qb = new AhcQueryBuilder(host, port, None) + val qb = new AhcQueryBuilder("http", host, port, None) it should "properly generate URI" in { val queryMap = List( diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala index b9bc1816..fe70f9fc 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala @@ -16,10 +16,14 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers +import java.nio.ByteBuffer + import com.github.fsanaulla.chronicler.core.components.ResponseHandler import com.github.fsanaulla.chronicler.core.implicits._ import com.github.fsanaulla.chronicler.core.model.ContinuousQuery -import com.softwaremill.sttp.Response +import io.netty.buffer.Unpooled +import org.asynchttpclient.Response +import org.asynchttpclient.netty.EagerResponseBodyPart import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Second, Seconds, Span} import org.scalatest.{FlatSpec, Matchers} @@ -47,8 +51,18 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { val rh = new ResponseHandler(jsonHandler) - implicit def str2resp(str: String): Response[JValue] = - Response.ok(p.parseFromString(str).get) + def buildResponse(bts: Array[Byte]): Response = { + val b = new Response.ResponseBuilder() + + b.accumulate( + new EagerResponseBodyPart( + Unpooled.copiedBuffer(ByteBuffer.wrap(bts)), + true + ) + ) + + b.build + } it should "extract single query result from response" in { @@ -92,7 +106,7 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { JArray(Array(JString("2015-06-11T20:46:02Z"), JNum(0.64))) ) - rh.queryResultJson(Response.ok(singleResponse)).right.get shouldEqual result + rh.queryResultJson(buildResponse(singleResponse)).right.get shouldEqual result } it should "extract bulk query results from response" in { @@ -149,7 +163,7 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { |} """.stripMargin.getBytes() - rh.bulkQueryResultJson(Response.ok(bulkResponse)).right.get shouldEqual Array( + rh.bulkQueryResultJson(buildResponse(bulkResponse)).right.get shouldEqual Array( Array( JArray(Array(JString("2015-01-29T21:55:43.702900257Z"), JNum(2))), JArray(Array(JString("2015-01-29T21:55:43.702900257Z"), JNum(0.55))), @@ -216,7 +230,8 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { } """.getBytes() - val cqi = rh.toCqQueryResult(Response.ok(cqStrJson)).right.get.filter(_.queries.nonEmpty).head + val cqi = + rh.toCqQueryResult(buildResponse(cqStrJson)).right.get.filter(_.queries.nonEmpty).head cqi.dbName shouldEqual "mydb" cqi.queries.head shouldEqual ContinuousQuery( "cq", @@ -238,7 +253,7 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { |} """.stripMargin.getBytes() - jsonHandler.responseErrorMsgOpt(Response.ok(errorResponse)).right.get shouldEqual Some( + jsonHandler.responseErrorMsgOpt(buildResponse(errorResponse)).right.get shouldEqual Some( "user not found" ) } @@ -248,6 +263,9 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { val errorResponse = """ { "error": "user not found" } """.getBytes() - jsonHandler.responseErrorMsg(Response.ok(errorResponse)).right.get shouldEqual "user not found" + jsonHandler + .responseErrorMsg(buildResponse(errorResponse)) + .right + .get shouldEqual "user not found" } } From dbcd51063f036d6d99baab9f819440c06a7b6781 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 13:52:59 +0300 Subject: [PATCH 05/13] [SBT][NONE]: update dependencies, version := 0.6.0, changelog --- changelog/0.6.0.md | 5 +++++ project/Library.scala | 33 ++++++++++++++++----------------- project/plugins.sbt | 18 ++++++++++-------- version.sbt | 2 +- 4 files changed, 32 insertions(+), 26 deletions(-) create mode 100644 changelog/0.6.0.md diff --git a/changelog/0.6.0.md b/changelog/0.6.0.md new file mode 100644 index 00000000..9191059f --- /dev/null +++ b/changelog/0.6.0.md @@ -0,0 +1,5 @@ +Including issues: +- fsanaulla/chronicler#178 Updating AHC backend + +Contributors: +- @fsanaulla \ No newline at end of file diff --git a/project/Library.scala b/project/Library.scala index 41736c43..da9001bb 100644 --- a/project/Library.scala +++ b/project/Library.scala @@ -8,12 +8,10 @@ import sbt._ object Library { object Versions { - val sttp = "1.6.4" - val netty = "4.1.37.Final" val request = "0.2.0" object Akka { - val akka = "2.5.24" + val akka = "2.5.25" val akkaHttp = "10.1.9" } @@ -24,15 +22,14 @@ object Library { } } - val sttp = "com.softwaremill.sttp" %% "core" % Versions.sttp - val scalaTest = "org.scalatest" %% "scalatest" % Versions.Testing.scalaTest - val scalaCheck = "org.scalacheck" %% "scalacheck" % Versions.Testing.scalaCheck - val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % Versions.Akka.akka + val scalaTest = "org.scalatest" %% "scalatest" % Versions.Testing.scalaTest + val scalaCheck = "org.scalacheck" %% "scalacheck" % Versions.Testing.scalaCheck + val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % Versions.Akka.akka def macroDeps(scalaVersion: String): List[ModuleID] = - List( - "org.scala-lang" % "scala-reflect" % scalaVersion - ) ::: List(scalaTest, scalaCheck).map(_ % Scope.test) + "org.scala-lang" % "scala-reflect" % scalaVersion :: List(scalaTest, scalaCheck).map( + _ % Scope.test + ) // testing val testingDeps: List[ModuleID] = List( @@ -49,18 +46,20 @@ object Library { ) ::: List(scalaTest, scalaCheck).map(_ % Scope.test) // akka-http + // format: off val akkaDep: List[ModuleID] = List( - "com.typesafe.akka" %% "akka-stream" % Versions.Akka.akka exclude ("com.typesafe", "config"), - "com.typesafe" % "config" % "1.3.4", - "com.typesafe.akka" %% "akka-http" % Versions.Akka.akkaHttp, - "com.typesafe.akka" %% "akka-testkit" % Versions.Akka.akka % Scope.test + "com.typesafe.akka" %% "akka-stream" % Versions.Akka.akka exclude ("com.typesafe", "config"), + "com.typesafe" % "config" % "1.3.4", + "com.typesafe.akka" %% "akka-http" % Versions.Akka.akkaHttp, + akkaTestKit % Scope.test ) + + // format: on // async-http val asyncDeps: List[ModuleID] = List( - "io.netty" % "netty-handler" % Versions.netty, - "com.softwaremill.sttp" %% "async-http-client-backend-future" % Versions.sttp exclude ("io.netty", "netty-handler") exclude ("org.reactivestreams", "reactive-streams"), - "org.reactivestreams" % "reactive-streams" % "1.0.2" + "org.asynchttpclient" % "async-http-client" % "2.10.1", + "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0" ) // looks like a shit, but need to keep it until spark on 2.12 will become stable diff --git a/project/plugins.sbt b/project/plugins.sbt index 237c09f7..6db60b83 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,10 +1,12 @@ resolvers += "Era7 maven releases" at "https://s3-eu-west-1.amazonaws.com/releases.era7.com" -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1") -addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.0.0") -addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4") -addSbtPlugin("com.github.romanowski" % "hoarder" % "1.0.2") -addSbtPlugin("ohnosequences" % "sbt-github-release" % "0.7.1") -addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.2") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.3") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1") +addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.0.0") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4") +addSbtPlugin("com.github.romanowski" % "hoarder" % "1.0.2") +addSbtPlugin("ohnosequences" % "sbt-github-release" % "0.7.1") +addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.2") + +libraryDependencies += "com.sun.activation" % "javax.activation" % "1.2.0" diff --git a/version.sbt b/version.sbt index e3b2f607..0512f161 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.5.5" +version in ThisBuild := "0.6.0" From 69307bdfea235ab3323968c573f143a49813fac8 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 15:22:24 +0300 Subject: [PATCH 06/13] [AHC][ISSUE-178]: fixing import --- .../chronicler/ahc/shared/handlers/AhcJsonHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala index ecee4190..926427e9 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala @@ -27,7 +27,7 @@ import com.github.fsanaulla.chronicler.core.jawn.RichJParser import org.asynchttpclient.Response import org.typelevel.jawn.ast.{JParser, JValue} -import scala.jdk.CollectionConverters._ +import scala.collection.JavaConverters._ private[ahc] final class AhcJsonHandler(compress: Boolean) extends JsonHandler[Id, Response] { From 21d3f75c918cb95e4cda42f13b4391d3941db6bb Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 15:22:46 +0300 Subject: [PATCH 07/13] [SBT][NONE]: small refactoring --- changelog/0.6.0.md | 2 +- project/Library.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/changelog/0.6.0.md b/changelog/0.6.0.md index 9191059f..f8405f1a 100644 --- a/changelog/0.6.0.md +++ b/changelog/0.6.0.md @@ -1,5 +1,5 @@ Including issues: -- fsanaulla/chronicler#178 Updating AHC backend +- fsanaulla/chronicler#178 Moving away from sttp to pure AHC backend Contributors: - @fsanaulla \ No newline at end of file diff --git a/project/Library.scala b/project/Library.scala index da9001bb..949e4c45 100644 --- a/project/Library.scala +++ b/project/Library.scala @@ -53,7 +53,6 @@ object Library { "com.typesafe.akka" %% "akka-http" % Versions.Akka.akkaHttp, akkaTestKit % Scope.test ) - // format: on // async-http From 938fc499087203e41514dac1e3653f4c8128bc4b Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Thu, 22 Aug 2019 16:21:47 +0300 Subject: [PATCH 08/13] [AHC][ISSUE-178]: fixing tests --- .../shared/handlers/AhcJsonHandlerSpec.scala | 86 +++++++++++-------- .../shared/handlers/AhcQueryBuilderSpec.scala | 2 +- .../handlers/AhcResponseHandlerSpec.scala | 14 ++- 3 files changed, 65 insertions(+), 37 deletions(-) diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala index b7464aa0..09c1429b 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala @@ -19,8 +19,9 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers import java.nio.ByteBuffer import io.netty.buffer.Unpooled +import io.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion} import org.asynchttpclient.Response -import org.asynchttpclient.netty.EagerResponseBodyPart +import org.asynchttpclient.netty.{EagerResponseBodyPart, NettyResponseStatus} import org.scalatest.concurrent.ScalaFutures import org.scalatest.{FlatSpec, Matchers, OptionValues} import org.typelevel.jawn.ast._ @@ -32,48 +33,63 @@ import org.typelevel.jawn.ast._ */ class AhcJsonHandlerSpec extends FlatSpec with Matchers with ScalaFutures with OptionValues { - val jsonHandler = new AhcJsonHandler(compress = false) - val singleStrJson = """{ - "results": [ - { - "statement_id": 0, - "series": [ - { - "name": "cpu_load_short", - "columns": [ - "time", - "value" - ], - "values": [ - [ - "2015-01-29T21:55:43.702900257Z", - 2 - ], - [ - "2015-01-29T21:55:43.702900257Z", - 0.55 - ], - [ - "2015-06-11T20:46:02Z", - 0.64 - ] - ] - } - ] - } - ] - }""" + val jsonHandler = new AhcJsonHandler(compress = false) - val resp: Response = { + val singleStrJson: String = + """ + |{ + | "results": [ + | { + | "statement_id": 0, + | "series": [ + | { + | "name": "cpu_load_short", + | "columns": [ + | "time", + | "value" + | ], + | "values": [ + | [ + | "2015-01-29T21:55:43.702900257Z", + | 2 + | ], + | [ + | "2015-01-29T21:55:43.702900257Z", + | 0.55 + | ], + | [ + | "2015-06-11T20:46:02Z", + | 0.64 + | ] + | ] + | } + | ] + | } + | ] + |} + |""".stripMargin + + def buildResponse(bts: Array[Byte]): Response = { val b = new Response.ResponseBuilder() b.accumulate( new EagerResponseBodyPart( - Unpooled.copiedBuffer(ByteBuffer.wrap(singleStrJson.getBytes())), + Unpooled.copiedBuffer(ByteBuffer.wrap(bts)), true ) ) + b.accumulate( + new NettyResponseStatus( + null, + new DefaultHttpResponse( + HttpVersion.HTTP_1_0, + io.netty.handler.codec.http.HttpResponseStatus.OK + ), + null + ) + ) + b.build } @@ -81,7 +97,7 @@ class AhcJsonHandlerSpec extends FlatSpec with Matchers with ScalaFutures with O it should "extract JSON from HTTP response" in { jsonHandler - .responseBody(resp) + .responseBody(buildResponse(singleStrJson.getBytes())) .right .get shouldEqual result } diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala index f73fdd93..d4e3f5a1 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala @@ -31,7 +31,7 @@ class AhcQueryBuilderSpec extends FlatSpec with Matchers { ) val res = s"http://$host:$port/query?q=FirstQuery%3BSecondQuery" - qb.buildQuery("/query", qb.appendCredentials(queryMap)).toString() shouldEqual res + qb.buildQuery("/query", qb.appendCredentials(queryMap)).mkUrl shouldEqual res } } diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala index fe70f9fc..731acec3 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala @@ -22,8 +22,9 @@ import com.github.fsanaulla.chronicler.core.components.ResponseHandler import com.github.fsanaulla.chronicler.core.implicits._ import com.github.fsanaulla.chronicler.core.model.ContinuousQuery import io.netty.buffer.Unpooled +import io.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion} import org.asynchttpclient.Response -import org.asynchttpclient.netty.EagerResponseBodyPart +import org.asynchttpclient.netty.{EagerResponseBodyPart, NettyResponseStatus} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Second, Seconds, Span} import org.scalatest.{FlatSpec, Matchers} @@ -61,6 +62,17 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { ) ) + b.accumulate( + new NettyResponseStatus( + null, + new DefaultHttpResponse( + HttpVersion.HTTP_1_0, + io.netty.handler.codec.http.HttpResponseStatus.OK + ), + null + ) + ) + b.build } From cc0c7ea94cfe99cc00096644ecf4f462044823ac Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Fri, 23 Aug 2019 12:48:32 +0300 Subject: [PATCH 09/13] [AHC][ISSUE-178]: enable url param encoding --- .../fsanaulla/chronicler/ahc/management/package.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala index 5b6b76e6..213e188b 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala @@ -26,20 +26,20 @@ package object management { } def queryTesterAuth(query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?u=${credentials.username}&p=${credentials.password}&q=${query}" + s"http://localhost:8086/query?u=${credentials.username.encode}&p=${credentials.password.encode}&q=${query.encode}" def queryTesterAuth(db: String, query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?db=${db}&u=${credentials.username}&p=${credentials.password}&q=${query}" + s"http://localhost:8086/query?db=${db}&u=${credentials.username.encode}&p=${credentials.password.encode}&q=${query.encode}" def queryTester(query: String): String = - s"http://localhost:8086/query?q=${query}" + s"http://localhost:8086/query?q=${query.encode}" def queryTester(db: String, query: String): String = - s"http://localhost:8086/query?db=${db}&q=${query}" + s"http://localhost:8086/query?db=${db}&q=${query.encode}" def queryTester(path: String, queryPrms: List[(String, String)]): String = { val s = queryPrms - .map { case (k, v) => s"$k=$v" } + .map { case (k, v) => s"$k=${v.encode}" } .mkString("&") s"http://localhost:8086$path?$s" From 6d0eeda5fd6fc432c6cd8453cba6d7657130d8b1 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Fri, 23 Aug 2019 13:11:33 +0300 Subject: [PATCH 10/13] [AHC][ISSUE-178]: enable url param encoding --- .../fsanaulla/chronicler/ahc/io/unit/package.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala index 50f5c660..2a16ac88 100644 --- a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala +++ b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala @@ -26,20 +26,20 @@ package object unit { } def queryTesterAuth(query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?q=${query}&p=${credentials.password}&u=${credentials.username}" + s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&u=${credentials.username.encode}" def queryTesterAuth(db: String, query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?q=${query}&p=${credentials.password}&db=${db}&u=${credentials.username}" + s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&db=${db}&u=${credentials.username.encode}" def queryTester(query: String): String = - s"http://localhost:8086/query?q=${query}" + s"http://localhost:8086/query?q=${query.encode}" def queryTester(db: String, query: String): String = - s"http://localhost:8086/query?q=${query}&db=${db}" + s"http://localhost:8086/query?q=${query.encode}&db=${db}" def queryTester(path: String, mp: List[(String, String)]): String = { val queries = mp - .map { case (k, v) => s"$k=$v" } + .map { case (k, v) => s"$k=${v.encode}" } .mkString("&") s"http://localhost:8086$path?$queries" From fdff0b0c3a58152986172e03d372e427bb7b2fe7 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Tue, 27 Aug 2019 16:06:13 +0300 Subject: [PATCH 11/13] [CORE][ISSUE-182]: small refactoring --- .../fsanaulla/chronicler/core/components/ResponseHandler.scala | 3 +-- .../com/github/fsanaulla/chronicler/core/either/package.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala index cda93e23..f00dd00b 100644 --- a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala +++ b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala @@ -72,7 +72,7 @@ class ResponseHandler[G[_], R]( * @param response - backend response value * @return - Query result of JArray in future container */ - final def queryResultJson(response: R): G[ErrorOr[Array[JArray]]] = { + final def queryResultJson(response: R): G[ErrorOr[Array[JArray]]] = jsonHandler.responseCode(response).intValue() match { case code if isSuccessful(code) => F.map(jsonHandler.responseBody(response)) { ethRes => @@ -81,7 +81,6 @@ class ResponseHandler[G[_], R]( case _ => F.map(errorHandler(response))(Left(_)) } - } /** * Handling HTTP response with GROUP BY clause in the query diff --git a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala index 6110941e..add39e2a 100644 --- a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala +++ b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala @@ -28,7 +28,7 @@ package object either { } yield x +: xs } - def seq[L, R: ClassTag](s: Seq[Either[L, R]]): Either[L, Seq[R]] = + def seq[L, R](s: Seq[Either[L, R]]): Either[L, Seq[R]] = s.foldRight(Right(Seq.empty): Either[L, Seq[R]]) { (e, acc) => for { xs <- acc.right From 156a9fe2401ab1e8d5caed7e46df74293b56fbb6 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Tue, 27 Aug 2019 16:08:22 +0300 Subject: [PATCH 12/13] [AHC][ISSUE-182]: response decompression --- .../ahc/io/it/CompressionSpec.scala | 2 +- .../chronicler/ahc/io/InfluxIO.scala | 4 ++-- .../ahc/shared/InfluxAhcClient.scala | 24 ++++++++++++++----- .../ahc/shared/handlers/AhcJsonHandler.scala | 12 ++++++---- .../shared/handlers/AhcRequestExecutor.scala | 6 ++++- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala index ff41b8d2..47a56a03 100644 --- a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala +++ b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala @@ -29,7 +29,7 @@ class CompressionSpec InfluxMng(host, port, Some(creds), None) lazy val io = - InfluxIO(host, port, Some(creds) /*, compress = true*/ ) + InfluxIO(host, port, Some(creds), compress = true) lazy val db: DatabaseApi[Future, Id, Response, Uri, String] = io.database(testDB) diff --git a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala index 337fa2ad..57471f84 100644 --- a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala +++ b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala @@ -32,7 +32,7 @@ object InfluxIO { * @param credentials - user credentials * @param compress - enable gzip compression * @param asyncClientConfig - custom configuration - * @param ex - implicit execution context, by default use standard one + * @param ec - implicit execution context, by default use standard one * @return - [[AhcIOClient]] */ def apply( @@ -41,7 +41,7 @@ object InfluxIO { credentials: Option[InfluxCredentials] = None, compress: Boolean = false, asyncClientConfig: Option[AsyncHttpClientConfig] = None - )(implicit ex: ExecutionContext + )(implicit ec: ExecutionContext ): AhcIOClient = new AhcIOClient(host, port, compress, credentials, asyncClientConfig) diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala index edbee8f2..58279e91 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala @@ -16,16 +16,28 @@ package com.github.fsanaulla.chronicler.ahc.shared -import org.asynchttpclient.{AsyncHttpClientConfig, DefaultAsyncHttpClient} +import org.asynchttpclient.{ + AsyncHttpClientConfig, + DefaultAsyncHttpClient, + DefaultAsyncHttpClientConfig +} class InfluxAhcClient(asyncClientConfig: Option[AsyncHttpClientConfig]) { self: AutoCloseable => - private[ahc] val schema: String = asyncClientConfig - .map(_.isUseOpenSsl) - .map(if (_) "https" else "http") - .getOrElse("http") + private[this] val config = { + val default = { + val b = new DefaultAsyncHttpClientConfig.Builder() + b.setCompressionEnforced(false) + b.build() + } + + asyncClientConfig.getOrElse(default) + } + + private[ahc] val schema: String = + if (config.isUseOpenSsl) "https" else "http" private[ahc] implicit val client: DefaultAsyncHttpClient = - asyncClientConfig.fold(new DefaultAsyncHttpClient())(new DefaultAsyncHttpClient(_)) + new DefaultAsyncHttpClient(config) override def close(): Unit = client.close() } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala index 926427e9..8d47fa10 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala @@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets} import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id} import com.github.fsanaulla.chronicler.core.components.JsonHandler import com.github.fsanaulla.chronicler.core.encoding.encodingFromContentType -import com.github.fsanaulla.chronicler.core.gzip import com.github.fsanaulla.chronicler.core.implicits._ import com.github.fsanaulla.chronicler.core.jawn.RichJParser import org.asynchttpclient.Response @@ -31,15 +30,20 @@ import scala.collection.JavaConverters._ private[ahc] final class AhcJsonHandler(compress: Boolean) extends JsonHandler[Id, Response] { + /*** + * Extract response body + * + * @see - [https://groups.google.com/forum/#!searchin/asynchttpclient/compression%7Csort:date/asynchttpclient/TAq33OWXeKU/sBm3v4EWAwAJ], + * netty automatically decompress gzipped request + */ def responseBody(response: Response): ErrorOr[JValue] = { - val bodyBts = response.getResponseBodyAsBytes - val maybeDecompressed = if (compress) gzip.decompress(bodyBts) else bodyBts + val bodyBts = response.getResponseBodyAsBytes val encoding: Charset = Option(response.getContentType) .flatMap(encodingFromContentType) .map(Charset.forName) .getOrElse(StandardCharsets.UTF_8) - val bodyStr = new String(maybeDecompressed, encoding) + val bodyStr = new String(bodyBts, encoding) JParser.parseFromStringEither(bodyStr) } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala index 8f493d63..ff5e1eda 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala @@ -19,6 +19,7 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.core.components.RequestExecutor import com.github.fsanaulla.chronicler.core.gzip +import io.netty.handler.codec.http.HttpHeaderValues.GZIP_DEFLATE import org.asynchttpclient.{AsyncHttpClient, Response} import scala.compat.java8.FutureConverters._ @@ -34,7 +35,10 @@ private[ahc] final class AhcRequestExecutor()(implicit client: AsyncHttpClient) * @return - Return wrapper response */ override def get(uri: Uri, compress: Boolean): Future[Response] = { - client.prepareGet(uri.mkUrl).execute.toCompletableFuture.toScala + val req = client.prepareGet(uri.mkUrl) + val maybeCompressed = if (compress) req.setHeader("Accept-Encoding", GZIP_DEFLATE) else req + + maybeCompressed.execute.toCompletableFuture.toScala } override def post( From 78c0c0ddbceb2625661acd4120c1fdfb85c1cbb8 Mon Sep 17 00:00:00 2001 From: fsanaulla Date: Wed, 25 Sep 2019 19:33:19 +0300 Subject: [PATCH 13/13] [RELEASE][0.6.0]: updating from upstream, fixing test spec --- .../shared/handlers/AhcJsonHandlerSpec.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala index 0d921c1f..af65c246 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala @@ -23,7 +23,7 @@ import io.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion} import org.asynchttpclient.Response import org.asynchttpclient.netty.{EagerResponseBodyPart, NettyResponseStatus} import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{FlatSpec, Matchers, OptionValues} +import org.scalatest.{Matchers, OptionValues, WordSpec} import org.typelevel.jawn.ast._ /** @@ -35,6 +35,30 @@ class AhcJsonHandlerSpec extends WordSpec with Matchers with ScalaFutures with O val jsonHandler = new AhcJsonHandler(compress = false) + def buildResponse(bts: Array[Byte]): Response = { + val b = new Response.ResponseBuilder() + + b.accumulate( + new EagerResponseBodyPart( + Unpooled.copiedBuffer(ByteBuffer.wrap(bts)), + true + ) + ) + + b.accumulate( + new NettyResponseStatus( + null, + new DefaultHttpResponse( + HttpVersion.HTTP_1_0, + io.netty.handler.codec.http.HttpResponseStatus.OK + ), + null + ) + ) + + b.build + } + "JsonHandler" should { "extract" should { "body from HTTP response" in { @@ -70,7 +94,7 @@ class AhcJsonHandlerSpec extends WordSpec with Matchers with ScalaFutures with O | ] | }""".stripMargin - val resp: Response[Array[Byte]] = Response.ok(singleStrJson.getBytes()) + val resp = buildResponse(singleStrJson.getBytes()) val result: JValue = JParser.parseFromString(singleStrJson).get