Skip to content

Commit

Permalink
flow-client: Make sure to remove any potentially expired JWT from the…
Browse files Browse the repository at this point in the history
… client used to exchange a refresh token for an access token in `refresh_authorizations()`

While running down some issues with Dekaf, namely frequent consumer group rebalances, I noticed that these rebalances correlated with Dekaf errors such as:

```
dekaf: error=failed to obtain access token

Caused by:
    Unauthorized: {"code":"PGRST301","details":null,"hint":null,"message":"JWT expired"}
```
  • Loading branch information
jshearer committed Oct 21, 2024
1 parent b9cfdcc commit ed30865
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion crates/flow-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ impl Client {
}
}

pub fn as_anonymous(self) -> Self {
Self {
user_access_token: None,
..self
}
}

/// Build a fresh `gazette::journal::Client` and `gazette::shard::Client`
/// There is a bug that causes these clients to hang under heavy/varied load,
/// so until that bug is found+fixed, this is the work-around.
Expand Down Expand Up @@ -331,10 +338,15 @@ pub async fn refresh_authorizations(
access_token: String,
refresh_token: Option<RefreshToken>, // Set iff the token was single-use.
}
// We either never had an access token, or we had one and it expired,
// in which case the client may have an invalid access token configured.
// The `generate_access_token` RPC only needs the provided refresh token
// for authentication, so we should use an unauthenticated client to make
// the request.
let Response {
access_token,
refresh_token: next_refresh_token,
} = api_exec::<Response>(client.rpc(
} = api_exec::<Response>(client.clone().as_anonymous().rpc(
"generate_access_token",
serde_json::json!({"refresh_token_id": id, "secret": secret}).to_string(),
))
Expand Down

0 comments on commit ed30865

Please sign in to comment.