From 848cf87aeda65fcd5d042747dbd663b8e861ba3e Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Tue, 13 Aug 2024 17:14:07 -0700 Subject: [PATCH] Improve Hearth unit test coverge for event streams. (#214) --- .../spec/event_stream_integration_spec.rb | 15 +- .../event_stream_integration_spec.rb | 15 +- hearth/lib/hearth/event_stream.rb | 21 +- .../event_stream/binary/message_decoder.rb | 22 +- .../event_stream/binary/message_encoder.rb | 4 +- .../lib/hearth/event_stream/binary/types.rb | 4 +- .../lib/hearth/event_stream/handler_base.rb | 19 +- hearth/lib/hearth/http2/client.rb | 22 +- hearth/lib/hearth/http2/connection.rb | 67 ++-- hearth/lib/hearth/middleware/event_streams.rb | 2 +- .../sig/lib/hearth/event_stream/message.rbs | 4 +- .../hearth/event_stream/async_output_spec.rb | 123 ++++++ .../event_stream/binary/encode_decode_spec.rb | 67 +++- .../spec/hearth/event_stream/decoder_spec.rb | 41 ++ .../spec/hearth/event_stream/encoder_spec.rb | 82 ++++ .../hearth/event_stream/handler_base_spec.rb | 126 ++++++ hearth/spec/hearth/http2/client_spec.rb | 204 ++++++++++ hearth/spec/hearth/http2/connection_spec.rb | 360 ++++++++++++++++++ hearth/spec/hearth/http2/request_spec.rb | 28 ++ hearth/spec/hearth/http2/response_spec.rb | 17 + .../hearth/middleware/event_streams_spec.rb | 96 +++++ hearth/spec/hearth/middleware/sign_spec.rb | 34 ++ hearth/spec/hearth/signers/anonymous_spec.rb | 19 + hearth/spec/spec_helper.rb | 3 +- 24 files changed, 1288 insertions(+), 107 deletions(-) create mode 100644 hearth/spec/hearth/event_stream/async_output_spec.rb create mode 100644 hearth/spec/hearth/event_stream/decoder_spec.rb create mode 100644 hearth/spec/hearth/event_stream/encoder_spec.rb create mode 100644 hearth/spec/hearth/event_stream/handler_base_spec.rb create mode 100644 hearth/spec/hearth/http2/client_spec.rb create mode 100644 hearth/spec/hearth/http2/connection_spec.rb create mode 100644 hearth/spec/hearth/http2/request_spec.rb create mode 100644 hearth/spec/hearth/http2/response_spec.rb create mode 100644 hearth/spec/hearth/middleware/event_streams_spec.rb diff --git a/codegen/projections/white_label/spec/event_stream_integration_spec.rb b/codegen/projections/white_label/spec/event_stream_integration_spec.rb index de735eff5..f3470e350 100644 --- a/codegen/projections/white_label/spec/event_stream_integration_spec.rb +++ b/codegen/projections/white_label/spec/event_stream_integration_spec.rb @@ -19,7 +19,7 @@ def start_mirror_event_server(port) server = TCPServer.new(port) - Logger.new($stdout) + logger = Logger.new($stdout) $stdout.sync = true server_thread = Thread.new do @@ -28,6 +28,7 @@ def start_mirror_event_server(port) conn = HTTP2::Server.new conn.on(:frame) do |bytes| + logger.info("SERVER -> #{bytes.inspect}") sock.write(bytes) unless sock.closed? end @@ -76,6 +77,8 @@ def start_mirror_event_server(port) end stream.on(:half_close) do + logger.info("SERVER HALF CLOSE") + stream.data('', end_stream: true) stream.close end end @@ -88,6 +91,7 @@ def start_mirror_event_server(port) rescue StandardError => e puts "#{e.class} exception: #{e.message} - closing socket." puts e.backtrace + logger.error("SERVER exception: #{e.inspect}") sock.close end end @@ -113,9 +117,16 @@ def start_mirror_event_server(port) # end server, server_thread = start_mirror_event_server(port) + logger = Logger.new($stdout) + $stdout.sync = true + Timeout.timeout(5) do client = WhiteLabel::Client.new( - endpoint: "http://localhost:#{port}" + endpoint: "http://localhost:#{port}", + http2_client: Hearth::HTTP2::Client.new( + debug_output: true, + logger: logger + ) ) handler = WhiteLabel::EventStream::StartEventStreamHandler.new diff --git a/codegen/smithy-ruby-codegen-test/integration-specs/event_stream_integration_spec.rb b/codegen/smithy-ruby-codegen-test/integration-specs/event_stream_integration_spec.rb index de735eff5..ea0e8933c 100644 --- a/codegen/smithy-ruby-codegen-test/integration-specs/event_stream_integration_spec.rb +++ b/codegen/smithy-ruby-codegen-test/integration-specs/event_stream_integration_spec.rb @@ -19,7 +19,7 @@ def start_mirror_event_server(port) server = TCPServer.new(port) - Logger.new($stdout) + logger = Logger.new($stdout) $stdout.sync = true server_thread = Thread.new do @@ -28,6 +28,7 @@ def start_mirror_event_server(port) conn = HTTP2::Server.new conn.on(:frame) do |bytes| + logger.info("SERVER -> #{bytes.inspect}") sock.write(bytes) unless sock.closed? end @@ -76,6 +77,8 @@ def start_mirror_event_server(port) end stream.on(:half_close) do + logger.info('SERVER HALF CLOSE') + stream.data('', end_stream: true) stream.close end end @@ -88,6 +91,7 @@ def start_mirror_event_server(port) rescue StandardError => e puts "#{e.class} exception: #{e.message} - closing socket." puts e.backtrace + logger.error("SERVER exception: #{e.inspect}") sock.close end end @@ -113,9 +117,16 @@ def start_mirror_event_server(port) # end server, server_thread = start_mirror_event_server(port) + logger = Logger.new($stdout) + $stdout.sync = true + Timeout.timeout(5) do client = WhiteLabel::Client.new( - endpoint: "http://localhost:#{port}" + endpoint: "http://localhost:#{port}", + http2_client: Hearth::HTTP2::Client.new( + debug_output: true, + logger: logger + ) ) handler = WhiteLabel::EventStream::StartEventStreamHandler.new diff --git a/hearth/lib/hearth/event_stream.rb b/hearth/lib/hearth/event_stream.rb index 76eafc2d1..1cd40c701 100644 --- a/hearth/lib/hearth/event_stream.rb +++ b/hearth/lib/hearth/event_stream.rb @@ -13,25 +13,8 @@ module Hearth # Module for EventStreams. module EventStream - # Raised when reading bytes exceed buffer total bytes - class ReadBytesExceedLengthError < RuntimeError; end + class MessageDecodeError < StandardError; end - # Raised when insufficient bytes of a message is received - class IncompleteMessageError < RuntimeError; end - - # Raised when there is a prelude checksum mismatch. - class PreludeChecksumError < RuntimeError; end - - # Raised when there is a message checksum mismatch. - class MessageChecksumError < RuntimeError; end - - # Raised when an event payload exceeds the maximum allowed length. - class EventPayloadLengthExceedError < RuntimeError; end - - # Raised when event headers exceed maximum allowed length. - class EventHeadersLengthExceedError < RuntimeError; end - - # Raised when event streams parsers encounter are unable to parse a message. - class EventStreamParserError < RuntimeError; end + class MessageEncodeError < StandardError; end end end diff --git a/hearth/lib/hearth/event_stream/binary/message_decoder.rb b/hearth/lib/hearth/event_stream/binary/message_decoder.rb index 0244c9711..8b9762802 100644 --- a/hearth/lib/hearth/event_stream/binary/message_decoder.rb +++ b/hearth/lib/hearth/event_stream/binary/message_decoder.rb @@ -71,7 +71,7 @@ def decode_message(raw_message) def validate_checksum!(prelude, content, checksum) return if Zlib.crc32([prelude, content].pack('a*a*'), 0) == checksum - raise MessageChecksumError + raise MessageDecodeError, 'Message checksum mismatch' end def decode_prelude(prelude) @@ -80,7 +80,9 @@ def decode_prelude(prelude) content, checksum = prelude.unpack( "a#{PRELUDE_LENGTH - CRC32_LENGTH}N" ) - raise PreludeChecksumError unless Zlib.crc32(content, 0) == checksum + unless Zlib.crc32(content, 0) == checksum + raise MessageDecodeError, 'Prelude checksum mismatch' + end content.unpack('N*') end @@ -137,24 +139,8 @@ def extract_header_value(scanner, unpack_pattern, value_length) # rubocop:enable Metrics def extract_payload(encoded) - if encoded.bytesize <= ONE_MEGABYTE - payload_stringio(encoded) - else - payload_tempfile(encoded) - end - end - - def payload_stringio(encoded) StringIO.new(encoded) end - - def payload_tempfile(encoded) - payload = Tempfile.new - payload.binmode - payload.write(encoded) - payload.rewind - payload - end end end end diff --git a/hearth/lib/hearth/event_stream/binary/message_encoder.rb b/hearth/lib/hearth/event_stream/binary/message_encoder.rb index c8d506dc4..ed4e86653 100644 --- a/hearth/lib/hearth/event_stream/binary/message_encoder.rb +++ b/hearth/lib/hearth/event_stream/binary/message_encoder.rb @@ -53,7 +53,7 @@ def encode_headers(headers) def encode_content(message, encoded_header) if message.payload.length > MAX_PAYLOAD_LENGTH - raise EventPayloadLengthExceedError + raise MessageEncodeError, 'Payload exceeds maximum payload length' end header_length = encoded_header.bytesize @@ -102,7 +102,7 @@ def validate_and_join!(header_entries) break encoded_header end - raise EventHeadersLengthExceedError + raise MessageEncodeError, 'Encoded headers exceed maximum length' end end diff --git a/hearth/lib/hearth/event_stream/binary/types.rb b/hearth/lib/hearth/event_stream/binary/types.rb index d834eb431..b6008603e 100644 --- a/hearth/lib/hearth/event_stream/binary/types.rb +++ b/hearth/lib/hearth/event_stream/binary/types.rb @@ -24,7 +24,9 @@ module Types def self.encode_info(type) pattern = PATTERN[type] - raise EventStreamParserError unless pattern + unless pattern + raise MessageEncodeError, "Unexpected header type: #{type}" + end pattern end diff --git a/hearth/lib/hearth/event_stream/handler_base.rb b/hearth/lib/hearth/event_stream/handler_base.rb index 1cdb9a2e3..d8dcad6af 100644 --- a/hearth/lib/hearth/event_stream/handler_base.rb +++ b/hearth/lib/hearth/event_stream/handler_base.rb @@ -9,7 +9,6 @@ def initialize @handlers = Hash.new { |h, k| h[k] = [] } @error_handlers = [] @error_event_handlers = [] - @exception_event_handlers = [] @raw_event_handlers = [] @headers_handlers = [] end @@ -26,11 +25,6 @@ def on_error_event(&block) @error_event_handlers << block end - # Modeled errors with message-type exception - def on_exception_event(&block) - @exception_event_handlers << block - end - def on_headers(&block) @headers_handlers << block end @@ -75,11 +69,7 @@ def emit_raw_event(message) def parse_and_emit_exception(message) type = message.headers.delete(':exception-type')&.value event = parse_event(type, message) - if event - emit_exception_event(type, event) - else - emit_exception_event(:unknown, message) - end + emit_event(event.class, event) end def on(type, callback) @@ -98,13 +88,6 @@ def emit_event(type, event) end end - def emit_exception_event(type, event) - emit_event(type, event) - @exception_event_handlers.each do |handler| - handler.call(type, event) - end - end - def emit_error_event(message) error_code = message.headers.delete(':error-code') error_message = message.headers.delete(':error-message') diff --git a/hearth/lib/hearth/http2/client.rb b/hearth/lib/hearth/http2/client.rb index 953b83f1f..414fe5a56 100644 --- a/hearth/lib/hearth/http2/client.rb +++ b/hearth/lib/hearth/http2/client.rb @@ -114,11 +114,7 @@ def transmit(request:, response:, logger: nil) def setup_stream_handlers(response, stream) stream.on(:headers) do |headers| - headers.each { |k, v| response.headers[k] = v } - if response.body.is_a?(EventStream::Decoder) - # allow async events based on headers - response.body.emit_headers(response.headers) - end + handle_response_headers(headers, response) end stream.on(:data) do |data| @@ -126,10 +122,20 @@ def setup_stream_handlers(response, stream) end stream.on(:close) do + log_debug('Stream closed, sending stream-closed to ' \ + "sync_queue. Stream: #{stream.inspect}") response.sync_queue << 'stream-closed' end end + def handle_response_headers(headers, response) + headers.each { |k, v| response.headers[k] = v } + return unless response.body.is_a?(EventStream::Decoder) + + # allow async events based on headers + response.body.emit_headers(headers) + end + # H2 pseudo headers # https://http2.github.io/http2-spec/#rfc.section.8.1.2.3 def h2_headers(request) @@ -193,6 +199,12 @@ def pool_config options['http_version'] = 'http2' options end + + def log_debug(msg) + return unless @logger && @debug_output + + @logger.debug(msg) + end end # rubocop:enable Metrics/ClassLength end diff --git a/hearth/lib/hearth/http2/connection.rb b/hearth/lib/hearth/http2/connection.rb index 40e635057..af4d23a0a 100644 --- a/hearth/lib/hearth/http2/connection.rb +++ b/hearth/lib/hearth/http2/connection.rb @@ -39,7 +39,8 @@ def initialize(options = {}) @h2_client = ::HTTP2::Client.new @mutex = Mutex.new - @socket = create_tcp_connection(options) + endpoint = options[:endpoint] + @socket = create_tcp_connection(endpoint) register_h2_callbacks @state = :connected @healthy = true @@ -61,7 +62,7 @@ def stale? # return a new stream, or nil when max streams is exceeded def new_stream - return unless @streams.size < @max_concurrent_streams + return unless !stale? && @streams.size < @max_concurrent_streams begin stream = @h2_client.new_stream @@ -88,7 +89,9 @@ def close @status = :closing @healthy = false - @streams.each_value(&:close) + @streams.each_value do |stream| + stream.close if stream.state == :open + end @streams = {} @thread.kill @@ -110,39 +113,31 @@ def start_connection_thread # rubocop:disable Metrics def socket_read_loop while !@socket.closed? && !@socket.eof? - begin - data = @socket.read_nonblock(CHUNKSIZE) - @h2_client << data - rescue IO::WaitReadable - begin - if @socket.wait_readable(read_timeout) - # available, retry to start reading - retry - else - log_debug('socket connection read time out') - close - end - rescue StandardError - # error can happen when closing the socket - # while it's waiting for read + data = @socket.read_nonblock(CHUNKSIZE, exception: false) + if data == :wait_readable + if @socket.wait_readable(read_timeout) + # available, retry to start reading + redo + else + log_debug('socket connection read time out') close + raise NetworkingError, SocketError.new('Socket read timeout.') end end + @h2_client << data end rescue StandardError => e log_debug("Fatal error in http2 connection: #{e.inspect}") @errors << e - close raise e ensure close end # rubocop:enable Metrics - def create_tcp_connection(options) - endpoint = options[:endpoint] + def create_tcp_connection(endpoint) tcp, addr = tcp_socket(endpoint) - log_debug("opening connection to #{endpoint.host}:#{endpoint.port} ...") + log_debug("opening connection to #{endpoint.host}:#{endpoint.port}") nonblocking_connect(tcp, addr) return open_ssl_socket(tcp, endpoint) if endpoint.scheme == 'https' @@ -154,9 +149,8 @@ def open_ssl_socket(tcp, endpoint) socket.sync_close = true socket.hostname = endpoint.host - log_debug("starting TLS for #{endpoint.host}:#{endpoint.port} ...") + log_debug("starting TLS for #{endpoint.host}:#{endpoint.port}") socket.connect - log_debug('TLS established') socket end @@ -181,12 +175,15 @@ def get_address(host) end def nonblocking_connect(tcp, addr) - tcp.connect_nonblock(addr) - rescue IO::WaitWritable + resp = tcp.connect_nonblock(addr, exception: false) + return unless resp == :wait_writable + unless tcp.wait_writable(open_timeout) tcp.close - raise + raise Hearth::NetworkingError, + SocketError.new("Timeout opening socket to #{addr}") end + begin tcp.connect_nonblock(addr) rescue Errno::EISCONN @@ -230,15 +227,17 @@ def default_ca_directory def register_h2_callbacks @h2_client.on(:frame) do |bytes| - if @socket.nil? - msg = 'Connection is closed due to errors, ' \ - 'you can find errors at async_client.connection.errors' - raise Hearth::Http2::ConnectionClosedError, msg - else - @socket.print(bytes) - @socket.flush + @mutex.synchronize do + if @socket.nil? || @socket.closed? + msg = 'Unable to write data to closed connection.' + raise Hearth::HTTP2::ConnectionClosedError, SocketError.new(msg) + else + @socket.print(bytes) + @socket.flush + end end end + return unless @debug_output @h2_client.on(:frame_sent) do |frame| diff --git a/hearth/lib/hearth/middleware/event_streams.rb b/hearth/lib/hearth/middleware/event_streams.rb index cdd28093e..8cb76ddca 100644 --- a/hearth/lib/hearth/middleware/event_streams.rb +++ b/hearth/lib/hearth/middleware/event_streams.rb @@ -84,7 +84,7 @@ def setup_request_events(context) def setup_response_events(context) log_debug(context, 'Setting up response events.') - decoder = Hearth::EventStream::Decoder.new( + decoder = EventStream::Decoder.new( message_decoder: @message_encoding_module .const_get(:MessageDecoder).new, event_handler: @event_handler diff --git a/hearth/sig/lib/hearth/event_stream/message.rbs b/hearth/sig/lib/hearth/event_stream/message.rbs index 175dca2de..81fac9d15 100644 --- a/hearth/sig/lib/hearth/event_stream/message.rbs +++ b/hearth/sig/lib/hearth/event_stream/message.rbs @@ -1,9 +1,9 @@ module Hearth module EventStream class Message - def initialize: (headers: Hash[Symbol, HeaderValue], payload: IO) -> void + def initialize: (headers: Hash[String, HeaderValue], payload: IO) -> void - attr_accessor headers: Hash[Symbol, HeaderValue] + attr_accessor headers: Hash[String, HeaderValue] # @return [IO] payload of a message, size not exceed 16MB. attr_accessor payload: IO diff --git a/hearth/spec/hearth/event_stream/async_output_spec.rb b/hearth/spec/hearth/event_stream/async_output_spec.rb new file mode 100644 index 000000000..b0882119d --- /dev/null +++ b/hearth/spec/hearth/event_stream/async_output_spec.rb @@ -0,0 +1,123 @@ +# frozen_string_literal: true + +module Hearth + module EventStream + describe AsyncOutput do + let(:response) { HTTP2::Response.new } + let(:payload) { 'payload' } + let(:encoder) { double(encode: payload) } + let(:closed) { false } + let(:state) { :open } + let(:stream) { double(closed?: closed, state: state) } + + subject do + AsyncOutput.new( + response: response, + encoder: encoder + ) + end + + before(:each) do + response.stream = stream if response + end + + describe '#end_input_stream' do + context 'stream closed' do + let(:closed) { true } + + it 'does not send a frame' do + expect(stream).not_to receive(:data) + subject.end_input_stream + end + end + + context 'stream open' do + let(:closed) { false } + + it 'encodes an empty message and sends end_stream' do + expect(stream).to receive(:data).with(payload, end_stream: true) + subject.end_input_stream + end + end + end + + describe '#join' do + context 'no response' do + let(:response) { nil } + + it 'returns false immediately' do + expect(subject.join).to be_falsey + end + end + + context 'stream closed' do + let(:closed) { true } + + it 'returns false immediately' do + expect(subject.join).to be_falsey + end + end + + context 'stream open' do + let(:state) { :open } + it 'closes the stream and waits for graceful service close' do + expect(stream).to receive(:data).with(payload, end_stream: true) + expect(response.sync_queue).to receive(:pop) + + expect(subject.join).to be_truthy + end + end + + context 'stream half_closed_remote' do + let(:state) { :half_closed_remote } + it 'closes the stream and waits for graceful service close' do + expect(stream).to receive(:data).with(payload, end_stream: true) + expect(response.sync_queue).to receive(:pop) + + expect(subject.join).to be_truthy + end + end + + context 'stream half_closed_local' do + let(:state) { :half_closed_local } + it 'waits for graceful service close' do + expect(stream).not_to receive(:data) + expect(response.sync_queue).to receive(:pop) + + expect(subject.join).to be_truthy + end + end + end + + describe '#kill' do + it 'closes the stream' do + expect(stream).to receive(:close) + + subject.kill + end + end + + describe '#send_event' do + context 'stream open' do + it 'sends the encoded data' do + expect(stream).to receive(:data).with(payload, end_stream: false) + + subject.send(:send_event, Message.new) + end + end + + context 'stream closed' do + let(:closed) { true } + + it 'sends the encoded data' do + expect(stream).not_to receive(:data) + + expect do + subject.send(:send_event, Message.new) + end.to raise_error(ArgumentError) + end + end + end + end + end +end diff --git a/hearth/spec/hearth/event_stream/binary/encode_decode_spec.rb b/hearth/spec/hearth/event_stream/binary/encode_decode_spec.rb index 6803cee59..1135dfce5 100644 --- a/hearth/spec/hearth/event_stream/binary/encode_decode_spec.rb +++ b/hearth/spec/hearth/event_stream/binary/encode_decode_spec.rb @@ -55,7 +55,72 @@ def build_headers(headers) end end - # TODO: Negative tests + Dir.glob(File.expand_path('fixtures/decoded/negative/*', __dir__)) + .each do |decode_path| + suit_name = File.basename(decode_path) + encoded_path = File.join( + File.expand_path('fixtures/encoded/negative', __dir__), suit_name + ) + + next unless File.exist?(encoded_path) + + context suit_name do + let(:encoded_bytes) { File.read(encoded_path, mode: 'rb') } + let(:error_message) { File.read(decode_path) } + it 'raises errors decoding' do + expect do + MessageDecoder.new.decode(encoded_bytes) + end.to raise_error(MessageDecodeError, error_message) + end + end + end + end + + context 'large payloads' do + let(:payload) { 'a' * 16_777_217 } + let(:message) do + Message.new( + payload: StringIO.new(payload) + ) + end + it 'encoder raises when headers exceed maximum size' do + expect do + MessageEncoder.new.encode(message) + end.to raise_error(MessageEncodeError) + end + end + + context 'large headers' do + let(:large_header) { 'a' * 131_072 } + let(:message) do + Message.new( + headers: { + 'header' => HeaderValue.new(type: 'string', value: large_header) + } + ) + end + + it 'encoder raises when payloads exceed maximum size' do + expect do + MessageEncoder.new.encode(message) + end.to raise_error(MessageEncodeError) + end + end + + context 'invalid header type' do + let(:message) do + Message.new( + headers: { + 'header' => HeaderValue.new(type: 'invalid', value: 'value') + } + ) + end + + it 'encoder raises when payloads exceed maximum size' do + expect do + MessageEncoder.new.encode(message) + end.to raise_error(MessageEncodeError) + end end end end diff --git a/hearth/spec/hearth/event_stream/decoder_spec.rb b/hearth/spec/hearth/event_stream/decoder_spec.rb new file mode 100644 index 000000000..f2d285836 --- /dev/null +++ b/hearth/spec/hearth/event_stream/decoder_spec.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Hearth + module EventStream + describe Decoder do + let(:message_decoder) { double } + let(:event_handler) { double } + + subject do + Decoder.new( + message_decoder: message_decoder, + event_handler: event_handler + ) + end + + describe '#emit_headers' do + let(:headers) { {} } + it 'calls the event_handler' do + expect(event_handler).to receive(:emit_headers).with(headers) + subject.emit_headers(headers) + end + end + + describe '#write' do + let(:chunk) { 'chunk' } + let(:message1) { Message.new } + let(:message2) { Message.new } + it 'decodes and emits messages until empty' do + expect(message_decoder).to receive(:decode) + .with(chunk).and_return([message1, false]) + expect(message_decoder).to receive(:decode) + .with(nil).and_return([message2, true]) + expect(event_handler).to receive(:emit).with(message1) + expect(event_handler).to receive(:emit).with(message2) + + subject.write(chunk) + end + end + end + end +end diff --git a/hearth/spec/hearth/event_stream/encoder_spec.rb b/hearth/spec/hearth/event_stream/encoder_spec.rb new file mode 100644 index 000000000..58e6b6f9d --- /dev/null +++ b/hearth/spec/hearth/event_stream/encoder_spec.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +module Hearth + module EventStream + describe Encoder do + let(:message_encoder) { double } + + subject do + Encoder.new( + message_encoder: message_encoder + ) + end + + describe '#rewind' do + it 'responds to rewind' do + expect(subject).to respond_to(:rewind) + end + end + + describe '#encode' do + let(:signature1) { 'signature1' } + let(:signature2) { 'signature2' } + let(:event_type) { :event } + let(:message) { double } + let(:signed_message) { double } + + it 'signs and encodes the event' do + subject.prior_signature = signature1 + sign_event = proc do |sig, event_type, message, message_encoder| + expect(sig).to eq(signature1) + expect(event_type).to eq(event_type) + expect(message).to eq(message) + expect(message_encoder).to eq(message_encoder) + [signed_message, signature2] + end + expect(message_encoder).to receive(:encode).with(signed_message) + subject.sign_event = sign_event + + subject.encode(event_type, message) + end + end + end + + describe EncoderWithInitialMessage do + let(:message_encoder) { double } + let(:initial_event) { double } + + subject do + EncoderWithInitialMessage.new( + message_encoder: message_encoder, + initial_event: initial_event + ) + end + + describe '#read' do + let(:signature1) { 'signature1' } + let(:signature2) { 'signature2' } + let(:event_type) { :event } + let(:message) { double } + let(:signed_message) { double } + let(:encoded_payload) { 'encoded_payload' } + + it 'returns the signed, encoded initial event' do + subject.prior_signature = signature1 + sign_event = proc do |sig, event_type, message, message_encoder| + expect(sig).to eq(signature1) + expect(event_type).to eq(event_type) + expect(message).to eq(message) + expect(message_encoder).to eq(message_encoder) + [signed_message, signature2] + end + expect(message_encoder).to receive(:encode) + .with(signed_message) + .and_return(encoded_payload) + subject.sign_event = sign_event + + expect(subject.read).to eq(encoded_payload) + end + end + end + end +end diff --git a/hearth/spec/hearth/event_stream/handler_base_spec.rb b/hearth/spec/hearth/event_stream/handler_base_spec.rb new file mode 100644 index 000000000..5feba4b40 --- /dev/null +++ b/hearth/spec/hearth/event_stream/handler_base_spec.rb @@ -0,0 +1,126 @@ +# frozen_string_literal: true + +module Hearth + module EventStream + describe HandlerBase do + let(:handler1) { proc {} } + let(:handler2) { proc {} } + let(:message_type) { 'event' } + let(:error_code) { nil } + let(:error_message) { nil } + let(:event_type) { nil } + let(:exception_type) { nil } + + let(:message) do + headers = {} + headers[':message-type'] = message_type if message_type + headers[':error-code'] = error_code if error_code + headers[':error-message'] = error_message if error_message + headers[':event-type'] = event_type if event_type + headers[':exception-type'] = exception_type if exception_type + headers.each do |k, v| + headers[k] = HeaderValue.new(value: v, type: 'string') + end + + Message.new( + headers: headers + ) + end + + subject do + Class.new(HandlerBase) do + def parse_event(_, _); end + end.new + end + + describe '#emit_headers' do + let(:headers) { {} } + + it 'calls registered header handlers' do + subject.on_headers(&handler1) + subject.on_headers(&handler2) + expect(handler1).to receive(:call).with(headers) + expect(handler2).to receive(:call).with(headers) + + subject.emit_headers(headers) + end + end + + describe '#emit' do + context 'raised exception' do + let(:exception) { StandardError.new } + it 'calls registered error handlers' do + allow(subject).to receive(:parse_event) + .and_raise(exception) + + subject.on_error(&handler1) + subject.on_error(&handler2) + expect(handler1).to receive(:call).with(exception) + expect(handler2).to receive(:call).with(exception) + + subject.emit(message) + end + end + + context 'error event' do + let(:message_type) { 'error' } + let(:error_code) { 'error_code' } + let(:error_message) { 'error_message' } + + it 'calls registered error_event handlers' do + subject.on_error_event(&handler1) + subject.on_error_event(&handler2) + expect(handler1).to receive(:call).with(error_code, error_message) + expect(handler2).to receive(:call).with(error_code, error_message) + + subject.emit(message) + end + end + + context 'exception event' do + let(:message_type) { 'exception' } + let(:exception_type) { 'MyException' } + let(:exception_class) { Class.new } + let(:exception_event) { exception_class.new } + + it 'calls registered MyException handlers' do + subject.send(:on, exception_class, handler1) + expect(subject).to receive(:parse_event) + .with(exception_type, message) + .and_return(exception_event) + expect(handler1).to receive(:call).with(exception_event) + + subject.emit(message) + end + end + + context 'regular event' do + let(:message_type) { 'event' } + let(:event_type) { 'MyEvent' } + let(:event_class) { Class.new } + let(:event) { event_class.new } + + it 'calls registered MyEvent handlers' do + subject.send(:on, event_class, handler1) + expect(subject).to receive(:parse_event) + .with(event_type, message) + .and_return(event) + expect(handler1).to receive(:call).with(event) + + subject.emit(message) + end + + it 'calls raw message handlers' do + subject.on_raw_event(&handler1) + expect(subject).to receive(:parse_event) + .with(event_type, message) + .and_return(event) + expect(handler1).to receive(:call).with(message) + + subject.emit(message) + end + end + end + end + end +end diff --git a/hearth/spec/hearth/http2/client_spec.rb b/hearth/spec/hearth/http2/client_spec.rb new file mode 100644 index 000000000..e7245e1fb --- /dev/null +++ b/hearth/spec/hearth/http2/client_spec.rb @@ -0,0 +1,204 @@ +# frozen_string_literal: true + +module Hearth + module HTTP2 + describe Client do + let(:debug_output) { true } + let(:client_logger) { double } + let(:logger) { double } + let(:open_timeout) { 1 } + let(:read_timeout) { 2 } + let(:max_concurrent_streams) { 3 } + let(:verify_peer) { false } + let(:ca_file) { 'ca_file' } + let(:ca_path) { 'ca_path' } + let(:cert_store) { 'cert_store' } + let(:enable_alpn) { false } + let(:host_resolver) { double } + + let(:client_options) do + { + logger: client_logger, + debug_output: debug_output, + open_timeout: open_timeout, + read_timeout: read_timeout, + max_concurrent_streams: max_concurrent_streams, + verify_peer: verify_peer, + ca_file: ca_file, + ca_path: ca_path, + cert_store: cert_store, + enable_alpn: enable_alpn, + host_resolver: host_resolver + } + end + + subject { Client.new(**client_options) } + + let(:http_method) { :get } + let(:uri) { URI('http://example.com/some/path?query') } + let(:fields) { HTTP::Fields.new } + let(:body_data) { 'payload' } + let(:request_body) { StringIO.new(body_data) } + let(:keep_open) { true } + + let(:request) do + Request.new( + http_method: http_method, + uri: uri, + fields: fields, + body: request_body, + keep_open: keep_open + ) + end + let(:response) { Response.new } + let(:pool) { double } + let(:connection) { double } + let(:stream) { double(nil?: false) } + let(:callbacks) { {} } + let(:response_headers) do + { 'header1' => 'value1', 'header2' => 'value2' } + end + let(:response_data) { 'data' } + + before(:each) do + allow(ConnectionPool).to receive(:for).and_return(pool) + allow(pool).to receive(:connection_for).and_return(connection) + allow(connection).to receive(:new_stream).and_return(stream) + allow(pool).to receive(:offer) + + allow(stream).to receive(:on) do |name, &block| + callbacks[name] = block + end + + allow(stream).to receive(:headers) + allow(stream).to receive(:data) + allow(client_logger).to receive(:debug) + end + + describe '#initialize' do + it 'raises when given unknown keys' do + expect do + Client.new(unknown: 'foo') + end.to raise_error(ArgumentError, /unknown/) + end + end + + describe '#transmit' do + it 're-uses a connection from the connection pool' do + expect(ConnectionPool).to receive(:for).and_return(pool) + expect(pool).to receive(:connection_for) + .with(uri).and_return(connection) + expect(Hearth::HTTP2::Connection).not_to receive(:new) + expect(pool).to receive(:offer).with(uri, connection) + + subject.transmit(request: request, response: response, logger: logger) + end + + it 'creates a new connection when non are available' do + expect(ConnectionPool).to receive(:for).and_return(pool) + expect(pool).to receive(:connection_for) + .with(uri).and_yield + expect(Hearth::HTTP2::Connection).to receive(:new) + .with(endpoint: uri, **client_options) + .and_return(connection) + expect(pool).to receive(:offer).with(uri, connection) + + subject.transmit(request: request, response: response, logger: logger) + end + + it 'creates a new connection when new_stream is nil' do + over_capacity_conn = double + expect(ConnectionPool).to receive(:for).and_return(pool) + expect(pool).to receive(:connection_for) + .and_return(over_capacity_conn) + expect(over_capacity_conn).to receive(:new_stream).and_return(nil) + expect(pool).to receive(:offer).with(uri, over_capacity_conn) + + expect(Hearth::HTTP2::Connection).to receive(:new) + .with(endpoint: uri, **client_options) + .and_return(connection) + expect(connection).to receive(:new_stream).and_return(stream) + + expect(pool).to receive(:offer).with(uri, connection) + + subject.transmit(request: request, response: response, logger: logger) + end + + it 'closes the connection on error' do + expect(stream).to receive(:headers).and_raise(SocketError) + expect(connection).to receive(:finish) + expect do + subject.transmit( + request: request, response: response, logger: logger + ) + end.to raise_error(SocketError) + end + + it 'adds h2 required headers' do + expected_headers = { + ':authority' => 'example.com', + ':method' => :GET, + ':path' => '/some/path?query', + ':scheme' => 'http' + } + expect(stream).to receive(:headers) + .with(expected_headers, end_stream: false) + + subject.transmit(request: request, response: response, logger: logger) + end + + it 'transmits the body' do + expect(stream).to receive(:data) + .with(body_data, end_stream: !keep_open) + + subject.transmit(request: request, response: response, logger: logger) + end + + it 'sets the stream on the response' do + subject.transmit(request: request, response: response, logger: logger) + expect(response.stream).to eq(stream) + end + + context 'response.body is an EventStream::Decoder' do + before(:each) do + response.body = double + allow(response.body).to receive(:is_a?) + .with(EventStream::Decoder) + .and_return(true) + end + + it 'emits headers using the decoder' do + subject.transmit(request: request, response: response, + logger: logger) + + expect(response.body).to receive(:emit_headers) + .with(response_headers) + callbacks[:headers].call(response_headers) + end + end + + it 'sets headers on response when received' do + subject.transmit(request: request, response: response, logger: logger) + + callbacks[:headers].call(response_headers) + expect(response.headers.to_h).to eq(response_headers) + end + + it 'writes data to the response body when received' do + subject.transmit(request: request, response: response, logger: logger) + + expect(response.body).to receive(:write).with(response_data) + callbacks[:data].call(response_data) + end + + it 'signals close on the response' do + subject.transmit(request: request, response: response, logger: logger) + + callbacks[:close].call + + expect(response.sync_queue.pop).to eq('stream-closed') + end + end + end + end +end diff --git a/hearth/spec/hearth/http2/connection_spec.rb b/hearth/spec/hearth/http2/connection_spec.rb new file mode 100644 index 000000000..a423f0a56 --- /dev/null +++ b/hearth/spec/hearth/http2/connection_spec.rb @@ -0,0 +1,360 @@ +# frozen_string_literal: true + +module Hearth + module HTTP2 + describe Connection do + let(:debug_output) { false } + let(:logger) { nil } + let(:open_timeout) { nil } + let(:read_timeout) { nil } + let(:max_concurrent_streams) { nil } + let(:verify_peer) { false } + let(:ca_file) { nil } + let(:ca_path) { nil } + let(:cert_store) { nil } + let(:enable_alpn) { nil } + let(:host_resolver) { nil } + let(:endpoint) { URI('https://example.com') } + let(:addr) { 'addr' } + + let(:socket) do + double( + setsockopt: nil, connect: nil, + closed?: false, eof?: true, close: nil + ) + end + let(:sockaddr) { double } + let(:thread) { double(kill: nil) } + let(:h2_client) { double } + let(:callbacks) { {} } # used for temp storage + let(:frame) { 'frame_data' } + + subject do + Connection.new( + endpoint: endpoint, + logger: logger, + debug_output: debug_output, + open_timeout: open_timeout, + read_timeout: read_timeout, + max_concurrent_streams: max_concurrent_streams, + verify_peer: verify_peer, + ca_file: ca_file, + ca_path: ca_path, + cert_store: cert_store, + enable_alpn: enable_alpn, + host_resolver: host_resolver + ) + end + + before(:each) do + allow(::Socket).to receive(:new).and_return(socket) + allow(OpenSSL::SSL::SSLSocket).to receive(:new).and_return(socket) + allow(socket).to receive(:connect_nonblock) + allow(socket).to receive(:sync_close=) + allow(socket).to receive(:hostname=) + + allow(Thread).to receive(:new) do |&block| + callbacks[:thread_block] = block + thread + end + allow(thread).to receive(:report_on_exception=) + allow(::Socket).to receive(:getaddrinfo).and_return( + [[addr, addr, addr, addr]] + ) + allow(::Socket).to receive(:sockaddr_in).and_return(sockaddr) + + allow(::HTTP2::Client).to receive(:new).and_return(h2_client) + allow(h2_client).to receive(:on) do |name, &block| + callbacks[name] = block + end + end + + describe '#initialize' do + it 'connects to the endpoint' do + expect(::Socket).to receive(:getaddrinfo) + .with('example.com', nil, ::Socket::AF_INET) + .and_return([[addr, addr, addr, addr]]) + expect(socket).to receive(:connect_nonblock) + .with(sockaddr, exception: false) + + subject + end + + it 'raises when trying to write h2 data to closed' do + subject + + allow(socket).to receive(:closed?).and_return(true) + expect do + callbacks[:frame].call('frame') + end.to raise_error(Hearth::HTTP2::ConnectionClosedError) + end + + it 'prints h2 stream data to the socket' do + subject + + expect(socket).to receive(:print).with(frame) + expect(socket).to receive(:flush) + callbacks[:frame].call(frame) + end + + context 'open_timeout' do + let(:open_timeout) { 1 } + + it 'connects with the open_timeout' do + expect(socket) + .to receive(:connect_nonblock) + .and_return(:wait_writable) + + expect(socket).to receive(:wait_writable) + .with(open_timeout).and_return(true) + + # a second call + expect(socket).to receive(:connect_nonblock) + subject + end + + it 'does not fail when waiting for socket that is already open' do + expect(socket) + .to receive(:connect_nonblock) + .and_return(:wait_writable) + + expect(socket).to receive(:wait_writable) + .with(open_timeout).and_return(true) + # second call raises already connected + expect(socket).to receive(:connect_nonblock) + .and_raise(Errno::EISCONN) + expect(subject.stale?).to be_falsey + end + + it 'raises when open timeout fails' do + expect(socket) + .to receive(:connect_nonblock) + .and_return(:wait_writable) + + expect(socket).to receive(:wait_writable).and_return(false) + expect(socket).to receive(:close) + expect do + subject + end.to raise_error(Hearth::NetworkingError) + end + end + + context 'http' do + let(:endpoint) { URI('http://no-ssl.com') } + + it 'does not create an ssl socket' do + expect(OpenSSL::SSL::SSLSocket).not_to receive(:new) + + subject + end + end + + context 'host_resolver' do + let(:host_resolver) { double } + let(:ipv6_addr) { double(address: 'ipv6') } + let(:ipv4_addr) { double(address: 'ipv4') } + + it 'uses the ipv6 address when returned' do + expect(host_resolver).to receive(:resolve_address) + .with(nodename: endpoint.host) + .and_return([ipv6_addr, ipv4_addr]) + expect(::Socket).to receive(:sockaddr_in) + .with(endpoint.port, ipv6_addr.address) + .and_return(sockaddr) + subject + end + + it 'uses the ipv4 address when no ipv6' do + expect(host_resolver).to receive(:resolve_address) + .with(nodename: endpoint.host) + .and_return([nil, ipv4_addr]) + expect(::Socket).to receive(:sockaddr_in) + .with(endpoint.port, ipv4_addr.address) + .and_return(sockaddr) + subject + end + end + + context 'verify_peer: false' do + let(:verify_peer) { false } + + it 'does not verify peer' do + expect(OpenSSL::SSL::SSLSocket).to receive(:new) do |_tcp, tls| + expect(tls.verify_mode).to eq(OpenSSL::SSL::VERIFY_NONE) + socket + end + + subject + end + end + + context 'verify_peer' do + let(:verify_peer) { true } + + it 'verifies peer' do + expect(OpenSSL::SSL::SSLSocket).to receive(:new) do |_, tls| + expect(tls.verify_mode).to eq(OpenSSL::SSL::VERIFY_PEER) + socket + end + + subject + end + + it 'configures default certs' do + allow(File).to receive(:exist?).and_return(true) + allow(Dir).to receive(:exist?).and_return(true) + + expect(OpenSSL::SSL::SSLSocket).to receive(:new) do |_, tls| + expect(tls.ca_file).to eq(OpenSSL::X509::DEFAULT_CERT_FILE) + expect(tls.ca_path).to eq(OpenSSL::X509::DEFAULT_CERT_DIR) + socket + end + + subject + end + + context 'certs/ca configured' do + let(:ca_file) { 'ca_file' } + let(:ca_path) { 'ca_path' } + let(:cert_store) { double } + + it 'configures certs' do + expect(OpenSSL::SSL::SSLSocket).to receive(:new) do |_, tls| + expect(tls.ca_file).to eq(ca_file) + expect(tls.ca_path).to eq(ca_path) + expect(tls.cert_store).to eq(cert_store) + socket + end + + subject + end + end + end + + context 'debug_output' do + let(:debug_output) { true } + let(:logger) { double } + + it 'logs connections and frame data' do + expect(logger).to receive(:debug) + .with('opening connection to example.com:443') + expect(logger).to receive(:debug) + .with('starting TLS for example.com:443') + subject + + expect(logger).to receive(:debug) + .with('-> frame: "frame_data"') + callbacks[:frame_sent].call(frame) + + expect(logger).to receive(:debug) + .with('<- frame: "frame_data"') + callbacks[:frame_received].call(frame) + end + end + end + + describe '#socket_read_loop' do + it 'closes the socket after eof' do + subject + expect(socket).to receive(:eof?).and_return(true) + expect(subject).to receive(:close) + callbacks[:thread_block].call + end + + it 'writes data to the h2_client' do + subject + expect(socket).to receive(:eof?).and_return(false, true) + expect(socket).to receive(:read_nonblock).and_return(frame) + expect(h2_client).to receive(:<<).with(frame) + expect(subject).to receive(:close) + callbacks[:thread_block].call + end + + context 'read_timeout' do + let(:read_timeout) { 1 } + + it 'waits to read' do + expect(socket).to receive(:eof?).and_return(false, true) + expect(socket).to receive(:read_nonblock) + .and_return(:wait_readable) + expect(socket).to receive(:wait_readable).with(read_timeout) + .and_return(true) + expect(socket).to receive(:read_nonblock).and_return(frame) + expect(h2_client).to receive(:<<).with(frame) + expect(subject).to receive(:close) + callbacks[:thread_block].call + end + + it 'raises after timeout' do + expect(socket).to receive(:eof?).and_return(false) + expect(socket).to receive(:read_nonblock) + .and_return(:wait_readable) + expect(socket).to receive(:wait_readable).with(read_timeout) + .and_return(false) + expect(subject).to receive(:close).at_least(:once) + expect do + callbacks[:thread_block].call + end.to raise_error(NetworkingError) + end + end + end + + describe '#new_stream' do + let(:max_concurrent_streams) { 1 } + let(:stream) { double(id: 1) } + it 'returns nil when stale' do + subject.instance_variable_set(:@healthy, false) + expect(subject.new_stream).to be_nil + end + + it 'creates and returns a stream' do + expect(h2_client).to receive(:new_stream).and_return(stream) + expect(subject.new_stream).to eq(stream) + end + + it 'returns nil when streams > max configured streams' do + expect(h2_client).to receive(:new_stream).and_return(stream) + subject.new_stream + + # try to create a second stream + expect(subject.new_stream).to be_nil + end + + it 'returns nil when server exceeds stream limit' do + expect(h2_client).to receive(:new_stream) + .and_raise(::HTTP2::Error::StreamLimitExceeded) + expect(subject.new_stream).to be_nil + end + end + + describe '#close_stream' do + let(:stream) { double(id: 1) } + + it 'closes the stream' do + allow(h2_client).to receive(:new_stream).and_return(stream) + subject.new_stream + + expect(stream).to receive(:close) + subject.close_stream(stream) + end + end + + describe '#close' do + let(:stream) { double(id: 1, state: :open) } + + it 'closes streams and the socket' do + allow(h2_client).to receive(:new_stream).and_return(stream) + subject.new_stream + + expect(stream).to receive(:close) + expect(thread).to receive(:kill) + expect(socket).to receive(:close) + + subject.close + + expect(subject.stale?).to be_truthy + end + end + end + end +end diff --git a/hearth/spec/hearth/http2/request_spec.rb b/hearth/spec/hearth/http2/request_spec.rb new file mode 100644 index 000000000..49110d0f9 --- /dev/null +++ b/hearth/spec/hearth/http2/request_spec.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module Hearth + module HTTP2 + describe Request do + let(:keep_open) { true } + + subject do + Request.new( + keep_open: keep_open + ) + end + + describe '#initialize' do + it 'defaults keep_open to false' do + request = Request.new + expect(request.keep_open).to be_falsey + end + end + + describe '#keep_open' do + it 'returns keep_open' do + expect(subject.keep_open).to eq(keep_open) + end + end + end + end +end diff --git a/hearth/spec/hearth/http2/response_spec.rb b/hearth/spec/hearth/http2/response_spec.rb new file mode 100644 index 000000000..55e2952ac --- /dev/null +++ b/hearth/spec/hearth/http2/response_spec.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Hearth + module HTTP2 + describe Response do + subject do + Response.new + end + + describe '#initialize' do + it 'initializes a sync queue' do + expect(subject.sync_queue).to be_a(Thread::Queue) + end + end + end + end +end diff --git a/hearth/spec/hearth/middleware/event_streams_spec.rb b/hearth/spec/hearth/middleware/event_streams_spec.rb new file mode 100644 index 000000000..397c03915 --- /dev/null +++ b/hearth/spec/hearth/middleware/event_streams_spec.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +module Hearth + module Middleware + describe EventStreams do + let(:app) { double('app', call: output) } + + let(:request_events) { false } + let(:response_events) { false } + let(:async_output) { double } + let(:async_output_class) { double(new: async_output) } + let(:event_handler) { double } + let(:message_encoding_module) { EventStream::Binary } + + subject do + EventStreams.new( + app, + request_events: request_events, + response_events: response_events, + async_output_class: async_output_class, + event_handler: event_handler, + message_encoding_module: message_encoding_module + ) + end + + describe '#call' do + let(:input) { double('input') } + let(:error) { nil } + let(:metadata) { {} } + let(:output) { double('output', error: error, metadata: metadata) } + let(:request) { HTTP2::Request.new } + let(:response) { HTTP2::Response.new } + let(:logger) { Logger.new(IO::NULL) } + let(:config) { double('config', logger: logger) } + let(:context) do + Context.new(request: request, response: response, config: config) + end + + context 'request_events' do + let(:request_events) { true } + + it 'sets up the request encoder and output' do + expect(async_output_class).to receive(:new).with( + response: response, + encoder: instance_of(EventStream::Encoder), + metadata: metadata + ) + .and_return(async_output) + + resp = subject.call(input, context) + + expect(resp).to eq(async_output) + expect(request.body).to be_a(EventStream::Encoder) + expect(request.keep_open).to be_truthy + end + + context 'initial response body' do + let(:message) { EventStream::Message.new } + let(:request) { HTTP2::Request.new(body: message) } + + it 'sets up the encoder with the initial message' do + expect(async_output_class).to receive(:new) + expect(EventStream::EncoderWithInitialMessage) + .to receive(:new).with( + message_encoder: + instance_of(EventStream::Binary::MessageEncoder), + initial_event: message + ).and_call_original + + subject.call(input, context) + + expect(request.body) + .to be_a(EventStream::EncoderWithInitialMessage) + end + end + end + + context 'response_events' do + let(:response_events) { true } + + it 'sets up the response decoder' do + expect(EventStream::Decoder).to receive(:new) + .with( + message_decoder: + instance_of(EventStream::Binary::MessageDecoder), + event_handler: event_handler + ).and_call_original + subject.call(input, context) + + expect(response.body).to be_a(EventStream::Decoder) + end + end + end + end + end +end diff --git a/hearth/spec/hearth/middleware/sign_spec.rb b/hearth/spec/hearth/middleware/sign_spec.rb index 51e8aafe9..ed20d209e 100644 --- a/hearth/spec/hearth/middleware/sign_spec.rb +++ b/hearth/spec/hearth/middleware/sign_spec.rb @@ -136,6 +136,40 @@ module Middleware expect(out).to eq(output) end end + + context 'event_stream' do + let(:event_stream) { true } + let(:encoder) { double } + let(:request) { double(body: encoder) } + let(:initial_signature) { 'signature' } + let(:message) { double } + + it 'signs the initial request and sets up event signing' do + expect(signer).to receive(:sign) + .with( + request: request, + identity: identity, + properties: { event_stream: true } + ) + .and_return(initial_signature) + expect(encoder).to receive(:prior_signature=) + .with(initial_signature) + + expect(encoder).to receive(:sign_event=) do |sign| + expect(signer).to receive(:sign_event).with( + message: message, + prior_signature: initial_signature, + event_type: :event, + encoder: encoder, + identity: identity, + properties: {} + ) + sign.call(initial_signature, :event, message, encoder) + end + + subject.call(input, context) + end + end end end end diff --git a/hearth/spec/hearth/signers/anonymous_spec.rb b/hearth/spec/hearth/signers/anonymous_spec.rb index 4a95ec61d..f55db9ad7 100644 --- a/hearth/spec/hearth/signers/anonymous_spec.rb +++ b/hearth/spec/hearth/signers/anonymous_spec.rb @@ -19,6 +19,25 @@ module Signers end end + describe '#sign_event' do + let(:message) { double } + let(:prior_signature) { 'signature' } + let(:encoder) { double } + + it 'does not modify the message' do + signed_message, signature = subject.sign_event( + message: message, + prior_signature: prior_signature, + event_type: :event, + encoder: encoder, + identity: identity, + properties: properties + ) + expect(signed_message).to be(message) + expect(signature).to be(prior_signature) + end + end + describe '#reset' do it 'does not modify the request' do expect do diff --git a/hearth/spec/spec_helper.rb b/hearth/spec/spec_helper.rb index 99b3806fe..2e4065dc0 100644 --- a/hearth/spec/spec_helper.rb +++ b/hearth/spec/spec_helper.rb @@ -3,8 +3,7 @@ require 'simplecov' unless ENV['NO_COVERAGE'] - # TODO: set back to 100 once event stream specs have been completed - SimpleCov.minimum_coverage 90 unless defined?(JRUBY_VERSION) + SimpleCov.minimum_coverage 100 unless defined?(JRUBY_VERSION) SimpleCov.start do add_filter %r{/spec/} end