From 1d1a48d1a38985f21b230b63e7b8e6a8b0f4f7a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Siatkowski?= Date: Fri, 23 Aug 2019 13:07:32 +0200 Subject: [PATCH] added EvalApi support to Cluster classes --- src/main/scala/com/redis/api/EvalApi.scala | 15 ++-- .../scala/com/redis/cluster/EvalOps.scala | 73 +++++++++++++++++++ .../com/redis/cluster/RedisCluster.scala | 6 +- .../com/redis/cluster/RedisClusterOps.scala | 21 ++++++ .../scala/com/redis/cluster/RedisShards.scala | 7 +- .../cluster/ClusterIncompatibleTests.scala | 4 +- .../cluster/ClusterUnimplementedMethods.scala | 4 +- .../cluster/ClusterUnsupportedMethods.scala | 4 +- .../cluster/CommonRedisClusterSpec.scala | 4 +- 9 files changed, 123 insertions(+), 15 deletions(-) create mode 100644 src/main/scala/com/redis/cluster/EvalOps.scala diff --git a/src/main/scala/com/redis/api/EvalApi.scala b/src/main/scala/com/redis/api/EvalApi.scala index 615e40d5..054ff96b 100644 --- a/src/main/scala/com/redis/api/EvalApi.scala +++ b/src/main/scala/com/redis/api/EvalApi.scala @@ -7,17 +7,22 @@ trait EvalApi { /** * evaluates lua code on the server. */ - def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] + def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] - def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A] + def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[A] def evalInt(luaCode: String, keys: List[Any], args: List[Any]): Option[Int] - def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] + def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] - def evalSHA[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A] + def evalSHA[A](shahash: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[A] - def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A] + def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[A] def scriptLoad(luaCode: String): Option[String] diff --git a/src/main/scala/com/redis/cluster/EvalOps.scala b/src/main/scala/com/redis/cluster/EvalOps.scala new file mode 100644 index 00000000..2d6a5f9a --- /dev/null +++ b/src/main/scala/com/redis/cluster/EvalOps.scala @@ -0,0 +1,73 @@ +package com.redis.cluster + +import com.redis.api.EvalApi +import com.redis.serialization.{Format, Parse} + +trait EvalOps extends EvalApi { + self: RedisClusterOps => + + // todo: broken output + override def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] = + processForKeys(keys)(gkeys => rc => rc.evalMultiBulk(luaCode, gkeys, args)) + .flatten.headOption + + // todo: broken output + override def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[A] = + processForKeys(keys)(gkeys => rc => rc.evalBulk(luaCode, gkeys, args)) + .flatten.headOption + + // todo: broken output + override def evalInt(luaCode: String, keys: List[Any], args: List[Any]): Option[Int] = + processForKeys(keys)(gkeys => rc => rc.evalInt(luaCode, gkeys, args)) + .flatten.headOption + + // todo: broken output + override def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] = + processForKeys(keys)(gkeys => rc => rc.evalMultiSHA(shahash, gkeys, args)) + .flatten.headOption + + // todo: broken output + override def evalSHA[A](shahash: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[A] = + processForKeys(keys)(gkeys => rc => rc.evalSHA(shahash, gkeys, args)) + .flatten.headOption + + // todo: broken output + override def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any]) + (implicit format: Format, parse: Parse[A]): Option[A] = + processForKeys(keys)(gkeys => rc => rc.evalSHABulk(shahash, gkeys, args)) + .flatten.headOption + + override def scriptLoad(luaCode: String): Option[String] = { + val r = onAllConns(_.scriptLoad(luaCode)) + oneCommonAnswerOr(r)(orError("ScriptLoad")).flatten + } + + private val scriptExistsNot = Some(0) + + override def scriptExists(shahash: String): Option[Int] = { + val r = onAllConns(_.scriptExists(shahash)) + oneCommonAnswerOr(r)(_.find(_ == scriptExistsNot)).flatten + } + + override def scriptFlush: Option[String] = { + val r = onAllConns(_.scriptFlush) + oneCommonAnswerOr(r)(orError("ScriptFlush")).flatten + } + + private def orError[A](method: String)(r: Iterable[A]): Option[A] = + throw new IllegalStateException(s"Various values returned while $method from various instances: ${r.mkString(",")}") + + protected def oneCommonAnswerOr[A](r: Iterable[A])(moreResultHandler: Iterable[A] => Option[A]): Option[A] = { + val distinct = r.toSeq.distinct + if (distinct.size > 1) { + moreResultHandler(distinct) + } else { + r.headOption + } + } + +} diff --git a/src/main/scala/com/redis/cluster/RedisCluster.scala b/src/main/scala/com/redis/cluster/RedisCluster.scala index 84ad81c3..8fde0fe2 100644 --- a/src/main/scala/com/redis/cluster/RedisCluster.scala +++ b/src/main/scala/com/redis/cluster/RedisCluster.scala @@ -71,7 +71,7 @@ class RedisCluster( with SetOps with SortedSetOps // with GeoOps todo: implement GeoApi - // with EvalOps todo: implement EvalApi + with EvalOps // with HyperLogLogOps todo: implement HyperLogLogApi with HashOps { @@ -118,4 +118,8 @@ class RedisCluster( def close(): Unit = hr.cluster.foreach(_.close()) + override protected[cluster] def randomNode(): RedisClientPool = { + val rni = r.nextInt(hr.cluster.size) + hr.cluster(rni) + } } diff --git a/src/main/scala/com/redis/cluster/RedisClusterOps.scala b/src/main/scala/com/redis/cluster/RedisClusterOps.scala index d6404e22..47fc2057 100644 --- a/src/main/scala/com/redis/cluster/RedisClusterOps.scala +++ b/src/main/scala/com/redis/cluster/RedisClusterOps.scala @@ -3,8 +3,12 @@ package com.redis.cluster import com.redis.serialization.Format import com.redis.{RedisClient, RedisClientPool, RedisCommand} +import scala.util.Random + trait RedisClusterOps extends AutoCloseable { + protected val r = new Random() + protected val keyTag: Option[KeyTag] protected[cluster] val POINTS_PER_SERVER = 160 // default in libmemcached @@ -16,6 +20,8 @@ trait RedisClusterOps extends AutoCloseable { protected[cluster] def onAllConns[T](body: RedisClient => T): Iterable[T] + protected[cluster] def randomNode(): RedisClientPool + /** * add server to internal pool */ @@ -47,6 +53,21 @@ trait RedisClusterOps extends AutoCloseable { nodeForKey(key).withClient(body(_)) } + // todo: need some way to combine T + protected[cluster] def processForKeys[T](keys: List[Any])(body: List[Any] => RedisCommand => T) + (implicit format: Format): Iterable[T] = { + if (keys.isEmpty) { + Iterable { + randomNode().withClient(body(keys)) + } + } else { + keys.groupBy(nodeForKey) + .map { case (rcp, gkeys) => + rcp.withClient(body(gkeys)) + } + } + } + protected[cluster] def inSameNode[T](keys: Any*)(body: RedisClient => T)(implicit format: Format): T = { val nodes = keys.toList.map(nodeForKey(_)) if (nodes.forall(_ == nodes.head)) { diff --git a/src/main/scala/com/redis/cluster/RedisShards.scala b/src/main/scala/com/redis/cluster/RedisShards.scala index 3dafeaa8..b8ea63cb 100644 --- a/src/main/scala/com/redis/cluster/RedisShards.scala +++ b/src/main/scala/com/redis/cluster/RedisShards.scala @@ -16,7 +16,7 @@ class RedisShards( with SetOps with SortedSetOps // with GeoOps todo: implement GeoApi - // with EvalOps todo: implement EvalApi + with EvalOps // with HyperLogLogOps todo: implement HyperLogLogApi with HashOps { @@ -66,4 +66,9 @@ class RedisShards( def close(): Unit = clients.values.foreach(_.close()) + override protected[cluster] def randomNode(): RedisClientPool = { + val rni = r.nextInt(hr.cluster.size) + clients(hr.cluster(rni)) + } + } diff --git a/src/test/scala/com/redis/cluster/ClusterIncompatibleTests.scala b/src/test/scala/com/redis/cluster/ClusterIncompatibleTests.scala index 15178a6b..22226d0d 100644 --- a/src/test/scala/com/redis/cluster/ClusterIncompatibleTests.scala +++ b/src/test/scala/com/redis/cluster/ClusterIncompatibleTests.scala @@ -7,7 +7,7 @@ import com.redis.api._ */ trait ClusterIncompatibleTests extends BaseApiSpec - // with EvalApiSpec + with EvalApiSpec // with GeoApiSpec // with HyperLogLogApiSpec with HashApiSpec @@ -19,7 +19,7 @@ trait ClusterIncompatibleTests override val r: AutoCloseable with BaseApi - // with EvalApi + with EvalApi // with GeoApi // with HyperLogLogApi with HashApi diff --git a/src/test/scala/com/redis/cluster/ClusterUnimplementedMethods.scala b/src/test/scala/com/redis/cluster/ClusterUnimplementedMethods.scala index aaf781ac..9f165c6d 100644 --- a/src/test/scala/com/redis/cluster/ClusterUnimplementedMethods.scala +++ b/src/test/scala/com/redis/cluster/ClusterUnimplementedMethods.scala @@ -8,7 +8,7 @@ import com.redis.api._ */ trait ClusterUnimplementedMethods extends BaseApiSpec - // with EvalApiSpec + with EvalApiSpec // with GeoApiSpec // with HyperLogLogApiSpec with HashApiSpec @@ -20,7 +20,7 @@ trait ClusterUnimplementedMethods override val r: AutoCloseable with BaseApi - // with EvalApi + with EvalApi // with GeoApi // with HyperLogLogApi with HashApi diff --git a/src/test/scala/com/redis/cluster/ClusterUnsupportedMethods.scala b/src/test/scala/com/redis/cluster/ClusterUnsupportedMethods.scala index 533b6ed7..bcc7a885 100644 --- a/src/test/scala/com/redis/cluster/ClusterUnsupportedMethods.scala +++ b/src/test/scala/com/redis/cluster/ClusterUnsupportedMethods.scala @@ -7,7 +7,7 @@ import com.redis.api._ */ trait ClusterUnsupportedMethods extends BaseApiSpec - // with EvalApiSpec + with EvalApiSpec // with GeoApiSpec // with HyperLogLogApiSpec with HashApiSpec @@ -19,7 +19,7 @@ trait ClusterUnsupportedMethods override val r: AutoCloseable with BaseApi - // with EvalApi + with EvalApi // with GeoApi // with HyperLogLogApi with HashApi diff --git a/src/test/scala/com/redis/cluster/CommonRedisClusterSpec.scala b/src/test/scala/com/redis/cluster/CommonRedisClusterSpec.scala index e39fcddc..0fe1ae70 100644 --- a/src/test/scala/com/redis/cluster/CommonRedisClusterSpec.scala +++ b/src/test/scala/com/redis/cluster/CommonRedisClusterSpec.scala @@ -10,11 +10,11 @@ import scala.collection.mutable.ArrayBuffer // todo: remove, test every API separately @deprecated trait CommonRedisClusterSpec[A] extends FunSpec with Matchers with IntClusterSpec { - override val r: AutoCloseable with RedisClusterOps with WithHashRing[A] - with BaseApi with HashApi with ListApi with NodeApi with SetApi with SortedSetApi with StringApi = rProvider() + override val r = rProvider() def rProvider(): AutoCloseable with RedisClusterOps with WithHashRing[A] with BaseApi with HashApi with ListApi with NodeApi with SetApi with SortedSetApi with StringApi + with EvalApi describe("cluster operations") { shouldSet()