Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Mint adapter #272

Merged
merged 90 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
26daa75
remove gun specific code from stub to improve adapter flexilibity
beligante Aug 24, 2022
d8ed8fa
remove maybe_return_headers in gun adapter for clairty
beligante Sep 3, 2022
3be870e
refactor parse_response from gun adapter to improve readability
beligante Sep 3, 2022
a84e576
refactor read_stream from gun adapter to improve readability
beligante Sep 3, 2022
05655c5
remove unecessary response parse from inside with condition
beligante Oct 6, 2022
ed52450
Merge branch 'master' into improve-adapter-flexibility-on-stub
beligante Oct 6, 2022
b3ba6cc
add mint adapter and implement draft for open connection
beligante Sep 3, 2022
358c4d6
add mint adapter and implement draft for open connection
beligante Sep 3, 2022
c6e726e
add simple request and response handlers
beligante Sep 5, 2022
f0d64c5
add server stream connection handling
beligante Sep 5, 2022
1ebc7d5
refactor server stream consuption to use a short lived genserver
beligante Sep 10, 2022
11c8a7f
fix disconnect code for connection_process when brutal killing
beligante Sep 10, 2022
8f3bf67
refactor: use stream response module to handle unary calls
beligante Sep 10, 2022
b2c692d
add todo for compressor headers
beligante Sep 10, 2022
d500634
remove mint adapter code from hello world
beligante Sep 10, 2022
7589280
add client streaming api for mint adapter
beligante Sep 11, 2022
61f9f3d
uncomment route_guide test client requests
beligante Sep 11, 2022
c03f0bf
raise exception for unhandled error cases
beligante Sep 18, 2022
4bdad6d
refact(client/mint_adapter): add cond to handle the four rpc types to…
beligante Oct 1, 2022
064052d
refact(client/mint_adapter): use keyword.get to return headers and tr…
beligante Oct 1, 2022
3b84cc3
chunk and enqueue requests inside connection process
beligante Oct 5, 2022
132045d
remove unused functions from state module
beligante Oct 5, 2022
db5ffb3
fix reference check for response
beligante Oct 6, 2022
a84d803
add consume_error to stream response process for error handling
beligante Oct 6, 2022
301c016
remove comments from route guide examples
beligante Oct 6, 2022
267f6ad
replace case in favor of if/else
beligante Oct 6, 2022
3e0dc09
fix disconnect match on mint adapter
beligante Oct 6, 2022
4193f68
improve error handling for response stream
beligante Oct 6, 2022
47744a0
refact response module to use guards to return headers or trailers
beligante Oct 8, 2022
965a7cc
add factories for channel and stream, remove old factory code to use …
beligante Oct 8, 2022
bb06048
add tests for error and done cast calls
beligante Oct 8, 2022
63141ee
remove empty test
beligante Oct 8, 2022
507083f
add tests to stream producer for response process
beligante Oct 10, 2022
d62926f
Merge branch 'master' into add-mint-adapter
beligante Oct 15, 2022
9f5807c
fix dialyzer issue
beligante Oct 15, 2022
9126d67
improve documentation for connection process and add tests for discon…
beligante Oct 16, 2022
bb5df5c
add tests to stream_body handle_call cases on connection process module
beligante Oct 16, 2022
2384cef
add test for process_request_stream_queue
beligante Oct 22, 2022
468a7d0
add error test cases for connection process
beligante Oct 22, 2022
17aaf2b
add mint adapter to interop tests
beligante Oct 23, 2022
d5bd0c9
add tests to end_stream and cancel callbacks, also improved documenta…
beligante Oct 23, 2022
d16ff13
fix dialyzer issues
beligante Oct 23, 2022
e5e6677
improve documentation and error handling
beligante Oct 23, 2022
94b6037
remove mint from example projects
beligante Oct 23, 2022
9a7c701
remove ex_machina dependency
beligante Oct 23, 2022
b6b3e62
fix: remove ex-machina from test helper
beligante Oct 23, 2022
35564a5
add connection close handling
beligante Oct 25, 2022
d4f2570
add connection close checks and tests
beligante Oct 25, 2022
10e8c47
remove :queue.fold reference to avoid break on old versions of erlang
beligante Oct 25, 2022
2238cc4
send error tuple instead of raise for connect
beligante Oct 26, 2022
ff6409c
Merge branch 'elixir-grpc:master' into add-mint-adapter
beligante Nov 14, 2022
8f08d67
Merge branch 'elixir-grpc:master' into add-mint-adapter
beligante Nov 30, 2022
d3e026f
refact: extract interop test runnet to its module runner
beligante Nov 30, 2022
e704eed
fix: fix code review suggestions
thanabodee-c Dec 2, 2022
96207f9
fix: fix code review suggestions
thanabodee-c Dec 2, 2022
c67d7d4
fix: fix code review suggestions
thanabodee-c Dec 2, 2022
90fad3c
fix: fix code review suggestions
thanabodee-c Dec 2, 2022
a6f87e9
fix: fix code review suggestions
thanabodee-c Dec 2, 2022
49b0af1
fix: fix code review suggestions
thanabodee-c Dec 2, 2022
85f7369
refactor: extract ref at the function argument
thanabodee-c Dec 2, 2022
71eb579
Merge pull request #1 from wingyplus/fix-code-suggestion
beligante Dec 13, 2022
955a15f
remove unecessary sorts
beligante Dec 13, 2022
08a78e7
Merge branch 'master' into add-mint-adapter
beligante Dec 13, 2022
5bbb142
rename disconnect to remove brutal term
beligante Dec 26, 2022
07b61d3
rename bidi_stream atom to bidirectional_stream for clarity
beligante Dec 26, 2022
0825251
refactor response checks for mint adapter to have a single funtion
beligante Dec 26, 2022
2ea4616
Merge branch 'master' into add-mint-adapter
beligante Dec 27, 2022
914d225
refactor response process to use call instead of cast
beligante Dec 27, 2022
a2b4be3
add pattern match for response calls
beligante Dec 27, 2022
37c9a11
unify return headers behavior to match gun behavior
beligante Dec 29, 2022
fe38023
simplify code in favor of use enum module
beligante Dec 29, 2022
17a8e11
refactor variable names for clarity
beligante Dec 29, 2022
71d1d10
add todo for error handling on response process
beligante Dec 29, 2022
6f38f12
add bit lengh for binary manipulation for clarity
beligante Dec 29, 2022
fbdc879
rollback cowboy config
beligante Dec 29, 2022
9e2436d
unify factories to a single module
beligante Dec 29, 2022
ea46a41
add case for chunk_body to avoid raise
beligante Dec 29, 2022
db5a200
Update interop/lib/interop/client.ex
beligante Dec 30, 2022
1e86681
Update interop/lib/interop/client.ex
beligante Dec 30, 2022
55cbc28
improve documentation for GRPC.Stub.recv
beligante Dec 30, 2022
f9fdfff
remove doc for GRPC.Stub.call/5
beligante Dec 30, 2022
7edc063
make connection status check a private function
beligante Jan 3, 2023
736e203
apply PR comments
beligante Jan 4, 2023
b38ec42
apply missing PR comments
beligante Jan 4, 2023
7ae3036
Update lib/grpc/stub.ex
beligante Jan 4, 2023
c99f547
anotate stream response process for genserver callback functions
beligante Jan 4, 2023
1969c46
Merge branch 'master' into add-mint-adapter
beligante Jan 4, 2023
c395055
Update lib/grpc/client/adapters/mint/connection_process/connection_pr…
beligante Jan 5, 2023
b278d05
improve error messages
beligante Jan 7, 2023
00b0b6a
Apply suggestions from code review
polvalente Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions interop/lib/interop/client.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
defmodule Interop.Client do

