From b9cfdcc73c705bcde9fd81709a6c8b89b8b2d714 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Sun, 20 Oct 2024 11:21:54 -0500 Subject: [PATCH] agent: add timeout() routine and use it for data-plane operations Ensure we apply top-level timeouts to actions undertaken within a data-plane: - Staring a connector proxy environment. - Awaiting a connector unary response. - Activating built catalog specifications into a data-plane - Deleting built specifications from a data-plane. We want to preserve a firm boundary that data-planes are falliable and control-plane operations must protect themselves from an unavailability or undesired behavior of a data-plane. --- crates/agent/src/controllers/mod.rs | 17 +++-- .../src/controllers/publication_status.rs | 15 ++-- crates/agent/src/lib.rs | 20 +++++ crates/agent/src/proxy_connectors.rs | 73 ++++++++++++++----- 4 files changed, 93 insertions(+), 32 deletions(-) diff --git a/crates/agent/src/controllers/mod.rs b/crates/agent/src/controllers/mod.rs index ba35b67f0f..b90f459641 100644 --- a/crates/agent/src/controllers/mod.rs +++ b/crates/agent/src/controllers/mod.rs @@ -322,16 +322,19 @@ impl Status { // resources, and then notify dependent controllers, to make // sure that they can respond. The controller job row will be // deleted automatically after we return. - control_plane - .data_plane_delete( + crate::timeout( + std::time::Duration::from_secs(60), + control_plane.data_plane_delete( state.catalog_name.clone(), catalog_type, state.data_plane_id, - ) - .await - .context("deleting from data-plane") - .with_retry(backoff_data_plane_activate(state.failures))?; - tracing::info!("deleted from data-plane"); + ), + || "Timeout while deleting from data-plane", + ) + .await + .context("failed to delete from data-plane") + .with_retry(backoff_data_plane_activate(state.failures))?; + control_plane .notify_dependents(state.catalog_name.clone()) .await diff --git a/crates/agent/src/controllers/publication_status.rs b/crates/agent/src/controllers/publication_status.rs index 9e219f6fa3..459c61cd51 100644 --- a/crates/agent/src/controllers/publication_status.rs +++ b/crates/agent/src/controllers/publication_status.rs @@ -51,11 +51,16 @@ impl ActivationStatus { if state.last_build_id > self.last_activated { let name = state.catalog_name.clone(); let built_spec = state.built_spec.as_ref().expect("built_spec must be Some"); - control_plane - .data_plane_activate(name, built_spec, state.data_plane_id) - .await - .with_retry(backoff_data_plane_activate(state.failures)) - .context("failed to activate")?; + + crate::timeout( + std::time::Duration::from_secs(60), + control_plane.data_plane_activate(name, built_spec, state.data_plane_id), + || "Timeout while activating into data-plane", + ) + .await + .with_retry(backoff_data_plane_activate(state.failures)) + .context("failed to activate into data-plane")?; + tracing::debug!(last_activated = %state.last_build_id, "activated"); self.last_activated = state.last_build_id; } diff --git a/crates/agent/src/lib.rs b/crates/agent/src/lib.rs index 48b5698d80..fec010fb8f 100644 --- a/crates/agent/src/lib.rs +++ b/crates/agent/src/lib.rs @@ -74,3 +74,23 @@ pub fn next_name(current_name: &str) -> String { // complexity. format!("{current_name}_v2") } + +// timeout is a convienence for tokio::time::timeout which merges +// its error with the Future's nested anyhow::Result Output. +async fn timeout( + dur: std::time::Duration, + fut: Fut, + with_context: WC, +) -> anyhow::Result +where + C: std::fmt::Display + Send + Sync + 'static, + Fut: std::future::Future>, + WC: FnOnce() -> C, +{ + use anyhow::Context; + + match tokio::time::timeout(dur, fut).await { + Ok(result) => result, + Err(err) => Err(anyhow::anyhow!(err)).with_context(with_context), + } +} diff --git a/crates/agent/src/proxy_connectors.rs b/crates/agent/src/proxy_connectors.rs index ea963f8845..642c6f9dad 100644 --- a/crates/agent/src/proxy_connectors.rs +++ b/crates/agent/src/proxy_connectors.rs @@ -73,15 +73,25 @@ impl ProxyConnectors { task: ops::ShardRef, request: capture::Request, ) -> anyhow::Result { - let (channel, metadata, logs) = self.dial_proxy(data_plane, task).await?; + let (channel, metadata, logs) = crate::timeout( + DIAL_PROXY_TIMEOUT, + self.dial_proxy(data_plane, task), + || dial_proxy_timeout_msg(data_plane), + ) + .await?; + let mut client = proto_grpc::capture::connector_client::ConnectorClient::with_interceptor( channel, metadata, ) .max_decoding_message_size(runtime::MAX_MESSAGE_SIZE); - Self::drive_unary_response( - client.capture(futures::stream::once(async move { request })), - logs, + crate::timeout( + CONNECTOR_TIMEOUT, + Self::drive_unary_response( + client.capture(futures::stream::once(async move { request })), + logs, + ), + || CONNECTOR_TIMEOUT_MSG, ) .await } @@ -96,15 +106,25 @@ impl ProxyConnectors { task: ops::ShardRef, request: derive::Request, ) -> anyhow::Result { - let (channel, metadata, logs) = self.dial_proxy(data_plane, task).await?; + let (channel, metadata, logs) = crate::timeout( + DIAL_PROXY_TIMEOUT, + self.dial_proxy(data_plane, task), + || dial_proxy_timeout_msg(data_plane), + ) + .await?; + let mut client = proto_grpc::derive::connector_client::ConnectorClient::with_interceptor( channel, metadata, ) .max_decoding_message_size(runtime::MAX_MESSAGE_SIZE); - Self::drive_unary_response( - client.derive(futures::stream::once(async move { request })), - logs, + crate::timeout( + CONNECTOR_TIMEOUT, + Self::drive_unary_response( + client.derive(futures::stream::once(async move { request })), + logs, + ), + || CONNECTOR_TIMEOUT_MSG, ) .await } @@ -119,16 +139,26 @@ impl ProxyConnectors { task: ops::ShardRef, request: materialize::Request, ) -> anyhow::Result { - let (channel, metadata, logs) = self.dial_proxy(data_plane, task).await?; + let (channel, metadata, logs) = crate::timeout( + DIAL_PROXY_TIMEOUT, + self.dial_proxy(data_plane, task), + || dial_proxy_timeout_msg(data_plane), + ) + .await?; + let mut client = proto_grpc::materialize::connector_client::ConnectorClient::with_interceptor( channel, metadata, ) .max_decoding_message_size(runtime::MAX_MESSAGE_SIZE); - Self::drive_unary_response( - client.materialize(futures::stream::once(async move { request })), - logs, + crate::timeout( + CONNECTOR_TIMEOUT, + Self::drive_unary_response( + client.materialize(futures::stream::once(async move { request })), + logs, + ), + || CONNECTOR_TIMEOUT_MSG, ) .await } @@ -236,18 +266,21 @@ impl ProxyConnectors { } .map_err(runtime::status_to_anyhow); - let response = tokio::time::timeout(CONNECTOR_TIMEOUT, response); - - match futures::join!(response, log_loop) { - (Err(_timeout), _) => Err(anyhow::anyhow!("Timeout while waiting for the connector's response. Please verify any network configuration and retry.")), - (Ok(Err(response_err)), _) => Err(response_err), - (Ok(Ok(_)), Err(log_err)) => Err(log_err), - (Ok(Ok(response)), Ok(())) => Ok(response), - } + futures::try_join!(response, log_loop).map(|(response, ())| response) } } +const DIAL_PROXY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + +fn dial_proxy_timeout_msg(data_plane: &tables::DataPlane) -> String { + format!( + "Timeout starting remote proxy for connector in data-plane {}", + data_plane.data_plane_name + ) +} + const CONNECTOR_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // Five minutes. +const CONNECTOR_TIMEOUT_MSG: &'static str = "Timeout while waiting for the connector's response. Please verify any network configuration and retry."; /*