Skip to content

Commit

Permalink
Merge pull request #189 from fsanaulla/release-0.6.0
Browse files Browse the repository at this point in the history
[RELEASE][0.6.0] - simplifying AHC backend
  • Loading branch information
fsanaulla authored Sep 29, 2019
2 parents 37d8a51 + 3016108 commit e086d83
Show file tree
Hide file tree
Showing 32 changed files with 435 additions and 340 deletions.
5 changes: 5 additions & 0 deletions changelog/0.6.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Including issues:
- fsanaulla/chronicler#178 Moving away from sttp to pure AHC backend

Contributors:
- @fsanaulla
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
package com.github.fsanaulla.chronicler.example.udp

import com.github.fsanaulla.chronicler.core.model.{InfluxFormatter, Point}
import com.github.fsanaulla.chronicler.macros.Influx
import com.github.fsanaulla.chronicler.macros.annotations.{field, tag}
import com.github.fsanaulla.chronicler.macros.auto._
import com.github.fsanaulla.chronicler.udp.InfluxUdp

import scala.util.Try

object Main {

def main(args: Array[String]): Unit = {
final case class Test(@tag name: String, @field age: Int)

// generate formatter at compile-time
implicit val fmt: InfluxFormatter[Test] = Influx.formatter[Test]

val t = Test("f", 1)
val host = args.headOption.getOrElse("localhost")
val t = Test("f", 1)
val host = args.headOption.getOrElse("localhost")
val influx = InfluxUdp(host)

for {
// write record to Influx
_ <- influx.write("cpu", t)
// close client
_ <- influx.close()
_ <- Try(influx.close())
} yield println("Stored!")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import java.nio.file.Paths

import com.github.fsanaulla.chronicler.ahc.io.InfluxIO
import com.github.fsanaulla.chronicler.ahc.management.InfluxMng
import com.github.fsanaulla.chronicler.ahc.shared.Uri
import com.github.fsanaulla.chronicler.core.alias.Id
import com.github.fsanaulla.chronicler.core.api.DatabaseApi
import com.github.fsanaulla.chronicler.testing.it.DockerizedInfluxDB
import com.softwaremill.sttp.{Response, Uri}
import org.asynchttpclient.Response
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.{FlatSpec, Matchers}

Expand All @@ -28,9 +29,9 @@ class CompressionSpec
InfluxMng(host, port, Some(creds), None)

lazy val io =
InfluxIO(host, port, Some(creds) /*, compress = true*/ )
InfluxIO(host, port, Some(creds), compress = true)

lazy val db: DatabaseApi[Future, Id, Response[Array[Byte]], Uri, String] =
lazy val db: DatabaseApi[Future, Id, Response, Uri, String] =
io.database(testDB)

it should "ping database" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,28 @@ class DatabaseApiSpec extends FlatSpec with Matchers with Futures with Dockerize
}

it should "retrieve multiple request" in {
db.readJson("SELECT * FROM test2").futureValue.right.get.length shouldEqual 3

db.readJson("SELECT * FROM test2 WHERE age < 40").futureValue.right.get.length shouldEqual 1

val multiQuery = db
.bulkReadJson(
Array(
"SELECT * FROM test2",
"SELECT * FROM test2 WHERE age < 40"
)
)
.bulkReadJson(Seq("SELECT * FROM test2", "SELECT * FROM test2 WHERE age < 40"))
.futureValue
.right
.get

multiQuery.right.get.length shouldEqual 2
multiQuery.right.get shouldBe a[Array[_]]
multiQuery.length shouldEqual 2
multiQuery shouldBe a[Array[_]]

multiQuery.right.get.head.length shouldEqual 3
multiQuery.right.get.head shouldBe a[Array[_]]
multiQuery.right.get.head.head shouldBe a[JArray]
multiQuery.head.length shouldEqual 3
multiQuery.head shouldBe a[Array[_]]
multiQuery.head.head shouldBe a[JArray]

multiQuery.right.get.last.length shouldEqual 1
multiQuery.right.get.last shouldBe a[Array[_]]
multiQuery.right.get.last.head shouldBe a[JArray]
multiQuery.last.length shouldEqual 1
multiQuery.last shouldBe a[Array[_]]
multiQuery.last.head shouldBe a[JArray]

multiQuery.right.get
multiQuery
.map(_.map(_.arrayValue.right.get.tail)) shouldEqual largeMultiJsonEntity.map(
_.map(_.arrayValue.right.get.tail)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,20 @@

package com.github.fsanaulla.chronicler.ahc.io

import com.github.fsanaulla.chronicler.ahc.shared.InfluxAhcClient
import com.github.fsanaulla.chronicler.ahc.shared.handlers.{
AhcJsonHandler,
AhcQueryBuilder,
AhcRequestExecutor
}
import com.github.fsanaulla.chronicler.ahc.shared.implicits._
import com.github.fsanaulla.chronicler.ahc.shared.implicits.{fkId, futureFailable, futureFunctor}
import com.github.fsanaulla.chronicler.ahc.shared.{InfluxAhcClient, Uri}
import com.github.fsanaulla.chronicler.core.IOClient
import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id}
import com.github.fsanaulla.chronicler.core.api.{DatabaseApi, MeasurementApi}
import com.github.fsanaulla.chronicler.core.components.ResponseHandler
import com.github.fsanaulla.chronicler.core.implicits.{applyId, functorId}
import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxDBInfo}
import com.softwaremill.sttp.{Response, Uri}
import org.asynchttpclient.AsyncHttpClientConfig
import org.asynchttpclient.{AsyncHttpClientConfig, Response}

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag
Expand All @@ -43,12 +42,12 @@ final class AhcIOClient(
asyncClientConfig: Option[AsyncHttpClientConfig]
)(implicit ex: ExecutionContext)
extends InfluxAhcClient(asyncClientConfig)
with IOClient[Future, Id, Response[Array[Byte]], Uri, String] {
with IOClient[Future, Id, Response, Uri, String] {

val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress)
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials)
implicit val re: AhcRequestExecutor = new AhcRequestExecutor
implicit val rh: ResponseHandler[Id, Response[Array[Byte]]] = new ResponseHandler(jsonHandler)
val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress)
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials)
implicit val re: AhcRequestExecutor = new AhcRequestExecutor
implicit val rh: ResponseHandler[Id, Response] = new ResponseHandler(jsonHandler)

