Skip to content

Commit

Permalink
Single generic executor instance
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk committed Jul 18, 2024
1 parent d9ecd78 commit 4b4a83d
Show file tree
Hide file tree
Showing 21 changed files with 22 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,9 @@ class AkkaHttpClientTest extends AnyFlatSpec with Matchers with DockerTests with
}

it should "propagate headers if included" in {
implicit val executor: Executor[Future] = new Executor[Future] {
override def exec(client: HttpClient[Future], request: ElasticRequest): Future[HttpResponse] = {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}
implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}

mkAkkaBasedClient.execute {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ class PekkoHttpClientTest extends AnyFlatSpec with Matchers with DockerTests wit
}

it should "propagate headers if included" in {
implicit val executor: Executor[Future] = new Executor[Future] {
override def exec(client: HttpClient[Future], request: ElasticRequest): Future[HttpResponse] = {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}
implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}

mkPekkoBasedClient.execute {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

class SttpRequestHttpClientTest extends AnyFlatSpec with Matchers with DockerTests {
implicit val executor: Executor[Future] = new Executor[Future] {
override def exec(client: HttpClient[Future], request: ElasticRequest): Future[HttpResponse] = {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}
implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}
private lazy val sttpClient = SttpRequestHttpClient(ElasticNodeEndpoint("http", elasticHost, elasticPort.toInt, None))
override lazy val client = ElasticClient(sttpClient)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package com.sksamuel.elastic4s

import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds

trait Executor[F[_]] {
def exec(client: HttpClient[F], request: ElasticRequest): F[HttpResponse]
}

object Executor {
def apply[F[_]](implicit ev: Executor[F]): Executor[F] = ev

def apply[F[_]: Executor]: Executor[F] = implicitly[Executor[F]]

implicit def FutureExecutor(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): Executor[Future] =
new Executor[Future] {
override def exec(client: HttpClient[Future], request: ElasticRequest): Future[HttpResponse] = {
client.send(request)
}
}
implicit def defaultExecutor[F[_]]: Executor[F] =
(client: HttpClient[F], request: ElasticRequest) => client.send(request)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ trait Functor[F[_]] {

object Functor {

def apply[F[_]: Functor]: Functor[F] = implicitly[Functor[F]]
def apply[F[_]](implicit f: Functor[F]): Functor[F] = f

implicit def FutureFunctor(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): Functor[Future] =
new Functor[Future] {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package com.sksamuel.elastic4s.cats.effect.instances

import cats.effect.{Async, IO}
import cats.{Functor => CatsFunctor}
import com.sksamuel.elastic4s.cats.effect.CatsEffectExecutor
import com.sksamuel.elastic4s.{Executor, Functor}
import com.sksamuel.elastic4s.Functor

import scala.language.higherKinds

trait CatsEffectInstances {
implicit def catsFunctor[F[_]: CatsFunctor]: Functor[F] = new Functor[F] {
override def map[A, B](fa: F[A])(f: A => B): F[B] = CatsFunctor[F].map(fa)(f)
}

implicit def catsEffectExecutor[F[_]: Async]: Executor[F] =
new CatsEffectExecutor[F]

//this needs to be at the bottom
implicit val ioExecutor: Executor[IO] = new CatsEffectExecutor[IO]
}

@deprecated("Use CatsEffectInstances instead")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package com.sksamuel.elastic4s.cats.effect.instances

import cats.effect.{Async, IO}
import cats.{Functor => CatsFunctor}
import com.sksamuel.elastic4s.cats.effect.CatsEffectExecutor
import com.sksamuel.elastic4s.{Executor, Functor}
import com.sksamuel.elastic4s.Functor

import scala.language.higherKinds

trait CatsEffectInstances {
implicit def catsFunctor[F[_]: CatsFunctor]: Functor[F] = new Functor[F] {
override def map[A, B](fa: F[A])(f: A => B): F[B] = CatsFunctor[F].map(fa)(f)
}

implicit def catsEffectExecutor[F[_]: Async]: Executor[F] =
new CatsEffectExecutor[F]

//this needs to be at the bottom
implicit val ioExecutor: Executor[IO] = new CatsEffectExecutor[IO]
}

@deprecated("Use CatsEffectInstances instead")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.sksamuel.elastic4s.monix.instances

import com.sksamuel.elastic4s.Functor
import com.sksamuel.elastic4s.monix.TaskExecutor
import monix.eval.Task

trait TaskInstances {
implicit val taskFunctor: Functor[Task] = new Functor[Task] {
override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f)
}

implicit val taskExecutor: TaskExecutor = new TaskExecutor
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.sksamuel.elastic4s.scalaz.instances

import com.sksamuel.elastic4s.Functor
import com.sksamuel.elastic4s.scalaz.TaskExecutor
import scalaz.concurrent.Task

trait TaskInstances {
implicit val taskFunctor: Functor[Task] = new Functor[Task] {
override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f)
}

implicit val taskExecutor: TaskExecutor = new TaskExecutor
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,4 @@ trait TaskInstances {
implicit val taskFunctor: Functor[Task] = new Functor[Task] {
override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f)
}

implicit val taskExecutor: Executor[Task] = new Executor[Task] {
override def exec(client: HttpClient[Task], request: ElasticRequest): Task[HttpResponse] =
client.send(request)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,4 @@ trait TaskInstances {
implicit val taskFunctor: Functor[Task] = new Functor[Task] {
override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f)
}

implicit val taskExecutor: Executor[Task] = new Executor[Task] {
override def exec(client: HttpClient[Task], request: ElasticRequest): Task[HttpResponse] =
client.send(request)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ class BatchElasticSink[T](client: ElasticClient[Future], settings: SinkSettings)
override val shape: SinkShape[Seq[T]] = SinkShape.of(in)

private implicit val bulkHandler: BulkHandlers.BulkHandler.type = BulkHandlers.BulkHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class ElasticSource(client: ElasticClient[Future], settings: SourceSettings)
private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler
private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler
private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ class BatchElasticSink[T](client: ElasticClient[Future], settings: SinkSettings)
override val shape: SinkShape[Seq[T]] = SinkShape.of(in)

private implicit val bulkHandler: BulkHandlers.BulkHandler.type = BulkHandlers.BulkHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class ElasticSource(client: ElasticClient[Future], settings: SourceSettings)
private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler
private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler
private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import org.scalatest.matchers.{MatchResult, Matcher}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.higherKinds

trait SearchMatchers extends Matchers {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ class ElasticClientTests extends AnyFlatSpec with Matchers with DockerTests {
}

it should "propagate headers if included" in {
implicit val executor: Executor[Future] = new Executor[Future] {
override def exec(client: HttpClient[Future], request: ElasticRequest): Future[HttpResponse] = {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}
implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => {
val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8))
client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred")))
}

mkJavaBasedClient.execute {
Expand Down

0 comments on commit 4b4a83d

Please sign in to comment.