Skip to content

Commit

Permalink
[ALL][ISSUE-70]: updating write from file method, simplifying api, sp…
Browse files Browse the repository at this point in the history
…eedup travis (#73)

* [MACROS][ISSUE-70]: refactoring write impl, update readme

* [MACROS][ISSUE-70]: added type hints in macro impl

* [MACROS][ISSUE-70]: simplifying api

* [MACROS][ISSUE-70]: fixing test compile issue

* [MACROS][ISSUE-70]: fixing json handler error

* [MACROS][ISSUE-70]: another test fixes

* [AKKA][ISSUE-70]: fixing write from file method

* [ALL][ISSUE-70]: cleanup

* [ALL][ISSUE-70]: remove useless implicits
  • Loading branch information
fsanaulla authored Jun 27, 2018
1 parent bb00df1 commit 7a35180
Show file tree
Hide file tree
Showing 41 changed files with 447 additions and 489 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ scala:

jdk:
- oraclejdk8

sudo: required

cache:
directories:
- "$HOME/.ivy2/cache"
- "$HOME/.sbt/boot"

services:
- docker

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,6 @@ For more details see next section. The same example can be applied for other cli
9. [Response handling](docs/response_handling.md)
10. [Macros](docs/macros.md)
11. [Utils](docs/utils.md)

# Inspirations
- [scala-influxdb-client](https://github.com/paulgoldbaum/scala-influxdb-client) by [Paul Goldbaum](https://github.com/paulgoldbaum)
6 changes: 3 additions & 3 deletions akka-http/src/it/resources/points.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
test1,host=server02 value=0.67
test1,host=server02,region=us-west value=0.55 1422568543702900257
test1,direction=in,host=server01,region=us-west value=2.0 1422568543702900257
test1,direction=in,host=server01,region=us-west value=2.1
test1,direction=in,host=server02,region=us-west value=2.2
test1,direction=in,host=server03,region=us-west value=2.3
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.github.fsanaulla.chronicler.akka

import java.io.File

import _root_.akka.actor.ActorSystem
import _root_.akka.testkit.TestKit
import com.github.fsanaulla.chronicler.akka.SampleEntitys._
Expand Down Expand Up @@ -36,7 +34,7 @@ class DatabaseSpec
"Database API" should "write data from file" in {
influx.createDatabase(testDB).futureValue shouldEqual OkResult

db.writeFromFile(new File(getClass.getResource("/points.txt").getPath))
db.writeFromFile(getClass.getResource("/points.txt").getPath)
.futureValue shouldEqual NoContentResult

db.readJs("SELECT * FROM test1")
Expand All @@ -45,7 +43,7 @@ class DatabaseSpec
.length shouldEqual 3
}

it should "write 2 points represented entities" in {
it should "write 2 points.txt represented entities" in {

val point1 = Point("test2")
.addTag("sex", "Male")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.github.fsanaulla.chronicler.akka.api

import java.io.File

import _root_.akka.actor.ActorSystem
import _root_.akka.http.scaladsl.model.RequestEntity
import _root_.akka.stream.ActorMaterializer
import com.github.fsanaulla.chronicler.akka.io.{AkkaReader, AkkaWriter}
import com.github.fsanaulla.chronicler.akka.models.AkkaDeserializers._
import com.github.fsanaulla.chronicler.akka.models.AkkaSeserializers._
import com.github.fsanaulla.chronicler.akka.utils.AkkaAlias.Connection
import com.github.fsanaulla.chronicler.core.api.DatabaseApi
import com.github.fsanaulla.chronicler.core.api.DatabaseIO
import com.github.fsanaulla.chronicler.core.enums._
import com.github.fsanaulla.chronicler.core.model._
import jawn.ast.JArray
Expand All @@ -26,42 +24,42 @@ final class Database(dbName: String, val credentials: Option[InfluxCredentials])
override protected implicit val mat: ActorMaterializer,
override protected implicit val ex: ExecutionContext,
override protected implicit val connection: Connection)
extends DatabaseApi[Future, RequestEntity](dbName)
extends DatabaseIO[Future, RequestEntity](dbName)
with AkkaWriter
with AkkaReader
with Serializable[RequestEntity]
with HasCredentials
with Executable {

def writeFromFile(file: File,
chunkSize: Int = 8192,
def writeFromFile(filePath: String,
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writeFromFile0(file, chunkSize, consistency, precision, retentionPolicy)
writeFromFile(dbName, filePath, consistency, precision, retentionPolicy)

def writeNative(point: String,
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writeNative0(point, consistency, precision, retentionPolicy)
writeTo(dbName, point, consistency, precision, retentionPolicy)

def bulkWriteNative(points: Seq[String],
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
bulkWriteNative0(points, consistency, precision, retentionPolicy)
writeTo(dbName, points, consistency, precision, retentionPolicy)

def writePoint(point: Point,
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
writePoint0(point, consistency, precision, retentionPolicy)
writeTo(dbName, point, consistency, precision, retentionPolicy)

def bulkWritePoints(points: Seq[Point],
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None): Future[WriteResult] =
bulkWritePoints0(points, consistency, precision, retentionPolicy)
writeTo(dbName, points, consistency, precision, retentionPolicy)

def read[A: ClassTag](query: String,
epoch: Epoch = Epochs.NANOSECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import _root_.akka.actor.ActorSystem
import _root_.akka.http.scaladsl.model.RequestEntity
import _root_.akka.stream.ActorMaterializer
import com.github.fsanaulla.chronicler.akka.io.{AkkaReader, AkkaWriter}
import com.github.fsanaulla.chronicler.akka.models.AkkaDeserializers._
import com.github.fsanaulla.chronicler.akka.utils.AkkaAlias.Connection
import com.github.fsanaulla.chronicler.core.api.MeasurementApi
import com.github.fsanaulla.chronicler.core.api.MeasurementIO
import com.github.fsanaulla.chronicler.core.enums._
import com.github.fsanaulla.chronicler.core.model._
import jawn.ast.JArray
Expand All @@ -27,36 +26,43 @@ final class Measurement[E: ClassTag](
protected implicit val mat: ActorMaterializer,
protected implicit val ex: ExecutionContext,
protected implicit val connection: Connection)
extends MeasurementApi[Future, E, RequestEntity](dbName, measurementName)
extends MeasurementIO[Future, E, RequestEntity]
with AkkaWriter
with AkkaReader
with HasCredentials
with Executable {

def write(
entity: E,
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None)
(implicit writer: InfluxWriter[E]): Future[WriteResult] =
write0(entity, consistency, precision, retentionPolicy)
def write(entity: E,
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None)(implicit wr: InfluxWriter[E]): Future[WriteResult] =
writeTo(
dbName,
toPoint(measurementName, wr.write(entity)),
consistency,
precision,
retentionPolicy
)


def bulkWrite(
entitys: Seq[E],
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None)
(implicit writer: InfluxWriter[E]): Future[WriteResult] =
bulkWrite0(entitys, consistency, precision, retentionPolicy)
def bulkWrite(entitys: Seq[E],
consistency: Consistency = Consistencies.ONE,
precision: Precision = Precisions.NANOSECONDS,
retentionPolicy: Option[String] = None)(implicit wr: InfluxWriter[E]): Future[WriteResult] =
writeTo(
dbName,
toPoints(measurementName, entitys.map(wr.write)),
consistency,
precision,
retentionPolicy
)


def read(query: String,
epoch: Epoch = Epochs.NANOSECONDS,
pretty: Boolean = false,
chunked: Boolean = false)
(implicit reader: InfluxReader[E]): Future[ReadResult[E]] =
readJs0(dbName, query, epoch, pretty, chunked) map {
chunked: Boolean = false)(implicit reader: InfluxReader[E]): Future[ReadResult[E]] =
readJs(dbName, query, epoch, pretty, chunked) map {
case qr: QueryResult[JArray] => qr.map(reader.read)
case gr: GroupedResult[JArray] => gr.map(reader.read)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ private[akka] trait AkkaReader
with DatabaseOperationQuery[Uri]
with HasCredentials { self: ReadOperations[Future] with Executable =>

override def readJs0(
dbName: String,
query: String,
epoch: Epoch,
pretty: Boolean,
chunked: Boolean): Future[ReadResult[JArray]] = {
override def readJs(dbName: String,
query: String,
epoch: Epoch,
pretty: Boolean,
chunked: Boolean): Future[ReadResult[JArray]] = {
val executionResult = readRequest(readFromInfluxSingleQuery(dbName, query, epoch, pretty, chunked))

query match {
Expand All @@ -37,12 +36,11 @@ private[akka] trait AkkaReader
}


override def bulkReadJs0(
dbName: String,
queries: Seq[String],
epoch: Epoch,
pretty: Boolean,
chunked: Boolean): Future[QueryResult[Array[JArray]]] =
override def bulkReadJs(dbName: String,
queries: Seq[String],
epoch: Epoch,
pretty: Boolean,
chunked: Boolean): Future[QueryResult[Array[JArray]]] =
readRequest(readFromInfluxBulkQuery(dbName, queries, epoch, pretty, chunked)).flatMap(toBulkQueryJsResult)

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.github.fsanaulla.chronicler.akka.io

import akka.http.scaladsl.model.{RequestEntity, Uri}
import akka.stream.ActorMaterializer
import java.nio.file.Paths

import _root_.akka.http.scaladsl.model._
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl.FileIO
import com.github.fsanaulla.chronicler.akka.handlers.{AkkaQueryHandler, AkkaRequestHandler, AkkaResponseHandler}
import com.github.fsanaulla.chronicler.akka.utils.AkkaAlias.Connection
import com.github.fsanaulla.chronicler.core.enums.{Consistency, Precision}
Expand Down Expand Up @@ -29,11 +32,11 @@ private[akka] trait AkkaWriter
protected implicit val mat: ActorMaterializer
protected implicit val connection: Connection

def writeTo(dbName: String,
entity: RequestEntity,
consistency: Consistency,
precision: Precision,
retentionPolicy: Option[String]): Future[WriteResult] = {
override def writeTo(dbName: String,
entity: RequestEntity,
consistency: Consistency,
precision: Precision,
retentionPolicy: Option[String]): Future[WriteResult] = {

writeRequest(
uri = writeToInfluxQuery(
Expand All @@ -45,4 +48,21 @@ private[akka] trait AkkaWriter
entity = entity
).flatMap(toResult)
}

override def writeFromFile(dbName: String,
filePath: String,
consistency: Consistency,
precision: Precision,
retentionPolicy: Option[String]): Future[WriteResult] = {

writeRequest(
uri = writeToInfluxQuery(
dbName,
consistency,
precision,
retentionPolicy
),
entity = HttpEntity(MediaTypes.`application/octet-stream`, FileIO.fromPath(Paths.get(filePath), 1024))
).flatMap(toResult)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.fsanaulla.chronicler.akka.models

import _root_.akka.http.scaladsl.model.{HttpEntity, RequestEntity}
import _root_.akka.util.ByteString
import com.github.fsanaulla.chronicler.akka.utils.AkkaContentTypes.OctetStream
import com.github.fsanaulla.chronicler.core.model.{Point, Serializer}

/**
* Created by
* Author: [email protected]
* Date: 15.03.18
*/
private[akka] object AkkaSeserializers {

implicit val seq2Http: Serializer[Seq[String], RequestEntity] = new Serializer[Seq[String], RequestEntity] {
def serialize(obj: Seq[String]) = HttpEntity(ByteString(obj.mkString("\n")))
}

implicit val point2Http: Serializer[Point, RequestEntity] = new Serializer[Point, RequestEntity] {
def serialize(obj: Point) = HttpEntity(OctetStream, ByteString(obj.serialize))
}

implicit val seqPoint2Http: Serializer[Seq[Point], RequestEntity] = new Serializer[Seq[Point], RequestEntity] {
def serialize(obj: Seq[Point]) = HttpEntity(OctetStream, ByteString(obj.map(_.serialize).mkString("\n")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,7 @@ class AkkaJsonHandlerSpec
val (measurament, points) = res.value.head

measurament shouldEqual "cpu_load_short"
points shouldEqual Array(
JArray(Array(JString("2015-01-29T21:55:43.702900257Z"), JNum(2))),
JArray(Array(JString("2015-01-29T21:55:43.702900257Z"), JNum(0.55))),
JArray(Array(JString("2015-06-11T20:46:02Z"), JNum(0.64)))
)
points shouldEqual result.head._2
}

it should "extract grouped result" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.github.fsanaulla.chronicler.async

import java.io.File

import com.github.fsanaulla.chronicler.async.SampleEntitys.largeMultiJsonEntity
import com.github.fsanaulla.chronicler.async.api.Database
import com.github.fsanaulla.chronicler.core.model.Point
Expand Down Expand Up @@ -30,7 +28,7 @@ class DatabaseSpec extends FlatSpecWithMatchers with Futures with DockerizedInfl
"Database API" should "write data from file" in {
influx.createDatabase(testDB).futureValue shouldEqual OkResult

db.writeFromFile(new File(getClass.getResource("/points.txt").getPath))
db.writeFromFile(getClass.getResource("/points.txt").getPath)
.futureValue shouldEqual NoContentResult

db.readJs("SELECT * FROM test1")
Expand Down
Loading

0 comments on commit 7a35180

Please sign in to comment.