From ed30865286c1733689c55f40e5d6be227962159d Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 21 Oct 2024 12:34:15 -0400 Subject: [PATCH] flow-client: Make sure to remove any potentially expired JWT from the client used to exchange a refresh token for an access token in `refresh_authorizations()` While running down some issues with Dekaf, namely frequent consumer group rebalances, I noticed that these rebalances correlated with Dekaf errors such as: ``` dekaf: error=failed to obtain access token Caused by: Unauthorized: {"code":"PGRST301","details":null,"hint":null,"message":"JWT expired"} ``` --- crates/flow-client/src/client.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index de5a2d72c6..1f5663bbf5 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -66,6 +66,13 @@ impl Client { } } + pub fn as_anonymous(self) -> Self { + Self { + user_access_token: None, + ..self + } + } + /// Build a fresh `gazette::journal::Client` and `gazette::shard::Client` /// There is a bug that causes these clients to hang under heavy/varied load, /// so until that bug is found+fixed, this is the work-around. @@ -331,10 +338,15 @@ pub async fn refresh_authorizations( access_token: String, refresh_token: Option, // Set iff the token was single-use. } + // We either never had an access token, or we had one and it expired, + // in which case the client may have an invalid access token configured. + // The `generate_access_token` RPC only needs the provided refresh token + // for authentication, so we should use an unauthenticated client to make + // the request. let Response { access_token, refresh_token: next_refresh_token, - } = api_exec::(client.rpc( + } = api_exec::(client.clone().as_anonymous().rpc( "generate_access_token", serde_json::json!({"refresh_token_id": id, "secret": secret}).to_string(), ))