import ExUnit.Assertions, only: [refute: 1]

require Logger

# To better understand the behavior of streams used in this module
# we suggest you to check the documentation for `GRPC.Stub.recv/2`
# there is some unusual behavior that can be observed.

def connect(host, port, opts \\ []) do
{:ok, ch} = GRPC.Stub.connect(host, port, opts)
ch
Expand Down Expand Up @@ -103,12 +108,9 @@ defmodule Interop.Client do
params = Enum.map([31415, 9, 2653, 58979], &res_param(&1))
req = Grpc.Testing.StreamingOutputCallRequest.new(response_parameters: params)
{:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req)
result = Enum.map([31415, 9, 2653, 58979], &String.duplicate(<<0>>, &1))
result = Enum.map([9, 2653, 31415, 58979], &String.duplicate(<<0>>, &1))

^result =
Enum.map(res_enum, fn {:ok, res} ->
res.payload.body
end)
^result = res_enum |> Enum.map(fn {:ok, res} -> res.payload.body end) |> Enum.sort()
end

def server_compressed_streaming!(ch) do
Expand All @@ -122,10 +124,7 @@ defmodule Interop.Client do
{:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req)
result = Enum.map([31415, 92653], &String.duplicate(<<0>>, &1))

^result =
Enum.map(res_enum, fn {:ok, res} ->
res.payload.body
end)
^result = res_enum |> Enum.map(fn {:ok, res} -> res.payload.body end) |> Enum.sort()
end

