diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 8b7f0991c2dd..0e584d564b5c 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -45,6 +46,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; @@ -431,8 +433,7 @@ public boolean start() throws IOException { client = spec.connectionConfiguration().createClient(); LOG.debug("Reader client ID is {}", client.getClientId()); checkpointMark.clientId = client.getClientId().toString(); - connection = client.blockingConnection(); - connection.connect(); + connection = createConnection(client); connection.subscribe( new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)}); return advance(); @@ -569,8 +570,7 @@ public void createMqttClient() throws Exception { LOG.debug("Starting MQTT writer"); client = spec.connectionConfiguration().createClient(); LOG.debug("MQTT writer client ID is {}", client.getClientId()); - connection = client.blockingConnection(); - connection.connect(); + connection = createConnection(client); } @ProcessElement @@ -590,4 +590,20 @@ public void closeMqttClient() throws Exception { } } } + + /** Create a connected MQTT BlockingConnection from given client, aware of connection timeout. */ + static BlockingConnection createConnection(MQTT client) throws Exception { + FutureConnection futureConnection = client.futureConnection(); + org.fusesource.mqtt.client.Future connecting = futureConnection.connect(); + while (true) { + try { + connecting.await(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + LOG.warn("Connection to {} pending after waiting for 1 minute", client.getHost()); + continue; + } + break; + } + return new BlockingConnection(futureConnection); + } } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 30adad708f8d..7d60d6d65780 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -142,8 +142,7 @@ public void testReadNoClientId() throws Exception { publisherThread.join(); } - @Test(timeout = 30 * 1000) - @Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic output.") + @Test(timeout = 40 * 1000) public void testRead() throws Exception { PCollection output = pipeline.apply( @@ -151,7 +150,7 @@ public void testRead() throws Exception { .withConnectionConfiguration( MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC") .withClientId("READ_PIPELINE")) - .withMaxReadTime(Duration.standardSeconds(3))); + .withMaxReadTime(Duration.standardSeconds(5))); PAssert.that(output) .containsInAnyOrder( "This is test 0".getBytes(StandardCharsets.UTF_8), @@ -180,12 +179,12 @@ public void testRead() throws Exception { + "messages ..."); boolean pipelineConnected = false; while (!pipelineConnected) { - Thread.sleep(1000); for (Connection connection : brokerService.getBroker().getClients()) { if (connection.getConnectionId().startsWith("READ_PIPELINE")) { pipelineConnected = true; } } + Thread.sleep(1000); } for (int i = 0; i < 10; i++) { publishConnection.publish(