Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of reading and writing parquet metadata outside the file #6081

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]


[[example]]
name = "external_metadata"
required-features = ["arrow", "async"]
path = "./examples/external_metadata.rs"

[[example]]
name = "read_parquet"
required-features = ["arrow"]
Expand Down
217 changes: 217 additions & 0 deletions parquet/examples/external_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_cast::pretty::pretty_format_batches;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;

/// This example demonstrates advanced usage of the Parquet metadata APIs.
///
/// # Usecase
///
/// 1. Read Parquet metadata from an existing Parquet file.
///
/// 2. Store that metadata somewhere other than the rest of the parquet data
/// (e.g. a cache) that is not necessarily in RAM
///
/// 3. Use the metadata to determine of the file should be read, and if so,
/// read the data stored in that Parquet file, without re-reading / reparsing
/// the metadata.
///
/// Note that Parquet metadata is not necessarily contiguous in the files: part
/// is stored in the footer (the last bytes of the file), but other portions
/// (such as the PageIndex) can be stored elsewhere,
///
/// You can use the these APIs to store a copy of the metadata for parquet files
/// stored on remote object storage (e.g. S3) in a local file or an in-memory
/// cache, use a query engine like DataFusion to analyze the metadata to
/// determine which file to read, and then read those files with a single
/// object store request.
///
/// # Specifically, this example:
/// 1. Reads the metadata of a Parquet file
/// 2. Removes some column statistics from the metadata (to make them smaller)
/// 3. Stores the metadata in a separate file
/// 4. Reads the metadata from the separate file and uses that to read the Parquet file
///
/// Without this API, to implement the functionality you would need to implement
/// a conversion of the `ParquetMetaData` and related structures to/from some
/// other structs that can be serialized/deserialized.

#[tokio::main(flavor = "current_thread")]
async fn main() -> parquet::errors::Result<()> {
let tempdir = TempDir::new().unwrap();
println!("data in {tempdir:?}");
let parquet_path = create_parquet_file(&tempdir);
let metadata_path = tempdir.path().join("thrift_metadata.dat");
// temp: don't clean up tempdir
std::mem::forget(tempdir);

let metadata = get_metadata_from_parquet_file(&parquet_path).await;
println!(
"Read metadata from Parquet file into memory: {} bytes",
metadata.memory_size()
);
let metadata = prepare_metadata(metadata);
write_metadata_to_file(metadata, &metadata_path);

// now read the metadata from the file and use it to read the Parquet file
let metadata = read_metadata_from_file(&metadata_path);
println!("Read metadata from file: {metadata:#?}");

let batches = read_parquet_file_with_metadata(&parquet_path, metadata);

// display the results
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
let batches_lines: Vec<_> = batches_string.split('\n').collect();

assert_eq!(
batches_lines,
[
"+-----+-------------+",
"| id | description |",
"+-----+-------------+",
"| 100 | oranges |",
"| 200 | apples |",
"| 201 | grapefruit |",
"| 300 | bannanas |",
"| 102 | grames |",
"| 33 | pears |",
"+-----+-------------+",
],
"actual output:\n\n{batches_lines:#?}"
);

Ok(())
}

/// Reads the metadata from a parquet file
async fn get_metadata_from_parquet_file(file: impl AsRef<Path>) -> ParquetMetaData {
// pretend we are reading the metadata from a remote object store
let file = std::fs::File::open(file).unwrap();
let file = tokio::fs::File::from_std(file);

// tell the reader to read the page index
let reader_options = ArrowReaderOptions::new().with_page_index(true);

let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, reader_options)
.await
.unwrap();

// The metadata is Arc'd -- so unwrap it after dropping the builder
let metadata = Arc::clone(builder.metadata());
drop(builder);
Arc::try_unwrap(metadata).unwrap()
}

/// modifies the metadata to reduce its size
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
// maybe we will do this
metadata
}

/// writes the metadata to a file
///
/// The data is stored using the same thrift format as the Parquet file metadata
fn write_metadata_to_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
let file = File::create(file).unwrap();
let writer = ParquetMetaDataWriter::new(file, &metadata);
alamb marked this conversation as resolved.
Show resolved Hide resolved
writer.finish().unwrap()
}

/// Reads the metadata from a file
///
/// This function reads the format written by `write_metadata_to_file`
fn read_metadata_from_file(file: impl AsRef<Path>) -> ParquetMetaData {
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
alamb marked this conversation as resolved.
Show resolved Hide resolved
.with_column_indexes(true)
.with_offset_indexes(true)
.parse_and_finish(&file)
.unwrap()
}

/// Reads the Parquet file using the metadata
///
/// This shows how to read the Parquet file using previously read metadata
/// instead of the metadata in the Parquet file itself. This avoids an IO /
/// having to fetch and decode the metadata from the Parquet file before
/// beginning to read it.
///
/// In this example, we read the results as Arrow record batches
fn read_parquet_file_with_metadata(
file: impl AsRef<Path>,
metadata: ParquetMetaData,
) -> Vec<RecordBatch> {
let file = std::fs::File::open(file).unwrap();
let options = ArrowReaderOptions::new()
// tell the reader to read the page index
.with_page_index(true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also kind of akward -- it would be great if the actual reading of the parquet metadata could do this...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean loading the page index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, what I was trying to get at was that since the ColumnIndex and OffsetIndex (aka the "Page index structures") are not store inline, decode_metadata doesn't read them -- the logic to do so is baked into this reader

https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to describe this more on #6002 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we now get rid of with_page_index? Didn't ParquetMetaDataReader already load it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can...it appears that options.page_index is only used in ArrowReaderMetadata::load, so it should have no effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is now pretty clear -- we still need to explicitly call for loading the page index, but it makes sense I think

We could potentially change the default to be "read the page index unless told not to"

// create a reader with pre-existing metadata
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();

reader.collect::<arrow::error::Result<Vec<_>>>().unwrap()
}

/// Make a new parquet file in the temporary directory, and returns the path
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
let path = tmpdir.path().join("example.parquet");
let new_file = File::create(&path).unwrap();

let batch = RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
),
(
"description",
Arc::new(StringArray::from(vec![
"oranges",
"apples",
"grapefruit",
"bannanas",
"grames",
"pears",
])),
),
])
.unwrap();

let props = WriterProperties::builder()
// ensure we write the page index level statistics
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();

writer.write(&batch).unwrap();
writer.finish().unwrap();

path
}
Loading