Skip to content

Commit

Permalink
Emit warning when Mqtt waiting for connection for extended period of …
Browse files Browse the repository at this point in the history
…time (#32500)

* Emit warning when Matt waiting for connection for extended period of time

* address comment; adjust timeout
  • Loading branch information
Abacn committed Sep 19, 2024
1 parent bb96ac0 commit ec307a5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
Expand All @@ -45,6 +46,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
Expand Down Expand Up @@ -431,8 +433,7 @@ public boolean start() throws IOException {
client = spec.connectionConfiguration().createClient();
LOG.debug("Reader client ID is {}", client.getClientId());
checkpointMark.clientId = client.getClientId().toString();
connection = client.blockingConnection();
connection.connect();
connection = createConnection(client);
connection.subscribe(
new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)});
return advance();
Expand Down Expand Up @@ -569,8 +570,7 @@ public void createMqttClient() throws Exception {
LOG.debug("Starting MQTT writer");
client = spec.connectionConfiguration().createClient();
LOG.debug("MQTT writer client ID is {}", client.getClientId());
connection = client.blockingConnection();
connection.connect();
connection = createConnection(client);
}

@ProcessElement
Expand All @@ -590,4 +590,20 @@ public void closeMqttClient() throws Exception {
}
}
}

/** Create a connected MQTT BlockingConnection from given client, aware of connection timeout. */
static BlockingConnection createConnection(MQTT client) throws Exception {
FutureConnection futureConnection = client.futureConnection();
org.fusesource.mqtt.client.Future<Void> connecting = futureConnection.connect();
while (true) {
try {
connecting.await(1, TimeUnit.MINUTES);
} catch (TimeoutException e) {
LOG.warn("Connection to {} pending after waiting for 1 minute", client.getHost());
continue;
}
break;
}
return new BlockingConnection(futureConnection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,15 @@ public void testReadNoClientId() throws Exception {
publisherThread.join();
}

@Test(timeout = 30 * 1000)
@Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic output.")
@Test(timeout = 40 * 1000)
public void testRead() throws Exception {
PCollection<byte[]> output =
pipeline.apply(
MqttIO.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC")
.withClientId("READ_PIPELINE"))
.withMaxReadTime(Duration.standardSeconds(3)));
.withMaxReadTime(Duration.standardSeconds(5)));
PAssert.that(output)
.containsInAnyOrder(
"This is test 0".getBytes(StandardCharsets.UTF_8),
Expand Down Expand Up @@ -180,12 +179,12 @@ public void testRead() throws Exception {
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
Thread.sleep(1000);
for (Connection connection : brokerService.getBroker().getClients()) {
if (connection.getConnectionId().startsWith("READ_PIPELINE")) {
pipelineConnected = true;
}
}
Thread.sleep(1000);
}
for (int i = 0; i < 10; i++) {
publishConnection.publish(
Expand Down

0 comments on commit ec307a5

Please sign in to comment.