Skip to content

Commit

Permalink
Pick the specify worker
Browse files Browse the repository at this point in the history
  • Loading branch information
heshaoqiong committed Apr 14, 2023
1 parent 86ad40d commit 9012472
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

-export([start_link/3,
is_ready/1,
get/3,
pick/2,
pick/3,
add_endpoints/2,
Expand Down Expand Up @@ -57,6 +58,16 @@ start_link(Name, Endpoints, Options) ->
is_ready(Name) ->
gen_statem:call(?CHANNEL(Name), is_ready).

-spec get(name(), unary | stream, term()) ->
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
{error, undefined_channel | not_found_endpoint}.
get(Name, CallType, Key) ->
case lists:keyfind(Key, 1, gproc_pool:active_workers(Name)) of
{_, Pid} -> {ok, {Pid, interceptor(Name, CallType)}};
false -> {error, not_found_endpoint}
end.


%% @doc Picks a subchannel from a pool using the configured strategy.
-spec pick(name(), unary | stream) ->
{ok, {pid(), grpcbox_client:interceptor() | undefined}} |
Expand Down
1 change: 1 addition & 0 deletions src/grpcbox_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ get_channel(Options, Type) ->
Key = maps:get(key, Options, undefined),
PickStrategy = maps:get(pick_strategy, Options, undefined),
case PickStrategy of
specify_worker -> grpcbox_channel:get(Channel, Type, Key);
active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key);
undefined -> grpcbox_channel:pick(Channel, Type, Key)
end.
Expand Down
13 changes: 11 additions & 2 deletions test/grpcbox_channel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
add_and_remove_endpoints/1,
add_and_remove_endpoints_active_workers/1,
pick_worker_strategy/1,
pick_active_worker_strategy/1]).
pick_active_worker_strategy/1,
pick_specify_worker_strategy/1]).

-include_lib("eunit/include/eunit.hrl").

Expand All @@ -15,7 +16,8 @@ all() ->
add_and_remove_endpoints,
add_and_remove_endpoints_active_workers,
pick_worker_strategy,
pick_active_worker_strategy
pick_active_worker_strategy,
pick_specify_worker_strategy
].
init_per_suite(_Config) ->
GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}},
Expand Down Expand Up @@ -95,6 +97,13 @@ pick_active_worker_strategy(_Config) ->
?assertEqual(error, pick_worker({direct_channel, active})),
?assertEqual(error, pick_worker({hash_channel, active})),
ok.

pick_specify_worker_strategy(_Config) ->
?assertMatch({ok, _} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 18080, #{}})),
?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 8080, #{}})),
?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(channel_xxx, stream, {http, "127.0.0.1", 8080, #{}})),
ok.

pick_worker(Name, N) ->
{R, _} = grpcbox_channel:pick(Name, unary, N),
R.
Expand Down

0 comments on commit 9012472

Please sign in to comment.