diff --git a/FuzzTesting/FailCases/clusterfuzz-testcase-minimized-ServerFuzzer-release-5285159577452544 b/FuzzTesting/FailCases/clusterfuzz-testcase-minimized-ServerFuzzer-release-5285159577452544 new file mode 100644 index 000000000..e6d890818 --- /dev/null +++ b/FuzzTesting/FailCases/clusterfuzz-testcase-minimized-ServerFuzzer-release-5285159577452544 @@ -0,0 +1,10 @@ +PUT /echo.Echo/Collect HTTP/1.1 +Content-Type:application/grpc +transfer-encoding:cHUnked + +3 +ÿÿ +ÿ + +PUT * HTTP/1.1 + diff --git a/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift b/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift index cb344c2c5..9ec784402 100644 --- a/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift +++ b/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift @@ -167,7 +167,7 @@ extension GRPCWebToHTTP2ServerCodec { case fullyOpen(InboundState, OutboundState) /// The server has closed the response stream, we may receive other request parts from the client. - case clientOpenServerClosed + case clientOpenServerClosed(InboundState) /// The client has sent everything, the server still needs to close the response stream. case clientClosedServerOpen(OutboundState) @@ -304,40 +304,15 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("Invalid state: haven't received request head") case .fullyOpen(var inbound, let outbound): - if inbound.requestBuffer == nil { - // We're not dealing with gRPC Web Text: just forward the buffer. - return .fireChannelRead(.data(.init(data: .byteBuffer(buffer)))) - } - - if inbound.requestBuffer!.readableBytes == 0 { - inbound.requestBuffer = buffer - } else { - inbound.requestBuffer!.writeBuffer(&buffer) - } - - let readableBytes = inbound.requestBuffer!.readableBytes - // The length of base64 encoded data must be a multiple of 4. - let bytesToRead = readableBytes - (readableBytes % 4) - - let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action - - if bytesToRead > 0, - let base64Encoded = inbound.requestBuffer!.readString(length: bytesToRead), - let base64Decoded = Data(base64Encoded: base64Encoded) { - // Recycle the input buffer and restore the request buffer. - buffer.clear() - buffer.writeContiguousBytes(base64Decoded) - action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer)))) - } else { - action = .none - } - + let action = inbound.processInboundData(buffer: &buffer) self = .fullyOpen(inbound, outbound) return action - case .clientOpenServerClosed: - // The server is already done; so drop the request. - return .none + case var .clientOpenServerClosed(inbound): + // The server is already done, but it's not our place to drop the request. + let action = inbound.processInboundData(buffer: &buffer) + self = .clientOpenServerClosed(inbound) + return action case .clientClosedServerOpen: preconditionFailure("End of request stream already received") @@ -366,9 +341,13 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("End of request stream already received") case .clientOpenServerClosed: - // Both sides are closed now, back to idle. + // Both sides are closed now, back to idle. Don't forget to pass on the .end, as + // it's necessary to communicate to the other peers that the response is done. self = .idle - return .none + + // Send an empty DATA frame with the end stream flag set. + let empty = allocator.buffer(capacity: 0) + return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true))) case ._modifying: preconditionFailure("Left in modifying state") @@ -388,12 +367,12 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { case .idle: preconditionFailure("Invalid state: haven't received request head") - case var .fullyOpen(_, outbound): + case .fullyOpen(let inbound, var outbound): // Double check these are trailers. assert(outbound.responseHeadersSent) // We haven't seen the end of the request stream yet. - self = .clientOpenServerClosed + self = .clientOpenServerClosed(inbound) // Avoid CoW-ing the buffers. let responseBuffers = outbound.responseBuffer @@ -467,9 +446,9 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { case .idle: preconditionFailure("Invalid state: haven't received request head") - case let .fullyOpen(_, outbound): + case let .fullyOpen(inbound, outbound): // We still haven't seen the end of the request stream. - self = .clientOpenServerClosed + self = .clientOpenServerClosed(inbound) let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( hpackHeaders: trailers, @@ -703,6 +682,42 @@ extension GRPCWebToHTTP2ServerCodec { } } +extension GRPCWebToHTTP2ServerCodec.StateMachine.InboundState { + fileprivate mutating func processInboundData( + buffer: inout ByteBuffer + ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { + if self.requestBuffer == nil { + // We're not dealing with gRPC Web Text: just forward the buffer. + return .fireChannelRead(.data(.init(data: .byteBuffer(buffer)))) + } + + if self.requestBuffer!.readableBytes == 0 { + self.requestBuffer = buffer + } else { + self.requestBuffer!.writeBuffer(&buffer) + } + + let readableBytes = self.requestBuffer!.readableBytes + // The length of base64 encoded data must be a multiple of 4. + let bytesToRead = readableBytes - (readableBytes % 4) + + let action: GRPCWebToHTTP2ServerCodec.StateMachine.Action + + if bytesToRead > 0, + let base64Encoded = self.requestBuffer!.readString(length: bytesToRead), + let base64Decoded = Data(base64Encoded: base64Encoded) { + // Recycle the input buffer and restore the request buffer. + buffer.clear() + buffer.writeContiguousBytes(base64Decoded) + action = .fireChannelRead(.data(.init(data: .byteBuffer(buffer)))) + } else { + action = .none + } + + return action + } +} + extension HTTPHeaders { fileprivate init(hpackHeaders headers: HPACKHeaders) { self.init() diff --git a/Tests/GRPCTests/GRPCWebToHTTP2StateMachineTests.swift b/Tests/GRPCTests/GRPCWebToHTTP2StateMachineTests.swift index acd72fed8..3f131d6eb 100644 --- a/Tests/GRPCTests/GRPCWebToHTTP2StateMachineTests.swift +++ b/Tests/GRPCTests/GRPCWebToHTTP2StateMachineTests.swift @@ -379,8 +379,20 @@ final class GRPCWebToHTTP2StateMachineTests: GRPCTestCase { allocator: self.allocator ).assertWrite() - state.processInbound(serverRequestPart: .body(.init()), allocator: self.allocator).assertNone() - state.processInbound(serverRequestPart: .end(nil), allocator: self.allocator).assertNone() + state.processInbound( + serverRequestPart: .body(ByteBuffer(string: "hello world")), + allocator: self.allocator + ).assertRead { + $0.assertData { + XCTAssertFalse($0.endStream) + $0.data.assertByteBuffer { buffer in + XCTAssertTrue(buffer.readableBytesView.elementsEqual("hello world".utf8)) + } + } + } + state.processInbound(serverRequestPart: .end(nil), allocator: self.allocator).assertRead { + $0.assertEmptyDataWithEndStream() + } } func test_responsePartsAfterServerClosed() { @@ -489,14 +501,14 @@ final class GRPCWebToHTTP2StateMachineTests: GRPCTestCase { // gRPC-Web, server closes immediately. sendRequestHead(&state, contentType: .webProtobuf).assertRead() sendResponseHeaders(&state, headers: [":status": "415"], endStream: true).assertWrite() - sendRequestBody(&state, buffer: .init(string: "hello")).assertNone() - sendRequestEnd(&state).assertNone() + sendRequestBody(&state, buffer: .init(string: "hello")).assertRead() + sendRequestEnd(&state).assertRead() // gRPC-Web text, server closes immediately. sendRequestHead(&state, contentType: .webTextProtobuf).assertRead() sendResponseHeaders(&state, headers: [":status": "415"], endStream: true).assertWrite() - sendRequestBody(&state, buffer: .init(string: "hello")).assertNone() - sendRequestEnd(&state).assertNone() + sendRequestBody(&state, buffer: .init(string: "hello")).assertRead() + sendRequestEnd(&state).assertRead() } } diff --git a/Tests/GRPCTests/ServerFuzzingRegressionTests.swift b/Tests/GRPCTests/ServerFuzzingRegressionTests.swift index dba8052cd..306edfaef 100644 --- a/Tests/GRPCTests/ServerFuzzingRegressionTests.swift +++ b/Tests/GRPCTests/ServerFuzzingRegressionTests.swift @@ -77,4 +77,9 @@ final class ServerFuzzingRegressionTests: GRPCTestCase { let name = "clusterfuzz-testcase-minimized-ServerFuzzer-release-5448955772141568" XCTAssertNoThrow(try self.runTest(withInputNamed: name)) } + + func testFuzzCase_release_5285159577452544() { + let name = "clusterfuzz-testcase-minimized-ServerFuzzer-release-5285159577452544" + XCTAssertNoThrow(try self.runTest(withInputNamed: name)) + } }