Skip to content

Commit

Permalink
Add basic eventstream classes
Browse files Browse the repository at this point in the history
  • Loading branch information
alextwoods committed Jun 26, 2024
1 parent 9b057ad commit 53a7c69
Show file tree
Hide file tree
Showing 29 changed files with 574 additions and 1 deletion.
1 change: 1 addition & 0 deletions hearth/lib/hearth.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

require_relative 'hearth/cbor'
require_relative 'hearth/endpoint_rules'
require_relative 'hearth/event_stream'
require_relative 'hearth/http'
require_relative 'hearth/http2'
require_relative 'hearth/identity_provider'
Expand Down
54 changes: 54 additions & 0 deletions hearth/lib/hearth/event_stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

require_relative 'event_stream/decoder'
require_relative 'event_stream/encoder'
require_relative 'event_stream/header_value'
require_relative 'event_stream/message'
require_relative 'event_stream/binary/types'
require_relative 'event_stream/binary/message_decoder'
require_relative 'event_stream/binary/message_encoder'

module Hearth
# Module for EventStreams.
module EventStream
# Raised when reading bytes exceed buffer total bytes
class ReadBytesExceedLengthError < RuntimeError
def initialize(target_byte, total_len)
msg = "Attempting reading bytes to offset #{target_byte} exceeds " \
"buffer length of #{total_len}"
super(msg)
end
end

# Raise when insufficient bytes of a message is received
class IncompleteMessageError < RuntimeError
def initialize(*_args)
super('Not enough bytes for event message')
end
end

class PreludeChecksumError < RuntimeError
def initialize(*_args)
super('Prelude checksum mismatch')
end
end

class MessageChecksumError < RuntimeError
def initialize(*_args)
super('Message checksum mismatch')
end
end

class EventPayloadLengthExceedError < RuntimeError
def initialize(*_args)
super('Payload length of a message should be under 16mb.')
end
end

class EventHeadersLengthExceedError < RuntimeError
def initialize(*_args)
super('Encoded headers length of a message should be under 128kb.')
end
end
end
end
135 changes: 135 additions & 0 deletions hearth/lib/hearth/event_stream/binary/message_decoder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# frozen_string_literal: true

require 'zlib'

module Hearth
module EventStream
module Binary
# This class provides method for decoding binary inputs into
# messages.
class MessageDecoder
ONE_MEGABYTE = 1024 * 1024

# bytes of prelude part, including 4 bytes of
# total message length, headers length and crc checksum of prelude
PRELUDE_LENGTH = 12

# 4 bytes message crc checksum
CRC32_LENGTH = 4

# @param [Hash] options The initialization options.
def initialize
@message_buffer = ''
end

# Decodes a single message from a chunk of string
#
# @param [String] chunk A chunk of string to be decoded,
# chunk can contain partial event message to multiple event messages
# When not provided, decode data from #message_buffer
#
# @return [Array<Message|nil, Boolean>] Returns single decoded message
# and boolean pair, the boolean flag indicates whether this chunk
# has been fully consumed, unused data is tracked at #message_buffer
def decode(chunk = nil)
@message_buffer = [@message_buffer, chunk].pack('a*a*') if chunk
decode_message(@message_buffer)
end


private

# exposed via object.send for testing
attr_reader :message_buffer

def decode_message(raw_message)
# incomplete message prelude received
return [nil, true] if raw_message.bytesize < PRELUDE_LENGTH

prelude, content = raw_message.unpack("a#{PRELUDE_LENGTH}a*")

# decode prelude
total_length, header_length = decode_prelude(prelude)

# incomplete message received, leave it in the buffer
return [nil, true] if raw_message.bytesize < total_length

content, checksum, remaining = content.unpack("a#{total_length - PRELUDE_LENGTH - CRC32_LENGTH}Na*")
unless Zlib.crc32([prelude, content].pack('a*a*'), 0) == checksum
raise MessageChecksumError
end

# decode headers and payload
headers, payload = decode_context(content, header_length)

@message_buffer = remaining

[Message.new(headers: headers, payload: payload), remaining.empty?]
end

def decode_prelude(prelude)
# prelude contains length of message and headers,
# followed with CRC checksum of itself
content, checksum = prelude.unpack("a#{PRELUDE_LENGTH - CRC32_LENGTH}N")
raise PreludeChecksumError unless Zlib.crc32(content, 0) == checksum
content.unpack('N*')
end

def decode_context(content, header_length)
encoded_header, encoded_payload = content.unpack("a#{header_length}a*")
[
extract_headers(encoded_header),
extract_payload(encoded_payload)
]
end

def extract_headers(buffer)
scanner = buffer
headers = {}
until scanner.bytesize == 0
# header key
key_length, scanner = scanner.unpack('Ca*')
key, scanner = scanner.unpack("a#{key_length}a*")