override def database(dbName: String) =
new DatabaseApi(dbName, compress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object InfluxIO {
* @param credentials - user credentials
* @param compress - enable gzip compression
* @param asyncClientConfig - custom configuration
* @param ex - implicit execution context, by default use standard one
* @param ec - implicit execution context, by default use standard one
* @return - [[AhcIOClient]]
*/
def apply(
Expand All @@ -41,7 +41,7 @@ object InfluxIO {
credentials: Option[InfluxCredentials] = None,
compress: Boolean = false,
asyncClientConfig: Option[AsyncHttpClientConfig] = None
)(implicit ex: ExecutionContext
)(implicit ec: ExecutionContext
): AhcIOClient =
new AhcIOClient(host, port, compress, credentials, asyncClientConfig)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.github.fsanaulla.chronicler.ahc.io.unit

import com.github.fsanaulla.chronicler.ahc.shared.Uri
import com.github.fsanaulla.chronicler.ahc.shared.handlers.AhcQueryBuilder
import com.github.fsanaulla.chronicler.core.enums.{Consistencies, Epochs, Precisions}
import com.github.fsanaulla.chronicler.core.model.InfluxCredentials
import com.github.fsanaulla.chronicler.core.query.DatabaseOperationQuery
import com.softwaremill.sttp.Uri
import org.scalatest.{FlatSpec, Matchers}

import scala.language.implicitConversions
Expand All @@ -31,32 +31,30 @@ import scala.language.implicitConversions
* Date: 27.07.17
*/
class DatabaseApiOperationQuerySpec
extends FlatSpec
with Matchers
with DatabaseOperationQuery[Uri] {
extends FlatSpec
with Matchers
with DatabaseOperationQuery[Uri] {

trait Env {
val host = "localhost"
val port = 8086
val schema = "http"
val host = "localhost"
val port = 8086
}

trait AuthEnv extends Env {
val credentials = Some(InfluxCredentials("admin", "admin"))
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials)
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials)
}

trait NonAuthEnv extends Env {
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, None)
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, None)
}

val testDB = "db"
val testQuery = "SELECT * FROM test"

implicit def a2Opt[A](a: A): Option[A] = Some(a)

it should "return correct write query" in new AuthEnv {
write(testDB, Consistencies.One, Precisions.Nanoseconds, None)
.toString() shouldEqual queryTester(
write(testDB, Consistencies.One, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester(
"/write",
List(
"db" -> testDB,
Expand All @@ -67,8 +65,7 @@ class DatabaseApiOperationQuerySpec
)
)

write(testDB, Consistencies.All, Precisions.Nanoseconds, None)
.toString() shouldEqual queryTester(
write(testDB, Consistencies.All, Precisions.Nanoseconds, None).mkUrl shouldEqual queryTester(
"/write",
List(
"db" -> testDB,
Expand All @@ -81,23 +78,23 @@ class DatabaseApiOperationQuerySpec
}

it should "return correct write query without auth " in new NonAuthEnv {
write(testDB, Consistencies.One, Precisions.Nanoseconds, None).toString() shouldEqual
write(testDB, Consistencies.One, Precisions.Nanoseconds, None).mkUrl shouldEqual
queryTester("/write", List("db" -> testDB, "consistency" -> "one", "precision" -> "ns"))

write(testDB, Consistencies.One, Precisions.Microseconds, None).toString() shouldEqual
write(testDB, Consistencies.One, Precisions.Microseconds, None).mkUrl shouldEqual
queryTester("/write", List("db" -> testDB, "consistency" -> "one", "precision" -> "u"))
}

it should "return correct single read query" in new AuthEnv {

val queryPrms = List(
val queryPrms: List[(String, String)] = List(
"db" -> testDB,
"u" -> credentials.get.username,
"p" -> credentials.get.password,
"epoch" -> "ns",
"q" -> "SELECT * FROM test"
)
singleQuery(testDB, testQuery, Epochs.Nanoseconds, pretty = false).toString() shouldEqual
singleQuery(testDB, testQuery, Epochs.Nanoseconds, pretty = false).mkUrl shouldEqual
queryTester("/query", queryPrms)
}

Expand All @@ -116,7 +113,7 @@ class DatabaseApiOperationQuerySpec
Seq("SELECT * FROM test", "SELECT * FROM test1"),
Epochs.Nanoseconds,
pretty = false
).toString() shouldEqual
).mkUrl shouldEqual
queryTester("/query", queryPrms)

val queryPrms1: List[(String, String)] = List(
Expand All @@ -132,7 +129,7 @@ class DatabaseApiOperationQuerySpec
Seq("SELECT * FROM test", "SELECT * FROM test1"),
Epochs.Nanoseconds,
pretty = true
).toString() shouldEqual
).mkUrl shouldEqual
queryTester("/query", queryPrms1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ package object unit {
s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&u=${credentials.username.encode}"

def queryTesterAuth(db: String, query: String)(credentials: InfluxCredentials): String =
s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&db=${db.encode}&u=${credentials.username.encode}"
s"http://localhost:8086/query?q=${query.encode}&p=${credentials.password.encode}&db=${db}&u=${credentials.username.encode}"

def queryTester(query: String): String =
s"http://localhost:8086/query?q=${query.encode}"

def queryTester(db: String, query: String): String =
s"http://localhost:8086/query?q=${query.encode}&db=${db.encode}"
s"http://localhost:8086/query?q=${query.encode}&db=${db}"

def queryTester(path: String, mp: List[(String, String)]): String = {
val queries = mp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package com.github.fsanaulla.chronicler.ahc.management

import com.github.fsanaulla.chronicler.ahc.shared.InfluxAhcClient
import com.github.fsanaulla.chronicler.ahc.shared.handlers.{
AhcJsonHandler,
AhcQueryBuilder,
AhcRequestExecutor
}
import com.github.fsanaulla.chronicler.ahc.shared.{InfluxAhcClient, Uri}
import com.github.fsanaulla.chronicler.core.ManagementClient
import com.github.fsanaulla.chronicler.core.alias.{ErrorOr, Id}
import com.github.fsanaulla.chronicler.core.components.ResponseHandler
Expand All @@ -32,8 +32,7 @@ import com.github.fsanaulla.chronicler.core.model.{
InfluxCredentials,
InfluxDBInfo
}
import com.softwaremill.sttp.{Response, Uri}
import org.asynchttpclient.AsyncHttpClientConfig
import org.asynchttpclient.{AsyncHttpClientConfig, Response}

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -46,12 +45,12 @@ final class AhcManagementClient(
val F: Functor[Future],
val FK: FunctionK[Id, Future])
extends InfluxAhcClient(asyncClientConfig)
with ManagementClient[Future, Id, Response[Array[Byte]], Uri, String] {
with ManagementClient[Future, Id, Response, Uri, String] {

val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress = false)
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(host, port, credentials)
implicit val re: AhcRequestExecutor = new AhcRequestExecutor
implicit val rh: ResponseHandler[Id, Response[Array[Byte]]] = new ResponseHandler(jsonHandler)
val jsonHandler: AhcJsonHandler = new AhcJsonHandler(compress = false)
implicit val qb: AhcQueryBuilder = new AhcQueryBuilder(schema, host, port, credentials)
implicit val re: AhcRequestExecutor = new AhcRequestExecutor
implicit val rh: ResponseHandler[Id, Response] = new ResponseHandler(jsonHandler)

override def ping: Future[ErrorOr[InfluxDBInfo]] = {
re.get(qb.buildQuery("/ping"), compress = false)
Expand Down
Loading

0 comments on commit e086d83

Please sign in to comment.