Skip to content

Commit

Permalink
feat: reconnect block compression (#2878)
Browse files Browse the repository at this point in the history
This allows block compression (zstd) in the narrow case of using it for
binary data. A more generalized approach to block compression can be
handled as part of 2.1.
  • Loading branch information
westonpace committed Sep 18, 2024
1 parent 2c13945 commit ef39518
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 11 deletions.
83 changes: 83 additions & 0 deletions python/python/benchmarks/test_compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
import os
import random
from pathlib import Path
from typing import Optional

import pyarrow as pa
import pytest
from lance.file import LanceFileReader, LanceFileWriter

NUM_ROWS = 1000000
NUM_INDICES = 100
PREFIX = """
SOME VERY LONG PREFIX THAT IS COMPRESSIBLE PROBABLY BUT WE WILL ADD A
NUMBER TO THE END OF IT TO MAKE IT NOT DICTIONARY COMPRESSIBLE. THIS
IS A PRETTY IDEAL CASE FOR COMPRESSION
"""


def generate_test_data(compression_scheme: Optional[str]):
strings = pa.array([f"{PREFIX}-{i}" for i in range(NUM_ROWS)], type=pa.string())

if compression_scheme is None:
metadata = None
else:
metadata = {"lance-encoding:compression": compression_scheme}

schema = pa.schema(
[
pa.field("strings", pa.string(), metadata=metadata),
]
)
return pa.table([strings], schema=schema)


# generate NUM_INDICES random indices between 0 and NUM_ROWS for scanning
@pytest.fixture(scope="module")
def random_indices():
random.seed(42)
random_indices = sorted([random.randint(0, NUM_ROWS) for _ in range(NUM_INDICES)])
return random_indices


def drop_page_cache():
# Note: this will prompt the user for password, not ideal but simple
os.system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"')


@pytest.mark.benchmark
@pytest.mark.parametrize("compression", [None, "zstd"])
def test_random_access(tmp_path: Path, benchmark, random_indices, compression):
benchmark.group = f"random-access-{compression}"
test_data = generate_test_data(compression)
lance_path = str(tmp_path / "random_access.lance")

with LanceFileWriter(lance_path) as writer:
writer.write_batch(test_data)

def read_lance_file_random(lance_path, random_indices):
drop_page_cache()
reader = LanceFileReader(lance_path)
reader.take_rows(random_indices).to_table()

benchmark(read_lance_file_random, lance_path, random_indices)


@pytest.mark.benchmark
@pytest.mark.parametrize("compression", [None, "zstd"])
def test_full_scan(tmp_path: Path, benchmark, compression):
benchmark.group = f"full-scan-{compression}"
test_data = generate_test_data(compression)
lance_path = str(tmp_path / "full_scan.lance")

with LanceFileWriter(lance_path) as writer:
writer.write_batch(test_data)

def read_lance_file_full(lance_path):
drop_page_cache()
reader = LanceFileReader(lance_path)
reader.read_all().to_table()

benchmark(read_lance_file_full, lance_path)
32 changes: 32 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import os

import pyarrow as pa
import pyarrow.parquet as pq
import pytest
Expand Down Expand Up @@ -341,3 +343,33 @@ def test_writer_maintains_order(tmp_path):
reader = LanceFileReader(str(path))
result = reader.read_all().to_table()
assert result == table


def test_compression(tmp_path):
# 10Ki strings, which should be highly compressible, but not eligible for dictionary
compressible_strings = [f"compress_me_please-{i}" for i in range(10 * 1024)]
table_default = pa.table({"compressible_strings": compressible_strings})

schema_compress = pa.schema(
[
pa.field(
"compressible_strings",
pa.string(),
metadata={"lance-encoding:compression": "zstd"},
)
]
)
table_compress = pa.table(
{"compressible_strings": compressible_strings}, schema=schema_compress
)

with LanceFileWriter(str(tmp_path / "default.lance")) as writer:
writer.write_batch(table_default)

with LanceFileWriter(str(tmp_path / "compress.lance"), schema_compress) as writer:
writer.write_batch(table_compress)

size_default = os.path.getsize(tmp_path / "default.lance")
size_compress = os.path.getsize(tmp_path / "compress.lance")

assert size_compress < size_default
22 changes: 17 additions & 5 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use crate::{
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use std::collections::hash_map::RandomState;

pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression";

/// An encoded array
///
/// Maps to a single Arrow array
Expand Down Expand Up @@ -213,17 +215,25 @@ impl CoreArrayEncodingStrategy {
&& data_size > 4 * 1024 * 1024
}

fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionScheme> {
let compression = field_meta.get(COMPRESSION_META_KEY)?;
Some(compression.parse::<CompressionScheme>().unwrap())
}

fn default_binary_encoder(
arrays: &[ArrayRef],
data_type: &DataType,
field_meta: Option<&HashMap<String, String>>,
data_size: u64,
version: LanceFileVersion,
) -> Result<Box<dyn ArrayEncoder>> {
let bin_indices_encoder =
Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;

let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder));
if Self::can_use_fsst(data_type, data_size, version) {
let compression = field_meta.and_then(Self::get_field_compression);

let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, compression));
if compression.is_none() && Self::can_use_fsst(data_type, data_size, version) {
Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
} else {
Ok(bin_encoder)
Expand All @@ -236,7 +246,7 @@ impl CoreArrayEncodingStrategy {
data_size: u64,
use_dict_encoding: bool,
version: LanceFileVersion,
_field_meta: Option<&HashMap<String, String>>,
field_meta: Option<&HashMap<String, String>>,
) -> Result<Box<dyn ArrayEncoder>> {
match data_type {
DataType::FixedSizeList(inner, dimension) => {
Expand Down Expand Up @@ -306,10 +316,12 @@ impl CoreArrayEncodingStrategy {
FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
))))
} else {
Self::default_binary_encoder(arrays, data_type, data_size, version)
Self::default_binary_encoder(
arrays, data_type, field_meta, data_size, version,
)
}
} else {
Self::default_binary_encoder(arrays, data_type, data_size, version)
Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
}
}
DataType::Struct(fields) => {
Expand Down
34 changes: 28 additions & 6 deletions rust/lance-encoding/src/encodings/physical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use arrow_array::{PrimitiveArray, UInt64Array};
use arrow_schema::DataType;
use lance_core::Result;

use super::block_compress::{BufferCompressor, CompressionScheme, GeneralBufferCompressor};

struct IndicesNormalizer {
indices: Vec<u64>,
validity: BooleanBufferBuilder,
Expand Down Expand Up @@ -337,11 +339,22 @@ impl PrimitivePageDecoder for BinaryPageDecoder {
#[derive(Debug)]
pub struct BinaryEncoder {
indices_encoder: Box<dyn ArrayEncoder>,
compression_scheme: Option<CompressionScheme>,
buffer_compressor: Option<Box<dyn BufferCompressor>>,
}

impl BinaryEncoder {
pub fn new(indices_encoder: Box<dyn ArrayEncoder>) -> Self {
Self { indices_encoder }
pub fn new(
indices_encoder: Box<dyn ArrayEncoder>,
compression_scheme: Option<CompressionScheme>,
) -> Self {
let buffer_compressor = compression_scheme
.map(|scheme| GeneralBufferCompressor::get_compressor(&scheme.to_string()));
Self {
indices_encoder,
compression_scheme,
buffer_compressor,
}
}

// In 2.1 we will materialize nulls higher up (in the primitive encoder). Unfortunately,
Expand Down Expand Up @@ -438,7 +451,7 @@ impl ArrayEncoder for BinaryEncoder {
data_type: &DataType,
buffer_index: &mut u32,
) -> Result<EncodedArray> {
let (data, nulls) = match data {
let (mut data, nulls) = match data {
DataBlock::Nullable(nullable) => {
let data = nullable.data.as_variable_width().unwrap();
(data, Some(nullable.nulls))
Expand Down Expand Up @@ -467,6 +480,12 @@ impl ArrayEncoder for BinaryEncoder {

assert!(encoded_indices_data.bits_per_value <= 64);

if let Some(buffer_compressor) = &self.buffer_compressor {
let mut compressed_data = Vec::with_capacity(data.data.len());
buffer_compressor.compress(&data.data, &mut compressed_data)?;
data.data = LanceBuffer::Owned(compressed_data);
}

let data = DataBlock::VariableWidth(VariableWidthBlock {
bits_per_offset: encoded_indices_data.bits_per_value as u8,
offsets: encoded_indices_data.data,
Expand All @@ -476,9 +495,12 @@ impl ArrayEncoder for BinaryEncoder {

let bytes_buffer_index = *buffer_index;
*buffer_index += 1;
// TODO: Do we really need a "nested encoding" here?
let bytes_encoding =
ProtobufUtils::flat_encoding(/*bits_per_value=*/ 8, bytes_buffer_index, None);

let bytes_encoding = ProtobufUtils::flat_encoding(
/*bits_per_value=*/ 8,
bytes_buffer_index,
self.compression_scheme,
);

let encoding =
ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
Expand Down

0 comments on commit ef39518

Please sign in to comment.