def ping_pong!(ch) do
Expand All @@ -143,15 +142,13 @@ defmodule Interop.Client do
{:ok, res_enum} = GRPC.Stub.recv(stream)
reply = String.duplicate(<<0>>, 31415)

{:ok, %{payload: %{body: ^reply}}} =
Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
{:ok, %{payload: %{body: ^reply}}} = Enum.at(res_enum, 0)

Enum.each([{9, 8}, {2653, 1828}, {58979, 45904}], fn {res, payload} ->
GRPC.Stub.send_request(stream, req.(res, payload))
reply = String.duplicate(<<0>>, res)

{:ok, %{payload: %{body: ^reply}}} =
Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
{:ok, %{payload: %{body: ^reply}}} = Enum.at(res_enum, 0)
end)

GRPC.Stub.end_stream(stream)
Expand Down Expand Up @@ -191,19 +188,32 @@ defmodule Interop.Client do
payload: payload(271_828)
)

{:ok, res_enum, %{headers: new_headers}} =
{headers, data, trailers} =
ch
|> Grpc.Testing.TestService.Stub.full_duplex_call(metadata: metadata)
|> GRPC.Stub.send_request(req, end_stream: true)
|> GRPC.Stub.recv(return_headers: true)
|> process_full_duplex_response()

reply = String.duplicate(<<0>>, 314_159)

{:ok, %{payload: %{body: ^reply}}} =
Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
%{payload: %{body: ^reply}} = data

{:trailers, new_trailers} = Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
validate_headers!(new_headers, new_trailers)
validate_headers!(headers, trailers)
end

defp process_full_duplex_response({:ok, res_enum, %{headers: new_headers}}) do
{:ok, data} = Enum.at(res_enum, 0)
{:trailers, new_trailers} = Enum.at(res_enum, 0)
{new_headers, data, new_trailers}
end


defp process_full_duplex_response({:ok, res_enum}) do
{:headers, headers} = Enum.at(res_enum, 0)
{:ok, data} = Enum.at(res_enum, 0)
{:trailers, trailers} = Enum.at(res_enum, 0)
{headers, data, trailers}
end

def status_code_and_message!(ch) do
Expand All @@ -226,6 +236,10 @@ defmodule Interop.Client do
|> Grpc.Testing.TestService.Stub.full_duplex_call()
|> GRPC.Stub.send_request(req, end_stream: true)
|> GRPC.Stub.recv()
|> case do
{:ok, stream} -> Enum.at(stream, 0)
error -> error
end
end

def unimplemented_service!(ch) do
Expand Down Expand Up @@ -260,7 +274,7 @@ defmodule Interop.Client do
|> GRPC.Stub.send_request(req)
|> GRPC.Stub.recv()

