Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6863][VL] Pre-alloc and reuse compress buffer to avoid OOM in spill #6869

Merged
merged 7 commits into from
Sep 5, 2024

Conversation

marin-ma
Copy link
Contributor

@marin-ma marin-ma commented Aug 15, 2024

During sort-shuffle spill, allocating compressed buffer can trigger another spill and lead to OOM. Because sortBuffer has fixed-size, the maximum compressed buffer size can be computed at the very begining, and the compressed buffer can be pre-allocated and reused for spill.

  • Use the configurations spark.io.compression.lz4.blockSize and spark.io.compression.zstd.bufferSize to align with spark. Allocate the sort buffer and compress buffer using the default memory pool as Spark counts this part of allocation into memory overhead.
  • Fix the issue if input data contains a row larger than the compress buffer size

@github-actions github-actions bot added the VELOX label Aug 15, 2024
Copy link

#6863

@marin-ma marin-ma changed the title [GLUTEN-6863][VL] Pre-alloc and reused compress buffer to avoid OOM in spill [GLUTEN-6863][VL] Pre-alloc and reuse compress buffer to avoid OOM in spill Aug 16, 2024
@github-actions github-actions bot added CORE works for Gluten Core RSS labels Aug 21, 2024
Copy link

Run Gluten Clickhouse CI

1 similar comment
Copy link

Run Gluten Clickhouse CI

@FelixYBW
Copy link
Contributor

Should we allocate the buffer using global allocator which is counted into overhead memory?

Copy link

Run Gluten Clickhouse CI

sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get());
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
// In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor
// memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FelixYBW arrow::default_memory_pool is used to allocate the sort buffer and compress buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer Can you help look at here? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a function defaultArrowMemoryPool to VeloxMemoryManager to unify the memory pool usage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code looks fine now as we don't have a mechanism to count global allocation of Arrow into Spark overhead memory.

In future we may report both Arrow and Velox's global pool usages to one counter which requires for some designs. So far we don't have that.

@FelixYBW
Copy link
Contributor

@jinchengchenghh can you take a look?

Copy link
Contributor

@jinchengchenghh jinchengchenghh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Add some comments.

@@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool close) {
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::evict(
arrow::Status LocalPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't need evictType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hashEvict need this param to know whether the evict source is a spill or not. If it's spill, the partition writer will write the payload to disk immediately, otherwise it will cache the payload.

"Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")"));
output = const_cast<uint8_t*>(compressed->data());
} else {
ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the buffer for uncompressed payload type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hold the original evicted buffer for uncompressed payload. There are no extra copy.

@@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() {
return getBufferSize(buffers_);
}

int64_t BlockPayload::maxCompressedLength(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move it to anonymous namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a public api for BlockPayload and is used by other components.

@@ -314,7 +314,7 @@ std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
uint32_t numRows;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_));
if (numRows == 0) {
if (arrowBuffers.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, numRows is set to zero in BlockPayload::deserialize once reach EOS. This PR remove this logic and use numRows = 0 to represent a segment of a large row that cannot be compressed within one block.

cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
cachedRows_ += numRows;
} else {
// For a large row, read all segments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit more? I don't catch the context here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments here to indicate this cases only occurs in sort buffer writer, and the numRows is 0. Do we have a more friendly way to specify the large row that is splited?

sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get());
rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
// In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor
// memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer Can you help look at here? Thanks!

@@ -266,6 +273,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}

arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
VELOX_CHECK(begin < end);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VELOX_DCHECK

for (auto useRadixSort : {true, false}) {
params.push_back(ShuffleTestParams{
ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort});
for (const auto compressionBufferSize : {4, 56, 32 * 1024}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the test for split large row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The condition of splitting a large row is the row size > compressionBufferSize. When compressionBufferSize is 4, most of the rows will be split.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to support this case or require the compression buffer should be larger than one row size at least, throw exception? I think we should have a check for the minimum config value. @FelixYBW

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark doesn't throw exception. It copies the row to a default 32k buffer for compressing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, It's fine to align with Spark behavior here.

if ("lz4" == codec) {
Math.max(
conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt,
GlutenConfig.GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support set the config? GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the default value 64 is much less than other compression kind default value 32 * 1024

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

64 is not the default value, unless user set IO_COMPRESSION_LZ4_BLOCKSIZE to a very small size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WE could set a more reasonable value, maybe 32 * 1024?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, we will throw exception if IO_COMPRESSION_LZ4_BLOCKSIZE < 4. For each serialized row, the row size takes 4 bytes. Therefore 4 bytes is the minimum acceptable compression block size in Gluten.

Note here for the Spark exceptions:

  • lz4: spark.io.compression.lz4.blockSize=0
Caused by: java.lang.IllegalArgumentException: blockSize must be >= 64, got 0
  at net.jpountz.lz4.LZ4BlockOutputStream.compressionLevel(LZ4BlockOutputStream.java:60)
  at net.jpountz.lz4.LZ4BlockOutputStream.<init>(LZ4BlockOutputStream.java:101)
  at org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:151)
  at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$2(TorrentBroadcast.scala:361)
  at scala.Option.map(Option.scala:230)
  at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:361)
  at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:161)
  at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
  at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
  at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
  at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1662)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1644)
  at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1585)
  at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1402)
  at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1337)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3003)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  • zstd: spark.io.compression.zstd.bufferSize=0
Caused by: java.lang.IllegalArgumentException: Buffer size <= 0
  at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:74)
  at org.apache.spark.io.ZStdCompressionCodec.compressedOutputStream(CompressionCodec.scala:237)
  at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$2(TorrentBroadcast.scala:361)
  at scala.Option.map(Option.scala:230)
  at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:361)
  at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:161)
  at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
  at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
  at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
  at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1662)
  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1644)
  at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1585)
  at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1402)
  at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1337)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3003)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

Copy link

github-actions bot commented Sep 5, 2024

Run Gluten Clickhouse CI

@marin-ma
Copy link
Contributor Author

marin-ma commented Sep 5, 2024

@jinchengchenghh Do you have further comments? Thanks!

cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
cachedRows_ += numRows;
} else {
// numRows = 0 indicates a segment of a large row.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract the numRows = 0 logic to a function to make code more readable?

RowSizeType bytes = 0;
auto* dst = rowBuffer->mutable_data();
for (const auto& buffer : buffers) {
VELOX_CHECK_NOT_NULL(buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VELOX_DCHECK, code logic should use DCHECK

Copy link

github-actions bot commented Sep 5, 2024

Run Gluten Clickhouse CI

@marin-ma
Copy link
Contributor Author

marin-ma commented Sep 5, 2024

@jinchengchenghh Do you have further comments? Thanks!

@jinchengchenghh
Copy link
Contributor

Thanks!

@marin-ma marin-ma merged commit eeb0ca1 into apache:main Sep 5, 2024
45 checks passed
dcoliversun pushed a commit to dcoliversun/gluten that referenced this pull request Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CORE works for Gluten Core RSS VELOX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants