Skip to content

Commit

Permalink
Use separate isolated streams based on job settings
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Sep 20, 2024
1 parent 3fd80af commit 3085cf0
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {
void setWindmillMessagesBetweenIsReadyChecks(int value);

@Description("If true, a most a single active rpc will be used per channel.")
@Default.Boolean(false)
boolean getUseWindmillIsolatedChannels();
Boolean getUseWindmillIsolatedChannels();

void setUseWindmillIsolatedChannels(boolean value);
void setUseWindmillIsolatedChannels(Boolean value);

@Description(
"If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.dataflow.model.CounterUpdate;
Expand Down Expand Up @@ -63,7 +62,6 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
Expand All @@ -79,10 +77,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
Expand All @@ -100,7 +95,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -454,7 +448,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
GrpcWindmillStreamFactory windmillStreamFactory;
if (options.isEnableStreamingEngine()) {
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(createStubFactory(options));
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
configFetcher =
StreamingEngineComputationConfigFetcher.create(
options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient);
Expand All @@ -480,7 +474,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
GrpcWindmillServer.create(
options,
windmillStreamFactory,
GrpcDispatcherClient.create(createStubFactory(options)));
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)));
} else {
windmillStreamFactory = windmillStreamFactoryBuilder.build();
windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport());
Expand Down Expand Up @@ -684,24 +678,6 @@ public static void main(String[] args) throws Exception {
worker.start();
}

private static ChannelCachingStubFactory createStubFactory(
DataflowWorkerHarnessOptions workerOptions) {
Function<WindmillServiceAddress, ManagedChannel> channelFactory =
serviceAddress ->
remoteChannel(
serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec());
ChannelCache channelCache =
ChannelCache.create(
serviceAddress ->
// IsolationChannel will create and manage separate RPC channels to the same
// serviceAddress via calling the channelFactory, else just directly return the
// RPC channel.
workerOptions.getUseWindmillIsolatedChannels()
? IsolationChannel.create(() -> channelFactory.apply(serviceAddress))
: channelFactory.apply(serviceAddress));
return ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), channelCache);
}

private static int chooseMaxThreads(DataflowWorkerHarnessOptions options) {
if (options.getNumberOfWorkerHarnessThreads() != 0) {
return options.getNumberOfWorkerHarnessThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand All @@ -49,7 +52,6 @@
public class GrpcDispatcherClient {

private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class);
private final WindmillStubFactory windmillStubFactory;
private final CountDownLatch onInitializedEndpoints;

/**
Expand All @@ -62,31 +64,52 @@ public class GrpcDispatcherClient {
@GuardedBy("this")
private final Random rand;

private final WindmillStubFactoryFactory windmillStubFactoryFactory;

private AtomicReference<WindmillStubFactory> windmillStubFactory = new AtomicReference<>();

private final AtomicBoolean useIsolatedChannels = new AtomicBoolean();
private final boolean reactToIsolatedChannelsJobSetting;

private GrpcDispatcherClient(
WindmillStubFactory windmillStubFactory,
DataflowWorkerHarnessOptions options,
WindmillStubFactoryFactory windmillStubFactoryFactory,
DispatcherStubs initialDispatcherStubs,
Random rand) {
this.windmillStubFactory = windmillStubFactory;
this.windmillStubFactoryFactory = windmillStubFactoryFactory;
if (options.getUseWindmillIsolatedChannels() != null) {
this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
this.reactToIsolatedChannelsJobSetting = false;
} else {
this.useIsolatedChannels.set(false);
this.reactToIsolatedChannelsJobSetting = true;
}
this.windmillStubFactory.set(
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
this.rand = rand;
this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
this.onInitializedEndpoints = new CountDownLatch(1);
}

public static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
return new GrpcDispatcherClient(windmillStubFactory, DispatcherStubs.empty(), new Random());
public static GrpcDispatcherClient create(
DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory windmillStubFactoryFactory) {
return new GrpcDispatcherClient(
options, windmillStubFactoryFactory, DispatcherStubs.empty(), new Random());
}

@VisibleForTesting
public static GrpcDispatcherClient forTesting(
WindmillStubFactory windmillGrpcStubFactory,
DataflowWorkerHarnessOptions options,
WindmillStubFactoryFactory windmillStubFactoryFactory,
List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
List<CloudWindmillMetadataServiceV1Alpha1Stub> windmillMetadataServiceStubs,
Set<HostAndPort> dispatcherEndpoints) {
Preconditions.checkArgument(
dispatcherEndpoints.size() == windmillServiceStubs.size()
&& windmillServiceStubs.size() == windmillMetadataServiceStubs.size());
return new GrpcDispatcherClient(
windmillGrpcStubFactory,
options,
windmillStubFactoryFactory,
DispatcherStubs.create(
dispatcherEndpoints, windmillServiceStubs, windmillMetadataServiceStubs),
new Random());
Expand Down Expand Up @@ -153,17 +176,31 @@ public void onJobConfig(StreamingGlobalConfig config) {
LOG.warn("Dispatcher client received empty windmill service endpoints from global config");
return;
}
consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
boolean forceRecreateStubs = false;
if (reactToIsolatedChannelsJobSetting) {
boolean useIsolatedChannels = config.userWorkerJobSettings().getUseWindmillIsolatedChannels();
if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) != useIsolatedChannels) {
windmillStubFactory.set(
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels));
forceRecreateStubs = true;
}
}
consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(), forceRecreateStubs);
}

public synchronized void consumeWindmillDispatcherEndpoints(
ImmutableSet<HostAndPort> dispatcherEndpoints) {
consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /*forceRecreateStubs=*/ false);
}

