Skip to content

Commit

Permalink
[CORE][ISSUE-69]: gzip support (#74)
Browse files Browse the repository at this point in the history
* [CORE][ISSUE-79]: remove mapper trait, small refactoring

* [CORE][ISSUE-69]: add another one inspiration, fixing system management spec

* [AKKA][ISSUE-69]: gzipping in akka module

* [ASYNC][ISSUE-69]: gzipping in async module

* [URL][ISSUE-69]: gzipping in urlHttp module

* [URL][ISSUE-69]: gzipping default value, sbt setting

* [AKKA][ISSUE-69]: add forgotten http method

* [ALL][ISSUE-69]: gzipped specs

* [ALL][ISSUE-69]: add client docs, update version to 0.2.7
  • Loading branch information
fsanaulla authored Jun 30, 2018
1 parent 7a35180 commit bd4c8c0
Show file tree
Hide file tree
Showing 58 changed files with 990 additions and 536 deletions.
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,19 @@ influx.close()
For more details see next section. The same example can be applied for other client. With small difference.

# Documentation
1. [Read operation](docs/read_operation_notes.md)
2. [Write operation](docs/write_operation_notes.md)
3. [Database management](docs/database_management.md)
4. [User management](docs/user_management.md)
5. [CQ management](docs/continuous_query-management.md)
6. [Subscription management](docs/subscription_management.md)
7. [RP management](docs/retention_policy_management.md)
8. [Shards management](docs/shard_management.md)
9. [Response handling](docs/response_handling.md)
10. [Macros](docs/macros.md)
11. [Utils](docs/utils.md)
1. [Clients tutorial](docs/clients.md)
2. [Read operation](docs/read_operation_notes.md)
3. [Write operation](docs/write_operation_notes.md)
4. [Database management](docs/database_management.md)
5. [User management](docs/user_management.md)
6. [CQ management](docs/continuous_query-management.md)
7. [Subscription management](docs/subscription_management.md)
8. [RP management](docs/retention_policy_management.md)
9. [Shards management](docs/shard_management.md)
10. [Response handling](docs/response_handling.md)
11. [Macros](docs/macros.md)
12. [Utils](docs/utils.md)

# Inspirations
- [scala-influxdb-client](https://github.com/paulgoldbaum/scala-influxdb-client) by [Paul Goldbaum](https://github.com/paulgoldbaum)
- [scala-influxdb-client](https://github.com/paulgoldbaum/scala-influxdb-client) by [Paul Goldbaum](https://github.com/paulgoldbaum)
- [influxdb-java](https://github.com/influxdata/influxdb-java) from [influxdata](https://github.com/influxdata)
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.github.fsanaulla.chronicler.akka

import _root_.akka.actor.ActorSystem
import _root_.akka.testkit.TestKit
import com.github.fsanaulla.chronicler.akka.SampleEntitys._
import com.github.fsanaulla.chronicler.akka.api.Database
import com.github.fsanaulla.chronicler.core.model.Point
import com.github.fsanaulla.chronicler.core.utils.Extensions.RichJValue
import com.github.fsanaulla.chronicler.testing.it.ResultMatchers._
import com.github.fsanaulla.chronicler.testing.it.{DockerizedInfluxDB, FakeEntity, Futures}
import com.github.fsanaulla.chronicler.testing.unit.FlatSpecWithMatchers
import jawn.ast.{JArray, JNum}

import scala.concurrent.ExecutionContext.Implicits.global

class GzippedDatabaseSpec
extends TestKit(ActorSystem())
with FlatSpecWithMatchers
with Futures
with DockerizedInfluxDB {

val testDB = "db"

lazy val influx: InfluxAkkaHttpClient =
Influx.connect(host, port, Some(creds), system, gzipped = true)

lazy val db: Database = influx.database(testDB)

"Database API" should "write data from file" in {
influx.createDatabase(testDB).futureValue shouldEqual OkResult

db.writeFromFile(getClass.getResource("/points.txt").getPath)
.futureValue shouldEqual NoContentResult

db.readJs("SELECT * FROM test1")
.futureValue
.queryResult
.length shouldEqual 3
}

it should "write 2 points.txt represented entities" in {

val point1 = Point("test2")
.addTag("sex", "Male")
.addTag("firstName", "Martin")
.addTag("lastName", "Odersky")
.addField("age", 54)

val point2 = Point("test2")
.addTag("sex", "Male")
.addTag("firstName", "Jame")
.addTag("lastName", "Franko")
.addField("age", 36)

db.writePoint(point1).futureValue shouldEqual NoContentResult

db.read[FakeEntity]("SELECT * FROM test2")
.futureValue
.queryResult shouldEqual Array(FakeEntity("Martin", "Odersky", 54))

db.bulkWritePoints(Array(point1, point2)).futureValue shouldEqual NoContentResult

db.read[FakeEntity]("SELECT * FROM test2")
.futureValue
.queryResult shouldEqual Array(FakeEntity("Martin", "Odersky", 54), FakeEntity("Jame", "Franko", 36), FakeEntity("Martin", "Odersky", 54))
}

it should "retrieve multiple request" in {

val multiQuery = db.bulkReadJs(
Array(
"SELECT * FROM test2",
"SELECT * FROM test2 WHERE age < 40"
)
).futureValue

multiQuery.queryResult.length shouldEqual 2
multiQuery.queryResult shouldBe a[Array[_]]

multiQuery.queryResult.head.length shouldEqual 3
multiQuery.queryResult.head shouldBe a[Array[_]]
multiQuery.queryResult.head.head shouldBe a[JArray]

multiQuery.queryResult.last.length shouldEqual 1
multiQuery.queryResult.last shouldBe a[Array[_]]
multiQuery.queryResult.last.head shouldBe a[JArray]

multiQuery
.queryResult
.map(_.map(_.arrayValue.get.tail)) shouldEqual largeMultiJsonEntity.map(_.map(_.arrayValue.get.tail))
}

it should "write native" in {

db.writeNative("test3,sex=Male,firstName=Jame,lastName=Lannister age=48").futureValue shouldEqual NoContentResult

db.read[FakeEntity]("SELECT * FROM test3")
.futureValue
.queryResult shouldEqual Array(FakeEntity("Jame", "Lannister", 48))

db.bulkWriteNative(Seq("test4,sex=Male,firstName=Jon,lastName=Snow age=24", "test4,sex=Female,firstName=Deny,lastName=Targaryen age=25")).futureValue shouldEqual NoContentResult

db.read[FakeEntity]("SELECT * FROM test4")
.futureValue
.queryResult shouldEqual Array(FakeEntity("Female", "Deny", "Targaryen", 25), FakeEntity("Jon", "Snow", 24))
}

it should "return grouped result by sex and sum of ages" in {

db
.bulkWriteNative(Array("test5,sex=Male,firstName=Jon,lastName=Snow age=24", "test5,sex=Male,firstName=Rainer,lastName=Targaryen age=25"))
.futureValue shouldEqual NoContentResult

db
.readJs("SELECT SUM(\"age\") FROM \"test5\" GROUP BY \"sex\"")
.futureValue
.groupedResult
.map { case (k, v) => k.toSeq -> v } shouldEqual Array(Seq("Male") -> JArray(Array(JNum(0), JNum(49))))

influx.close() shouldEqual {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ object Influx {
* @param system - implicit actor system, by default will create new one
* @return - InfluxAkkaHttpClient
*/
final def connect(host: String = "localhost",
port: Int = 8086,
credentials: Option[InfluxCredentials] = None,
system: ActorSystem = ActorSystem())
(implicit ex: ExecutionContext) =
new InfluxAkkaHttpClient(host, port, credentials)(ex, system)
def connect(host: String = "localhost",
port: Int = 8086,
credentials: Option[InfluxCredentials] = None,
system: ActorSystem = ActorSystem(),
gzipped: Boolean = false)
(implicit ex: ExecutionContext) =
new InfluxAkkaHttpClient(host, port, credentials, gzipped)(ex, system)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ import scala.util.{Failure, Success}
* Author: [email protected]
* Date: 27.08.17
*/
final class InfluxAkkaHttpClient(
host: String,
port: Int,
val credentials: Option[InfluxCredentials])
final class InfluxAkkaHttpClient(host: String,
port: Int,
val credentials: Option[InfluxCredentials],
gzipped: Boolean)
(implicit val ex: ExecutionContext, val system: ActorSystem)
extends InfluxClient[Future, HttpResponse, Uri, RequestEntity]
extends InfluxClient[Future, HttpRequest, HttpResponse, Uri, RequestEntity]
with AkkaRequestHandler
with AkkaResponseHandler
with AkkaQueryHandler {
with AkkaQueryHandler
with Mappable[Future, HttpResponse] {

override val m: Mapper[Future, HttpResponse] = new Mapper[Future, HttpResponse] {
override def mapTo[B](resp: Future[HttpResponse], f: HttpResponse => Future[B]): Future[B] = resp.flatMap(f)
}
override def mapTo[B](resp: Future[HttpResponse], f: HttpResponse => Future[B]): Future[B] = resp.flatMap(f)

protected implicit val mat: ActorMaterializer = ActorMaterializer()
protected implicit val connection: Connection = Http().outgoingConnection(host, port) recover {
Expand All @@ -45,7 +44,7 @@ final class InfluxAkkaHttpClient(
* @return Database instance that provide non type safe operations
*/
override def database(dbName: String): Database =
new Database(dbName, credentials)
new Database(dbName, credentials, gzipped)

/**
*
Expand All @@ -54,15 +53,16 @@ final class InfluxAkkaHttpClient(
* @tparam A - Measurement's time series type
* @return - Measurement instance of type [A]
*/
override def measurement[A: ClassTag](dbName: String, measurementName: String): Measurement[A] =
new Measurement[A](dbName, measurementName, credentials)
override def measurement[A: ClassTag](dbName: String,
measurementName: String): Measurement[A] =
new Measurement[A](dbName, measurementName, credentials, gzipped)


/**
* Ping InfluxDB
*/
override def ping: Future[WriteResult] =
m.mapTo(readRequest("/ping"), toResult)
mapTo(execute(Uri("/ping")), toResult)

/**
* Close HTTP connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import scala.reflect.ClassTag
* Author: [email protected]
* Date: 27.08.17
*/
final class Database(dbName: String, val credentials: Option[InfluxCredentials])
(protected implicit val actorSystem: ActorSystem,
override protected implicit val mat: ActorMaterializer,
override protected implicit val ex: ExecutionContext,
override protected implicit val connection: Connection)
final class Database(dbName: String,
val credentials: Option[InfluxCredentials],
gzipped: Boolean)
(protected implicit val actorSystem: ActorSystem,
override protected implicit val mat: ActorMaterializer,
override protected implicit val ex: ExecutionContext,
override protected implicit val connection: Connection)
extends DatabaseIO[Future, RequestEntity](dbName)
with AkkaWriter
with AkkaReader
Expand All @@ -35,31 +37,31 @@ final class Database(dbName: String, val credentials: Option[InfluxCredentials])
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writeFromFile(dbName, filePath, consistency, precision, retentionPolicy)
writeFromFile(dbName, filePath, consistency, precision, retentionPolicy, gzipped)

def writeNative(point: String,
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writeTo(dbName, point, consistency, precision, retentionPolicy)
writeTo(dbName, point, consistency, precision, retentionPolicy, gzipped)

def bulkWriteNative(points: Seq[String],
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writeTo(dbName, points, consistency, precision, retentionPolicy)
writeTo(dbName, points, consistency, precision, retentionPolicy, gzipped)

def writePoint(point: Point,
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writeTo(dbName, point, consistency, precision, retentionPolicy)
writeTo(dbName, point, consistency, precision, retentionPolicy, gzipped)

def bulkWritePoints(points: Seq[Point],
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writeTo(dbName, points, consistency, precision, retentionPolicy)
writeTo(dbName, points, consistency, precision, retentionPolicy, gzipped)

def read[A: ClassTag](query: String,
epoch: Epoch = Epochs.NANOSECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import scala.reflect.ClassTag
* Author: [email protected]
* Date: 03.09.17
*/
final class Measurement[E: ClassTag](
dbName: String,
measurementName: String,
val credentials: Option[InfluxCredentials])
final class Measurement[E: ClassTag](dbName: String,
measurementName: String,
val credentials: Option[InfluxCredentials],
gzipped: Boolean)
(protected implicit val actorSystem: ActorSystem,
protected implicit val mat: ActorMaterializer,
protected implicit val ex: ExecutionContext,
Expand All @@ -41,7 +41,8 @@ final class Measurement[E: ClassTag](
toPoint(measurementName, wr.write(entity)),
consistency,
precision,
retentionPolicy
retentionPolicy,
gzipped
)


Expand All @@ -54,7 +55,8 @@ final class Measurement[E: ClassTag](
toPoints(measurementName, entitys.map(wr.write)),
consistency,
precision,
retentionPolicy
retentionPolicy,
gzipped
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import com.github.fsanaulla.chronicler.core.model.HasCredentials
*/
private[akka] trait AkkaQueryHandler extends QueryHandler[Uri] with HasCredentials {

override def buildQuery(
uri: String,
queryParams: Map[String, String]): Uri =
override def buildQuery(uri: String, queryParams: Map[String, String]): Uri =
Uri(uri).withQuery(Uri.Query(queryParams))
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,26 @@
package com.github.fsanaulla.chronicler.akka.handlers

import _root_.akka.http.scaladsl.model.{MessageEntity, _}
import _root_.akka.http.scaladsl.model._
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl.{Sink, Source}
import com.github.fsanaulla.chronicler.akka.utils.AkkaAlias.Connection
import com.github.fsanaulla.chronicler.core.handlers.RequestHandler

import scala.concurrent.Future
import scala.language.implicitConversions

/**
* Created by
* Author: [email protected]
* Date: 15.03.18
*/
private[akka] trait AkkaRequestHandler extends RequestHandler[Future, HttpResponse, Uri, MessageEntity] {
private[akka] trait AkkaRequestHandler extends RequestHandler[Future, HttpRequest, HttpResponse, Uri] {

protected implicit val mat: ActorMaterializer
protected implicit val connection: Connection

override def readRequest(uri: Uri, entity: Option[MessageEntity] = None): Future[HttpResponse] = {
Source
.single(
HttpRequest(
method = HttpMethods.GET,
uri = uri,
entity = entity.getOrElse(HttpEntity.Empty)
)
)
.via(connection)
.runWith(Sink.head)
}
override implicit def req(uri: Uri): HttpRequest = HttpRequest(uri = uri)

override def writeRequest(uri: Uri, entity: MessageEntity): Future[HttpResponse] = {
Source
.single(
HttpRequest(
method = HttpMethods.POST,
uri = uri,
entity = entity
)
)
.via(connection)
.runWith(Sink.head)
}
override def execute(request: HttpRequest): Future[HttpResponse] =
Source.single(request).via(connection).runWith(Sink.head)
}
Loading

0 comments on commit bd4c8c0

Please sign in to comment.