From 630c9e1c442f686b9371c5c6c8baa85a1358fb6f Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Mon, 21 Dec 2020 12:39:57 +1300 Subject: [PATCH] Require compatible protocol version from language support --- proxy/core/src/main/resources/reference.conf | 1 + .../proxy/CloudStateProxyMain.scala | 21 ++++++++++++------- .../proxy/EntityDiscoveryManager.scala | 11 ++++++---- tck/src/it/scala/io/cloudstate/tck/TCK.scala | 15 +++++++------ .../scala/io/cloudstate/tck/TCKSpec.scala | 15 ++++++++----- .../TestEntityDiscoveryService.scala | 7 ++++++- 6 files changed, 46 insertions(+), 24 deletions(-) diff --git a/proxy/core/src/main/resources/reference.conf b/proxy/core/src/main/resources/reference.conf index 4c672416e..e98ec0213 100644 --- a/proxy/core/src/main/resources/reference.conf +++ b/proxy/core/src/main/resources/reference.conf @@ -8,6 +8,7 @@ cloudstate.proxy { user-function-host = ${?USER_FUNCTION_HOST} user-function-port = 8080 user-function-port = ${?USER_FUNCTION_PORT} + protocol-compatibility-check = true relay-timeout = 1m max-inbound-message-size = 12M relay-buffer-size = 100 diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/CloudStateProxyMain.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/CloudStateProxyMain.scala index 3e66af1db..20386cfa3 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/CloudStateProxyMain.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/CloudStateProxyMain.scala @@ -20,7 +20,7 @@ import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicLong import com.typesafe.config.Config -import akka.actor.{ActorSelection, ActorSystem} +import akka.actor.{ActorSelection, ActorSystem, OneForOneStrategy, SupervisorStrategy} import akka.pattern.ask import akka.util.Timeout import akka.cluster.Cluster @@ -177,13 +177,18 @@ object CloudStateProxyMain { system.actorOf( BackoffSupervisor.props( - BackoffOpts.onFailure( - EntityDiscoveryManager.props(serverConfig), - childName = "server-manager", - minBackoff = appConfig.backoffMin, - maxBackoff = appConfig.backoffMax, - randomFactor = appConfig.backoffRandomFactor - ) + BackoffOpts + .onFailure( + EntityDiscoveryManager.props(serverConfig), + childName = "server-manager", + minBackoff = appConfig.backoffMin, + maxBackoff = appConfig.backoffMax, + randomFactor = appConfig.backoffRandomFactor + ) + .withSupervisorStrategy(OneForOneStrategy() { + // don't keep restarting and retrying if an entity discovery exception is thrown + case _: EntityDiscoveryException => SupervisorStrategy.Stop + }) ), "server-manager-supervisor" ) diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/EntityDiscoveryManager.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/EntityDiscoveryManager.scala index fc5b351db..4b7e60e8f 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/EntityDiscoveryManager.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/EntityDiscoveryManager.scala @@ -50,6 +50,7 @@ object EntityDiscoveryManager { httpPort: Int, userFunctionHost: String, userFunctionPort: Int, + protocolCompatibilityCheck: Boolean, relayTimeout: Timeout, relayOutputBufferSize: Int, maxInboundMessageSize: Long, @@ -68,6 +69,7 @@ object EntityDiscoveryManager { httpPort = config.getInt("http-port"), userFunctionHost = config.getString("user-function-host"), userFunctionPort = config.getInt("user-function-port"), + protocolCompatibilityCheck = config.getBoolean("protocol-compatibility-check"), relayTimeout = Timeout(config.getDuration("relay-timeout").toMillis.millis), maxInboundMessageSize = config.getBytes("max-inbound-message-size"), relayOutputBufferSize = config.getInt("relay-buffer-size"), @@ -203,9 +205,7 @@ class EntityDiscoveryManager(config: EntityDiscoveryManager.Configuration)( val supportedProtocolVersionString: String = s"${supportedProtocolMajorVersion}.${supportedProtocolMinorVersion}" def compatibleProtocol(majorVersion: Int, minorVersion: Int): Boolean = - // allow empty protocol version to be compatible, until all library supports report their protocol version - ((majorVersion == 0) && (minorVersion == 0)) || - // otherwise it's currently strict matching of protocol versions + // currently strict matching of protocol versions ((majorVersion == supportedProtocolMajorVersion) && (minorVersion == supportedProtocolMinorVersion)) override def receive: Receive = { @@ -213,7 +213,10 @@ class EntityDiscoveryManager(config: EntityDiscoveryManager.Configuration)( log.info("Received EntitySpec from user function with info: {}", spec.getServiceInfo) try { - if (!compatibleProtocol(spec.getServiceInfo.protocolMajorVersion, spec.getServiceInfo.protocolMinorVersion)) + if (!config.protocolCompatibilityCheck) + log.warning("Protocol version compatibility is configured to be ignored") + else if (!compatibleProtocol(spec.getServiceInfo.protocolMajorVersion, + spec.getServiceInfo.protocolMinorVersion)) throw EntityDiscoveryException( s"Incompatible protocol version ${spec.getServiceInfo.protocolMajorVersion}.${spec.getServiceInfo.protocolMinorVersion}, only $supportedProtocolVersionString is supported" ) diff --git a/tck/src/it/scala/io/cloudstate/tck/TCK.scala b/tck/src/it/scala/io/cloudstate/tck/TCK.scala index 689ac3d7e..339f56f96 100644 --- a/tck/src/it/scala/io/cloudstate/tck/TCK.scala +++ b/tck/src/it/scala/io/cloudstate/tck/TCK.scala @@ -56,15 +56,18 @@ class ManagedCloudstateTCK(config: TckConfiguration) extends CloudstateTCK("for val processes: TckProcesses = TckProcesses.create(config) - override def start(): Unit = try { + override def start(): Unit = { processes.service.start() super.start() processes.proxy.start() - } catch { - case error: Throwable => - processes.service.logs("service") - processes.proxy.logs("proxy") - throw error + } + + override def onStartError(error: Throwable): Unit = { + processes.service.logs("service") + processes.proxy.logs("proxy") + try processes.proxy.stop() + finally try processes.service.stop() + finally throw error } override def afterAll(): Unit = { diff --git a/tck/src/main/scala/io/cloudstate/tck/TCKSpec.scala b/tck/src/main/scala/io/cloudstate/tck/TCKSpec.scala index d0e094cc3..aab13028c 100644 --- a/tck/src/main/scala/io/cloudstate/tck/TCKSpec.scala +++ b/tck/src/main/scala/io/cloudstate/tck/TCKSpec.scala @@ -67,11 +67,16 @@ trait TCKSpec @volatile private[this] var _entitySpec: EntitySpec = EntitySpec() @volatile private[this] var _proxyInfo: ProxyInfo = ProxyInfo() - override def beforeAll(): Unit = { - start() - discover() - super.beforeAll() - } + override def beforeAll(): Unit = + try { + start() + discover() + super.beforeAll() + } catch { + case error: Throwable => onStartError(error) + } + + def onStartError(error: Throwable): Unit = throw error override def afterEach(): Unit = { super.afterEach() diff --git a/testkit/src/main/scala/io/cloudstate/testkit/discovery/TestEntityDiscoveryService.scala b/testkit/src/main/scala/io/cloudstate/testkit/discovery/TestEntityDiscoveryService.scala index b859acf7a..aa0f0f02d 100644 --- a/testkit/src/main/scala/io/cloudstate/testkit/discovery/TestEntityDiscoveryService.scala +++ b/testkit/src/main/scala/io/cloudstate/testkit/discovery/TestEntityDiscoveryService.scala @@ -23,6 +23,7 @@ import com.google.protobuf.DescriptorProtos import com.google.protobuf.Descriptors.{FileDescriptor, ServiceDescriptor} import com.google.protobuf.empty.{Empty => ScalaPbEmpty} import io.cloudstate.protocol.entity._ +import io.cloudstate.testkit.BuildInfo import io.cloudstate.testkit.TestService.TestServiceContext import scala.concurrent.{Future, Promise} @@ -40,7 +41,11 @@ final class TestEntityDiscoveryService(context: TestServiceContext) { } object TestEntityDiscoveryService { - val info: ServiceInfo = ServiceInfo(supportLibraryName = "Cloudstate TestKit") + val info: ServiceInfo = ServiceInfo( + supportLibraryName = "Cloudstate TestKit", + protocolMajorVersion = BuildInfo.protocolMajorVersion, + protocolMinorVersion = BuildInfo.protocolMinorVersion + ) def entitySpec(entityType: String, service: ServiceDescription): EntitySpec = { import scala.jdk.CollectionConverters._