Skip to content

Commit

Permalink
flowctl: allow specifying --network in raw commands
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee authored and jgraettinger committed Sep 20, 2023
1 parent 70c7d22 commit eb99dcd
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
11 changes: 7 additions & 4 deletions crates/flowctl/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn inspect(image: &str) -> anyhow::Result<Output> {

const CONNECTOR_INIT_PORT: u16 = 49092;

pub fn docker_spawn(image: &str, args: &[&str]) -> anyhow::Result<(Child, TempDir, u16)> {
pub fn docker_spawn(image: &str, args: &[&str], network: &str) -> anyhow::Result<(Child, TempDir, u16)> {
pull(image).context(format!("pulling {image}"))?;

let inspect_output = inspect(image).context(format!("inspecting {image}"))?;
Expand Down Expand Up @@ -58,6 +58,8 @@ pub fn docker_spawn(image: &str, args: &[&str]) -> anyhow::Result<(Child, TempDi
),
"--mount",
&format!("type=bind,source={host_inspect_str},target={target_inspect}"),
"--network",
network,
"--publish",
&format!("0.0.0.0:{}:{}/tcp", port, CONNECTOR_INIT_PORT),
image,
Expand Down Expand Up @@ -86,8 +88,8 @@ async fn connector_client(port: u16) -> anyhow::Result<ConnectorClient<tonic::tr
}
}

pub async fn docker_run(image: &str, req: Request) -> anyhow::Result<Response> {
let (_child, _dir, port) = docker_spawn(image, &[])?;
pub async fn docker_run(image: &str, network: &str, req: Request) -> anyhow::Result<Response> {
let (_child, _dir, port) = docker_spawn(image, &[], network)?;

let mut client = connector_client(port).await?;

Expand All @@ -100,11 +102,12 @@ pub async fn docker_run(image: &str, req: Request) -> anyhow::Result<Response> {

pub async fn docker_run_stream(
image: &str,
network: &str,
stream: Pin<Box<dyn Stream<Item = Request> + Send + Sync>>,
) -> anyhow::Result<
Pin<Box<dyn TryStream<Item = anyhow::Result<Response>, Ok = Response, Error = anyhow::Error>>>,
> {
let (_child, _dir, port) = docker_spawn(image, &[])?;
let (_child, _dir, port) = docker_spawn(image, &[], network)?;
let mut client = connector_client(port).await?;
let response_stream = client.capture(stream).await?;

Expand Down
8 changes: 7 additions & 1 deletion crates/flowctl/src/raw/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub struct Capture {
/// Print the reduced checkpoint of the connector as it gets updated
#[clap(long, action)]
print_checkpoint: bool,

/// Docker network to run the connector
#[clap(long, default_value="bridge")]
network: String,
}

#[derive(Deserialize)]
Expand All @@ -40,6 +44,7 @@ pub async fn do_capture(
ctx: &mut crate::CliContext,
Capture {
source,
network,
print_checkpoint,
}: &Capture,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -80,7 +85,7 @@ pub async fn do_capture(
..Default::default()
};

let apply_output = docker_run(&cfg.image, apply)
let apply_output = docker_run(&cfg.image, &network, apply)
.await
.context("connector apply")?;

Expand Down Expand Up @@ -159,6 +164,7 @@ pub async fn do_capture(
});
let mut out_stream = docker_run_stream(
&cfg.image,
&network,
Box::pin(stream::once(async { open }).chain(in_stream)),
)
.await
Expand Down
7 changes: 7 additions & 0 deletions crates/flowctl/src/raw/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub struct Discover {
/// Should specs be written to the single specification file, or written in the canonical layout?
#[clap(long)]
flat: bool,

/// Docker network to run the connector
#[clap(long, default_value="bridge")]
network: String,
}

pub async fn do_discover(
Expand All @@ -42,6 +46,7 @@ pub async fn do_discover(
prefix,
overwrite,
flat,
network,
}: &Discover,
) -> anyhow::Result<()> {
let connector_name = image
Expand Down Expand Up @@ -70,6 +75,7 @@ pub async fn do_discover(
if let Some(config) = cfg {
let discover_output = docker_run(
image,
&network,
Request {
discover: Some(request::Discover {
connector_type: ConnectorType::Image.into(),
Expand Down Expand Up @@ -150,6 +156,7 @@ pub async fn do_discover(
// Otherwise send a Spec RPC and use that to write a sample config file
let spec_output = docker_run(
image,
&network,
Request {
spec: Some(request::Spec {
connector_type: ConnectorType::Image.into(),
Expand Down

0 comments on commit eb99dcd

Please sign in to comment.