diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index adaadf3..d11f8b6 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -4,6 +4,7 @@ -export([start_link/3, is_ready/1, + get/3, pick/2, pick/3, add_endpoints/2, @@ -57,6 +58,16 @@ start_link(Name, Endpoints, Options) -> is_ready(Name) -> gen_statem:call(?CHANNEL(Name), is_ready). +-spec get(name(), unary | stream, term()) -> + {ok, {pid(), grpcbox_client:interceptor() | undefined}} | + {error, undefined_channel | not_found_endpoint}. +get(Name, CallType, Key) -> + case lists:keyfind(Key, 1, gproc_pool:active_workers(Name)) of + {_, Pid} -> {ok, {Pid, interceptor(Name, CallType)}}; + false -> {error, not_found_endpoint} + end. + + %% @doc Picks a subchannel from a pool using the configured strategy. -spec pick(name(), unary | stream) -> {ok, {pid(), grpcbox_client:interceptor() | undefined}} | diff --git a/src/grpcbox_client.erl b/src/grpcbox_client.erl index 4dd4ffd..e04a78f 100644 --- a/src/grpcbox_client.erl +++ b/src/grpcbox_client.erl @@ -50,6 +50,7 @@ get_channel(Options, Type) -> Key = maps:get(key, Options, undefined), PickStrategy = maps:get(pick_strategy, Options, undefined), case PickStrategy of + specify_worker -> grpcbox_channel:get(Channel, Type, Key); active_worker -> grpcbox_channel:pick({Channel, active}, Type, Key); undefined -> grpcbox_channel:pick(Channel, Type, Key) end. diff --git a/test/grpcbox_channel_SUITE.erl b/test/grpcbox_channel_SUITE.erl index 5b4295f..872ec0b 100644 --- a/test/grpcbox_channel_SUITE.erl +++ b/test/grpcbox_channel_SUITE.erl @@ -6,7 +6,8 @@ add_and_remove_endpoints/1, add_and_remove_endpoints_active_workers/1, pick_worker_strategy/1, - pick_active_worker_strategy/1]). + pick_active_worker_strategy/1, + pick_specify_worker_strategy/1]). -include_lib("eunit/include/eunit.hrl"). @@ -15,7 +16,8 @@ all() -> add_and_remove_endpoints, add_and_remove_endpoints_active_workers, pick_worker_strategy, - pick_active_worker_strategy + pick_active_worker_strategy, + pick_specify_worker_strategy ]. init_per_suite(_Config) -> GrpcOptions = #{service_protos => [route_guide_pb], services => #{'routeguide.RouteGuide' => routeguide_route_guide}}, @@ -95,6 +97,13 @@ pick_active_worker_strategy(_Config) -> ?assertEqual(error, pick_worker({direct_channel, active})), ?assertEqual(error, pick_worker({hash_channel, active})), ok. + +pick_specify_worker_strategy(_Config) -> + ?assertMatch({ok, _} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 18080, #{}})), + ?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(default_channel, stream, {http, "127.0.0.1", 8080, #{}})), + ?assertEqual({error, not_found_endpoint} ,grpcbox_channel:get(channel_xxx, stream, {http, "127.0.0.1", 8080, #{}})), + ok. + pick_worker(Name, N) -> {R, _} = grpcbox_channel:pick(Name, unary, N), R.