From e1fb6627f2cbeb802746a0f9ac9ecce49c4debc0 Mon Sep 17 00:00:00 2001 From: Louis Wasserman Date: Fri, 4 Dec 2020 16:26:13 -0800 Subject: [PATCH] fix: copy metadata in flow collection for server-streaming RPCs (#210) --- .../main/java/io/grpc/kotlin/ClientCalls.kt | 8 +++- .../java/io/grpc/kotlin/ClientCallsTest.kt | 46 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/stub/src/main/java/io/grpc/kotlin/ClientCalls.kt b/stub/src/main/java/io/grpc/kotlin/ClientCalls.kt index 3cddb038..3650cafc 100644 --- a/stub/src/main/java/io/grpc/kotlin/ClientCalls.kt +++ b/stub/src/main/java/io/grpc/kotlin/ClientCalls.kt @@ -251,6 +251,12 @@ object ClientCalls { } } + private fun GrpcMetadata.copy(): GrpcMetadata { + val result = GrpcMetadata() + result.merge(this) + return result + } + /** * Returns a [Flow] that, when collected, issues the specified RPC with the specified request * on the specified channel, and emits the responses. This is intended to be the root @@ -294,7 +300,7 @@ object ClientCalls { readiness.onReady() } }, - headers + headers.copy() ) val sender = launch(CoroutineName("SendMessage worker for ${method.fullMethodName}")) { diff --git a/stub/src/test/java/io/grpc/kotlin/ClientCallsTest.kt b/stub/src/test/java/io/grpc/kotlin/ClientCallsTest.kt index c35d4ef3..4e9862bc 100644 --- a/stub/src/test/java/io/grpc/kotlin/ClientCallsTest.kt +++ b/stub/src/test/java/io/grpc/kotlin/ClientCallsTest.kt @@ -20,7 +20,13 @@ import com.google.common.truth.Truth.assertThat import com.google.common.truth.extensions.proto.ProtoTruth.assertThat import com.google.common.util.concurrent.MoreExecutors.directExecutor import io.grpc.CallOptions +import io.grpc.ClientCall +import io.grpc.ClientInterceptor +import io.grpc.ClientInterceptors import io.grpc.Context +import io.grpc.ForwardingClientCall +import io.grpc.Metadata +import io.grpc.MethodDescriptor import io.grpc.Status import io.grpc.StatusException import io.grpc.examples.helloworld.GreeterGrpc @@ -609,4 +615,44 @@ class ClientCallsTest: AbstractCallsTest() { assertThat(responses.single()).isEqualTo(helloReply("Hello, Sunstone")) assertThat(requestsEvaluations.get()).isEqualTo(2) } + + @Test + fun metadataCopied() = runBlocking { + val metadataKey: Metadata.Key = Metadata.Key.of("test", Metadata.ASCII_STRING_MARSHALLER) + val serverImpl = object : GreeterGrpc.GreeterImplBase() { + override fun serverStreamSayHello( + request: MultiHelloRequest, + responseObserver: StreamObserver + ) { + responseObserver.onNext(helloReply("hello!")) + responseObserver.onCompleted() + } + } + + // Verify that the metadata is copied anew for each collection of the flow, with an interceptor + // that checks that it hasn't run before. + val interceptor = object : ClientInterceptor { + override fun interceptCall( + method: MethodDescriptor?, + callOptions: CallOptions, + next: io.grpc.Channel + ): ClientCall { + val call: ClientCall = next.newCall(method, callOptions) + return object : ForwardingClientCall() { + override fun start(responseListener: Listener, headers: Metadata) { + check(!headers.containsKey(metadataKey)) + headers.put(metadataKey, "value") + super.start(responseListener, headers) + } + + override fun delegate(): ClientCall = call + } + } + } + val channel = ClientInterceptors.intercept(makeChannel(serverImpl), interceptor) + val flow = + ClientCalls.serverStreamingRpc(channel, serverStreamingSayHelloMethod, multiHelloRequest()) + flow.collect() + flow.collect() + } }