Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Require compatible protocol version from language support
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Dec 21, 2020
1 parent d163527 commit 630c9e1
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 24 deletions.
1 change: 1 addition & 0 deletions proxy/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ cloudstate.proxy {
user-function-host = ${?USER_FUNCTION_HOST}
user-function-port = 8080
user-function-port = ${?USER_FUNCTION_PORT}
protocol-compatibility-check = true
relay-timeout = 1m
max-inbound-message-size = 12M
relay-buffer-size = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicLong

import com.typesafe.config.Config
import akka.actor.{ActorSelection, ActorSystem}
import akka.actor.{ActorSelection, ActorSystem, OneForOneStrategy, SupervisorStrategy}
import akka.pattern.ask
import akka.util.Timeout
import akka.cluster.Cluster
Expand Down Expand Up @@ -177,13 +177,18 @@ object CloudStateProxyMain {

system.actorOf(
BackoffSupervisor.props(
BackoffOpts.onFailure(
EntityDiscoveryManager.props(serverConfig),
childName = "server-manager",
minBackoff = appConfig.backoffMin,
maxBackoff = appConfig.backoffMax,
randomFactor = appConfig.backoffRandomFactor
)
BackoffOpts
.onFailure(
EntityDiscoveryManager.props(serverConfig),
childName = "server-manager",
minBackoff = appConfig.backoffMin,
maxBackoff = appConfig.backoffMax,
randomFactor = appConfig.backoffRandomFactor
)
.withSupervisorStrategy(OneForOneStrategy() {
// don't keep restarting and retrying if an entity discovery exception is thrown
case _: EntityDiscoveryException => SupervisorStrategy.Stop
})
),
"server-manager-supervisor"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object EntityDiscoveryManager {
httpPort: Int,
userFunctionHost: String,
userFunctionPort: Int,
protocolCompatibilityCheck: Boolean,
relayTimeout: Timeout,
relayOutputBufferSize: Int,
maxInboundMessageSize: Long,
Expand All @@ -68,6 +69,7 @@ object EntityDiscoveryManager {
httpPort = config.getInt("http-port"),
userFunctionHost = config.getString("user-function-host"),
userFunctionPort = config.getInt("user-function-port"),
protocolCompatibilityCheck = config.getBoolean("protocol-compatibility-check"),
relayTimeout = Timeout(config.getDuration("relay-timeout").toMillis.millis),
maxInboundMessageSize = config.getBytes("max-inbound-message-size"),
relayOutputBufferSize = config.getInt("relay-buffer-size"),
Expand Down Expand Up @@ -203,17 +205,18 @@ class EntityDiscoveryManager(config: EntityDiscoveryManager.Configuration)(
val supportedProtocolVersionString: String = s"${supportedProtocolMajorVersion}.${supportedProtocolMinorVersion}"

def compatibleProtocol(majorVersion: Int, minorVersion: Int): Boolean =
// allow empty protocol version to be compatible, until all library supports report their protocol version
((majorVersion == 0) && (minorVersion == 0)) ||
// otherwise it's currently strict matching of protocol versions
// currently strict matching of protocol versions
((majorVersion == supportedProtocolMajorVersion) && (minorVersion == supportedProtocolMinorVersion))

override def receive: Receive = {
case spec: EntitySpec =>
log.info("Received EntitySpec from user function with info: {}", spec.getServiceInfo)

try {
if (!compatibleProtocol(spec.getServiceInfo.protocolMajorVersion, spec.getServiceInfo.protocolMinorVersion))
if (!config.protocolCompatibilityCheck)
log.warning("Protocol version compatibility is configured to be ignored")
else if (!compatibleProtocol(spec.getServiceInfo.protocolMajorVersion,
spec.getServiceInfo.protocolMinorVersion))
throw EntityDiscoveryException(
s"Incompatible protocol version ${spec.getServiceInfo.protocolMajorVersion}.${spec.getServiceInfo.protocolMinorVersion}, only $supportedProtocolVersionString is supported"
)
Expand Down
15 changes: 9 additions & 6 deletions tck/src/it/scala/io/cloudstate/tck/TCK.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,18 @@ class ManagedCloudstateTCK(config: TckConfiguration) extends CloudstateTCK("for

val processes: TckProcesses = TckProcesses.create(config)

override def start(): Unit = try {
override def start(): Unit = {
processes.service.start()
super.start()
processes.proxy.start()
} catch {
case error: Throwable =>
processes.service.logs("service")
processes.proxy.logs("proxy")
throw error
}

override def onStartError(error: Throwable): Unit = {
processes.service.logs("service")
processes.proxy.logs("proxy")
try processes.proxy.stop()
finally try processes.service.stop()
finally throw error
}

override def afterAll(): Unit = {
Expand Down
15 changes: 10 additions & 5 deletions tck/src/main/scala/io/cloudstate/tck/TCKSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ trait TCKSpec
@volatile private[this] var _entitySpec: EntitySpec = EntitySpec()
@volatile private[this] var _proxyInfo: ProxyInfo = ProxyInfo()

override def beforeAll(): Unit = {
start()
discover()
super.beforeAll()
}
override def beforeAll(): Unit =
try {
start()
discover()
super.beforeAll()
} catch {
case error: Throwable => onStartError(error)
}

def onStartError(error: Throwable): Unit = throw error

override def afterEach(): Unit = {
super.afterEach()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.google.protobuf.DescriptorProtos
import com.google.protobuf.Descriptors.{FileDescriptor, ServiceDescriptor}
import com.google.protobuf.empty.{Empty => ScalaPbEmpty}
import io.cloudstate.protocol.entity._
import io.cloudstate.testkit.BuildInfo
import io.cloudstate.testkit.TestService.TestServiceContext

import scala.concurrent.{Future, Promise}
Expand All @@ -40,7 +41,11 @@ final class TestEntityDiscoveryService(context: TestServiceContext) {
}

object TestEntityDiscoveryService {
val info: ServiceInfo = ServiceInfo(supportLibraryName = "Cloudstate TestKit")
val info: ServiceInfo = ServiceInfo(
supportLibraryName = "Cloudstate TestKit",
protocolMajorVersion = BuildInfo.protocolMajorVersion,
protocolMinorVersion = BuildInfo.protocolMinorVersion
)

def entitySpec(entityType: String, service: ServiceDescription): EntitySpec = {
import scala.jdk.CollectionConverters._
Expand Down

0 comments on commit 630c9e1

Please sign in to comment.