Skip to content

Commit

Permalink
Use separate heartbeat streams based on job settings
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Sep 20, 2024
1 parent aeead3f commit 3fd80af
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.options;

import java.util.Optional;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -132,10 +133,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

@Description(
"If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.")
@Default.Boolean(false)
boolean getUseSeparateWindmillHeartbeatStreams();

void setUseSeparateWindmillHeartbeatStreams(boolean value);
Boolean getUseSeparateWindmillHeartbeatStreams();
void setUseSeparateWindmillHeartbeatStreams(Boolean value);

@Description("The number of streams to use for GetData requests.")
@Default.Integer(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public final class StreamingDataflowWorker {
private static final int DEFAULT_STATUS_PORT = 8081;
private static final Random CLIENT_ID_GENERATOR = new Random();
private static final String CHANNELZ_PATH = "/channelz";
public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
"streaming_engine_use_job_settings_for_heartbeat_pool";

private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
Expand Down Expand Up @@ -253,12 +255,29 @@ private StreamingDataflowWorker(
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
heartbeatSender =
new StreamPoolHeartbeatSender(
options.getUseSeparateWindmillHeartbeatStreams()
? WindmillStreamPool.create(
1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream)
: getDataStreamPool);
// Experiment gates the logic till backend changes are rollback safe
if (DataflowRunner.hasExperiment(
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)) {
heartbeatSender =
// If the setting is explicitly passed in via PipelineOptions use it,
// else rely on the global config
options.getUseSeparateWindmillHeartbeatStreams() != null
? new StreamPoolHeartbeatSender(
options.getUseSeparateWindmillHeartbeatStreams()
? separateHeartbeatPool(windmillServer)
: getDataStreamPool)
: new StreamPoolHeartbeatSender(
separateHeartbeatPool(windmillServer),
getDataStreamPool,
configFetcher.getGlobalConfigHandle());
} else {
heartbeatSender =
new StreamPoolHeartbeatSender(
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
? separateHeartbeatPool(windmillServer)
: getDataStreamPool);
}

stuckCommitDurationMillis =
options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0;
statusPagesBuilder
Expand Down Expand Up @@ -326,6 +345,11 @@ private StreamingDataflowWorker(
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
}

private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
WindmillServerStub windmillServer) {
return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream);
}

public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {
long clientId = CLIENT_ID_GENERATOR.nextLong();
MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
Expand Down Expand Up @@ -834,6 +858,7 @@ private static ConfigFetcherComputationStateCacheAndWindmillClient create(
*/
@AutoValue
abstract static class BackgroundMemoryMonitor {

private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) {
return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor(
memoryMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
Expand All @@ -27,19 +30,45 @@
/** StreamingEngine stream pool based implementation of {@link HeartbeatSender}. */
@Internal
public final class StreamPoolHeartbeatSender implements HeartbeatSender {

private static final Logger LOG = LoggerFactory.getLogger(StreamPoolHeartbeatSender.class);

private final WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool;
@Nonnull
private final AtomicReference<WindmillStreamPool<WindmillStream.GetDataStream>>
heartbeatStreamPool = new AtomicReference<>();

public StreamPoolHeartbeatSender(
WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
this.heartbeatStreamPool = heartbeatStreamPool;
this.heartbeatStreamPool.set(heartbeatStreamPool);
}

/**
* Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on
* global config.
*
* @param heartbeatStreamPool stream to use when using separate streams for heartbeat is enabled.
* @param getDataPool stream to use when using separate streams for heartbeat is disabled.
*/
public StreamPoolHeartbeatSender(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool,
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
@Nonnull StreamingGlobalConfigHandle configHandle) {
// Use getDataPool as the default, settings callback will
// switch to the separate pool if enabled before processing any elements are processed.
this.heartbeatStreamPool.set(getDataPool);
configHandle.registerConfigObserver(
streamingGlobalConfig -> {
this.heartbeatStreamPool.set(
streamingGlobalConfig.userWorkerJobSettings().getUseSeparateWindmillHeartbeatStreams()
? heartbeatStreamPool
: getDataPool);
});
}

@Override
public void sendHeartbeats(Heartbeats heartbeats) {
try (CloseableStream<WindmillStream.GetDataStream> closeableStream =
heartbeatStreamPool.getCloseableStream()) {
heartbeatStreamPool.get().getCloseableStream()) {
closeableStream.stream().refreshActiveWork(heartbeats.heartbeatRequests().asMap());
} catch (Exception e) {
LOG.warn("Error occurred sending heartbeats=[{}].", heartbeats, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import static org.junit.Assert.assertEquals;

import java.util.Optional;
import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class StreamPoolHeartbeatSenderTest {

@Test
public void sendsHeartbeatsOnStream() {
FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
StreamPoolHeartbeatSender heartbeatSender =
new StreamPoolHeartbeatSender(
WindmillStreamPool.create(1, Duration.standardSeconds(10), server::getDataStream));
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
heartbeatsBuilder
.heartbeatRequestsBuilder()
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(1, server.getGetDataRequests().size());
}

@Test
public void sendsHeartbeatsOnDedicatedStream() {
FakeWindmillServer dedicatedServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
FakeWindmillServer getDataServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());

FixedGlobalConfigHandle configHandle =
new FixedGlobalConfigHandle(
StreamingGlobalConfig.builder()
.setUserWorkerJobSettings(
UserWorkerRunnerV1Settings.newBuilder()
.setUseSeparateWindmillHeartbeatStreams(true)
.build())
.build());
StreamPoolHeartbeatSender heartbeatSender =
new StreamPoolHeartbeatSender(
WindmillStreamPool.create(
1, Duration.standardSeconds(10), dedicatedServer::getDataStream),
WindmillStreamPool.create(
1, Duration.standardSeconds(10), getDataServer::getDataStream),
configHandle);
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
heartbeatsBuilder
.heartbeatRequestsBuilder()
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(1, dedicatedServer.getGetDataRequests().size());
assertEquals(0, getDataServer.getGetDataRequests().size());
}

@Test
public void sendsHeartbeatsOnGetDataStream() {
FakeWindmillServer dedicatedServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
FakeWindmillServer getDataServer =
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());

FixedGlobalConfigHandle configHandle =
new FixedGlobalConfigHandle(
StreamingGlobalConfig.builder()
.setUserWorkerJobSettings(
UserWorkerRunnerV1Settings.newBuilder()
.setUseSeparateWindmillHeartbeatStreams(false)
.build())
.build());
StreamPoolHeartbeatSender heartbeatSender =
new StreamPoolHeartbeatSender(
WindmillStreamPool.create(
1, Duration.standardSeconds(10), dedicatedServer::getDataStream),
WindmillStreamPool.create(
1, Duration.standardSeconds(10), getDataServer::getDataStream),
configHandle);
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
heartbeatsBuilder
.heartbeatRequestsBuilder()
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
assertEquals(0, dedicatedServer.getGetDataRequests().size());
assertEquals(1, getDataServer.getGetDataRequests().size());
}
}

0 comments on commit 3fd80af

Please sign in to comment.