Skip to content

Commit

Permalink
fix: copy metadata in flow collection for server-streaming RPCs (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
lowasser committed Dec 5, 2020
1 parent 3cd2fe4 commit e1fb662
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
8 changes: 7 additions & 1 deletion stub/src/main/java/io/grpc/kotlin/ClientCalls.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ object ClientCalls {
}
}

private fun GrpcMetadata.copy(): GrpcMetadata {
val result = GrpcMetadata()
result.merge(this)
return result
}

/**
* Returns a [Flow] that, when collected, issues the specified RPC with the specified request
* on the specified channel, and emits the responses. This is intended to be the root
Expand Down Expand Up @@ -294,7 +300,7 @@ object ClientCalls {
readiness.onReady()
}
},
headers
headers.copy()
)

val sender = launch(CoroutineName("SendMessage worker for ${method.fullMethodName}")) {
Expand Down
46 changes: 46 additions & 0 deletions stub/src/test/java/io/grpc/kotlin/ClientCallsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ import com.google.common.truth.Truth.assertThat
import com.google.common.truth.extensions.proto.ProtoTruth.assertThat
import com.google.common.util.concurrent.MoreExecutors.directExecutor
import io.grpc.CallOptions
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.ClientInterceptors
import io.grpc.Context
import io.grpc.ForwardingClientCall
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.Status
import io.grpc.StatusException
import io.grpc.examples.helloworld.GreeterGrpc
Expand Down Expand Up @@ -609,4 +615,44 @@ class ClientCallsTest: AbstractCallsTest() {
assertThat(responses.single()).isEqualTo(helloReply("Hello, Sunstone"))
assertThat(requestsEvaluations.get()).isEqualTo(2)
}

@Test
fun metadataCopied() = runBlocking {
val metadataKey: Metadata.Key<String> = Metadata.Key.of("test", Metadata.ASCII_STRING_MARSHALLER)
val serverImpl = object : GreeterGrpc.GreeterImplBase() {
override fun serverStreamSayHello(
request: MultiHelloRequest,
responseObserver: StreamObserver<HelloReply>
) {
responseObserver.onNext(helloReply("hello!"))
responseObserver.onCompleted()
}
}

// Verify that the metadata is copied anew for each collection of the flow, with an interceptor
// that checks that it hasn't run before.
val interceptor = object : ClientInterceptor {
override fun <ReqT, RespT> interceptCall(
method: MethodDescriptor<ReqT, RespT>?,
callOptions: CallOptions,
next: io.grpc.Channel
): ClientCall<ReqT, RespT> {
val call: ClientCall<ReqT, RespT> = next.newCall(method, callOptions)
return object : ForwardingClientCall<ReqT, RespT>() {
override fun start(responseListener: Listener<RespT>, headers: Metadata) {
check(!headers.containsKey(metadataKey))
headers.put(metadataKey, "value")
super.start(responseListener, headers)
}

override fun delegate(): ClientCall<ReqT, RespT> = call
}
}
}
val channel = ClientInterceptors.intercept(makeChannel(serverImpl), interceptor)
val flow =
ClientCalls.serverStreamingRpc(channel, serverStreamingSayHelloMethod, multiHelloRequest())
flow.collect()
flow.collect()
}
}

0 comments on commit e1fb662

Please sign in to comment.