Skip to content

Commit

Permalink
use spark compression buffer size and fix sorting large row
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Aug 21, 2024
1 parent 573f414 commit 81ee413
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 41 deletions.
2 changes: 2 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jstring codecJstr,
jstring codecBackendJstr,
jint compressionLevel,
jint compressionBufferSize,
jint compressionThreshold,
jstring compressionModeJstr,
jint sortBufferInitialSize,
Expand Down Expand Up @@ -802,6 +803,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.startPartitionId = startPartitionId,
.shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)),
.sortBufferInitialSize = sortBufferInitialSize,
.compressionBufferSize = compressionBufferSize,
.useRadixSort = static_cast<bool>(useRadixSort)};

// Build PartitionWriterOptions.
Expand Down
2 changes: 2 additions & 0 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
static const std::string kDefaultCompressionTypeStr = "lz4";
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
Expand Down Expand Up @@ -62,6 +63,7 @@ struct ShuffleWriterOptions {

// Sort shuffle writer.
int32_t sortBufferInitialSize = kDefaultSortBufferSize;
int32_t compressionBufferSize = kDefaultCompressionBufferSize;
bool useRadixSort = kDefaultUseRadixSort;
};

Expand Down
9 changes: 3 additions & 6 deletions cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,11 @@ class PartitionWriter : public Reclaimable {
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) = 0;

arrow::Result<std::shared_ptr<arrow::Buffer>> getCompressedBuffer(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::MemoryPool* pool) {
std::optional<int64_t> getCompressedBufferLength(const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
if (!codec_) {
return nullptr;
return std::nullopt;
}
auto compressedLength = BlockPayload::maxCompressedLength(buffers, codec_.get());
return arrow::AllocateBuffer(compressedLength, pool);
return BlockPayload::maxCompressedLength(buffers, codec_.get());
}

virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;
Expand Down
39 changes: 33 additions & 6 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
uint32_t numRows;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_));
if (numRows == 0) {
if (arrowBuffers.empty()) {
// Reach EOS.
return nullptr;
}
Expand All @@ -333,7 +333,7 @@ std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
while (!merged_ || merged_->numRows() < batchSize_) {
GLUTEN_ASSIGN_OR_THROW(
arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_));
if (numRows == 0) {
if (arrowBuffers.empty()) {
reachEos_ = true;
break;
}
Expand Down Expand Up @@ -403,16 +403,43 @@ std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::next() {
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_));

if (numRows == 0) {
if (arrowBuffers.empty()) {
reachEos_ = true;
if (cachedRows_ > 0) {
return deserializeToBatch();
}
return nullptr;
}
auto buffer = std::move(arrowBuffers[0]);
cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
cachedRows_ += numRows;

if (numRows > 0) {
auto buffer = std::move(arrowBuffers[0]);
cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
cachedRows_ += numRows;
} else {
// For a large row, read all segments.
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
auto rowSize = *reinterpret_cast<RowSizeType*>(const_cast<uint8_t*>(arrowBuffers[0]->data()));
RowSizeType bufferSize = arrowBuffers[0]->size();
buffers.emplace_back(std::move(arrowBuffers[0]));
while (bufferSize < rowSize) {
GLUTEN_ASSIGN_OR_THROW(
arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_));
bufferSize += arrowBuffers[0]->size();
buffers.emplace_back(std::move(arrowBuffers[0]));
}
VELOX_CHECK_EQ(bufferSize, rowSize);
// Merge all segments.
GLUTEN_ASSIGN_OR_THROW(std::shared_ptr<arrow::Buffer> rowBuffer, arrow::AllocateBuffer(rowSize, arrowPool_));
RowSizeType bytes = 0;
auto* dst = rowBuffer->mutable_data();
for (const auto& buffer : buffers) {
VELOX_CHECK_NOT_NULL(buffer);
gluten::fastCopy(dst + bytes, buffer->data(), buffer->size());
bytes += buffer->size();
}
cachedInputs_.emplace_back(1, wrapInBufferViewAsOwner(rowBuffer->data(), rowSize, rowBuffer));
cachedRows_++;
}
}
return deserializeToBatch();
}
Expand Down
51 changes: 36 additions & 15 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1;
constexpr uint32_t kPartitionIdStartByteIndex = 5;
constexpr uint32_t kPartitionIdEndByteIndex = 7;

constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024;

uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) {
// |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) |
return (uint64_t)partitionId << 40 | (uint64_t)pageNumber << 27 | offsetInPage;
Expand Down Expand Up @@ -106,11 +104,17 @@ arrow::Status VeloxSortShuffleWriter::init() {
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
allocateMinimalArray();
ARROW_ASSIGN_OR_RAISE(sortedBuffer_, arrow::AllocateBuffer(kSortedBufferSize, pool_));
rawBuffer_ = sortedBuffer_->mutable_data();
// In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and not counted into executor
// memory. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten.
ARROW_ASSIGN_OR_RAISE(
compressedBuffer_,
partitionWriter_->getCompressedBuffer({std::make_shared<arrow::Buffer>(rawBuffer_, kSortedBufferSize)}, pool_));
sortedBuffer_, arrow::AllocateBuffer(options_.compressionBufferSize, arrow::default_memory_pool()));
rawBuffer_ = sortedBuffer_->mutable_data();
auto compressedBufferLength = partitionWriter_->getCompressedBufferLength(
{std::make_shared<arrow::Buffer>(rawBuffer_, options_.compressionBufferSize)});
if (compressedBufferLength.has_value()) {
ARROW_ASSIGN_OR_RAISE(
compressionBuffer_, arrow::AllocateBuffer(*compressedBufferLength, arrow::default_memory_pool()));
}
return arrow::Status::OK();
}

Expand Down Expand Up @@ -281,32 +285,49 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
addr = pageAddresses_[pageIndex.first] + pageIndex.second;
size = *(RowSizeType*)addr;
if (offset + size > kSortedBufferSize) {
if (offset + size > options_.compressionBufferSize && offset > 0) {
sortTime.stop();
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset));
sortTime.start();
begin = index;
offset = 0;
}
gluten::fastCopy(rawBuffer_ + offset, addr, size);
offset += size;
if (size > options_.compressionBufferSize) {
// Split large rows.
sortTime.stop();
RowSizeType bytes = 0;
auto* buffer = reinterpret_cast<uint8_t*>(addr);
while (bytes < size) {
auto rawLength = std::min<RowSizeType>((uint32_t)options_.compressionBufferSize, size - bytes);
// Use numRows = 0 to represent a part of row.
RETURN_NOT_OK(evictPartition0(partitionId, 0, buffer + bytes, rawLength));
bytes += rawLength;
}
sortTime.start();
} else {
// Copy small rows.
gluten::fastCopy(rawBuffer_ + offset, addr, size);
offset += size;
}
index++;
}
sortTime.stop();
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));

if (offset > 0) {
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset));
}
sortTime_ += sortTime.realTimeUsed();
return arrow::Status::OK();
}

arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength) {
arrow::Status
VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength) {
VELOX_CHECK(rawLength > 0);
auto payload = std::make_unique<InMemoryPayload>(
numRows,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_, rawLength)});
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer, rawLength)});
updateSpillMetrics(payload);
RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), compressedBuffer_, stopped_));
RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), compressionBuffer_, stopped_));
return arrow::Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);

arrow::Status evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength);
arrow::Status evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength);

uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);

Expand Down Expand Up @@ -109,7 +109,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

std::unique_ptr<arrow::Buffer> sortedBuffer_;
uint8_t* rawBuffer_;
std::shared_ptr<arrow::Buffer> compressedBuffer_;
std::shared_ptr<arrow::Buffer> compressionBuffer_{nullptr};

// Row ID -> Partition ID
// subscript: The index of row in the current input RowVector
Expand Down
20 changes: 12 additions & 8 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,28 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};

for (const auto& compression : compressions) {
for (auto useRadixSort : {true, false}) {
params.push_back(ShuffleTestParams{
ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort});
for (const auto compressionBufferSize : {1, 56, 32 * 1024}) {
for (auto useRadixSort : {true, false}) {
params.push_back(ShuffleTestParams{
.shuffleWriterType = ShuffleWriterType::kSortShuffle,
.partitionWriterType = PartitionWriterType::kLocal,
.compressionType = compression,
.compressionBufferSize = compressionBufferSize,
.useRadixSort = useRadixSort});
}
}
params.push_back(
ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0, false});
params.push_back(ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression});
for (const auto compressionThreshold : compressionThresholds) {
for (const auto mergeBufferSize : mergeBufferSizes) {
params.push_back(ShuffleTestParams{
ShuffleWriterType::kHashShuffle,
PartitionWriterType::kLocal,
compression,
compressionThreshold,
mergeBufferSize,
false /* unused */});
mergeBufferSize});
}
params.push_back(ShuffleTestParams{
ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0});
ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold});
}
}

