Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#32498][prism] Add split / progress back off + catch-up. #32526

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

lostluck
Copy link
Contributor

@lostluck lostluck commented Sep 20, 2024

Prism splits too aggressively in low latency scenarios, which for textio can cause splitting to single byte inputs most of which don't output anything.

While it would be nice to make this configurable, that is more plumbing than I'd like to do right now. Instead we add some per stage back off and catchup.

  • Give each stage an atomic progress interval value for its "ticks".
  • If no progress has been made in the interval (via no PCollections element count or channel index changes), split.
  • If we successfully split, back off the interval for all instances of the stage.
    • This back off is clamped to a 30 second interval maximum.
    • Back off is multiplicative (by a factor of 4).
    • New bundles for this stage use that value.
    • Reduces noisy per bundle log printouts by splitting less.
  • If bundles begin to complete without having any progress ticks, reduce the interval to catch up.
    • This catch up is clamped to a minimum interval of 100 milliseconds.
    • The catch up is additive (where we subtract the minimum interval from the current value).
    • This allows us to get to a clear "rate" per stage, where it should settle on getting at least one progress tick.

The stages are all independant, so different stages will get different progress intervals over time. If bundles are always making progress, splitting behavior is unchanged (biased to not split).

This allows for larger latencies to first input, while not hampering separation harness tests, or otherwise overdoing it.

There still remains the same bad case, of time between elements being larger than 2* maximumProgressInterval, but to resolve that likely requires a bit more global state awareness, and not splitting when maximum parallelism has been reached. But that would be a different change.

Fixes #32498.

Tested against the Wordcount example with tightened minimal timings, and sleeps. Once this is in the following command should be able to validate that it's fixed the issue for the higher latency cases.

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@master --input "gs://apache-beam-samples/shakespeare/kinglear.txt" --output counts
head -n 30 count

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@lostluck
Copy link
Contributor Author

R: @damondouglas

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link

codecov bot commented Sep 20, 2024

Codecov Report

Attention: Patch coverage is 81.48148% with 5 lines in your changes missing coverage. Please review.

Project coverage is 58.89%. Comparing base (f2d0558) to head (cf52998).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
sdks/go/pkg/beam/runners/prism/internal/stage.go 81.48% 4 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##             master   #32526   +/-   ##
=========================================
  Coverage     58.88%   58.89%           
  Complexity     3071     3071           
=========================================
  Files          1129     1129           
  Lines        173992   174018   +26     
  Branches       3329     3329           
=========================================
+ Hits         102463   102490   +27     
- Misses        68182    68185    +3     
+ Partials       3347     3343    -4     
Flag Coverage Δ
go 34.67% <81.48%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -220,12 +248,28 @@ progress:
Data: residuals,
})
}

// Any split means we're processing slower than desired, but splitting should increase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non blocking question. How does split translate to processing slower than desired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question.

That can be answered by looking at the issue being fixed, and seeing the behavior.

In this case, the file was being opened repeatedly and endlessly*.

By splitting too quickly, we end up doing more work, serializing and deserializing the elements in the set of elements to be processed by the SDK. So we weren't letting the SDK actually get any work done

This meant that because we were slow to open the file, we opened the file, again and again in different bundles.

*Eventually there would be nothing left to split, and lines would be emitted, but it would have been very wasteful.

Copy link
Contributor

@damondouglas damondouglas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: minimal wordcount golang example is freezeing on gcs reading
2 participants