-
Notifications
You must be signed in to change notification settings - Fork 6
Streaming Traits
This wiki contains a mapping between Smithy Streaming traits and generated Ruby code.
Indicates that the data represented by the shape needs to be streamed. The streaming
shape can be applied to either a blob or a union. When applied to a blob
, the data should not be stored in memory or size is unknown. When applied to a union
, the shape represents an event stream.
@http(method: "POST", uri: "/streaming")
operation StreamingOperation {
input: StreamingOperationInputOutput,
output: StreamingOperationInputOutput,
}
structure StreamingOperationInputOutput {
@httpPayload
stream: StreamingBlob,
}
@streaming
blob StreamingBlob
// example of using @requiresLength trait in the input structure
structure StreamingWithLengthInput {
@httpPayload
stream: FiniteStreamingBlob,
}
@streaming
@requiresLength
blob FiniteStreamingBlob
For streaming input, the protocol builder does the following:
- The
Transfer-Encoding
header is set tochunked
unless the@requiresLength
trait is used. - Then, the HTTP request body is set to the input member.
Hearth::HTTP::Client
sets its body toNet::HTTPGenericRequest#body_stream
. - The
body_stream
method is used as the source forIO.copy_stream
. -
IO.copy_stream
will attempt to open and read from a file if the input is a String, otherwise it will read from the IO-like object (has aread
orreadpartial
method). For input that is not IO-like, such as a String, the input is wrapped as aStringIO
.
Params ensures that input strings and nils are converted to StringIO
:
# params.rb
class StreamingOperationInput
def self.build(params, context:)
Hearth::Validator.validate_types!(params, ::Hash, Types::StreamingOperationInput, context: context)
type = Types::StreamingOperationInput.new
Hearth::Validator.validate_unknown!(type, params, context: context) if params.is_a?(Hash)
io = params[:stream] || StringIO.new
unless io.respond_to?(:read) || io.respond_to?(:readpartial)
io = StringIO.new(io)
end
type.stream = io
type
end
end
Validators ensures that the input is IO like/readable and if @requiredLength
is set, they also validate that size
can be called:
# validators.rb
class StreamingOperationInput
def self.validate!(input, context:)
Hearth::Validator.validate_types!(input, Types::StreamingOperationInput, context: context)
unless input[:stream].respond_to?(:read) || input[:stream].respond_to?(:readpartial)
raise ArgumentError, "Expected #{context} to be an IO like object, got #{input[:stream].class}"
end
end
end
class StreamingWithLengthInput
def self.validate!(input, context:)
Hearth::Validator.validate_types!(input, Types::StreamingWithLengthInput, context: context)
unless input[:stream].respond_to?(:read) || input[:stream].respond_to?(:readpartial)
raise ArgumentError, "Expected #{context} to be an IO like object, got #{input[:stream].class}"
end
unless input[:stream].respond_to?(:size)
raise ArgumentError, "Expected #{context} to respond_to(:size)"
end
end
end
Builders set the http request body, Transfer-Encoding header (if not requiredLength
) and the appropriate Content-Type header:
# builders.rb
class StreamingOperation
def self.build(http_req, input:)
http_req.body = input[:stream]
http_req.headers['Transfer-Encoding'] = 'chunked'
http_req.headers['Content-Type'] = 'application/octet-stream'
end
end
For streaming output, an output stream object needs to be provided. In the client, an output_stream
method is used, defaulting to StringIO
. When the operation is provided an object that responds to write
to the output_stream
option, it is used as the body for the HTTP response. The Hearth::HTTP::Client
will write the data to this object. The parser will also assign this object to the output object.
# client.rb - in operation
def streaming_operation
response_body = output_stream(options, &block)
...
context = Hearth::Context.new(
...
response: Hearth::HTTP::Response.new(body: response_body),
...
)
output = stack.run(input, context)
...
end
# client.rb - private method
def output_stream(options = {}, &block)
return options.delete(:output_stream) if options[:output_stream]
return Hearth::BlockIO.new(block) if block
::StringIO.new
end
# parsers.rb
class StreamingOperation
def self.parse(http_resp)
data = Types::StreamingOperationOutput.new
data.stream = http_resp.body
data
end
end
Assuming a Client with a streaming_operation
operation, the SDK usage is:
params = { blob: 'This is some text', stream_id: '123' }
File.open('some-file.txt', 'w') do |f|
resp = client.streaming_operation(params, { output_stream: f })
end
See Smithy Documentation for more details.
Indicates that the streaming blob MUST be finite and has a known size. This trait applies only to shapes with the @streaming
trait. If this trait is present, the Content-Length
header with the body's size is sent if it can be determined. An error is raised if the IO object’s size cannot be calculated.
@streaming
@requiresLength
blob FiniteStreamingBlob
See Smithy Documentation for more details.
An event stream is an abstraction that allows multiple messages to be sent asynchronously between a client and server. Event streams support both duplex and simplex streaming. See Smithy Documentation for more details.
Response (output) events are asynchronous and are handled by registering handlers when calling the operation. Request (Input) events are signaled (sent) by calling the corresponding signal methods on the output object returned from the operation.
Assuming a start_event_stream
operation with duplex streaming, the client usage is:
client = MyService::Client.new
handler = MyService::EventStream::StartEventStreamHandler.new
# register various handlers
handler.on_my_event { |event| handle_my_event(event) }
stream = client.start_event_stream(initial_request_params, event_stream_handler: handler)
stream.signal_my_event(my_event_params)
stream.join # close our end of the stream and wait for the server to close gracefully