Skip to content

Commit

Permalink
Add reader and writer for Puffin, indexes and stats file format
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jun 9, 2022
1 parent a42fbb5 commit 9430d36
Show file tree
Hide file tree
Showing 24 changed files with 1,710 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ project(':iceberg-core') {
exclude group: 'org.tukaani' // xz compression is not supported
}

implementation 'io.airlift:aircompressor'
implementation 'org.apache.httpcomponents.client5:httpclient5'
implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.fasterxml.jackson.core:jackson-core"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.common.hash.Hashing;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
import com.google.common.io.Resources;
import com.google.common.primitives.Bytes;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class GuavaClasses {
Hashing.class.getName();
Files.class.getName();
Bytes.class.getName();
Resources.class.getName();
MoreExecutors.class.getName();
ThreadFactoryBuilder.class.getName();
Iterables.class.getName();
Expand Down
75 changes: 75 additions & 0 deletions core/src/main/java/org/apache/iceberg/puffin/Blob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.puffin;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public final class Blob {
private final String type;
private final List<Integer> inputFields;
private final ByteBuffer blobData;
private final PuffinCompressionCodec requestedCompression;
private final Map<String, String> properties;

public Blob(String type, List<Integer> inputFields, ByteBuffer blobData) {
this(type, inputFields, blobData, null, ImmutableMap.of());
}

public Blob(
String type, List<Integer> inputFields, ByteBuffer blobData,
@Nullable PuffinCompressionCodec requestedCompression, Map<String, String> properties) {
Preconditions.checkNotNull(type, "type is null");
Preconditions.checkNotNull(inputFields, "inputFields is null");
Preconditions.checkNotNull(blobData, "blobData is null");
Preconditions.checkNotNull(properties, "properties is null");
this.type = type;
this.inputFields = ImmutableList.copyOf(inputFields);
this.blobData = blobData;
this.requestedCompression = requestedCompression;
this.properties = ImmutableMap.copyOf(properties);
}

public String type() {
return type;
}

public List<Integer> inputFields() {
return inputFields;
}

public ByteBuffer blobData() {
return blobData;
}

@Nullable
public PuffinCompressionCodec requestedCompression() {
return requestedCompression;
}

public Map<String, String> properties() {
return properties;
}
}
81 changes: 81 additions & 0 deletions core/src/main/java/org/apache/iceberg/puffin/BlobMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.puffin;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class BlobMetadata {
private final String type;
private final List<Integer> inputFields;
private final long offset;
private final long length;
private final String compressionCodec;
private final Map<String, String> properties;

public BlobMetadata(
String type, List<Integer> inputFields, long offset, long length,
@Nullable String compressionCodec, Map<String, String> properties) {
Preconditions.checkNotNull(type, "type is null");
Preconditions.checkNotNull(inputFields, "inputFields is null");
Preconditions.checkNotNull(properties, "properties is null");
this.type = type;
this.inputFields = ImmutableList.copyOf(inputFields);
this.offset = offset;
this.length = length;
this.compressionCodec = compressionCodec;
this.properties = ImmutableMap.copyOf(properties);
}

public String type() {
return type;
}

public List<Integer> inputFields() {
return inputFields;
}

/**
* Offset in the file
*/
public long offset() {
return offset;
}

/**
* Length in the file
*/
public long length() {
return length;
}

@Nullable
public String compressionCodec() {
return compressionCodec;
}

public Map<String, String> properties() {
return properties;
}
}
46 changes: 46 additions & 0 deletions core/src/main/java/org/apache/iceberg/puffin/FileMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.puffin;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class FileMetadata {
private final List<BlobMetadata> blobs;
private final Map<String, String> properties;

public FileMetadata(List<BlobMetadata> blobs, Map<String, String> properties) {
Preconditions.checkNotNull(blobs, "blobs is null");
Preconditions.checkNotNull(properties, "properties is null");
this.blobs = ImmutableList.copyOf(blobs);
this.properties = ImmutableMap.copyOf(properties);
}

public List<BlobMetadata> blobs() {
return blobs;
}

public Map<String, String> properties() {
return properties;
}
}
164 changes: 164 additions & 0 deletions core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.puffin;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.JsonUtil;

public final class FileMetadataParser {

private FileMetadataParser() {
}

private static final String BLOBS = "blobs";
private static final String PROPERTIES = "properties";

private static final String TYPE = "type";
private static final String FIELDS = "fields";
private static final String OFFSET = "offset";
private static final String LENGTH = "length";
private static final String COMPRESSION_CODEC = "compression-codec";

public static String toJson(FileMetadata fileMetadata, boolean pretty) {
try {
StringWriter writer = new StringWriter();
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
if (pretty) {
generator.useDefaultPrettyPrinter();
}
toJson(fileMetadata, generator);
generator.flush();
return writer.toString();
} catch (IOException e) {
throw new UncheckedIOException("Failed to write json for: " + fileMetadata, e);
}
}

public static FileMetadata fromJson(String json) {
try {
return fromJson(JsonUtil.mapper().readValue(json, JsonNode.class));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

static FileMetadata fromJson(JsonNode json) {
return fileMetadataFromJson(json);
}

static void toJson(FileMetadata fileMetadata, JsonGenerator generator) throws IOException {
generator.writeStartObject();

generator.writeArrayFieldStart(BLOBS);
for (BlobMetadata blobMetadata : fileMetadata.blobs()) {
toJson(blobMetadata, generator);
}
generator.writeEndArray();

generator.writeObjectFieldStart(PROPERTIES);
for (Map.Entry<String, String> entry : fileMetadata.properties().entrySet()) {
generator.writeStringField(entry.getKey(), entry.getValue());
}
generator.writeEndObject();

generator.writeEndObject();
}

static FileMetadata fileMetadataFromJson(JsonNode json) {

ImmutableList.Builder<BlobMetadata> blobs = ImmutableList.builder();
JsonNode blobsJson = json.get(BLOBS);
Preconditions.checkArgument(blobsJson != null && blobsJson.isArray(),
"Cannot parse blobs from non-array: %s", blobsJson);
for (JsonNode blobJson : blobsJson) {
blobs.add(blobMetadataFromJson(blobJson));
}

Map<String, String> properties = ImmutableMap.of();
JsonNode propertiesJson = json.get(PROPERTIES);
if (propertiesJson != null) {
properties = JsonUtil.getStringMap(PROPERTIES, json);
}

return new FileMetadata(
blobs.build(),
properties);
}

static void toJson(BlobMetadata blobMetadata, JsonGenerator generator) throws IOException {
generator.writeStartObject();

generator.writeStringField(TYPE, blobMetadata.type());

generator.writeArrayFieldStart(FIELDS);
for (int field : blobMetadata.inputFields()) {
generator.writeNumber(field);
}
generator.writeEndArray();

generator.writeNumberField(OFFSET, blobMetadata.offset());
generator.writeNumberField(LENGTH, blobMetadata.length());

if (blobMetadata.compressionCodec() != null) {
generator.writeStringField(COMPRESSION_CODEC, blobMetadata.compressionCodec());
}

if (!blobMetadata.properties().isEmpty()) {
generator.writeObjectFieldStart(PROPERTIES);
for (Map.Entry<String, String> entry : blobMetadata.properties().entrySet()) {
generator.writeStringField(entry.getKey(), entry.getValue());
}
generator.writeEndObject();
}

generator.writeEndObject();
}

static BlobMetadata blobMetadataFromJson(JsonNode json) {
String type = JsonUtil.getString(TYPE, json);
List<Integer> fields = JsonUtil.getIntegerList(FIELDS, json);
long offset = JsonUtil.getLong(OFFSET, json);
long length = JsonUtil.getLong(LENGTH, json);
String compressionCodec = JsonUtil.getStringOrNull(COMPRESSION_CODEC, json);
Map<String, String> properties = ImmutableMap.of();
JsonNode propertiesJson = json.get(PROPERTIES);
if (propertiesJson != null) {
properties = JsonUtil.getStringMap(PROPERTIES, json);
}


return new BlobMetadata(
type,
fields,
offset,
length,
compressionCodec,
properties);
}
}
Loading

0 comments on commit 9430d36

Please sign in to comment.