diff --git a/changelog/0.6.0.md b/changelog/0.6.0.md new file mode 100644 index 00000000..f8405f1a --- /dev/null +++ b/changelog/0.6.0.md @@ -0,0 +1,5 @@ +Including issues: +- fsanaulla/chronicler#178 Moving away from sttp to pure AHC backend + +Contributors: +- @fsanaulla \ No newline at end of file diff --git a/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala b/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala index 8dd74272..9d47b1c3 100644 --- a/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala +++ b/examples/udp/src/main/scala/com.github.fsanaulla.chronicler.example.udp/Main.scala @@ -1,26 +1,24 @@ package com.github.fsanaulla.chronicler.example.udp -import com.github.fsanaulla.chronicler.core.model.{InfluxFormatter, Point} -import com.github.fsanaulla.chronicler.macros.Influx import com.github.fsanaulla.chronicler.macros.annotations.{field, tag} +import com.github.fsanaulla.chronicler.macros.auto._ import com.github.fsanaulla.chronicler.udp.InfluxUdp +import scala.util.Try + object Main { + def main(args: Array[String]): Unit = { final case class Test(@tag name: String, @field age: Int) - - // generate formatter at compile-time - implicit val fmt: InfluxFormatter[Test] = Influx.formatter[Test] - - val t = Test("f", 1) - val host = args.headOption.getOrElse("localhost") + val t = Test("f", 1) + val host = args.headOption.getOrElse("localhost") val influx = InfluxUdp(host) for { // write record to Influx _ <- influx.write("cpu", t) // close client - _ <- influx.close() + _ <- Try(influx.close()) } yield println("Stored!") } } diff --git a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala index 3e81ae36..47a56a03 100644 --- a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala +++ b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/CompressionSpec.scala @@ -4,10 +4,11 @@ import java.nio.file.Paths import com.github.fsanaulla.chronicler.ahc.io.InfluxIO import com.github.fsanaulla.chronicler.ahc.management.InfluxMng +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.core.alias.Id import com.github.fsanaulla.chronicler.core.api.DatabaseApi import com.github.fsanaulla.chronicler.testing.it.DockerizedInfluxDB -import com.softwaremill.sttp.{Response, Uri} +import org.asynchttpclient.Response import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} import org.scalatest.{FlatSpec, Matchers} @@ -28,9 +29,9 @@ class CompressionSpec InfluxMng(host, port, Some(creds), None) lazy val io = - InfluxIO(host, port, Some(creds) /*, compress = true*/ ) + InfluxIO(host, port, Some(creds), compress = true) - lazy val db: DatabaseApi[Future, Id, Response[Array[Byte]], Uri, String] = + lazy val db: DatabaseApi[Future, Id, Response, Uri, String] = io.database(testDB) it should "ping database" in { diff --git a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala index 95557028..f332a70d 100644 --- a/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala +++ b/modules/ahc/io/src/it/scala/com/github/fsanaulla/chronicler/ahc/io/it/DatabaseApiSpec.scala @@ -84,27 +84,28 @@ class DatabaseApiSpec extends FlatSpec with Matchers with Futures with Dockerize } it should "retrieve multiple request" in { + db.readJson("SELECT * FROM test2").futureValue.right.get.length shouldEqual 3 + + db.readJson("SELECT * FROM test2 WHERE age < 40").futureValue.right.get.length shouldEqual 1 + val multiQuery = db - .bulkReadJson( - Array( - "SELECT * FROM test2", - "SELECT * FROM test2 WHERE age < 40" - ) - ) + .bulkReadJson(Seq("SELECT * FROM test2", "SELECT * FROM test2 WHERE age < 40")) .futureValue + .right + .get - multiQuery.right.get.length shouldEqual 2 - multiQuery.right.get shouldBe a[Array[_]] + multiQuery.length shouldEqual 2 + multiQuery shouldBe a[Array[_]] - multiQuery.right.get.head.length shouldEqual 3 - multiQuery.right.get.head shouldBe a[Array[_]] - multiQuery.right.get.head.head shouldBe a[JArray] + multiQuery.head.length shouldEqual 3 + multiQuery.head shouldBe a[Array[_]] + multiQuery.head.head shouldBe a[JArray] - multiQuery.right.get.last.length shouldEqual 1 - multiQuery.right.get.last shouldBe a[Array[_]] - multiQuery.right.get.last.head shouldBe a[JArray] + multiQuery.last.length shouldEqual 1 + multiQuery.last shouldBe a[Array[_]] + multiQuery.last.head shouldBe a[JArray] - multiQuery.right.get + multiQuery .map(_.map(_.arrayValue.right.get.tail)) shouldEqual largeMultiJsonEntity.map( _.map(_.arrayValue.right.get.tail) ) diff --git a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala index df34a6fe..07b2dc40 100644 --- a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala +++ b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/AhcIOClient.scala @@ -16,21 +16,20 @@ package com.github.fsanaulla.chronicler.ahc.io -import com.github.fsanaulla.chronicler.ahc.shared.InfluxAhcClient import com.github.fsanaulla.chronicler.ahc.shared.handlers.{ AhcJsonHandler, AhcQueryBuilder, AhcRequestExecutor } -import com.github.fsanaulla.chronicler.ahc.shared.implicits._ +import com.github.fsanaulla.chronicler.ahc.shared.implicits.{fkId, futureFailable, futureFunctor} +import com.github.fsanaulla.chronicler.ahc.shared.{InfluxAhcClient, Uri} import com.github.fsanaulla.chronicler.core.IOClient import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id} import com.github.fsanaulla.chronicler.core.api.{DatabaseApi, MeasurementApi} import com.github.fsanaulla.chronicler.core.components.ResponseHandler import com.github.fsanaulla.chronicler.core.implicits.{applyId, functorId} import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxDBInfo} -import com.softwaremill.sttp.{Response, Uri} -import org.asynchttpclient.AsyncHttpClientConfig +import org.asynchttpclient.{AsyncHttpClientConfig, Response} import scala.concurrent.{ExecutionContext, Future} import scala.reflect.ClassTag @@ -43,12 +42,12 @@ final class AhcIOClient( asyncClientConfig: Option[AsyncHttpClientConfig] )(implicit ex: ExecutionContext) extends InfluxAhcClient(asyncClientConfig) - with IOClient[Future, Id, Response[Array[Byte]], Uri, String] { + with IOClient[Future, Id, Response, Uri, String] { - val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) - implicit val re: AhcRequestExecutor = new AhcRequestExecutor - implicit val rh: ResponseHandler[Id, Response[Array[Byte]]] = new ResponseHandler(jsonHandler) + val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) + implicit val re: AhcRequestExecutor = new AhcRequestExecutor + implicit val rh: ResponseHandler[Id, Response] = new ResponseHandler(jsonHandler) override def database(dbName: String) = new DatabaseApi(dbName, compress) diff --git a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala index 337fa2ad..57471f84 100644 --- a/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala +++ b/modules/ahc/io/src/main/scala/com/github/fsanaulla/chronicler/ahc/io/InfluxIO.scala @@ -32,7 +32,7 @@ object InfluxIO { * @param credentials - user credentials * @param compress - enable gzip compression * @param asyncClientConfig - custom configuration - * @param ex - implicit execution context, by default use standard one + * @param ec - implicit execution context, by default use standard one * @return - [[AhcIOClient]] */ def apply( @@ -41,7 +41,7 @@ object InfluxIO { credentials: Option[InfluxCredentials] = None, compress: Boolean = false, asyncClientConfig: Option[AsyncHttpClientConfig] = None - )(implicit ex: ExecutionContext + )(implicit ec: ExecutionContext ): AhcIOClient = new AhcIOClient(host, port, compress, credentials, asyncClientConfig) diff --git a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala index 4c0b7c9c..09e00b70 100644 --- a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala +++ b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/DatabaseApiOperationQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.io.unit +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.enums.{Consistencies, Epochs, Precisions} import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.DatabaseOperationQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} import scala.language.implicitConversions @@ -31,32 +31,30 @@ import scala.language.implicitConversions * Date: 27.07.17 */ class DatabaseApiOperationQuerySpec - extends FlatSpec - with Matchers - with DatabaseOperationQuery[Uri] { + extends FlatSpec + with Matchers + with DatabaseOperationQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val testDB = "db" val testQuery = "SELECT * FROM test" - implicit def a2Opt[A](a: A): Option[A] = Some(a) - it should "return correct write query" in new AuthEnv { - write(testDB, Consistencies.One, Precisions.Nanoseconds, None) - .toString() shouldEqual queryTester( + write(testDB, Consistencies.One, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester( "/write", List( "db" -> testDB, @@ -67,8 +65,7 @@ class DatabaseApiOperationQuerySpec ) ) - write(testDB, Consistencies.All, Precisions.Nanoseconds, None) - .toString() shouldEqual queryTester( + write(testDB, Consistencies.All, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester( "/write", List( "db" -> testDB, @@ -81,23 +78,23 @@ class DatabaseApiOperationQuerySpec } it should "return correct write query without auth " in new NonAuthEnv { - write(testDB, Consistencies.One, Precisions.Nanoseconds, None).toString() shouldEqual + write(testDB, Consistencies.One, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester("/write", List("db" -> testDB, "consistency" -> "one", "precision" -> "ns")) - write(testDB, Consistencies.One, Precisions.Microseconds, None).toString() shouldEqual + write(testDB, Consistencies.One, Precisions.Microseconds, None).mkUrl shouldEqual queryTester("/write", List("db" -> testDB, "consistency" -> "one", "precision" -> "u")) } it should "return correct single read query" in new AuthEnv { - val queryPrms = List( + val queryPrms: List[(String, String)] = List( "db" -> testDB, "u" -> credentials.get.username, "p" -> credentials.get.password, "epoch" -> "ns", "q" -> "SELECT * FROM test" ) - singleQuery(testDB, testQuery, Epochs.Nanoseconds, pretty = false).toString() shouldEqual + singleQuery(testDB, testQuery, Epochs.Nanoseconds, pretty = false).mkUrl shouldEqual queryTester("/query", queryPrms) } @@ -116,7 +113,7 @@ class DatabaseApiOperationQuerySpec Seq("SELECT * FROM test", "SELECT * FROM test1"), Epochs.Nanoseconds, pretty = false - ).toString() shouldEqual + ).mkUrl shouldEqual queryTester("/query", queryPrms) val queryPrms1: List[(String, String)] = List( @@ -132,7 +129,7 @@ class DatabaseApiOperationQuerySpec Seq("SELECT * FROM test", "SELECT * FROM test1"), Epochs.Nanoseconds, pretty = true - ).toString() shouldEqual + ).mkUrl shouldEqual queryTester("/query", queryPrms1) } } diff --git a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala index b43b7962..2a16ac88 100644 --- a/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala +++ b/modules/ahc/io/src/test/scala/com/github/fsanaulla/chronicler/ahc/io/unit/package.scala @@ -29,13 +29,13 @@ package object unit { s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&u=${credentials.username.encode}" def queryTesterAuth(db: String, query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&db=${db.encode}&u=${credentials.username.encode}" + s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&db=${db}&u=${credentials.username.encode}" def queryTester(query: String): String = s"http://localhost:8086/query?q=${query.encode}" def queryTester(db: String, query: String): String = - s"http://localhost:8086/query?q=${query.encode}&db=${db.encode}" + s"http://localhost:8086/query?q=${query.encode}&db=${db}" def queryTester(path: String, mp: List[(String, String)]): String = { val queries = mp diff --git a/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala b/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala index 8a968822..e2843290 100644 --- a/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala +++ b/modules/ahc/management/src/main/scala/com/github/fsanaulla/chronicler/ahc/management/AhcManagementClient.scala @@ -16,12 +16,12 @@ package com.github.fsanaulla.chronicler.ahc.management -import com.github.fsanaulla.chronicler.ahc.shared.InfluxAhcClient import com.github.fsanaulla.chronicler.ahc.shared.handlers.{ AhcJsonHandler, AhcQueryBuilder, AhcRequestExecutor } +import com.github.fsanaulla.chronicler.ahc.shared.{InfluxAhcClient, Uri} import com.github.fsanaulla.chronicler.core.ManagementClient import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id} import com.github.fsanaulla.chronicler.core.components.ResponseHandler @@ -32,8 +32,7 @@ import com.github.fsanaulla.chronicler.core.model.{ InfluxCredentials, InfluxDBInfo } -import com.softwaremill.sttp.{Response, Uri} -import org.asynchttpclient.AsyncHttpClientConfig +import org.asynchttpclient.{AsyncHttpClientConfig, Response} import scala.concurrent.{ExecutionContext, Future} @@ -46,12 +45,12 @@ final class AhcManagementClient( val F: Functor[Future], val FK: FunctionK[Id, Future]) extends InfluxAhcClient(asyncClientConfig) - with ManagementClient[Future, Id, Response[Array[Byte]], Uri, String] { + with ManagementClient[Future, Id, Response, Uri, String] { - val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress = false) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) - implicit val re: AhcRequestExecutor = new AhcRequestExecutor - implicit val rh: ResponseHandler[Id, Response[Array[Byte]]] = new ResponseHandler(jsonHandler) + val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress = false) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) + implicit val re: AhcRequestExecutor = new AhcRequestExecutor + implicit val rh: ResponseHandler[Id, Response] = new ResponseHandler(jsonHandler) override def ping: Future[ErrorOr[InfluxDBInfo]] = { re.get(qb.buildQuery("/ping"), compress = false) diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala index 943df473..3b0000ae 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ContinuousQueriesSpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.ContinuousQueries -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,17 +30,18 @@ import org.scalatest.{FlatSpec, Matchers} class ContinuousQueriesSpec extends FlatSpec with Matchers with ContinuousQueries[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val db = "mydb" @@ -48,31 +49,31 @@ class ContinuousQueriesSpec extends FlatSpec with Matchers with ContinuousQuerie val query = "SELECT mean(bees) AS mean_bees INTO aggregate_bees FROM farm GROUP BY time(30m)" "ContinuousQuerys operation" should "generate correct show query" in new AuthEnv { - showCQQuery.toString() shouldEqual queryTesterAuth("SHOW CONTINUOUS QUERIES")(credentials.get) + showCQQuery.mkUrl shouldEqual queryTesterAuth("SHOW CONTINUOUS QUERIES")(credentials.get) } it should "generate correct drop query" in new AuthEnv { - dropCQQuery(db, cq).toString() shouldEqual queryTesterAuth(s"DROP CONTINUOUS QUERY $cq ON $db")( + dropCQQuery(db, cq).mkUrl shouldEqual queryTesterAuth(s"DROP CONTINUOUS QUERY $cq ON $db")( credentials.get ) } it should "generate correct create query" in new AuthEnv { - createCQQuery(db, cq, query).toString() shouldEqual queryTesterAuth( + createCQQuery(db, cq, query).mkUrl shouldEqual queryTesterAuth( s"CREATE CONTINUOUS QUERY $cq ON $db BEGIN $query END" )(credentials.get) } it should "generate correct show query without auth" in new NonAuthEnv { - showCQQuery.toString() shouldEqual queryTester("SHOW CONTINUOUS QUERIES") + showCQQuery.mkUrl shouldEqual queryTester("SHOW CONTINUOUS QUERIES") } it should "generate correct drop query without auth" in new NonAuthEnv { - dropCQQuery(db, cq).toString() shouldEqual queryTester(s"DROP CONTINUOUS QUERY $cq ON $db") + dropCQQuery(db, cq).mkUrl shouldEqual queryTester(s"DROP CONTINUOUS QUERY $cq ON $db") } it should "generate correct create query without auth" in new NonAuthEnv { - createCQQuery(db, cq, query).toString() shouldEqual queryTester( + createCQQuery(db, cq, query).mkUrl shouldEqual queryTester( s"CREATE CONTINUOUS QUERY $cq ON $db BEGIN $query END" ) } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala index 6ea4fa4a..800e552c 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/DataManagementQuerySpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.DataManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,17 +30,18 @@ import org.scalatest.{FlatSpec, Matchers} class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val testDb: String = "testDb" @@ -52,58 +53,56 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement val testOffset = Some(3) it should "generate correct 'create database' query" in new AuthEnv { - createDatabaseQuery(testDb, None, None, None, None).toString() shouldEqual + createDatabaseQuery(testDb, None, None, None, None).mkUrl shouldEqual queryTesterAuth(s"CREATE DATABASE $testDb")(credentials.get) - createDatabaseQuery(testDb, None, Some(2), None, None).toString() shouldEqual + createDatabaseQuery(testDb, None, Some(2), None, None).mkUrl shouldEqual queryTesterAuth(s"CREATE DATABASE $testDb WITH REPLICATION 2")(credentials.get) } it should "generate correct 'drop database' query" in new AuthEnv { - dropDatabaseQuery(testDb).toString() shouldEqual + dropDatabaseQuery(testDb).mkUrl shouldEqual queryTesterAuth(s"DROP DATABASE $testDb")(credentials.get) } it should "generate correct 'drop series' query" in new AuthEnv { - dropSeriesQuery(testDb, testSeries).toString() shouldEqual + dropSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTesterAuth(testDb, s"DROP SERIES FROM $testSeries")(credentials.get) } it should "generate correct 'drop measurement' query" in new AuthEnv { - dropMeasurementQuery(testDb, testMeasurement).toString() shouldEqual + dropMeasurementQuery(testDb, testMeasurement).mkUrl shouldEqual queryTesterAuth(testDb, s"DROP MEASUREMENT $testMeasurement")(credentials.get) } it should "generate correct 'drop all series' query" in new AuthEnv { - deleteAllSeriesQuery(testDb, testSeries).toString() shouldEqual + deleteAllSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTesterAuth(testDb, s"DELETE FROM $testSeries")(credentials.get) } it should "generate correct 'show measurement' query" in new AuthEnv { - showMeasurementQuery(testDb).toString() shouldEqual + showMeasurementQuery(testDb).mkUrl shouldEqual queryTesterAuth(testDb, "SHOW MEASUREMENTS")(credentials.get) } it should "generate correct 'show database' query" in new AuthEnv { - showDatabasesQuery.toString() shouldEqual + showDatabasesQuery.mkUrl shouldEqual queryTesterAuth(s"SHOW DATABASES")(credentials.get) } it should "generate correct 'show tag-key' query" in new AuthEnv { - showTagKeysQuery(testDb, testMeasurement, testWhereClause, testLimit, testOffset) - .toString() shouldEqual + showTagKeysQuery(testDb, testMeasurement, testWhereClause, testLimit, testOffset).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG KEYS ON $testDb FROM $testMeasurement WHERE ${testWhereClause.get} LIMIT ${testLimit.get} OFFSET ${testOffset.get}" )(credentials.get) - showTagKeysQuery(testDb, testMeasurement, testWhereClause, None, None).toString() shouldEqual + showTagKeysQuery(testDb, testMeasurement, testWhereClause, None, None).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG KEYS ON $testDb FROM $testMeasurement WHERE ${testWhereClause.get}" )(credentials.get) } it should "generate correct 'show tag-value' query" in new AuthEnv { - showTagValuesQuery(testDb, testMeasurement, Seq("key"), testWhereClause, testLimit, testOffset) - .toString() shouldEqual + showTagValuesQuery(testDb, testMeasurement, Seq("key"), testWhereClause, testLimit, testOffset).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY = key WHERE ${testWhereClause.get} LIMIT ${testLimit.get} OFFSET ${testOffset.get}" )(credentials.get) @@ -114,14 +113,14 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement testWhereClause, testLimit, testOffset - ).toString() shouldEqual + ).mkUrl shouldEqual queryTesterAuth( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY IN (key,key1) WHERE ${testWhereClause.get} LIMIT ${testLimit.get} OFFSET ${testOffset.get}" )(credentials.get) } it should "generate correct 'show field-key' query" in new AuthEnv { - showFieldKeysQuery(testDb, testMeasurement).toString() shouldEqual + showFieldKeysQuery(testDb, testMeasurement).mkUrl shouldEqual queryTesterAuth(s"SHOW FIELD KEYS ON $testDb FROM $testMeasurement")(credentials.get) } @@ -132,7 +131,7 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement None, None, None - ).toString() shouldEqual queryTester(s"CREATE DATABASE $testDb WITH DURATION 3d") + ).mkUrl shouldEqual queryTester(s"CREATE DATABASE $testDb WITH DURATION 3d") createDatabaseQuery( testDb, @@ -140,36 +139,40 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement Some(2), Some("1d"), Some("testName") - ).toString() shouldEqual queryTester( + ).mkUrl shouldEqual queryTester( s"CREATE DATABASE $testDb WITH DURATION 3d REPLICATION 2 SHARD DURATION 1d NAME testName" ) } it should "generate correct 'drop database' query without auth" in new NonAuthEnv { - dropDatabaseQuery(testDb).toString() shouldEqual queryTester(s"DROP DATABASE $testDb") + dropDatabaseQuery(testDb).mkUrl shouldEqual queryTester(s"DROP DATABASE $testDb") } it should "generate correct 'drop series' query without auth" in new NonAuthEnv { - dropSeriesQuery(testDb, testSeries) - .toString() shouldEqual queryTester(testDb, s"DROP SERIES FROM $testSeries") + dropSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTester( + testDb, + s"DROP SERIES FROM $testSeries" + ) } it should "generate auth correct 'drop measurement' query without auth" in new NonAuthEnv { - dropMeasurementQuery(testDb, testMeasurement).toString() shouldEqual + dropMeasurementQuery(testDb, testMeasurement).mkUrl shouldEqual queryTester(testDb, s"DROP MEASUREMENT $testMeasurement") } it should "generate correct auth 'drop all series' query without auth" in new NonAuthEnv { - deleteAllSeriesQuery(testDb, testSeries) - .toString() shouldEqual queryTester(testDb, s"DELETE FROM $testSeries") + deleteAllSeriesQuery(testDb, testSeries).mkUrl shouldEqual queryTester( + testDb, + s"DELETE FROM $testSeries" + ) } it should "generate correct auth 'show measurement' query without auth" in new NonAuthEnv { - showMeasurementQuery(testDb).toString() shouldEqual queryTester(testDb, "SHOW MEASUREMENTS") + showMeasurementQuery(testDb).mkUrl shouldEqual queryTester(testDb, "SHOW MEASUREMENTS") } it should "generate correct 'show database' query without auth" in new NonAuthEnv { - showDatabasesQuery.toString() shouldEqual queryTester(s"SHOW DATABASES") + showDatabasesQuery.mkUrl shouldEqual queryTester(s"SHOW DATABASES") } it should "generate correct 'show tag-key' query without auth" in new NonAuthEnv { @@ -179,7 +182,7 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement None, None, None - ).toString() shouldEqual queryTester(s"SHOW TAG KEYS ON $testDb FROM $testMeasurement") + ).mkUrl shouldEqual queryTester(s"SHOW TAG KEYS ON $testDb FROM $testMeasurement") showTagKeysQuery( testDb, @@ -187,24 +190,22 @@ class DataManagementQuerySpec extends FlatSpec with Matchers with DataManagement testWhereClause, testLimit, None - ).toString() shouldEqual queryTester( + ).mkUrl shouldEqual queryTester( s"SHOW TAG KEYS ON $testDb FROM $testMeasurement WHERE ${testWhereClause.get} LIMIT ${testLimit.get}" ) } it should "generate correct 'show tag-value' query without auth" in new NonAuthEnv { - showTagValuesQuery(testDb, testMeasurement, Seq("key"), None, None, None) - .toString() shouldEqual queryTester( + showTagValuesQuery(testDb, testMeasurement, Seq("key"), None, None, None).mkUrl shouldEqual queryTester( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY = key" ) - showTagValuesQuery(testDb, testMeasurement, Seq("key", "key1"), testWhereClause, None, None) - .toString() shouldEqual queryTester( + showTagValuesQuery(testDb, testMeasurement, Seq("key", "key1"), testWhereClause, None, None).mkUrl shouldEqual queryTester( s"SHOW TAG VALUES ON $testDb FROM $testMeasurement WITH KEY IN (key,key1) WHERE ${testWhereClause.get}" ) } it should "generate correct 'show field-key' query without auth" in new NonAuthEnv { - showFieldKeysQuery(testDb, testMeasurement).toString() shouldEqual + showFieldKeysQuery(testDb, testMeasurement).mkUrl shouldEqual queryTester(s"SHOW FIELD KEYS ON $testDb FROM $testMeasurement") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala index 171b73df..4d070bef 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/QueriesManagementQuerySpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.QueriesManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,32 +30,33 @@ import org.scalatest.{FlatSpec, Matchers} class QueriesManagementQuerySpec extends FlatSpec with Matchers with QueriesManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } it should "show query" in new AuthEnv { - showQuerysQuery.toString() shouldEqual queryTesterAuth("SHOW QUERIES")(credentials.get) + showQuerysQuery.mkUrl shouldEqual queryTesterAuth("SHOW QUERIES")(credentials.get) } it should "kill query" in new AuthEnv { - killQueryQuery(5).toString() shouldEqual queryTesterAuth("KILL QUERY 5")(credentials.get) + killQueryQuery(5).mkUrl shouldEqual queryTesterAuth("KILL QUERY 5")(credentials.get) } it should "show query without auth" in new NonAuthEnv { - showQuerysQuery.toString() shouldEqual queryTester("SHOW QUERIES") + showQuerysQuery.mkUrl shouldEqual queryTester("SHOW QUERIES") } it should "kill query without auth" in new NonAuthEnv { - killQueryQuery(5).toString() shouldEqual queryTester("KILL QUERY 5") + killQueryQuery(5).mkUrl shouldEqual queryTester("KILL QUERY 5") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala index 40311556..4d2fcc4b 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/RetentionPolicyManagementQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.duration._ import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.RetentionPolicyManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} import scala.language.postfixOps @@ -31,68 +31,67 @@ import scala.language.postfixOps * Date: 27.07.17 */ class RetentionPolicyManagementQuerySpec - extends FlatSpec - with Matchers - with RetentionPolicyManagementQuery[Uri] { + extends FlatSpec + with Matchers + with RetentionPolicyManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val testRPName = "testRP" val testDBName = "testDB" it should "create retention policy" in new AuthEnv { - createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours), default = true) - .toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours), default = true).mkUrl shouldEqual queryTesterAuth( s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 SHARD DURATION 4h DEFAULT" )(credentials.get) - createRPQuery(testRPName, testDBName, 4 hours, 3, None, default = true).toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, None, default = true).mkUrl shouldEqual queryTesterAuth( s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 DEFAULT" )(credentials.get) - createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours)).toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, Some(4 hours)).mkUrl shouldEqual queryTesterAuth( s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 SHARD DURATION 4h" )(credentials.get) } it should "create retention policy without auth" in new NonAuthEnv { - createRPQuery(testRPName, testDBName, 4 hours, 3, None).toString() shouldEqual + createRPQuery(testRPName, testDBName, 4 hours, 3, None).mkUrl shouldEqual queryTester(s"CREATE RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3") } it should "drop retention policy" in new AuthEnv { - dropRPQuery(testRPName, testDBName).toString() shouldEqual + dropRPQuery(testRPName, testDBName).mkUrl shouldEqual queryTesterAuth(s"DROP RETENTION POLICY $testRPName ON $testDBName")(credentials.get) } it should "drop retention policy without auth" in new NonAuthEnv { - dropRPQuery(testRPName, testDBName).toString() shouldEqual + dropRPQuery(testRPName, testDBName).mkUrl shouldEqual queryTester(s"DROP RETENTION POLICY $testRPName ON $testDBName") } it should "update retention policy" in new AuthEnv { - updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), Some(4 hours), default = true) - .toString() shouldEqual + updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), Some(4 hours), default = true).mkUrl shouldEqual queryTesterAuth( s"ALTER RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3 SHARD DURATION 4h DEFAULT" )(credentials.get) - updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), None).toString() shouldEqual + updateRPQuery(testRPName, testDBName, Some(4 hours), Some(3), None).mkUrl shouldEqual queryTesterAuth( s"ALTER RETENTION POLICY $testRPName ON $testDBName DURATION 4h REPLICATION 3" )(credentials.get) @@ -100,9 +99,9 @@ class RetentionPolicyManagementQuerySpec } it should "update retention policy without auth" in new NonAuthEnv { - updateRPQuery(testRPName, testDBName, Some(4 hours), None, None).toString() shouldEqual + updateRPQuery(testRPName, testDBName, Some(4 hours), None, None).mkUrl shouldEqual queryTester(s"ALTER RETENTION POLICY $testRPName ON $testDBName DURATION 4h") - updateRPQuery(testRPName, testDBName, None, Some(3), Some(4 hours)).toString() shouldEqual + updateRPQuery(testRPName, testDBName, None, Some(3), Some(4 hours)).mkUrl shouldEqual queryTester( s"ALTER RETENTION POLICY $testRPName ON $testDBName REPLICATION 3 SHARD DURATION 4h" ) diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala index eb7a79e4..7db28f3c 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/ShardManagementQuerySpec.scala @@ -16,10 +16,10 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.ShardManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -30,42 +30,43 @@ import org.scalatest.{FlatSpec, Matchers} class ShardManagementQuerySpec extends FlatSpec with Matchers with ShardManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } it should "drop shard by id" in new AuthEnv { - dropShardQuery(5).toString() shouldEqual queryTesterAuth("DROP SHARD 5")(credentials.get) + dropShardQuery(5).mkUrl shouldEqual queryTesterAuth("DROP SHARD 5")(credentials.get) } it should "drop shard by id without auth" in new NonAuthEnv { - dropShardQuery(5).toString() shouldEqual queryTester("DROP SHARD 5") + dropShardQuery(5).mkUrl shouldEqual queryTester("DROP SHARD 5") } it should "show shards" in new AuthEnv { - showShardsQuery.toString() shouldEqual queryTesterAuth("SHOW SHARDS")(credentials.get) + showShardsQuery.mkUrl shouldEqual queryTesterAuth("SHOW SHARDS")(credentials.get) } it should "show shards without auth" in new NonAuthEnv { - showShardsQuery.toString() shouldEqual queryTester("SHOW SHARDS") + showShardsQuery.mkUrl shouldEqual queryTester("SHOW SHARDS") } it should "show shard groups" in new AuthEnv { - showShardGroupsQuery.toString() shouldEqual queryTesterAuth("SHOW SHARD GROUPS")( + showShardGroupsQuery.mkUrl shouldEqual queryTesterAuth("SHOW SHARD GROUPS")( credentials.get ) } it should "show shard groups without auth" in new NonAuthEnv { - showShardGroupsQuery.toString() shouldEqual queryTester("SHOW SHARD GROUPS") + showShardGroupsQuery.mkUrl shouldEqual queryTester("SHOW SHARD GROUPS") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala index 232130fc..d908cdf3 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/SubscriptionsManagementQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.enums.Destinations import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.SubscriptionsManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -29,22 +29,23 @@ import org.scalatest.{FlatSpec, Matchers} * Date: 21.08.17 */ class SubscriptionsManagementQuerySpec - extends FlatSpec - with Matchers - with SubscriptionsManagementQuery[Uri] { + extends FlatSpec + with Matchers + with SubscriptionsManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } val subName = "subs" @@ -58,34 +59,34 @@ class SubscriptionsManagementQuerySpec s"CREATE SUBSCRIPTION $subName ON $dbName.$rpName DESTINATIONS $destType $resHosts" it should "create subs query" in new AuthEnv { - createSubscriptionQuery(subName, dbName, rpName, destType, hosts).toString() shouldEqual + createSubscriptionQuery(subName, dbName, rpName, destType, hosts).mkUrl shouldEqual queryTesterAuth(createRes)(credentials.get) } it should "create subs query without auth" in new NonAuthEnv { - createSubscriptionQuery(subName, dbName, rpName, destType, hosts).toString() shouldEqual + createSubscriptionQuery(subName, dbName, rpName, destType, hosts).mkUrl shouldEqual queryTester(createRes) } val dropRes = s"DROP SUBSCRIPTION $subName ON $dbName.$rpName" it should "drop subs query" in new AuthEnv { - dropSubscriptionQuery(subName, dbName, rpName).toString() shouldEqual + dropSubscriptionQuery(subName, dbName, rpName).mkUrl shouldEqual queryTesterAuth(dropRes)(credentials.get) } it should "drop subs query without auth" in new NonAuthEnv { - dropSubscriptionQuery(subName, dbName, rpName).toString() shouldEqual queryTester(dropRes) + dropSubscriptionQuery(subName, dbName, rpName).mkUrl shouldEqual queryTester(dropRes) } val showRes = "SHOW SUBSCRIPTIONS" it should "show subs query" in new AuthEnv { - showSubscriptionsQuery.toString() shouldEqual + showSubscriptionsQuery.mkUrl shouldEqual queryTesterAuth(showRes)(credentials.get) } it should "show subs query without auth" in new NonAuthEnv { - showSubscriptionsQuery.toString() shouldEqual queryTester(showRes) + showSubscriptionsQuery.mkUrl shouldEqual queryTester(showRes) } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala index f5da6790..0df89ef3 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/UserManagementQuerySpec.scala @@ -16,11 +16,11 @@ package com.github.fsanaulla.chronicler.ahc.management +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder import com.github.fsanaulla.chronicler.core.enums.Privileges import com.github.fsanaulla.chronicler.core.model.InfluxCredentials import com.github.fsanaulla.chronicler.core.query.UserManagementQuery -import com.softwaremill.sttp.Uri import org.scalatest.{FlatSpec, Matchers} /** @@ -31,17 +31,18 @@ import org.scalatest.{FlatSpec, Matchers} class UserManagementQuerySpec extends FlatSpec with Matchers with UserManagementQuery[Uri] { trait Env { - val host = "localhost" - val port = 8086 + val schema = "http" + val host = "localhost" + val port = 8086 } trait AuthEnv extends Env { val credentials = Some(InfluxCredentials("admin", "admin")) - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials) } trait NonAuthEnv extends Env { - implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None) + implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None) } private val testUsername = "TEST_USER_NAME" @@ -50,17 +51,17 @@ class UserManagementQuerySpec extends FlatSpec with Matchers with UserManagement private val testPrivilege = Privileges.ALL it should "create user query" in new AuthEnv { - createUserQuery(testUsername, testPassword).toString() shouldEqual + createUserQuery(testUsername, testPassword).mkUrl shouldEqual queryTesterAuth(s"CREATE USER $testUsername WITH PASSWORD '$testPassword'")(credentials.get) } it should "create user query without auth" in new NonAuthEnv { - createUserQuery(testUsername, testPassword).toString() shouldEqual + createUserQuery(testUsername, testPassword).mkUrl shouldEqual queryTester(s"CREATE USER $testUsername WITH PASSWORD '$testPassword'") } it should "create admin user query" in new AuthEnv { - createAdminQuery(testUsername, testPassword).toString() shouldEqual + createAdminQuery(testUsername, testPassword).mkUrl shouldEqual queryTesterAuth( s"CREATE USER $testUsername WITH PASSWORD '$testPassword' WITH ALL PRIVILEGES" )(credentials.get) @@ -68,88 +69,88 @@ class UserManagementQuerySpec extends FlatSpec with Matchers with UserManagement } it should "create admin user query without auth" in new NonAuthEnv { - createAdminQuery(testUsername, testPassword).toString() shouldEqual + createAdminQuery(testUsername, testPassword).mkUrl shouldEqual queryTester(s"CREATE USER $testUsername WITH PASSWORD '$testPassword' WITH ALL PRIVILEGES") } it should "drop user query" in new AuthEnv { - dropUserQuery(testUsername).toString() shouldEqual + dropUserQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"DROP USER $testUsername")(credentials.get) } it should "drop user query without auth" in new NonAuthEnv { - dropUserQuery(testUsername).toString() shouldEqual + dropUserQuery(testUsername).mkUrl shouldEqual queryTester(s"DROP USER $testUsername") } it should "show users query" in new AuthEnv { - showUsersQuery.toString() shouldEqual queryTesterAuth("SHOW USERS")(credentials.get) + showUsersQuery.mkUrl shouldEqual queryTesterAuth("SHOW USERS")(credentials.get) } it should "show users query without auth" in new NonAuthEnv { - showUsersQuery.toString() shouldEqual queryTester("SHOW USERS") + showUsersQuery.mkUrl shouldEqual queryTester("SHOW USERS") } it should "show user privileges query" in new AuthEnv { - showUserPrivilegesQuery(testUsername).toString() shouldEqual + showUserPrivilegesQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"SHOW GRANTS FOR $testUsername")(credentials.get) } it should "show user privileges query without auth" in new NonAuthEnv { - showUserPrivilegesQuery(testUsername).toString() shouldEqual queryTester( + showUserPrivilegesQuery(testUsername).mkUrl shouldEqual queryTester( s"SHOW GRANTS FOR $testUsername" ) } it should "make admin query" in new AuthEnv { - makeAdminQuery(testUsername).toString() shouldEqual + makeAdminQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"GRANT ALL PRIVILEGES TO $testUsername")(credentials.get) } it should "make admin query without auth" in new NonAuthEnv { - makeAdminQuery(testUsername).toString() shouldEqual queryTester( + makeAdminQuery(testUsername).mkUrl shouldEqual queryTester( s"GRANT ALL PRIVILEGES TO $testUsername" ) } it should "disable admin query" in new AuthEnv { - disableAdminQuery(testUsername).toString() shouldEqual + disableAdminQuery(testUsername).mkUrl shouldEqual queryTesterAuth(s"REVOKE ALL PRIVILEGES FROM $testUsername")(credentials.get) } it should "disable admin query without auth" in new NonAuthEnv { - disableAdminQuery(testUsername).toString() shouldEqual queryTester( + disableAdminQuery(testUsername).mkUrl shouldEqual queryTester( s"REVOKE ALL PRIVILEGES FROM $testUsername" ) } it should "set user password query" in new AuthEnv { - setUserPasswordQuery(testUsername, testPassword).toString() shouldEqual + setUserPasswordQuery(testUsername, testPassword).mkUrl shouldEqual queryTesterAuth(s"SET PASSWORD FOR $testUsername = '$testPassword'")(credentials.get) } it should "set user password query without auth" in new NonAuthEnv { - setUserPasswordQuery(testUsername, testPassword).toString() shouldEqual + setUserPasswordQuery(testUsername, testPassword).mkUrl shouldEqual queryTester(s"SET PASSWORD FOR $testUsername = '$testPassword'") } it should "set privileges query" in new AuthEnv { - setPrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + setPrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTesterAuth(s"GRANT $testPrivilege ON $testDatabase TO $testUsername")(credentials.get) } it should "set privileges query without auth" in new NonAuthEnv { - setPrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + setPrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTester(s"GRANT $testPrivilege ON $testDatabase TO $testUsername") } it should "revoke privileges query" in new AuthEnv { - revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTesterAuth(s"REVOKE $testPrivilege ON $testDatabase FROM $testUsername")(credentials.get) } it should "revoke privileges query without auth" in new NonAuthEnv { - revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).toString() shouldEqual + revokePrivilegesQuery(testDatabase, testUsername, testPrivilege).mkUrl shouldEqual queryTester(s"REVOKE $testPrivilege ON $testDatabase FROM $testUsername") } } diff --git a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala index e8311bbf..213e188b 100644 --- a/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala +++ b/modules/ahc/management/src/test/scala/com/github/fsanaulla/chronicler/ahc/management/package.scala @@ -29,13 +29,13 @@ package object management { s"http://localhost:8086/query?u=${credentials.username.encode}&p=${credentials.password.encode}&q=${query.encode}" def queryTesterAuth(db: String, query: String)(credentials: InfluxCredentials): String = - s"http://localhost:8086/query?db=${db.encode}&u=${credentials.username.encode}&p=${credentials.password.encode}&q=${query.encode}" + s"http://localhost:8086/query?db=${db}&u=${credentials.username.encode}&p=${credentials.password.encode}&q=${query.encode}" def queryTester(query: String): String = s"http://localhost:8086/query?q=${query.encode}" def queryTester(db: String, query: String): String = - s"http://localhost:8086/query?db=${db.encode}&q=${query.encode}" + s"http://localhost:8086/query?db=${db}&q=${query.encode}" def queryTester(path: String, queryPrms: List[(String, String)]): String = { val s = queryPrms diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala index ad4d0149..58279e91 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/InfluxAhcClient.scala @@ -16,15 +16,28 @@ package com.github.fsanaulla.chronicler.ahc.shared -import com.softwaremill.sttp.SttpBackend -import com.softwaremill.sttp.asynchttpclient.future.AsyncHttpClientFutureBackend -import org.asynchttpclient.AsyncHttpClientConfig - -import scala.concurrent.Future +import org.asynchttpclient.{ + AsyncHttpClientConfig, + DefaultAsyncHttpClient, + DefaultAsyncHttpClientConfig +} class InfluxAhcClient(asyncClientConfig: Option[AsyncHttpClientConfig]) { self: AutoCloseable => - private[ahc] implicit val backend: SttpBackend[Future, Nothing] = - asyncClientConfig.fold(AsyncHttpClientFutureBackend())(AsyncHttpClientFutureBackend.usingConfig) + private[this] val config = { + val default = { + val b = new DefaultAsyncHttpClientConfig.Builder() + b.setCompressionEnforced(false) + b.build() + } + + asyncClientConfig.getOrElse(default) + } + + private[ahc] val schema: String = + if (config.isUseOpenSsl) "https" else "http" + + private[ahc] implicit val client: DefaultAsyncHttpClient = + new DefaultAsyncHttpClient(config) - override def close(): Unit = backend.close() + override def close(): Unit = client.close() } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/Uri.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/Uri.scala new file mode 100644 index 00000000..96b41412 --- /dev/null +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/Uri.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2017-2019 Faiaz Sanaulla + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.fsanaulla.chronicler.ahc.shared + +import java.net.URLEncoder + +import org.asynchttpclient.Param + +/*** + * Syntetic container for request information + * + * @param schema - request schema + * @param host - request api address + * @param port - request port + * @param query - query path + * @param params - query params + * + * @since 0.6.0 + */ +final case class Uri( + schema: String, + host: String, + port: Int, + query: String, + params: List[Param] = Nil) { + + /** Append query parameter */ + def addParam(param: Param): Uri = + this.copy(params = param :: params) + + /** Create string based representation */ + def mkUrl: String = { + val encode: String => String = + p => URLEncoder.encode(p, "UTF-8") + + val queryParams = params.reverse + .map(p => p.getName + "=" + encode(p.getValue)) + .mkString("&") + + schema + "://" + host + ":" + port + query + "?" + queryParams + } +} diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/alias/package.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/alias/package.scala deleted file mode 100644 index ccdbbf2b..00000000 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/alias/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2017-2019 Faiaz Sanaulla - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.github.fsanaulla.chronicler.ahc.shared - -import com.softwaremill.sttp.{Id, RequestT} -import org.typelevel.jawn.ast.JValue - -package object alias { - type Request = RequestT[Id, JValue, Nothing] -} diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala index 69315c10..8d47fa10 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandler.scala @@ -16,42 +16,45 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers -import java.nio.ByteBuffer import java.nio.charset.{Charset, StandardCharsets} import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id} import com.github.fsanaulla.chronicler.core.components.JsonHandler -import com.github.fsanaulla.chronicler.core.either.EitherOps import com.github.fsanaulla.chronicler.core.encoding.encodingFromContentType import com.github.fsanaulla.chronicler.core.implicits._ import com.github.fsanaulla.chronicler.core.jawn.RichJParser -import com.softwaremill.sttp.Response +import org.asynchttpclient.Response import org.typelevel.jawn.ast.{JParser, JValue} -private[ahc] final class AhcJsonHandler(compress: Boolean) - extends JsonHandler[Id, Response[Array[Byte]]] { +import scala.collection.JavaConverters._ - def responseBody(response: Response[Array[Byte]]): ErrorOr[JValue] = { - val ethBts = response.rawErrorBody -// val maybeDecompressed = if (compress) ethBts.mapRight(gzip.decompress) else ethBts +private[ahc] final class AhcJsonHandler(compress: Boolean) extends JsonHandler[Id, Response] { - val encoding: Charset = response.contentType + /*** + * Extract response body + * + * @see - [https://groups.google.com/forum/#!searchin/asynchttpclient/compression%7Csort:date/asynchttpclient/TAq33OWXeKU/sBm3v4EWAwAJ], + * netty automatically decompress gzipped request + */ + def responseBody(response: Response): ErrorOr[JValue] = { + val bodyBts = response.getResponseBodyAsBytes + val encoding: Charset = Option(response.getContentType) .flatMap(encodingFromContentType) .map(Charset.forName) .getOrElse(StandardCharsets.UTF_8) - ethBts - .mapRight(new String(_, encoding)) - .mapRight(JParser.parseFromStringOrNull) - .flatMapLeft { bt => - val btBuff = ByteBuffer.wrap(bt) - JParser.parseFromByteBufferEither(ByteBuffer.wrap(bt)) - } + val bodyStr = new String(bodyBts, encoding) + + JParser.parseFromStringEither(bodyStr) } - def responseHeader(response: Response[Array[Byte]]): Seq[(String, String)] = - response.headers + def responseHeader(response: Response): List[(String, String)] = + response.getHeaders + .entries() + .asScala + .toList + .map(e => e.getKey -> e.getValue) - def responseCode(response: Response[Array[Byte]]): Int = - response.code + def responseCode(response: Response): Int = + response.getStatusCode } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala index f70aab50..8a4f8e1d 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilder.scala @@ -16,38 +16,35 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.core.components.QueryBuilder import com.github.fsanaulla.chronicler.core.model.InfluxCredentials -import com.softwaremill.sttp.Uri.QueryFragment -import com.softwaremill.sttp.Uri.QueryFragment.KeyValue -import com.softwaremill.sttp._ +import org.asynchttpclient.Param import scala.annotation.tailrec private[ahc] class AhcQueryBuilder( + schema: String, host: String, port: Int, credentials: Option[InfluxCredentials]) extends QueryBuilder[Uri](credentials) { - override def buildQuery(url: String): Uri = - Uri(host = host, port).path(url) + override def buildQuery(query: String): Uri = + Uri(schema, host, port, query) - override def buildQuery(uri: String, queryParams: List[(String, String)]): Uri = { - val u = Uri(host = host, port).path(uri) - val encoding = Uri.QueryFragmentEncoding.All - val kvLst = queryParams.map { - case (k, v) => KeyValue(k, v, valueEncoding = encoding) - } + override def buildQuery(query: String, queryParams: List[(String, String)]): Uri = { + val u = buildQuery(query) + val params = queryParams.map { case (k, v) => new Param(k, v) } @tailrec - def addQueryParam(u: Uri, lst: Seq[QueryFragment]): Uri = { + def addQueryParam(u: Uri, lst: List[Param]): Uri = { lst match { case Nil => u - case h :: tail => addQueryParam(u.queryFragment(h), tail) + case h :: tail => addQueryParam(u.addParam(h), tail) } } - addQueryParam(u, kvLst) + addQueryParam(u, params) } } diff --git a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala index 1b27d07a..ff5e1eda 100644 --- a/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala +++ b/modules/ahc/shared/src/main/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcRequestExecutor.scala @@ -16,22 +16,17 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers +import com.github.fsanaulla.chronicler.ahc.shared.Uri import com.github.fsanaulla.chronicler.core.components.RequestExecutor import com.github.fsanaulla.chronicler.core.gzip -import com.softwaremill.sttp.{ - asByteArray, - emptyRequest, - HeaderNames, - MediaTypes, - Response, - SttpBackend, - Uri -} +import io.netty.handler.codec.http.HttpHeaderValues.GZIP_DEFLATE +import org.asynchttpclient.{AsyncHttpClient, Response} +import scala.compat.java8.FutureConverters._ import scala.concurrent.Future -private[ahc] final class AhcRequestExecutor()(implicit backend: SttpBackend[Future, Nothing]) - extends RequestExecutor[Future, Response[Array[Byte]], Uri, String] { +private[ahc] final class AhcRequestExecutor()(implicit client: AsyncHttpClient) + extends RequestExecutor[Future, Response, Uri, String] { /** * Execute uri @@ -39,36 +34,33 @@ private[ahc] final class AhcRequestExecutor()(implicit backend: SttpBackend[Futu * @param uri - request uri * @return - Return wrapper response */ - override def get(uri: Uri, compress: Boolean): Future[Response[Array[Byte]]] = { - val request = emptyRequest.get(uri) - - // currently not supported -// val maybeEncoded = -// if (compress) request.acceptEncoding("gzip") -// else request + override def get(uri: Uri, compress: Boolean): Future[Response] = { + val req = client.prepareGet(uri.mkUrl) + val maybeCompressed = if (compress) req.setHeader("Accept-Encoding", GZIP_DEFLATE) else req - request - .response(asByteArray) - .send() + maybeCompressed.execute.toCompletableFuture.toScala } override def post( uri: Uri, body: String, compress: Boolean - ): Future[Response[Array[Byte]]] = { - val req = emptyRequest.post(uri).response(asByteArray) + ): Future[Response] = { + val req = client.preparePost(uri.mkUrl) val maybeEncoded = if (compress) { val (length, data) = gzip.compress(body.getBytes()) // it fails with input stream, using byte array instead req - .body(data) - .header(HeaderNames.ContentEncoding, "gzip", replaceExisting = true) - .contentType(MediaTypes.Binary) - .contentLength(length) - } else req.body(body) + .setBody(data) + .setHeader("Content-Encoding", "gzip") + .setHeader("Content-Type", "application/octet-stream") + .setHeader("Content-Length", length) + } else req.setBody(body.getBytes()) - maybeEncoded.send() + maybeEncoded + .execute() + .toCompletableFuture + .toScala } /** @@ -76,10 +68,11 @@ private[ahc] final class AhcRequestExecutor()(implicit backend: SttpBackend[Futu * * @param uri - request uri */ - override def post(uri: Uri): Future[Response[Array[Byte]]] = { - emptyRequest - .post(uri) - .response(asByteArray) - .send() + override def post(uri: Uri): Future[Response] = { + client + .preparePost(uri.mkUrl) + .execute() + .toCompletableFuture + .toScala } } diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala index 43e7e06a..af65c246 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcJsonHandlerSpec.scala @@ -16,7 +16,12 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers -import com.softwaremill.sttp.Response +import java.nio.ByteBuffer + +import io.netty.buffer.Unpooled +import io.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion} +import org.asynchttpclient.Response +import org.asynchttpclient.netty.{EagerResponseBodyPart, NettyResponseStatus} import org.scalatest.concurrent.ScalaFutures import org.scalatest.{Matchers, OptionValues, WordSpec} import org.typelevel.jawn.ast._ @@ -30,6 +35,30 @@ class AhcJsonHandlerSpec extends WordSpec with Matchers with ScalaFutures with O val jsonHandler = new AhcJsonHandler(compress = false) + def buildResponse(bts: Array[Byte]): Response = { + val b = new Response.ResponseBuilder() + + b.accumulate( + new EagerResponseBodyPart( + Unpooled.copiedBuffer(ByteBuffer.wrap(bts)), + true + ) + ) + + b.accumulate( + new NettyResponseStatus( + null, + new DefaultHttpResponse( + HttpVersion.HTTP_1_0, + io.netty.handler.codec.http.HttpResponseStatus.OK + ), + null + ) + ) + + b.build + } + "JsonHandler" should { "extract" should { "body from HTTP response" in { @@ -65,7 +94,7 @@ class AhcJsonHandlerSpec extends WordSpec with Matchers with ScalaFutures with O | ] | }""".stripMargin - val resp: Response[Array[Byte]] = Response.ok(singleStrJson.getBytes()) + val resp = buildResponse(singleStrJson.getBytes()) val result: JValue = JParser.parseFromString(singleStrJson).get diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala index 487c5b07..d4e3f5a1 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcQueryBuilderSpec.scala @@ -23,7 +23,7 @@ class AhcQueryBuilderSpec extends FlatSpec with Matchers { val host = "localhost" val port = 8080 - val qb = new AhcQueryBuilder(host, port, None) + val qb = new AhcQueryBuilder("http", host, port, None) it should "properly generate URI" in { val queryMap = List( @@ -31,7 +31,7 @@ class AhcQueryBuilderSpec extends FlatSpec with Matchers { ) val res = s"http://$host:$port/query?q=FirstQuery%3BSecondQuery" - qb.buildQuery("/query", qb.appendCredentials(queryMap)).toString() shouldEqual res + qb.buildQuery("/query", qb.appendCredentials(queryMap)).mkUrl shouldEqual res } } diff --git a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala index b9bc1816..731acec3 100644 --- a/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala +++ b/modules/ahc/shared/src/test/scala/com/github/fsanaulla/chronicler/ahc/shared/handlers/AhcResponseHandlerSpec.scala @@ -16,10 +16,15 @@ package com.github.fsanaulla.chronicler.ahc.shared.handlers +import java.nio.ByteBuffer + import com.github.fsanaulla.chronicler.core.components.ResponseHandler import com.github.fsanaulla.chronicler.core.implicits._ import com.github.fsanaulla.chronicler.core.model.ContinuousQuery -import com.softwaremill.sttp.Response +import io.netty.buffer.Unpooled +import io.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion} +import org.asynchttpclient.Response +import org.asynchttpclient.netty.{EagerResponseBodyPart, NettyResponseStatus} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Second, Seconds, Span} import org.scalatest.{FlatSpec, Matchers} @@ -47,8 +52,29 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { val rh = new ResponseHandler(jsonHandler) - implicit def str2resp(str: String): Response[JValue] = - Response.ok(p.parseFromString(str).get) + def buildResponse(bts: Array[Byte]): Response = { + val b = new Response.ResponseBuilder() + + b.accumulate( + new EagerResponseBodyPart( + Unpooled.copiedBuffer(ByteBuffer.wrap(bts)), + true + ) + ) + + b.accumulate( + new NettyResponseStatus( + null, + new DefaultHttpResponse( + HttpVersion.HTTP_1_0, + io.netty.handler.codec.http.HttpResponseStatus.OK + ), + null + ) + ) + + b.build + } it should "extract single query result from response" in { @@ -92,7 +118,7 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { JArray(Array(JString("2015-06-11T20:46:02Z"), JNum(0.64))) ) - rh.queryResultJson(Response.ok(singleResponse)).right.get shouldEqual result + rh.queryResultJson(buildResponse(singleResponse)).right.get shouldEqual result } it should "extract bulk query results from response" in { @@ -149,7 +175,7 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { |} """.stripMargin.getBytes() - rh.bulkQueryResultJson(Response.ok(bulkResponse)).right.get shouldEqual Array( + rh.bulkQueryResultJson(buildResponse(bulkResponse)).right.get shouldEqual Array( Array( JArray(Array(JString("2015-01-29T21:55:43.702900257Z"), JNum(2))), JArray(Array(JString("2015-01-29T21:55:43.702900257Z"), JNum(0.55))), @@ -216,7 +242,8 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { } """.getBytes() - val cqi = rh.toCqQueryResult(Response.ok(cqStrJson)).right.get.filter(_.queries.nonEmpty).head + val cqi = + rh.toCqQueryResult(buildResponse(cqStrJson)).right.get.filter(_.queries.nonEmpty).head cqi.dbName shouldEqual "mydb" cqi.queries.head shouldEqual ContinuousQuery( "cq", @@ -238,7 +265,7 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { |} """.stripMargin.getBytes() - jsonHandler.responseErrorMsgOpt(Response.ok(errorResponse)).right.get shouldEqual Some( + jsonHandler.responseErrorMsgOpt(buildResponse(errorResponse)).right.get shouldEqual Some( "user not found" ) } @@ -248,6 +275,9 @@ class AhcResponseHandlerSpec extends FlatSpec with Matchers with ScalaFutures { val errorResponse = """ { "error": "user not found" } """.getBytes() - jsonHandler.responseErrorMsg(Response.ok(errorResponse)).right.get shouldEqual "user not found" + jsonHandler + .responseErrorMsg(buildResponse(errorResponse)) + .right + .get shouldEqual "user not found" } } diff --git a/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala b/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala index 0954b3d6..4d3f5118 100644 --- a/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala +++ b/modules/core/io/src/main/scala/com/github/fsanaulla/chronicler/core/api/DatabaseApi.scala @@ -33,6 +33,8 @@ import org.typelevel.jawn.ast.JArray * @tparam R - HTTP response * @tparam U - HTTP URi * @tparam E - Entity + * + * @since Big Bang */ class DatabaseApi[F[_], G[_], R, U, E]( dbName: String, @@ -130,7 +132,6 @@ class DatabaseApi[F[_], G[_], R, U, E]( pretty: Boolean = false ): F[ErrorOr[Array[(Array[String], JArray)]]] = { val uri = singleQuery(dbName, query, epoch, pretty) - F.flatMap(re.get(uri, compress))(resp => FK(rh.groupedResultJson(resp))) } } diff --git a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala index 59818d1f..64ccefc2 100644 --- a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala +++ b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/components/ResponseHandler.scala @@ -72,7 +72,7 @@ class ResponseHandler[G[_], R]( * @param response - backend response value * @return - Query result of JArray in future container */ - final def queryResultJson(response: R): G[ErrorOr[Array[JArray]]] = { + final def queryResultJson(response: R): G[ErrorOr[Array[JArray]]] = jsonHandler.responseCode(response).intValue() match { case code if isSuccessful(code) => F.map(jsonHandler.responseBody(response)) { body => @@ -86,7 +86,6 @@ class ResponseHandler[G[_], R]( case _ => F.map(errorHandler(response))(Left(_)) } - } /** * Handling HTTP response with GROUP BY clause in the query diff --git a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala index 6110941e..add39e2a 100644 --- a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala +++ b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/either/package.scala @@ -28,7 +28,7 @@ package object either { } yield x +: xs } - def seq[L, R: ClassTag](s: Seq[Either[L, R]]): Either[L, Seq[R]] = + def seq[L, R](s: Seq[Either[L, R]]): Either[L, Seq[R]] = s.foldRight(Right(Seq.empty): Either[L, Seq[R]]) { (e, acc) => for { xs <- acc.right diff --git a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala index b38eb05b..77978634 100644 --- a/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala +++ b/modules/core/shared/src/main/scala/com/github/fsanaulla/chronicler/core/query/DatabaseOperationQuery.scala @@ -40,14 +40,13 @@ trait DatabaseOperationQuery[U] { if (epoch.isNone) queryParams else ("epoch" -> epoch.toString) :: queryParams + // format: off private[chronicler] final def write( dbName: String, consistency: Consistency, precision: Precision, retentionPolicy: Option[String] - )(implicit qb: QueryBuilder[U] - ): U = { - + )(implicit qb: QueryBuilder[U]): U = { val queryParams = Nil val withRP = @@ -70,9 +69,7 @@ trait DatabaseOperationQuery[U] { query: String, epoch: Epoch, pretty: Boolean - )(implicit qb: QueryBuilder[U] - ): U = { - + )(implicit qb: QueryBuilder[U]): U = { val queryParams = List("q" -> query) val withEpoch = addEpochQueryParam(epoch, queryParams) val withPretty = addPrettyQueryParam(pretty, withEpoch) @@ -86,8 +83,7 @@ trait DatabaseOperationQuery[U] { epoch: Epoch, pretty: Boolean, chunkSize: Int - )(implicit qb: QueryBuilder[U] - ): U = { + )(implicit qb: QueryBuilder[U]): U = { val queryParams = List( "db" -> dbName, @@ -107,8 +103,7 @@ trait DatabaseOperationQuery[U] { queries: Seq[String], epoch: Epoch, pretty: Boolean - )(implicit qb: QueryBuilder[U] - ): U = { + )(implicit qb: QueryBuilder[U]): U = { val queryParams = List("q" -> queries.mkString(";")) val withEpoch = addEpochQueryParam(epoch, queryParams) val withPretty = addPrettyQueryParam(pretty, withEpoch) diff --git a/project/Library.scala b/project/Library.scala index 56bdadb4..6c41343f 100644 --- a/project/Library.scala +++ b/project/Library.scala @@ -8,8 +8,6 @@ import sbt._ object Library { object Versions { - val sttp = "1.6.7" - val netty = "4.1.42.Final" val request = "0.2.0" object Akka { @@ -24,15 +22,14 @@ object Library { } } - val sttp = "com.softwaremill.sttp" %% "core" % Versions.sttp - val scalaTest = "org.scalatest" %% "scalatest" % Versions.Testing.scalaTest - val scalaCheck = "org.scalacheck" %% "scalacheck" % Versions.Testing.scalaCheck - val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % Versions.Akka.akka + val scalaTest = "org.scalatest" %% "scalatest" % Versions.Testing.scalaTest + val scalaCheck = "org.scalacheck" %% "scalacheck" % Versions.Testing.scalaCheck + val akkaTestKit = "com.typesafe.akka" %% "akka-testkit" % Versions.Akka.akka def macroDeps(scalaVersion: String): List[ModuleID] = - List( - "org.scala-lang" % "scala-reflect" % scalaVersion - ) ::: List(scalaTest, scalaCheck).map(_ % Scope.test) + "org.scala-lang" % "scala-reflect" % scalaVersion :: List(scalaTest, scalaCheck).map( + _ % Scope.test + ) // testing val testingDeps: List[ModuleID] = List( @@ -49,18 +46,19 @@ object Library { ) ::: List(scalaTest, scalaCheck).map(_ % Scope.test) // akka-http + // format: off val akkaDep: List[ModuleID] = List( - "com.typesafe.akka" %% "akka-stream" % Versions.Akka.akka exclude ("com.typesafe", "config"), - "com.typesafe" % "config" % "1.3.4", - "com.typesafe.akka" %% "akka-http" % Versions.Akka.akkaHttp, - "com.typesafe.akka" %% "akka-testkit" % Versions.Akka.akka % Scope.test + "com.typesafe.akka" %% "akka-stream" % Versions.Akka.akka exclude ("com.typesafe", "config"), + "com.typesafe" % "config" % "1.3.4", + "com.typesafe.akka" %% "akka-http" % Versions.Akka.akkaHttp, + akkaTestKit % Scope.test ) + // format: on // async-http val asyncDeps: List[ModuleID] = List( - "io.netty" % "netty-handler" % Versions.netty, - "com.softwaremill.sttp" %% "async-http-client-backend-future" % Versions.sttp exclude ("io.netty", "netty-handler") exclude ("org.reactivestreams", "reactive-streams"), - "org.reactivestreams" % "reactive-streams" % "1.0.3" + "org.asynchttpclient" % "async-http-client" % "2.10.1", + "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0" ) // looks like a shit, but need to keep it until spark on 2.12 will become stable diff --git a/version.sbt b/version.sbt index 9aefe74c..0512f161 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.5.6" +version in ThisBuild := "0.6.0"