Expand Down
11 changes: 7 additions & 4 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ struct ShuffleTestParams {
ShuffleWriterType shuffleWriterType;
PartitionWriterType partitionWriterType;
arrow::Compression::type compressionType;
int32_t compressionThreshold;
int32_t mergeBufferSize;
bool useRadixSort;
int32_t compressionThreshold{0};
int32_t mergeBufferSize{0};
int32_t compressionBufferSize{0};
bool useRadixSort{false};

std::string toString() const {
std::ostringstream out;
out << "shuffleWriterType = " << shuffleWriterType << ", partitionWriterType = " << partitionWriterType
<< ", compressionType = " << compressionType << ", compressionThreshold = " << compressionThreshold
<< ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " << (useRadixSort ? "true" : "false");
<< ", mergeBufferSize = " << mergeBufferSize << ", compressionBufferSize = " << compressionBufferSize
<< ", useRadixSort = " << (useRadixSort ? "true" : "false");
return out.str();
}
};
Expand Down Expand Up @@ -255,6 +257,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams

ShuffleTestParams params = GetParam();
shuffleWriterOptions_.useRadixSort = params.useRadixSort;
shuffleWriterOptions_.compressionBufferSize = params.compressionBufferSize;
partitionWriterOptions_.compressionType = params.compressionType;
switch (partitionWriterOptions_.compressionType) {
case arrow::Compression::UNCOMPRESSED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ abstract class CelebornColumnarShuffleWriter[K, V](
customizedCompressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

protected val compressionBufferSize: Int =
GlutenShuffleUtils.getCompressionBufferSize(conf, customizedCompressionCodec)

protected val bufferCompressThreshold: Int =
GlutenConfig.getConf.columnarShuffleCompressionThreshold

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
nativeBufferSize,
customizedCompressionCodec,
compressionLevel,
compressionBufferSize,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ object GlutenShuffleUtils {
}
}

def getCompressionBufferSize(conf: SparkConf, codec: String): Int = {
if ("lz4" == codec) {
conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
} else if ("zstd" == codec) {
conf.get(IO_COMPRESSION_ZSTD_BUFFERSIZE).toInt
} else {
32 * 1024
}
}

def getReaderParam[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public long make(
String codec,
String codecBackend,
int compressionLevel,
int compressionBufferSize,
int bufferCompressThreshold,
String compressionMode,
int sortBufferInitialSize,
Expand All @@ -80,6 +81,7 @@ public long make(
codec,
codecBackend,
compressionLevel,
compressionBufferSize,
bufferCompressThreshold,
compressionMode,
sortBufferInitialSize,
Expand Down Expand Up @@ -111,6 +113,7 @@ public long makeForRSS(
int bufferSize,
String codec,
int compressionLevel,
int compressionBufferSize,
int bufferCompressThreshold,
String compressionMode,
int sortBufferInitialSize,
Expand All @@ -133,6 +136,7 @@ public long makeForRSS(
codec,
null,
compressionLevel,
compressionBufferSize,
bufferCompressThreshold,
compressionMode,
sortBufferInitialSize,
Expand Down Expand Up @@ -160,6 +164,7 @@ public native long nativeMake(
String codec,
String codecBackend,
int compressionLevel,
int compressionBufferSize,
int bufferCompressThreshold,
String compressionMode,
int sortBufferInitialSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class ColumnarShuffleWriter[K, V](
private val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, compressionCodecBackend)

private val compressionBufferSize =
GlutenShuffleUtils.getCompressionBufferSize(conf, compressionCodec)

private val bufferCompressThreshold =
GlutenConfig.getConf.columnarShuffleCompressionThreshold

Expand Down Expand Up @@ -149,6 +152,7 @@ class ColumnarShuffleWriter[K, V](
compressionCodec,
compressionCodecBackend,
compressionLevel,
compressionBufferSize,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
Expand Down
Loading

0 comments on commit 81ee413

Please sign in to comment.