diff --git a/build.gradle b/build.gradle index 25bf761b242c..d2b1ccccedfb 100644 --- a/build.gradle +++ b/build.gradle @@ -217,6 +217,7 @@ project(':iceberg-core') { exclude group: 'org.tukaani' // xz compression is not supported } + implementation 'io.airlift:aircompressor' implementation 'org.apache.httpcomponents.client5:httpclient5' implementation "com.fasterxml.jackson.core:jackson-databind" implementation "com.fasterxml.jackson.core:jackson-core" diff --git a/core/src/main/java/org/apache/iceberg/stats/BlobMetadata.java b/core/src/main/java/org/apache/iceberg/stats/BlobMetadata.java new file mode 100644 index 000000000000..5a16aae041bb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/BlobMetadata.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + +public class BlobMetadata { + private final String type; + private final Set columns; + private final long offset; + private final long length; + private final String compressionCodec; + + @JsonCreator + public BlobMetadata( + @JsonProperty("type") String type, + @JsonProperty("columns") Set columns, + @JsonProperty("offset") long offset, + @JsonProperty("length") long length, + @JsonProperty("compression_codec") @Nullable String compressionCodec) { + this.type = Objects.requireNonNull(type, "type is null"); + this.columns = ImmutableSet.copyOf(Objects.requireNonNull(columns, "columns is null")); + this.offset = offset; + this.length = length; + this.compressionCodec = compressionCodec; + } + + public String type() { + return type; + } + + public Set columns() { + return columns; + } + + /** + * Offset in the file + */ + public long offset() { + return offset; + } + + /** + * Length in the file + */ + public long length() { + return length; + } + + @Nullable + public String compressionCodec() { + return compressionCodec; + } +} diff --git a/core/src/main/java/org/apache/iceberg/stats/FileMetadata.java b/core/src/main/java/org/apache/iceberg/stats/FileMetadata.java new file mode 100644 index 000000000000..f59344e54f37 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/FileMetadata.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class FileMetadata { + private final List blobs; + private final Map properties; + + @JsonCreator + public FileMetadata( + @JsonProperty("blobs") List blobs, + @JsonProperty("properties") Map properties) { + this.blobs = ImmutableList.copyOf(Objects.requireNonNull(blobs, "blobs is null")); + this.properties = ImmutableMap.copyOf(Objects.requireNonNull(properties, "properties is null")); + } + + public List blobs() { + return blobs; + } + + public Map properties() { + return properties; + } +} diff --git a/core/src/main/java/org/apache/iceberg/stats/FileMetadataParser.java b/core/src/main/java/org/apache/iceberg/stats/FileMetadataParser.java new file mode 100644 index 000000000000..a69dd9e2bbbb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/FileMetadataParser.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.JsonUtil; + +public final class FileMetadataParser { + + private FileMetadataParser() { + } + + private static final String BLOBS = "blobs"; + private static final String PROPERTIES = "properties"; + + private static final String TYPE = "type"; + private static final String COLUMNS = "columns"; + private static final String OFFSET = "offset"; + private static final String LENGTH = "length"; + private static final String COMPRESSION_CODEC = "compression_codec"; + + public static String toJson(FileMetadata fileMetadata) { + try { + StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + generator.useDefaultPrettyPrinter(); + toJson(fileMetadata, generator); + generator.flush(); + return writer.toString(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to write json for: " + fileMetadata, e); + } + } + + public static FileMetadata fromJson(String json) { + try { + return fromJson(JsonUtil.mapper().readValue(json, JsonNode.class)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static FileMetadata fromJson(JsonNode json) { + return fileMetadataFromJson(json); + } + + static void toJson(FileMetadata fileMetadata, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + + generator.writeArrayFieldStart(BLOBS); + for (BlobMetadata blobMetadata : fileMetadata.blobs()) { + toJson(blobMetadata, generator); + } + generator.writeEndArray(); + + generator.writeObjectFieldStart(PROPERTIES); + for (Map.Entry entry : fileMetadata.properties().entrySet()) { + generator.writeStringField(entry.getKey(), entry.getValue()); + } + generator.writeEndObject(); + + generator.writeEndObject(); + } + + static FileMetadata fileMetadataFromJson(JsonNode json) { + + ImmutableList.Builder blobs = ImmutableList.builder(); + JsonNode blobsJson = json.get(BLOBS); + Preconditions.checkArgument(blobsJson != null && blobsJson.isArray(), + "Cannot parse blobs from non-array: %s", blobsJson); + for (JsonNode blobJson : blobsJson) { + blobs.add(blobMetadataFromJson(blobJson)); + } + + Map properties = ImmutableMap.of(); + JsonNode propertiesJson = json.get(PROPERTIES); + if (propertiesJson != null) { + properties = JsonUtil.getStringMap(PROPERTIES, json); + } + + return new FileMetadata( + blobs.build(), + properties); + } + + static void toJson(BlobMetadata blobMetadata, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + + generator.writeStringField(TYPE, blobMetadata.type()); + + generator.writeArrayFieldStart(COLUMNS); + for (int column : blobMetadata.columns()) { + generator.writeNumber(column); + } + generator.writeEndArray(); + + generator.writeNumberField(OFFSET, blobMetadata.offset()); + generator.writeNumberField(LENGTH, blobMetadata.length()); + + if (blobMetadata.compressionCodec() != null) { + generator.writeStringField(COMPRESSION_CODEC, blobMetadata.compressionCodec()); + } + + generator.writeEndObject(); + } + + static BlobMetadata blobMetadataFromJson(JsonNode json) { + String type = JsonUtil.getString(TYPE, json); + Set columns = JsonUtil.getIntegerSet(COLUMNS, json); + long offset = JsonUtil.getLong(OFFSET, json); + long length = JsonUtil.getLong(LENGTH, json); + String compressionCodec = JsonUtil.getStringOrNull(COMPRESSION_CODEC, json); + + return new BlobMetadata( + type, + columns, + offset, + length, + compressionCodec); + } +} diff --git a/core/src/main/java/org/apache/iceberg/stats/StandardBlobTypes.java b/core/src/main/java/org/apache/iceberg/stats/StandardBlobTypes.java new file mode 100644 index 000000000000..7a76bca94a13 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/StandardBlobTypes.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +public final class StandardBlobTypes { + private StandardBlobTypes() { + } + + /** + * 8-bytes integer stored little-endian and representing number of distinct values + */ + public static final String NDV_LONG_LITTLE_ENDIAN = "ndv-long-little-endian"; + + /** + * A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library + */ + public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; +} diff --git a/core/src/main/java/org/apache/iceberg/stats/StatsCompressionCodec.java b/core/src/main/java/org/apache/iceberg/stats/StatsCompressionCodec.java new file mode 100644 index 000000000000..5bac16377779 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/StatsCompressionCodec.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import java.util.Objects; + +public enum StatsCompressionCodec { + /** + * LZ4 single compression frame with content size present + */ + LZ4("lz4"), + + /** + * Zstandard single compression frame with content size present + */ + ZSTD("zstd"), + /**/; + + private final String codecName; + + StatsCompressionCodec(String codecName) { + this.codecName = codecName; + } + + public String getCodecName() { + return codecName; + } + + public static StatsCompressionCodec forName(String codecName) { + Objects.requireNonNull(codecName, "codecName is null"); + for (StatsCompressionCodec value : values()) { + if (value.getCodecName().equals(codecName)) { + return value; + } + } + throw new IllegalArgumentException("Unrecognized codec name " + codecName); + } +} diff --git a/core/src/main/java/org/apache/iceberg/stats/StatsFormat.java b/core/src/main/java/org/apache/iceberg/stats/StatsFormat.java new file mode 100644 index 000000000000..f98e16319b0d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/StatsFormat.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import io.airlift.compress.Compressor; +import io.airlift.compress.zstd.ZstdCompressor; +import io.airlift.compress.zstd.ZstdDecompressor; +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +final class StatsFormat { + private StatsFormat() { + } + + static final int CURRENT_FORMAT_VERSION = 1; + + static final int MAGIC_AS_NUMBER_LE = new BigInteger(swap(getMagic())).intValueExact(); + + static final int SUPPORTED_FLAGS = 0b1; + static final int FLAG_COMPRESSED = 0b1; + + static byte[] getMagic() { + return new byte[] {0x50, 0x46, 0x49, 0x53}; + } + + static void writeIntegerLittleEndian(OutputStream outputStream, int value) throws IOException { + outputStream.write(0xFF & value); + outputStream.write(0xFF & value >> 8); + outputStream.write(0xFF & value >> 16); + outputStream.write(0xFF & value >> 24); + } + + static int readIntegerLittleEndian(byte[] data, int offset) { + return Byte.toUnsignedInt(data[offset]) | + (Byte.toUnsignedInt(data[offset + 1]) << 8) | + (Byte.toUnsignedInt(data[offset + 2]) << 16) | + (Byte.toUnsignedInt(data[offset + 3]) << 24); + } + + static ByteBuffer compressFooterPayload(ByteBuffer payload) { + return compress(StatsCompressionCodec.LZ4, payload); + } + + static ByteBuffer decompressFooterPayload(ByteBuffer footer) { + return decompress(StatsCompressionCodec.LZ4, footer); + } + + static ByteBuffer compressBlob(StatsCompressionCodec codec, ByteBuffer data) { + return compress(codec, data); + } + + static ByteBuffer decompressBlob(StatsCompressionCodec codec, ByteBuffer data) { + return decompress(codec, data); + } + + private static ByteBuffer compress(StatsCompressionCodec codec, ByteBuffer input) { + Compressor compressor = getCompressor(codec); + ByteBuffer output = ByteBuffer.allocate(compressor.maxCompressedLength(input.remaining())); + compressor.compress(input, output); + output.flip(); + return output; + } + + private static Compressor getCompressor(StatsCompressionCodec codec) { + switch (codec) { + case LZ4: + // TODO currently not supported + break; + case ZSTD: + return new ZstdCompressor(); + } + throw new UnsupportedOperationException("Unsupported codec: " + codec); + } + + private static ByteBuffer decompress(StatsCompressionCodec codec, ByteBuffer input) { + switch (codec) { + case LZ4: + // TODO requires LZ4 frame decompressor, e.g. https://github.com/airlift/aircompressor/pull/142 + break; + + case ZSTD: { + byte[] inputBytes; + int inputOffset; + int inputLength; + if (input.hasArray()) { + inputBytes = input.array(); + inputOffset = input.arrayOffset(); + inputLength = input.remaining(); + } else { + // TODO implement ZstdDecompressor.getDecompressedSize for ByteBuffer to avoid copying + inputBytes = new byte[input.remaining()]; + input.get(inputBytes); + inputOffset = 0; + inputLength = inputBytes.length; + } + byte[] decompressed = + new byte[Math.toIntExact(ZstdDecompressor.getDecompressedSize(inputBytes, inputOffset, inputLength))]; + int decompressedLength = + new ZstdDecompressor().decompress( + inputBytes, + inputOffset, + inputLength, + decompressed, + 0, + decompressed.length); + Preconditions.checkState(decompressedLength == decompressed.length, "Invalid decompressed length"); + return ByteBuffer.wrap(decompressed); + } + } + + throw new UnsupportedOperationException("Unsupported codec: " + codec); + } + + private static byte[] swap(byte[] bytes) { + byte[] swapped = new byte[bytes.length]; + for (int i = 0; i < swapped.length; i++) { + swapped[i] = bytes[swapped.length - i - 1]; + } + return swapped; + } +} diff --git a/core/src/main/java/org/apache/iceberg/stats/StatsReader.java b/core/src/main/java/org/apache/iceberg/stats/StatsReader.java new file mode 100644 index 000000000000..7344d34ccf0f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/StatsReader.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap.SimpleEntry; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; + +public class StatsReader implements Closeable { + + private final long fileLength; + private final SeekableInputStream input; + private Optional knownFooterSize; + private Optional knownFileMetadata = Optional.empty(); + + public StatsReader(InputFile inputFile, Optional footerSize) { + this.fileLength = inputFile.getLength(); + this.input = Objects.requireNonNull(inputFile, "inputFile is null").newStream(); + this.knownFooterSize = Objects.requireNonNull(footerSize, "footerSize is null"); + footerSize.ifPresent(size -> + Preconditions.checkArgument(0 < size && size <= fileLength - StatsFormat.getMagic().length, + "Invalid footer size: %s", size)); + } + + public FileMetadata getFileMetadata() throws IOException { + if (!knownFileMetadata.isPresent()) { + int footerSize = getFooterSize(); + byte[] footer = readInput(fileLength - footerSize, footerSize); + + checkMagic(footer, 0); + checkMagic(footer, footerSize - 4); + + int fileFormatVersion = StatsFormat.readIntegerLittleEndian(footer, footerSize - 8); + Preconditions.checkState( + fileFormatVersion == StatsFormat.CURRENT_FORMAT_VERSION, + "Unsupported format version %s, %s is the latest supported", + fileFormatVersion, + StatsFormat.CURRENT_FORMAT_VERSION); + + int flags = StatsFormat.readIntegerLittleEndian(footer, footerSize - 12); + Preconditions.checkState((flags & ~StatsFormat.SUPPORTED_FLAGS) == 0, "Unsupported flags: %s", flags); + boolean compressed = isFlagSet(flags, StatsFormat.FLAG_COMPRESSED); + + int reserved = StatsFormat.readIntegerLittleEndian(footer, footerSize - 16); + Preconditions.checkState(reserved == 0, "Unexpected reserved bytes value: %s", reserved); + + int footerPayloadSize = StatsFormat.readIntegerLittleEndian(footer, footerSize - 20); + Preconditions.checkState(footerPayloadSize == footerSize - 24, + "Unexpected footer payload size value %s for footer size %s", footerPayloadSize, footerSize); + + FileMetadata readFileMetadata; + ByteBuffer footerPayload = ByteBuffer.wrap(footer, 4, footerPayloadSize); + if (!compressed) { + readFileMetadata = parseFileMetadata(footerPayload); + } else { + ByteBuffer footerJson = StatsFormat.decompressFooterPayload(footerPayload); + readFileMetadata = parseFileMetadata(footerJson); + } + this.knownFileMetadata = Optional.of(readFileMetadata); + } + return knownFileMetadata.get(); + } + + /** + * @throws IOException when I/O error occurs + * @throws UncheckedIOException when I/O error occurs + */ + // Note: The method is marked as throwing IOException to allow future implementation evolution. + // Currently, the exception isn't being thrown. + public Stream> readAll(List blobs) throws IOException { + if (blobs.isEmpty()) { + return Stream.empty(); + } + + // TODO inspect blob offsets and coalesce read regions close to each other + + return blobs.stream() + .sorted(Comparator.comparingLong(BlobMetadata::offset)) + .map((BlobMetadata blobMetadata) -> { + try { + input.seek(blobMetadata.offset()); + byte[] bytes = new byte[Math.toIntExact(blobMetadata.length())]; + ByteStreams.readFully(input, bytes); + ByteBuffer data = ByteBuffer.wrap(bytes); + if (blobMetadata.compressionCodec() != null) { + StatsCompressionCodec codec = StatsCompressionCodec.forName(blobMetadata.compressionCodec()); + data = StatsFormat.decompressBlob(codec, data); + } + return new SimpleEntry<>(blobMetadata, data); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + private static void checkMagic(byte[] data, int offset) { + int read = StatsFormat.readIntegerLittleEndian(data, offset); + if (read != StatsFormat.MAGIC_AS_NUMBER_LE) { + throw new IllegalStateException(String.format( + "Invalid file: expected magic at offset %s (%s i.e. %s as integer) but got %s", + offset, Arrays.toString(StatsFormat.getMagic()), StatsFormat.MAGIC_AS_NUMBER_LE, read)); + } + } + + private static boolean isFlagSet(int setFlags, int testedFlag) { + return (setFlags & testedFlag) == testedFlag; + } + + private int getFooterSize() throws IOException { + if (!knownFooterSize.isPresent()) { + int footerTailSize = Math.toIntExact(Math.min(fileLength, 20)); + byte[] footerTail = readInput(fileLength - footerTailSize, footerTailSize); + + checkMagic(footerTail, footerTailSize - 4); + + int fileFormatVersion = StatsFormat.readIntegerLittleEndian(footerTail, footerTailSize - 8); + Preconditions.checkState( + fileFormatVersion == StatsFormat.CURRENT_FORMAT_VERSION, + "Unsupported format version %s, %s is the latest supported", + fileFormatVersion, + StatsFormat.CURRENT_FORMAT_VERSION); + + int footerPayloadSize = StatsFormat.readIntegerLittleEndian(footerTail, footerTailSize - 20); + knownFooterSize = Optional.of(footerPayloadSize + 24); + } + return knownFooterSize.get(); + } + + private byte[] readInput(long offset, int length) throws IOException { + input.seek(offset); + byte[] data = new byte[length]; + ByteStreams.readFully(input, data); + return data; + } + + private static FileMetadata parseFileMetadata(ByteBuffer data) { + String footerJson = StandardCharsets.UTF_8.decode(data).toString(); + return FileMetadataParser.fromJson(footerJson); + } + + @Override + public void close() throws IOException { + input.close(); + knownFooterSize = Optional.empty(); + knownFileMetadata = Optional.empty(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/stats/StatsWriter.java b/core/src/main/java/org/apache/iceberg/stats/StatsWriter.java new file mode 100644 index 000000000000..58773a8cd5aa --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/stats/StatsWriter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class StatsWriter implements Closeable { + + private final PositionOutputStream outputStream; + + private final Map properties = Maps.newHashMap(); + private final List blobs = Lists.newArrayList(); + private boolean compressFooter = false; // TODO compress footer by default when LZ4 support is added + + private boolean headerWritten; + private boolean finished; + private Optional footerSize = Optional.empty(); + + public StatsWriter(OutputFile outputFile) { + Objects.requireNonNull(outputFile, "outputFile is null"); + this.outputStream = outputFile.create(); + } + + public void addFileProperty(String name, String value) { + Objects.requireNonNull(name, "name is null"); + Objects.requireNonNull(value, "value is null"); + + if (properties.putIfAbsent(name, value) != null) { + throw new IllegalStateException(String.format("Property '%s' already set", name)); + } + } + public void append( + String type, + Set columnsCovered, + ByteBuffer blobData, + Optional compression) throws IOException { + checkNotFinished(); + writeHeaderIfNeeded(); + + Objects.requireNonNull(type, "type is null"); + long fileOffset = outputStream.getPos(); + ByteBuffer data; + data = compression.map(codec -> StatsFormat.compressBlob(codec, blobData)) + .orElse(blobData); + int length = data.remaining(); + Channels.newChannel(outputStream).write(data); + @Nullable String codecName = compression.map(StatsCompressionCodec::getCodecName).orElse(null); + blobs.add(new BlobMetadata(type, columnsCovered, fileOffset, length, codecName)); + } + + public void setCompressFooter(boolean compressFooter) { + this.compressFooter = compressFooter; + } + + @Override + public void close() throws IOException { + if (!finished) { + finish(); + } + + outputStream.close(); + } + + private void writeHeaderIfNeeded() throws IOException { + if (headerWritten) { + return; + } + + outputStream.write(StatsFormat.getMagic()); + headerWritten = true; + } + + public void finish() throws IOException { + checkNotFinished(); + writeHeaderIfNeeded(); + if (footerSize.isPresent()) { + throw new IllegalStateException("footerSize already set"); + } + + FileMetadata fileMetadata = new FileMetadata(blobs, properties); + ByteBuffer footerJson = ByteBuffer.wrap(FileMetadataParser.toJson(fileMetadata).getBytes(StandardCharsets.UTF_8)); + ByteBuffer footerPayload = compressFooter ? StatsFormat.compressFooterPayload(footerJson) : footerJson; + long footerOffset = outputStream.getPos(); + byte[] magic = StatsFormat.getMagic(); + outputStream.write(magic); + int footerPayloadLength = Channels.newChannel(outputStream).write(footerPayload); + StatsFormat.writeIntegerLittleEndian(outputStream, footerPayloadLength); + StatsFormat.writeIntegerLittleEndian(outputStream, 0); // Reserved + StatsFormat.writeIntegerLittleEndian(outputStream, getFileFlags()); + StatsFormat.writeIntegerLittleEndian(outputStream, StatsFormat.CURRENT_FORMAT_VERSION); + outputStream.write(magic); + + footerSize = Optional.of(Math.toIntExact(outputStream.getPos() - footerOffset)); + finished = true; + } + + public long getFooterSize() { + return footerSize.orElseThrow(() -> new IllegalStateException("Footer not written yet")); + } + + private int getFileFlags() { + int flags = 0; + if (compressFooter) { + flags |= StatsFormat.FLAG_COMPRESSED; + } + return flags; + } + + private void checkNotFinished() { + if (finished) { + throw new IllegalStateException("Writer already finished"); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 249b07992ce6..14bb1c27a2f0 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -162,7 +162,10 @@ public static Set getIntegerSetOrNull(String property, JsonNode node) { if (!node.has(property) || node.get(property).isNull()) { return null; } + return getIntegerSet(property, node); + } + public static Set getIntegerSet(String property, JsonNode node) { return ImmutableSet.builder() .addAll(new JsonIntegerArrayIterator(property, node)) .build(); diff --git a/core/src/test/java/org/apache/iceberg/stats/StatsFormatTestUtil.java b/core/src/test/java/org/apache/iceberg/stats/StatsFormatTestUtil.java new file mode 100644 index 000000000000..254143f30341 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/stats/StatsFormatTestUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import java.io.InputStream; +import java.net.URL; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.io.ByteStreams; + +public final class StatsFormatTestUtil { + private StatsFormatTestUtil() { + } + + // footer size for v1/sample-metric-data-uncompressed.bin + public static final int STATS_FILE_EMPTY_UNCOMPRESSED_FOOTER_SIZE = 65; + + static byte[] readTestResource(String resourceName) throws Exception { + // TODO simplify using com.google.common.io.Resources once it's available in org.apache.iceberg.relocated.* + // Resources.toByteArray(Resources.getResource(StatsFormatTestUtil.class, resourceName)) + + URL url = StatsFormatTestUtil.class.getResource(resourceName); + Preconditions.checkState(url != null, "Resource %s not found", resourceName); + try (InputStream inputStream = url.openStream()) { + return ByteStreams.toByteArray(inputStream); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/stats/TestFileMetadataParser.java b/core/src/test/java/org/apache/iceberg/stats/TestFileMetadataParser.java new file mode 100644 index 000000000000..db68bb8e7807 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/stats/TestFileMetadataParser.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestFileMetadataParser { + @Test + public void testJson() { + // minimal + testJsonSerialization( + new FileMetadata(ImmutableList.of(), ImmutableMap.of()), + "{\n" + + " \"blobs\" : [ ],\n" + + " \"properties\" : { }\n" + + "}"); + + testJsonSerialization( + new FileMetadata( + ImmutableList.of( + new BlobMetadata("type-a", ImmutableSet.of(1), 4, 16, null), + new BlobMetadata("type-bbb", ImmutableSet.of(2, 3, 4), Integer.MAX_VALUE * 10000L, 79834, null)), + ImmutableMap.of("a property", "a property value", "another one", "also with value")), + "{\n" + + " \"blobs\" : [ {\n" + + " \"type\" : \"type-a\",\n" + + " \"columns\" : [ 1 ],\n" + + " \"offset\" : 4,\n" + + " \"length\" : 16\n" + + " }, {\n" + + " \"type\" : \"type-bbb\",\n" + + " \"columns\" : [ 2, 3, 4 ],\n" + + " \"offset\" : 21474836470000,\n" + + " \"length\" : 79834\n" + + " } ],\n" + + " \"properties\" : {\n" + + " \"a property\" : \"a property value\",\n" + + " \"another one\" : \"also with value\"\n" + + " }\n" + + "}"); + } + + private void testJsonSerialization(FileMetadata fileMetadata, String json) { + assertThat(FileMetadataParser.toJson(fileMetadata)) + .isEqualTo(json); + + // Test round-trip. Note that FileMetadata doesn't implement equals() + FileMetadata parsed = FileMetadataParser.fromJson(json); + assertThat(FileMetadataParser.toJson(parsed)) + .isEqualTo(json); + } +} diff --git a/core/src/test/java/org/apache/iceberg/stats/TestStatsFormat.java b/core/src/test/java/org/apache/iceberg/stats/TestStatsFormat.java new file mode 100644 index 000000000000..1649eedee305 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/stats/TestStatsFormat.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import java.io.ByteArrayOutputStream; +import org.junit.Test; + +import static org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument; +import static org.apache.iceberg.stats.StatsFormat.readIntegerLittleEndian; +import static org.apache.iceberg.stats.StatsFormat.writeIntegerLittleEndian; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestStatsFormat { + @Test + public void testWriteIntegerLittleEndian() throws Exception { + testWriteIntegerLittleEndian(0, bytes(0, 0, 0, 0)); + testWriteIntegerLittleEndian(42, bytes(42, 0, 0, 0)); + testWriteIntegerLittleEndian(Integer.MAX_VALUE - 5, bytes(0xFa, 0xFF, 0xFF, 0x7F)); + testWriteIntegerLittleEndian(-7, bytes(0xF9, 0xFF, 0xFF, 0xFF)); + } + + private void testWriteIntegerLittleEndian(int value, byte[] expected) throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + writeIntegerLittleEndian(outputStream, value); + assertThat(outputStream.toByteArray()).isEqualTo(expected); + } + + @Test + public void testReadIntegerLittleEndian() { + assertThat(readIntegerLittleEndian(bytes(0, 0, 0, 0), 0)).isEqualTo(0); + assertThat(readIntegerLittleEndian(bytes(42, 0, 0, 0), 0)).isEqualTo(42); + assertThat(readIntegerLittleEndian(bytes(13, 42, 0, 0, 0, 14), 1)).isEqualTo(42); + assertThat(readIntegerLittleEndian(bytes(13, 0xFa, 0xFF, 0xFF, 0x7F, 14), 1)).isEqualTo(Integer.MAX_VALUE - 5); + assertThat(readIntegerLittleEndian(bytes(13, 0xF9, 0xFF, 0xFF, 0xFF, 14), 1)).isEqualTo(-7); + } + + private byte[] bytes(int... unsignedBytes) { + byte[] bytes = new byte[unsignedBytes.length]; + for (int i = 0; i < unsignedBytes.length; i++) { + int value = unsignedBytes[i]; + checkArgument(0 <= value && value <= 0xFF, "Invalid value: %s", value); + bytes[i] = (byte) value; + } + return bytes; + } +} diff --git a/core/src/test/java/org/apache/iceberg/stats/TestStatsReader.java b/core/src/test/java/org/apache/iceberg/stats/TestStatsReader.java new file mode 100644 index 000000000000..75f968494bab --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/stats/TestStatsReader.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import org.apache.iceberg.io.InMemoryInputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.toImmutableMap; +import static org.apache.iceberg.stats.StatsCompressionCodec.ZSTD; +import static org.apache.iceberg.stats.StatsFormatTestUtil.STATS_FILE_EMPTY_UNCOMPRESSED_FOOTER_SIZE; +import static org.apache.iceberg.stats.StatsFormatTestUtil.readTestResource; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestStatsReader { + @Test + public void testEmptyFooterUncompressed() throws Exception { + testEmpty("v1/empty-stats-uncompressed.bin", Optional.of(STATS_FILE_EMPTY_UNCOMPRESSED_FOOTER_SIZE)); + } + + @Test + public void testEmptyWithUnknownFooterSize() throws Exception { + testEmpty("v1/empty-stats-uncompressed.bin", Optional.empty()); + } + + private void testEmpty(String resourceName, Optional footerSize) throws Exception { + InMemoryInputFile inputFile = new InMemoryInputFile(readTestResource(resourceName)); + try (StatsReader reader = new StatsReader(inputFile, footerSize)) { + FileMetadata fileMetadata = reader.getFileMetadata(); + assertThat(fileMetadata.properties()).as("file properties") + .isEqualTo(ImmutableMap.of()); + assertThat(fileMetadata.blobs()).as("blob list") + .isEmpty(); + } + } + + @Test + public void testReadMetricDataUncompressed() throws Exception { + testReadMetricData("v1/sample-metric-data-uncompressed.bin", Optional.empty()); + } + + @Test + public void testReadMetricDataCompressedZstd() throws Exception { + testReadMetricData("v1/sample-metric-data-compressed-zstd.bin", Optional.of(ZSTD)); + } + + private void testReadMetricData(String resourceName, Optional expectedCodec) throws Exception { + InMemoryInputFile inputFile = new InMemoryInputFile(readTestResource(resourceName)); + try (StatsReader reader = new StatsReader(inputFile, Optional.empty())) { + FileMetadata fileMetadata = reader.getFileMetadata(); + assertThat(fileMetadata.properties()).as("file properties") + .isEqualTo(ImmutableMap.of("writer.version", "1234")); + assertThat(fileMetadata.blobs()).as("blob list") + .hasSize(2); + + BlobMetadata blob0Metadata = fileMetadata.blobs().get(0); + assertThat(blob0Metadata.type()).as("type").isEqualTo("some-blob"); + assertThat(blob0Metadata.columns()).as("columns").isEqualTo(ImmutableSet.of(1)); + assertThat(blob0Metadata.offset()).as("offset").isEqualTo(4); + assertThat(blob0Metadata.compressionCodec()).as("compression codec") + .isEqualTo(expectedCodec.map(StatsCompressionCodec::getCodecName).orElse(null)); + + BlobMetadata blob1Metadata = fileMetadata.blobs().get(1); + assertThat(blob1Metadata.type()).as("type").isEqualTo("some-other-blob"); + assertThat(blob1Metadata.columns()).as("columns").isEqualTo(ImmutableSet.of(2)); + assertThat(blob1Metadata.offset()).as("offset") + .isEqualTo(blob0Metadata.offset() + blob0Metadata.length()); + assertThat(blob1Metadata.compressionCodec()).as("compression codec") + .isEqualTo(expectedCodec.map(StatsCompressionCodec::getCodecName).orElse(null)); + + Map read = reader.readAll(ImmutableList.of(blob0Metadata, blob1Metadata)) + .collect(toImmutableMap(Entry::getKey, entry -> getBytes(entry.getValue()))); + + assertThat(read).as("read") + .containsOnlyKeys(blob0Metadata, blob1Metadata) + .containsEntry(blob0Metadata, "abcdefghi".getBytes(UTF_8)) + .containsEntry( + blob1Metadata, + "some blob \u0000 binary data 🤯 that is not very very very very very very long, is it?".getBytes(UTF_8)); + } + } + + private static byte[] getBytes(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } +} diff --git a/core/src/test/java/org/apache/iceberg/stats/TestStatsWriter.java b/core/src/test/java/org/apache/iceberg/stats/TestStatsWriter.java new file mode 100644 index 000000000000..d657cc90c9b3 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/stats/TestStatsWriter.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.stats; + +import java.nio.ByteBuffer; +import java.util.Optional; +import org.apache.iceberg.io.InMemoryOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.stats.StatsCompressionCodec.ZSTD; +import static org.apache.iceberg.stats.StatsFormatTestUtil.STATS_FILE_EMPTY_UNCOMPRESSED_FOOTER_SIZE; +import static org.apache.iceberg.stats.StatsFormatTestUtil.readTestResource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestStatsWriter { + @Test + public void testEmptyFooterCompressed() { + InMemoryOutputFile outputFile = new InMemoryOutputFile(); + StatsWriter writer = new StatsWriter(outputFile); + writer.setCompressFooter(true); + assertThatThrownBy(writer::getFooterSize) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Footer not written yet"); + assertThatThrownBy(writer::finish) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Unsupported codec: LZ4"); + assertThatThrownBy(writer::close) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Unsupported codec: LZ4"); + } + + @Test + public void testEmptyFooterUncompressed() throws Exception { + InMemoryOutputFile outputFile = new InMemoryOutputFile(); + StatsWriter writer = new StatsWriter(outputFile); + writer.setCompressFooter(false); + assertThatThrownBy(writer::getFooterSize) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Footer not written yet"); + writer.finish(); + assertThat(writer.getFooterSize()).isEqualTo(STATS_FILE_EMPTY_UNCOMPRESSED_FOOTER_SIZE); + writer.close(); + assertThat(outputFile.toByteArray()) + .isEqualTo(readTestResource("v1/empty-stats-uncompressed.bin")); + // getFooterSize is still accessible after close() + assertThat(writer.getFooterSize()).isEqualTo(STATS_FILE_EMPTY_UNCOMPRESSED_FOOTER_SIZE); + } + + @Test + public void testImplicitFinish() throws Exception { + InMemoryOutputFile outputFile = new InMemoryOutputFile(); + StatsWriter writer = new StatsWriter(outputFile); + writer.setCompressFooter(false); + writer.close(); + assertThat(outputFile.toByteArray()) + .isEqualTo(readTestResource("v1/empty-stats-uncompressed.bin")); + assertThat(writer.getFooterSize()).isEqualTo(STATS_FILE_EMPTY_UNCOMPRESSED_FOOTER_SIZE); + } + + @Test + public void testWriteMetricDataUncompressed() throws Exception { + testWriteMetric(Optional.empty(), "v1/sample-metric-data-uncompressed.bin"); + } + + @Test + public void testWriteMetricDataCompressedZstd() throws Exception { + testWriteMetric(Optional.of(ZSTD), "v1/sample-metric-data-compressed-zstd.bin"); + } + + private void testWriteMetric(Optional compression, String expectedResource) throws Exception { + InMemoryOutputFile outputFile = new InMemoryOutputFile(); + try (StatsWriter writer = new StatsWriter(outputFile)) { + writer.addFileProperty("writer.version", "1234"); + + writer.append("some-blob", ImmutableSet.of(1), ByteBuffer.wrap("abcdefghi".getBytes(UTF_8)), compression); + + // "xxx"s are stripped away by data offsets + byte[] bytes = + "xxx some blob \u0000 binary data 🤯 that is not very very very very very very long, is it? xxx".getBytes( + UTF_8); + writer.append("some-other-blob", ImmutableSet.of(2), ByteBuffer.wrap(bytes, 4, bytes.length - 8), compression); + } + + byte[] expected = readTestResource(expectedResource); + assertThat(outputFile.toByteArray()) + .isEqualTo(expected); + } +} diff --git a/core/src/test/resources/org/apache/iceberg/stats/v1/empty-stats-uncompressed.bin b/core/src/test/resources/org/apache/iceberg/stats/v1/empty-stats-uncompressed.bin new file mode 100644 index 000000000000..c5d8c9c22359 Binary files /dev/null and b/core/src/test/resources/org/apache/iceberg/stats/v1/empty-stats-uncompressed.bin differ diff --git a/core/src/test/resources/org/apache/iceberg/stats/v1/sample-metric-data-compressed-zstd.bin b/core/src/test/resources/org/apache/iceberg/stats/v1/sample-metric-data-compressed-zstd.bin new file mode 100644 index 000000000000..f7a6450bed23 Binary files /dev/null and b/core/src/test/resources/org/apache/iceberg/stats/v1/sample-metric-data-compressed-zstd.bin differ diff --git a/core/src/test/resources/org/apache/iceberg/stats/v1/sample-metric-data-uncompressed.bin b/core/src/test/resources/org/apache/iceberg/stats/v1/sample-metric-data-uncompressed.bin new file mode 100644 index 000000000000..43eb435a6b67 Binary files /dev/null and b/core/src/test/resources/org/apache/iceberg/stats/v1/sample-metric-data-uncompressed.bin differ diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 14d4d287c4a7..3cdad4e44d99 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -21,6 +21,7 @@ gradle/* .*\.svg .*\.lock .*\.json +.*\.bin package-list sitemap.xml derby.log diff --git a/versions.props b/versions.props index 72605f3f82d6..19229b097ae9 100644 --- a/versions.props +++ b/versions.props @@ -17,6 +17,7 @@ com.github.ben-manes.caffeine:caffeine = 2.8.4 org.apache.arrow:arrow-vector = 7.0.0 org.apache.arrow:arrow-memory-netty = 7.0.0 org.roaringbitmap:RoaringBitmap = 0.9.22 +io.airlift:aircompressor = 0.21 io.netty:netty-buffer = 4.1.68.Final com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 com.aliyun.oss:aliyun-sdk-oss = 3.10.2