private synchronized void consumeWindmillDispatcherEndpoints(
ImmutableSet<HostAndPort> dispatcherEndpoints, boolean forceRecreateStubs) {
ImmutableSet<HostAndPort> currentDispatcherEndpoints =
dispatcherStubs.get().dispatcherEndpoints();
Preconditions.checkArgument(
dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
"Cannot set dispatcher endpoints to nothing.");
if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
if (!forceRecreateStubs && currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
// The endpoints are equal don't recreate the stubs.
return;
}
Expand All @@ -174,7 +211,7 @@ public synchronized void consumeWindmillDispatcherEndpoints(
}

LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", dispatcherEndpoints);
dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory));
dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory.get()));
onInitializedEndpoints.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
Expand Down Expand Up @@ -154,7 +154,7 @@ static GrpcWindmillServer newTestInstance(
String name,
List<String> experiments,
long clientId,
WindmillStubFactory windmillStubFactory) {
WindmillStubFactoryFactory windmillStubFactoryFactory) {
ManagedChannel inProcessChannel = inProcessChannel(name);
CloudWindmillServiceV1Alpha1Stub stub =
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel);
Expand All @@ -164,16 +164,18 @@ static GrpcWindmillServer newTestInstance(
List<CloudWindmillMetadataServiceV1Alpha1Stub> windmillMetadataServiceStubs =
Lists.newArrayList(metadataStub);

DataflowWorkerHarnessOptions testOptions =
testOptions(/* enableStreamingEngine= */ true, experiments);

Set<HostAndPort> dispatcherEndpoints = Sets.newHashSet(HostAndPort.fromHost(name));
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.forTesting(
windmillStubFactory,
testOptions,
windmillStubFactoryFactory,
windmillServiceStubs,
windmillMetadataServiceStubs,
dispatcherEndpoints);

DataflowWorkerHarnessOptions testOptions =
testOptions(/* enableStreamingEngine= */ true, experiments);
boolean sendKeyedGetDataRequests =
!testOptions.isEnableStreamingEngine()
|| DataflowRunner.hasExperiment(
Expand All @@ -190,15 +192,15 @@ static GrpcWindmillServer newTestInstance(

@VisibleForTesting
static GrpcWindmillServer newApplianceTestInstance(
Channel channel, WindmillStubFactory windmillStubFactory) {
Channel channel, WindmillStubFactoryFactory windmillStubFactoryFactory) {
DataflowWorkerHarnessOptions options =
testOptions(/* enableStreamingEngine= */ false, new ArrayList<>());
GrpcWindmillServer testServer =
new GrpcWindmillServer(
options,
GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
// No-op, Appliance does not use Dispatcher to call Streaming Engine.
GrpcDispatcherClient.create(windmillStubFactory));
GrpcDispatcherClient.create(options, windmillStubFactoryFactory));
testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel);
return testServer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;

import org.apache.beam.sdk.annotations.Internal;

@Internal
public interface WindmillStubFactoryFactory {
WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;

import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;

import com.google.auth.Credentials;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;

public class WindmillStubFactoryFactoryImpl implements WindmillStubFactoryFactory {

private final int windmillServiceRpcChannelAliveTimeoutSec;
private final Credentials gcpCredential;

public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions workerOptions) {
this.gcpCredential = workerOptions.getGcpCredential();
this.windmillServiceRpcChannelAliveTimeoutSec =
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec();
}

@Override
public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) {
Function<WindmillServiceAddress, ManagedChannel> channelFactory =
serviceAddress -> remoteChannel(serviceAddress, windmillServiceRpcChannelAliveTimeoutSec);
ChannelCache channelCache =
ChannelCache.create(
serviceAddress ->
// IsolationChannel will create and manage separate RPC channels to the same
// serviceAddress via calling the channelFactory, else just directly return the
// RPC channel.
useIsolatedChannels
? IsolationChannel.create(() -> channelFactory.apply(serviceAddress))
: channelFactory.apply(serviceAddress));
return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
}
}
Loading

0 comments on commit 3085cf0

Please sign in to comment.