Skip to content

Commit

Permalink
Invoke teardown when DoFn throws in portable runners
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Sep 20, 2024
1 parent f2d0558 commit c0fe7fc
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test"
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_PVR_Samza.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
1 change: 0 additions & 1 deletion runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
Expand Down
2 changes: 1 addition & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesMultimapState',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
]
Expand Down
3 changes: 2 additions & 1 deletion runners/samza/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def portableValidatesRunnerTask(String name, boolean docker) {
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
Expand Down Expand Up @@ -127,6 +126,8 @@ def portableValidatesRunnerTask(String name, boolean docker) {
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalid'
// TODO(https://github.com/apache/beam/issues/21144)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalidZeroAllowed'
// TODO(https://github.com/apache/beam/issues/32520)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionIn*Stateful'
// TODO(https://github.com/apache/beam/issues/21145)
excludeTestsMatching 'org.apache.beam.sdk.transforms.DeduplicateTest.testEventTime'
// TODO(https://github.com/apache/beam/issues/21146)
Expand Down
2 changes: 0 additions & 2 deletions runners/spark/job-server/spark_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker,
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
Expand Down Expand Up @@ -187,7 +186,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker,
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction
request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
} catch (Exception e) {
// Make sure we clean-up from the active set of bundle processors.
// Make sure we clean up from the active set of bundle processors.
bundleProcessorCache.discard(bundleProcessor);
throw e;
}
Expand Down Expand Up @@ -1168,13 +1168,27 @@ void discard() {
if (this.bundleCache != null) {
this.bundleCache.clear();
}
// setupFunction called in createBundleProcessor when BundleProcessorCache.get returns null.
// call teardownFunction here as the BundleProcessor is already removed from cache and isn't
// going to be re-used.
for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) {
try {
teardownFunction.run();
} catch (Throwable e) {
LOG.error(
"Exceptions are thrown from DoFn.teardown method. Note that it will not fail the"
+ " pipeline execution,",
e);
}
}
getMetricsEnvironmentStateForBundle().discard();
for (BeamFnDataOutboundAggregator aggregator : getOutboundAggregators().values()) {
aggregator.discard();
}
}
}

// this is called in cachedBundleProcessors removal listener
void shutdown() {
for (ThrowingRunnable tearDownFunction : getTearDownFunctions()) {
LOG.debug("Tearing down function {}", tearDownFunction);
Expand Down

0 comments on commit c0fe7fc

Please sign in to comment.