{:ok, _} = Stream.take(res_enum, 1) |> Enum.to_list() |> List.first()
{:ok, _} = Enum.at(res_enum, 0)
stream = GRPC.Stub.cancel(stream)
{:error, %GRPC.RPCError{status: 1}} = GRPC.Stub.recv(stream)
end
Expand Down
2 changes: 2 additions & 0 deletions interop/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"grpc_prometheus": {:hex, :grpc_prometheus, "0.1.0", "a2f45ca83018c4ae59e4c293b7455634ac09e38c36cba7cc1fb8affdf462a6d5", [:mix], [{:grpc, ">= 0.0.0", [hex: :grpc, repo: "hexpm", optional: true]}, {:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm", "8b9ab3098657e7daec0b3edc78e1d02418bc0871618d8ca89b51b74a8086bb71"},
"grpc_statsd": {:hex, :grpc_statsd, "0.1.0", "a95ae388188486043f92a3c5091c143f5a646d6af80c9da5ee616546c4d8f5ff", [:mix], [{:grpc, ">= 0.0.0", [hex: :grpc, repo: "hexpm", optional: true]}, {:statix, ">= 0.0.0", [hex: :statix, repo: "hexpm", optional: true]}], "hexpm", "de0c05db313c7b3ffeff345855d173fd82fec3de16591a126b673f7f698d9e74"},
"gun": {:hex, :grpc_gun, "2.0.1", "221b792df3a93e8fead96f697cbaf920120deacced85c6cd3329d2e67f0871f8", [:rebar3], [{:cowlib, "~> 2.11", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "795a65eb9d0ba16697e6b0e1886009ce024799e43bb42753f0c59b029f592831"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"mint": {:hex, :mint, "1.4.2", "50330223429a6e1260b2ca5415f69b0ab086141bc76dc2fbf34d7c389a6675b2", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "ce75a5bbcc59b4d7d8d70f8b2fc284b1751ffb35c7b6a6302b5192f8ab4ddd80"},
"prometheus": {:hex, :prometheus, "4.2.2", "a830e77b79dc6d28183f4db050a7cac926a6c58f1872f9ef94a35cd989aceef8", [:mix, :rebar3], [], "hexpm", "b479a33d4aa4ba7909186e29bb6c1240254e0047a8e2a9f88463f50c0089370e"},
"prometheus_ex": {:hex, :prometheus_ex, "3.0.5", "fa58cfd983487fc5ead331e9a3e0aa622c67232b3ec71710ced122c4c453a02f", [:mix], [{:prometheus, "~> 4.0", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm", "9fd13404a48437e044b288b41f76e64acd9735fb8b0e3809f494811dfa66d0fb"},
"prometheus_httpd": {:hex, :prometheus_httpd, "2.1.11", "f616ed9b85b536b195d94104063025a91f904a4cfc20255363f49a197d96c896", [:rebar3], [{:accept, "~> 0.3", [hex: :accept, repo: "hexpm", optional: false]}, {:prometheus, "~> 4.2", [hex: :prometheus, repo: "hexpm", optional: false]}], "hexpm", "0bbe831452cfdf9588538eb2f570b26f30c348adae5e95a7d87f35a5910bcf92"},
Expand Down
77 changes: 38 additions & 39 deletions interop/script/run.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,49 @@ Logger.configure(level: level)

Logger.info("Rounds: #{rounds}; concurrency: #{concurrency}; port: #{port}")

alias GRPC.Client.Adapters.Gun
alias GRPC.Client.Adapters.Mint
alias Interop.Client

{:ok, _pid, port} = GRPC.Server.start_endpoint(Interop.Endpoint, port)

1..concurrency
|> Task.async_stream(fn _cli ->
ch = Client.connect("127.0.0.1", port, interceptors: [GRPCPrometheus.ClientInterceptor, GRPC.Client.Interceptors.Logger])

for _ <- 1..rounds do
Client.empty_unary!(ch)
Client.cacheable_unary!(ch)
Client.large_unary!(ch)
Client.large_unary2!(ch)
Client.client_compressed_unary!(ch)
Client.server_compressed_unary!(ch)
Client.client_streaming!(ch)
Client.client_compressed_streaming!(ch)
Client.server_streaming!(ch)
Client.server_compressed_streaming!(ch)
Client.ping_pong!(ch)
Client.empty_stream!(ch)
Client.custom_metadata!(ch)
Client.status_code_and_message!(ch)
Client.unimplemented_service!(ch)
Client.cancel_after_begin!(ch)
Client.cancel_after_first_response!(ch)
Client.timeout_on_sleeping_server!(ch)
defmodule InteropTestRunner do
def run(_cli, adapter, port, rounds) do
opts = [interceptors: [GRPCPrometheus.ClientInterceptor, GRPC.Client.Interceptors.Logger], adapter: adapter]
ch = Client.connect("127.0.0.1", port, opts)

for _ <- 1..rounds do
Client.empty_unary!(ch)
Client.cacheable_unary!(ch)
Client.large_unary!(ch)
Client.large_unary2!(ch)
Client.client_compressed_unary!(ch)
Client.server_compressed_unary!(ch)
Client.client_streaming!(ch)
Client.client_compressed_streaming!(ch)
Client.server_streaming!(ch)
Client.server_compressed_streaming!(ch)
Client.ping_pong!(ch)
Client.empty_stream!(ch)
Client.custom_metadata!(ch)
Client.status_code_and_message!(ch)
Client.unimplemented_service!(ch)
Client.cancel_after_begin!(ch)
Client.cancel_after_first_response!(ch)
Client.timeout_on_sleeping_server!(ch)
end
:ok
end
:ok
end, max_concurrency: concurrency, ordered: false, timeout: :infinity)
|> Enum.to_list()

# defmodule Helper do
# def flush() do
# receive do
# msg ->
# IO.inspect(msg)
# flush()
# after
# 0 -> :ok
# end
# end
# end
# Helper.flush()
end

for adapter <- [Gun, Mint] do
Logger.info("Starting run for adapter: #{adapter}")
args = [adapter, port, rounds]
stream_opts = [max_concurrency: concurrency, ordered: false, timeout: :infinity]
1..concurrency
|> Task.async_stream(InteropTestRunner, :run, args, stream_opts)
|> Enum.to_list()
end

Logger.info("Succeed!")
:ok = GRPC.Server.stop_endpoint(Interop.Endpoint)
26 changes: 26 additions & 0 deletions lib/grpc/client/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,30 @@ defmodule GRPC.Client.Adapter do
"""
@callback receive_data(stream :: Stream.t(), opts :: keyword()) ::
GRPC.Stub.receive_data_return() | {:error, any()}

@doc """
This callback is used to open a stream connection to the server.
Mostly used when the payload for this request is streamed.
To send data using the open stream request, you should use `send_data/3`
"""
@callback send_headers(stream :: Stream.t(), opts :: keyword()) :: Stream.t()

@doc """
This callback will be responsible to send data to the server on a stream
request is open using `send_headers/2`
Opts:
- :send_end_stream (optional) - ends the request stream
"""
@callback send_data(stream :: Stream.t(), message :: binary(), opts :: keyword()) :: Stream.t()

@doc """
Similarly to the option sent on `send_data/2` - :send_end_stream -
this callback will end request stream
"""
@callback end_stream(stream :: Stream.t()) :: Stream.t()

@doc """
Cancel a stream in a streaming client.
"""
@callback cancel(stream :: Stream.t()) :: :ok | {:error, any()}
end
11 changes: 10 additions & 1 deletion lib/grpc/client/adapters/gun.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ defmodule GRPC.Client.Adapters.Gun do
:gun.post(conn_pid, path, headers, data)
end

@impl true
def send_headers(
%{channel: %{adapter_payload: %{conn_pid: conn_pid}}, path: path} = stream,
opts
Expand All @@ -116,6 +117,7 @@ defmodule GRPC.Client.Adapters.Gun do
GRPC.Client.Stream.put_payload(stream, :stream_ref, stream_ref)
end

@impl true
def send_data(%{channel: channel, payload: %{stream_ref: stream_ref}} = stream, message, opts) do
conn_pid = channel.adapter_payload[:conn_pid]
fin = if opts[:send_end_stream], do: :fin, else: :nofin
Expand All @@ -124,13 +126,20 @@ defmodule GRPC.Client.Adapters.Gun do
stream
end

@impl true
def end_stream(%{channel: channel, payload: %{stream_ref: stream_ref}} = stream) do
conn_pid = channel.adapter_payload[:conn_pid]
:gun.data(conn_pid, stream_ref, :fin, "")
stream
end

def cancel(%{conn_pid: conn_pid}, %{stream_ref: stream_ref}) do
@impl true
def cancel(stream) do
%{
channel: %{adapter_payload: %{conn_pid: conn_pid}},
payload: %{stream_ref: stream_ref}
} = stream

:gun.cancel(conn_pid, stream_ref)
end

Expand Down
Loading