Skip to content

Commit

Permalink
added EvalApi support to Cluster classes
Browse files Browse the repository at this point in the history
  • Loading branch information
Michał Siatkowski committed Aug 26, 2019
1 parent b12fd6c commit fb912e2
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 15 deletions.
15 changes: 10 additions & 5 deletions src/main/scala/com/redis/api/EvalApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ trait EvalApi {
/**
* evaluates lua code on the server.
*/
def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]
def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]

def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A]
def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A]

def evalInt(luaCode: String, keys: List[Any], args: List[Any]): Option[Int]

def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]
def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]

def evalSHA[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A]
def evalSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A]

def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A]
def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A]

def scriptLoad(luaCode: String): Option[String]

Expand Down
73 changes: 73 additions & 0 deletions src/main/scala/com/redis/cluster/EvalOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.redis.cluster

import com.redis.api.EvalApi
import com.redis.serialization.{Format, Parse}

trait EvalOps extends EvalApi {
self: RedisClusterOps =>

// todo: broken output
override def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] =
processForKeys(keys)(gkeys => rc => rc.evalMultiBulk(luaCode, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A] =
processForKeys(keys)(gkeys => rc => rc.evalBulk(luaCode, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalInt(luaCode: String, keys: List[Any], args: List[Any]): Option[Int] =
processForKeys(keys)(gkeys => rc => rc.evalInt(luaCode, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] =
processForKeys(keys)(gkeys => rc => rc.evalMultiSHA(shahash, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A] =
processForKeys(keys)(gkeys => rc => rc.evalSHA(shahash, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A] =
processForKeys(keys)(gkeys => rc => rc.evalSHABulk(shahash, gkeys, args))
.flatten.headOption

override def scriptLoad(luaCode: String): Option[String] = {
val r = onAllConns(_.scriptLoad(luaCode))
oneCommonAnswerOr(r)(orError("ScriptLoad")).flatten
}

private val scriptExistsNot = Some(0)

override def scriptExists(shahash: String): Option[Int] = {
val r = onAllConns(_.scriptExists(shahash))
oneCommonAnswerOr(r)(_.find(_ == scriptExistsNot)).flatten
}

override def scriptFlush: Option[String] = {
val r = onAllConns(_.scriptFlush)
oneCommonAnswerOr(r)(orError("ScriptFlush")).flatten
}

private def orError[A](method: String)(r: Iterable[A]): Option[A] =
throw new IllegalStateException(s"Various values returned while $method from various instances: ${r.mkString(",")}")

protected def oneCommonAnswerOr[A](r: Iterable[A])(moreResultHandler: Iterable[A] => Option[A]): Option[A] = {
val distinct = r.toSeq.distinct
if (distinct.size > 1) {
moreResultHandler(distinct)
} else {
r.headOption
}
}

}
6 changes: 5 additions & 1 deletion src/main/scala/com/redis/cluster/RedisCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class RedisCluster(
with SetOps
with SortedSetOps
// with GeoOps todo: implement GeoApi
// with EvalOps todo: implement EvalApi
with EvalOps
// with HyperLogLogOps todo: implement HyperLogLogApi
with HashOps {

Expand Down Expand Up @@ -118,4 +118,8 @@ class RedisCluster(
def close(): Unit =
hr.cluster.foreach(_.close())

override protected[cluster] def randomNode(): RedisClientPool = {
val rni = r.nextInt(hr.cluster.size)
hr.cluster(rni)
}
}
21 changes: 21 additions & 0 deletions src/main/scala/com/redis/cluster/RedisClusterOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package com.redis.cluster
import com.redis.serialization.Format
import com.redis.{RedisClient, RedisClientPool, RedisCommand}

import scala.util.Random

trait RedisClusterOps extends AutoCloseable {

protected val r = new Random()

protected val keyTag: Option[KeyTag]

protected[cluster] val POINTS_PER_SERVER = 160 // default in libmemcached
Expand All @@ -16,6 +20,8 @@ trait RedisClusterOps extends AutoCloseable {

protected[cluster] def onAllConns[T](body: RedisClient => T): Iterable[T]

protected[cluster] def randomNode(): RedisClientPool

/**
* add server to internal pool
*/
Expand Down Expand Up @@ -47,6 +53,21 @@ trait RedisClusterOps extends AutoCloseable {
nodeForKey(key).withClient(body(_))
}

// todo: need some way to combine T
protected[cluster] def processForKeys[T](keys: List[Any])(body: List[Any] => RedisCommand => T)
(implicit format: Format): Iterable[T] = {
if (keys.isEmpty) {
Iterable {
randomNode().withClient(body(keys))
}
} else {
keys.groupBy(nodeForKey)
.map { case (rcp, gkeys) =>
rcp.withClient(body(gkeys))
}
}
}

protected[cluster] def inSameNode[T](keys: Any*)(body: RedisClient => T)(implicit format: Format): T = {
val nodes = keys.toList.map(nodeForKey(_))
if (nodes.forall(_ == nodes.head)) {
Expand Down
7 changes: 6 additions & 1 deletion src/main/scala/com/redis/cluster/RedisShards.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class RedisShards(
with SetOps
with SortedSetOps
// with GeoOps todo: implement GeoApi
// with EvalOps todo: implement EvalApi
with EvalOps
// with HyperLogLogOps todo: implement HyperLogLogApi
with HashOps {

Expand Down Expand Up @@ -66,4 +66,9 @@ class RedisShards(
def close(): Unit =
clients.values.foreach(_.close())

override protected[cluster] def randomNode(): RedisClientPool = {
val rni = r.nextInt(hr.cluster.size)
clients(hr.cluster(rni))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.redis.api._
*/
trait ClusterIncompatibleTests
extends BaseApiSpec
// with EvalApiSpec
with EvalApiSpec
// with GeoApiSpec
// with HyperLogLogApiSpec
with HashApiSpec
Expand All @@ -19,7 +19,7 @@ trait ClusterIncompatibleTests

override val r: AutoCloseable
with BaseApi
// with EvalApi
with EvalApi
// with GeoApi
// with HyperLogLogApi
with HashApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.redis.api._
*/
trait ClusterUnimplementedMethods
extends BaseApiSpec
// with EvalApiSpec
with EvalApiSpec
// with GeoApiSpec
// with HyperLogLogApiSpec
with HashApiSpec
Expand All @@ -20,7 +20,7 @@ trait ClusterUnimplementedMethods

override val r: AutoCloseable
with BaseApi
// with EvalApi
with EvalApi
// with GeoApi
// with HyperLogLogApi
with HashApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.redis.api._
*/
trait ClusterUnsupportedMethods
extends BaseApiSpec
// with EvalApiSpec
with EvalApiSpec
// with GeoApiSpec
// with HyperLogLogApiSpec
with HashApiSpec
Expand All @@ -19,7 +19,7 @@ trait ClusterUnsupportedMethods

override val r: AutoCloseable
with BaseApi
// with EvalApi
with EvalApi
// with GeoApi
// with HyperLogLogApi
with HashApi
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/com/redis/cluster/CommonRedisClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import scala.collection.mutable.ArrayBuffer
// todo: remove, test every API separately
@deprecated trait CommonRedisClusterSpec[A] extends FunSpec with Matchers with IntClusterSpec {

override val r: AutoCloseable with RedisClusterOps with WithHashRing[A]
with BaseApi with HashApi with ListApi with NodeApi with SetApi with SortedSetApi with StringApi = rProvider()
override val r = rProvider()

def rProvider(): AutoCloseable with RedisClusterOps with WithHashRing[A]
with BaseApi with HashApi with ListApi with NodeApi with SetApi with SortedSetApi with StringApi
with EvalApi

describe("cluster operations") {
shouldSet()
Expand Down

0 comments on commit fb912e2

Please sign in to comment.