# header value
type_index, scanner = scanner.unpack('Ca*')
value_type = TYPES[type_index]
unpack_pattern, value_length = Types::PATTERN[value_type]
value = if !!unpack_pattern == unpack_pattern
# boolean types won't have value specified
unpack_pattern
else
value_length, scanner = scanner.unpack('S>a*') unless value_length
unpacked_value, scanner = scanner.unpack("#{unpack_pattern || "a#{value_length}"}a*")
unpacked_value
end

headers[key] = HeaderValue.new(
value: value,
type: value_type
)
end
headers
end

def extract_payload(encoded)
encoded.bytesize <= ONE_MEGABYTE ?
payload_stringio(encoded) :
payload_tempfile(encoded)
end

def payload_stringio(encoded)
StringIO.new(encoded)
end

def payload_tempfile(encoded)
payload = Tempfile.new
payload.binmode
payload.write(encoded)
payload.rewind
payload
end
end
end
end
end
111 changes: 111 additions & 0 deletions hearth/lib/hearth/event_stream/binary/message_encoder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# frozen_string_literal: true

require 'zlib'

module Hearth
module EventStream
module Binary
# class for Encoding EventStream::Message into the EventStream
# binary format (application/vnd.amazon.eventstream)
class MessageEncoder
# bytes of total overhead in a message, including prelude
# and 4 bytes total message crc checksum
OVERHEAD_LENGTH = 16

# Maximum header length allowed (after encode) 128kb
MAX_HEADERS_LENGTH = 131_072

# Maximum payload length allowed (after encode) 16mb
MAX_PAYLOAD_LENGTH = 16_777_216

# Encodes EventStream::Message to encoded binary string.
#
# @param [Hearth::EventStream::Message] message
#
# @return [String] encoded binary string
def encode(message)
encode_message(message)
end

# Encodes an Hearth::EventStream::Message
# into String
#
# @param [Hearth::EventStream::Message] message
#
# @return [String]
def encode_message(message)
# create context buffer with encode headers
encoded_header = encode_headers(message)
header_length = encoded_header.bytesize
# encode payload
if message.payload.length > MAX_PAYLOAD_LENGTH
raise EventPayloadLengthExceedError
end

encoded_payload = message.payload.read
total_length = header_length + encoded_payload.bytesize + OVERHEAD_LENGTH

# create message buffer with prelude section
encoded_prelude = encode_prelude(total_length, header_length)

# append message context (headers, payload)
encoded_content = [
encoded_prelude,
encoded_header,
encoded_payload
].pack('a*a*a*')
# append message checksum
message_checksum = Zlib.crc32(encoded_content, 0)
[encoded_content, message_checksum].pack('a*N')
end

# Encodes headers part of an Hearth::EventStream::Message
# into String
#
# @param [Hearth::EventStream::Message] message
#
# @return [String]
def encode_headers(message)
header_entries = message.headers.map do |key, value|
encoded_key = [key.bytesize, key].pack('Ca*')

# header value
pattern, value_length, type_index = Types::PATTERN[value.type]
encoded_value = [type_index].pack('C')
# boolean types doesn't need to specify value
if pattern == true || pattern == false
next [encoded_key,
encoded_value].pack('a*a*')
end

unless value_length
encoded_value = [encoded_value,
value.value.bytesize].pack('a*S>')
end

[
encoded_key,
encoded_value,
pattern ? [value.value].pack(pattern) : value.value
].pack('a*a*a*')
end
header_entries.join.tap do |encoded_header|
if encoded_header.bytesize <= MAX_HEADERS_LENGTH
break encoded_header
end

raise EventHeadersLengthExceedError
end
end

private

def encode_prelude(total_length, headers_length)
prelude_body = [total_length, headers_length].pack('NN')
checksum = Zlib.crc32(prelude_body, 0)
[prelude_body, checksum].pack('a*N')
end
end
end
end
end
38 changes: 38 additions & 0 deletions hearth/lib/hearth/event_stream/binary/types.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

module Hearth
module EventStream
module Binary

TYPES = %w[
bool_true
bool_false
byte
short
integer
long
bytes
string
timestamp
uuid
].freeze

# Message Header Value Types
module Types
# pack/unpack pattern, byte size, type idx
PATTERN = {
'bool_true' => [true, 0, 0],
'bool_false' => [false, 0, 1],
'byte' => ['c', 1, 2],
'short' => ['s>', 2, 3],
'integer' => ['l>', 4, 4],
'long' => ['q>', 8, 5],
'bytes' => [nil, nil, 6],
'string' => [nil, nil, 7],
'timestamp' => ['q>', 8, 8],
'uuid' => [nil, 16, 9]
}.freeze
end
end
end
end
14 changes: 14 additions & 0 deletions hearth/lib/hearth/event_stream/decoder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module Hearth
module EventStream
# TODO: Set as the body on the Response - IO “like” - has a write method.
# Stream is setup to write bytes to this.
# Combines logic from v3 EventStreamDecoder and EventParser.
# Has a (protocol specific) MessageDecoder.
# Has a reference to the EventStreamHandler, which it will signal events on.
#
class Decoder
end
end
end
17 changes: 17 additions & 0 deletions hearth/lib/hearth/event_stream/encoder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Hearth
# Module for EventStreams.
module EventStream
class Encoder
# TODO:
# Set as the body on the request - input events written to body
# which sends to stream.
# Uses the MessageBuilder classes. Must be configured with
# a signer (and must maintain signer state).
# This will be the body on the request.
# It will handle initial request body (if required by protocol).
# Also allows the H2 Client to set the open stream on it.
end
end
end
Loading

0 comments on commit 53a7c69

Please sign in to comment.