From ef39518ae46c02f0c182ff88140ec6defde22aca Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 18 Sep 2024 15:58:55 -0700 Subject: [PATCH] feat: reconnect block compression (#2878) This allows block compression (zstd) in the narrow case of using it for binary data. A more generalized approach to block compression can be handled as part of 2.1. --- python/python/benchmarks/test_compression.py | 83 +++++++++++++++++++ python/python/tests/test_file.py | 32 +++++++ rust/lance-encoding/src/encoder.rs | 22 +++-- .../src/encodings/physical/binary.rs | 34 ++++++-- 4 files changed, 160 insertions(+), 11 deletions(-) create mode 100644 python/python/benchmarks/test_compression.py diff --git a/python/python/benchmarks/test_compression.py b/python/python/benchmarks/test_compression.py new file mode 100644 index 0000000000..a357119034 --- /dev/null +++ b/python/python/benchmarks/test_compression.py @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors +import os +import random +from pathlib import Path +from typing import Optional + +import pyarrow as pa +import pytest +from lance.file import LanceFileReader, LanceFileWriter + +NUM_ROWS = 1000000 +NUM_INDICES = 100 +PREFIX = """ +SOME VERY LONG PREFIX THAT IS COMPRESSIBLE PROBABLY BUT WE WILL ADD A +NUMBER TO THE END OF IT TO MAKE IT NOT DICTIONARY COMPRESSIBLE. THIS +IS A PRETTY IDEAL CASE FOR COMPRESSION +""" + + +def generate_test_data(compression_scheme: Optional[str]): + strings = pa.array([f"{PREFIX}-{i}" for i in range(NUM_ROWS)], type=pa.string()) + + if compression_scheme is None: + metadata = None + else: + metadata = {"lance-encoding:compression": compression_scheme} + + schema = pa.schema( + [ + pa.field("strings", pa.string(), metadata=metadata), + ] + ) + return pa.table([strings], schema=schema) + + +# generate NUM_INDICES random indices between 0 and NUM_ROWS for scanning +@pytest.fixture(scope="module") +def random_indices(): + random.seed(42) + random_indices = sorted([random.randint(0, NUM_ROWS) for _ in range(NUM_INDICES)]) + return random_indices + + +def drop_page_cache(): + # Note: this will prompt the user for password, not ideal but simple + os.system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"') + + +@pytest.mark.benchmark +@pytest.mark.parametrize("compression", [None, "zstd"]) +def test_random_access(tmp_path: Path, benchmark, random_indices, compression): + benchmark.group = f"random-access-{compression}" + test_data = generate_test_data(compression) + lance_path = str(tmp_path / "random_access.lance") + + with LanceFileWriter(lance_path) as writer: + writer.write_batch(test_data) + + def read_lance_file_random(lance_path, random_indices): + drop_page_cache() + reader = LanceFileReader(lance_path) + reader.take_rows(random_indices).to_table() + + benchmark(read_lance_file_random, lance_path, random_indices) + + +@pytest.mark.benchmark +@pytest.mark.parametrize("compression", [None, "zstd"]) +def test_full_scan(tmp_path: Path, benchmark, compression): + benchmark.group = f"full-scan-{compression}" + test_data = generate_test_data(compression) + lance_path = str(tmp_path / "full_scan.lance") + + with LanceFileWriter(lance_path) as writer: + writer.write_batch(test_data) + + def read_lance_file_full(lance_path): + drop_page_cache() + reader = LanceFileReader(lance_path) + reader.read_all().to_table() + + benchmark(read_lance_file_full, lance_path) diff --git a/python/python/tests/test_file.py b/python/python/tests/test_file.py index 870d51da75..716f10de57 100644 --- a/python/python/tests/test_file.py +++ b/python/python/tests/test_file.py @@ -1,6 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright The Lance Authors +import os + import pyarrow as pa import pyarrow.parquet as pq import pytest @@ -341,3 +343,33 @@ def test_writer_maintains_order(tmp_path): reader = LanceFileReader(str(path)) result = reader.read_all().to_table() assert result == table + + +def test_compression(tmp_path): + # 10Ki strings, which should be highly compressible, but not eligible for dictionary + compressible_strings = [f"compress_me_please-{i}" for i in range(10 * 1024)] + table_default = pa.table({"compressible_strings": compressible_strings}) + + schema_compress = pa.schema( + [ + pa.field( + "compressible_strings", + pa.string(), + metadata={"lance-encoding:compression": "zstd"}, + ) + ] + ) + table_compress = pa.table( + {"compressible_strings": compressible_strings}, schema=schema_compress + ) + + with LanceFileWriter(str(tmp_path / "default.lance")) as writer: + writer.write_batch(table_default) + + with LanceFileWriter(str(tmp_path / "compress.lance"), schema_compress) as writer: + writer.write_batch(table_compress) + + size_default = os.path.getsize(tmp_path / "default.lance") + size_compress = os.path.getsize(tmp_path / "compress.lance") + + assert size_compress < size_default diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index cd94c63682..c2afe88eaa 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -35,6 +35,8 @@ use crate::{ use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; use std::collections::hash_map::RandomState; +pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression"; + /// An encoded array /// /// Maps to a single Arrow array @@ -213,17 +215,25 @@ impl CoreArrayEncodingStrategy { && data_size > 4 * 1024 * 1024 } + fn get_field_compression(field_meta: &HashMap) -> Option { + let compression = field_meta.get(COMPRESSION_META_KEY)?; + Some(compression.parse::().unwrap()) + } + fn default_binary_encoder( arrays: &[ArrayRef], data_type: &DataType, + field_meta: Option<&HashMap>, data_size: u64, version: LanceFileVersion, ) -> Result> { let bin_indices_encoder = Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?; - let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder)); - if Self::can_use_fsst(data_type, data_size, version) { + let compression = field_meta.and_then(Self::get_field_compression); + + let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, compression)); + if compression.is_none() && Self::can_use_fsst(data_type, data_size, version) { Ok(Box::new(FsstArrayEncoder::new(bin_encoder))) } else { Ok(bin_encoder) @@ -236,7 +246,7 @@ impl CoreArrayEncodingStrategy { data_size: u64, use_dict_encoding: bool, version: LanceFileVersion, - _field_meta: Option<&HashMap>, + field_meta: Option<&HashMap>, ) -> Result> { match data_type { DataType::FixedSizeList(inner, dimension) => { @@ -306,10 +316,12 @@ impl CoreArrayEncodingStrategy { FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize), )))) } else { - Self::default_binary_encoder(arrays, data_type, data_size, version) + Self::default_binary_encoder( + arrays, data_type, field_meta, data_size, version, + ) } } else { - Self::default_binary_encoder(arrays, data_type, data_size, version) + Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version) } } DataType::Struct(fields) => { diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index c678830e47..961f34586e 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -27,6 +27,8 @@ use arrow_array::{PrimitiveArray, UInt64Array}; use arrow_schema::DataType; use lance_core::Result; +use super::block_compress::{BufferCompressor, CompressionScheme, GeneralBufferCompressor}; + struct IndicesNormalizer { indices: Vec, validity: BooleanBufferBuilder, @@ -337,11 +339,22 @@ impl PrimitivePageDecoder for BinaryPageDecoder { #[derive(Debug)] pub struct BinaryEncoder { indices_encoder: Box, + compression_scheme: Option, + buffer_compressor: Option>, } impl BinaryEncoder { - pub fn new(indices_encoder: Box) -> Self { - Self { indices_encoder } + pub fn new( + indices_encoder: Box, + compression_scheme: Option, + ) -> Self { + let buffer_compressor = compression_scheme + .map(|scheme| GeneralBufferCompressor::get_compressor(&scheme.to_string())); + Self { + indices_encoder, + compression_scheme, + buffer_compressor, + } } // In 2.1 we will materialize nulls higher up (in the primitive encoder). Unfortunately, @@ -438,7 +451,7 @@ impl ArrayEncoder for BinaryEncoder { data_type: &DataType, buffer_index: &mut u32, ) -> Result { - let (data, nulls) = match data { + let (mut data, nulls) = match data { DataBlock::Nullable(nullable) => { let data = nullable.data.as_variable_width().unwrap(); (data, Some(nullable.nulls)) @@ -467,6 +480,12 @@ impl ArrayEncoder for BinaryEncoder { assert!(encoded_indices_data.bits_per_value <= 64); + if let Some(buffer_compressor) = &self.buffer_compressor { + let mut compressed_data = Vec::with_capacity(data.data.len()); + buffer_compressor.compress(&data.data, &mut compressed_data)?; + data.data = LanceBuffer::Owned(compressed_data); + } + let data = DataBlock::VariableWidth(VariableWidthBlock { bits_per_offset: encoded_indices_data.bits_per_value as u8, offsets: encoded_indices_data.data, @@ -476,9 +495,12 @@ impl ArrayEncoder for BinaryEncoder { let bytes_buffer_index = *buffer_index; *buffer_index += 1; - // TODO: Do we really need a "nested encoding" here? - let bytes_encoding = - ProtobufUtils::flat_encoding(/*bits_per_value=*/ 8, bytes_buffer_index, None); + + let bytes_encoding = ProtobufUtils::flat_encoding( + /*bits_per_value=*/ 8, + bytes_buffer_index, + self.compression_scheme, + ); let encoding = ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);