From b6454ab2c047e318d361ccdabbba2a3e67ffcaee Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Sep 2024 10:40:47 -0400 Subject: [PATCH] Example of reading and writing parquet metadata outside the file --- parquet/Cargo.toml | 6 + parquet/examples/external_metadata.rs | 217 ++++++++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 parquet/examples/external_metadata.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index b97b2a57164..3541888b947 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -118,6 +118,12 @@ zstd = ["dep:zstd", "zstd-sys"] # Display memory in example/write_parquet.rs sysinfo = ["dep:sysinfo"] + +[[example]] +name = "external_metadata" +required-features = ["arrow", "async"] +path = "./examples/external_metadata.rs" + [[example]] name = "read_parquet" required-features = ["arrow"] diff --git a/parquet/examples/external_metadata.rs b/parquet/examples/external_metadata.rs new file mode 100644 index 00000000000..72927e0a791 --- /dev/null +++ b/parquet/examples/external_metadata.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_cast::pretty::pretty_format_batches; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; + +/// This example demonstrates advanced usage of the Parquet metadata APIs. +/// +/// # Usecase +/// +/// 1. Read Parquet metadata from an existing Parquet file. +/// +/// 2. Store that metadata somewhere other than the rest of the parquet data +/// (e.g. a cache) that is not necessarily in RAM +/// +/// 3. Use the metadata to determine of the file should be read, and if so, +/// read the data stored in that Parquet file, without re-reading / reparsing +/// the metadata. +/// +/// Note that Parquet metadata is not necessarily contiguous in the files: part +/// is stored in the footer (the last bytes of the file), but other portions +/// (such as the PageIndex) can be stored elsewhere, +/// +/// You can use the these APIs to store a copy of the metadata for parquet files +/// stored on remote object storage (e.g. S3) in a local file or an in-memory +/// cache, use a query engine like DataFusion to analyze the metadata to +/// determine which file to read, and then read those files with a single +/// object store request. +/// +/// # Specifically, this example: +/// 1. Reads the metadata of a Parquet file +/// 2. Removes some column statistics from the metadata (to make them smaller) +/// 3. Stores the metadata in a separate file +/// 4. Reads the metadata from the separate file and uses that to read the Parquet file +/// +/// Without this API, to implement the functionality you would need to implement +/// a conversion of the `ParquetMetaData` and related structures to/from some +/// other structs that can be serialized/deserialized. + +#[tokio::main(flavor = "current_thread")] +async fn main() -> parquet::errors::Result<()> { + let tempdir = TempDir::new().unwrap(); + println!("data in {tempdir:?}"); + let parquet_path = create_parquet_file(&tempdir); + let metadata_path = tempdir.path().join("thrift_metadata.dat"); + // temp: don't clean up tempdir + std::mem::forget(tempdir); + + let metadata = get_metadata_from_parquet_file(&parquet_path).await; + println!( + "Read metadata from Parquet file into memory: {} bytes", + metadata.memory_size() + ); + let metadata = prepare_metadata(metadata); + write_metadata_to_file(metadata, &metadata_path); + + // now read the metadata from the file and use it to read the Parquet file + let metadata = read_metadata_from_file(&metadata_path); + println!("Read metadata from file: {metadata:#?}"); + + let batches = read_parquet_file_with_metadata(&parquet_path, metadata); + + // display the results + let batches_string = pretty_format_batches(&batches).unwrap().to_string(); + let batches_lines: Vec<_> = batches_string.split('\n').collect(); + + assert_eq!( + batches_lines, + [ + "+-----+-------------+", + "| id | description |", + "+-----+-------------+", + "| 100 | oranges |", + "| 200 | apples |", + "| 201 | grapefruit |", + "| 300 | bannanas |", + "| 102 | grames |", + "| 33 | pears |", + "+-----+-------------+", + ], + "actual output:\n\n{batches_lines:#?}" + ); + + Ok(()) +} + +/// Reads the metadata from a parquet file +async fn get_metadata_from_parquet_file(file: impl AsRef) -> ParquetMetaData { + // pretend we are reading the metadata from a remote object store + let file = std::fs::File::open(file).unwrap(); + let file = tokio::fs::File::from_std(file); + + // tell the reader to read the page index + let reader_options = ArrowReaderOptions::new().with_page_index(true); + + let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, reader_options) + .await + .unwrap(); + + // The metadata is Arc'd -- so unwrap it after dropping the builder + let metadata = Arc::clone(builder.metadata()); + drop(builder); + Arc::try_unwrap(metadata).unwrap() +} + +/// modifies the metadata to reduce its size +fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData { + // maybe we will do this + metadata +} + +/// writes the metadata to a file +/// +/// The data is stored using the same thrift format as the Parquet file metadata +fn write_metadata_to_file(metadata: ParquetMetaData, file: impl AsRef) { + let file = File::create(file).unwrap(); + let writer = ParquetMetaDataWriter::new(file, &metadata); + writer.finish().unwrap() +} + +/// Reads the metadata from a file +/// +/// This function reads the format written by `write_metadata_to_file` +fn read_metadata_from_file(file: impl AsRef) -> ParquetMetaData { + let file = File::open(file).unwrap(); + ParquetMetaDataReader::new() + .with_column_indexes(true) + .with_offset_indexes(true) + .parse_and_finish(&file) + .unwrap() +} + +/// Reads the Parquet file using the metadata +/// +/// This shows how to read the Parquet file using previously read metadata +/// instead of the metadata in the Parquet file itself. This avoids an IO / +/// having to fetch and decode the metadata from the Parquet file before +/// beginning to read it. +/// +/// In this example, we read the results as Arrow record batches +fn read_parquet_file_with_metadata( + file: impl AsRef, + metadata: ParquetMetaData, +) -> Vec { + let file = std::fs::File::open(file).unwrap(); + let options = ArrowReaderOptions::new() + // tell the reader to read the page index + .with_page_index(true); + // create a reader with pre-existing metadata + let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap(); + let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, arrow_reader_metadata) + .build() + .unwrap(); + + reader.collect::>>().unwrap() +} + +/// Make a new parquet file in the temporary directory, and returns the path +fn create_parquet_file(tmpdir: &TempDir) -> PathBuf { + let path = tmpdir.path().join("example.parquet"); + let new_file = File::create(&path).unwrap(); + + let batch = RecordBatch::try_from_iter(vec![ + ( + "id", + Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef, + ), + ( + "description", + Arc::new(StringArray::from(vec![ + "oranges", + "apples", + "grapefruit", + "bannanas", + "grames", + "pears", + ])), + ), + ]) + .unwrap(); + + let props = WriterProperties::builder() + // ensure we write the page index level statistics + .set_statistics_enabled(EnabledStatistics::Page) + .build(); + + let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap(); + + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + + path +}