diff --git a/pom.xml b/pom.xml index 986901ba..1771d0e5 100644 --- a/pom.xml +++ b/pom.xml @@ -107,7 +107,7 @@ org.lz4 lz4-java 1.8.0 - test + diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameCompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameCompressor.java new file mode 100644 index 00000000..85a2ddbb --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameCompressor.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +import io.airlift.compress.Compressor; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import static io.airlift.compress.lz4.Lz4RawCompressor.MAX_TABLE_SIZE; +import static io.airlift.compress.lz4.UnsafeUtil.getAddress; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; + +/** + * This class is not thread-safe + */ +public class Lz4FrameCompressor + implements Compressor +{ + private final int[] table = new int[MAX_TABLE_SIZE]; + + @Override + public int maxCompressedLength(int uncompressedSize) + { + return Lz4FrameRawCompressor.maxCompressedLength(uncompressedSize); + } + + @Override + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + { + verifyRange(input, inputOffset, inputLength); + verifyRange(output, outputOffset, maxOutputLength); + + long inputAddress = ARRAY_BYTE_BASE_OFFSET + inputOffset; + long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset; + + return Lz4FrameRawCompressor.compress( + input, + inputAddress, + inputLength, + output, + outputAddress, + maxOutputLength, + table); + } + + @Override + public void compress(ByteBuffer inputBuffer, ByteBuffer outputBuffer) + { + if (true) { + // TODO support byte buffers, see disabled tests + throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet"); + } + + // Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode + // the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the + // methods through the interface class works around the problem + // Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile + Buffer input = inputBuffer; + Buffer output = outputBuffer; + + Object inputBase; + long inputAddress; + int inputLimit; + if (input.isDirect()) { + inputBase = null; + long address = getAddress(input); + inputAddress = address + input.position(); + inputLimit = input.limit(); + } + else if (input.hasArray()) { + inputBase = input.array(); + inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position(); + inputLimit = input.limit(); + } + else { + throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName()); + } + + Object outputBase; + long outputAddress; + int outputLimit; + if (output.isDirect()) { + outputBase = null; + long address = getAddress(output); + outputAddress = address + output.position(); + outputLimit = output.limit(); + } + else if (output.hasArray()) { + outputBase = output.array(); + outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position(); + outputLimit = output.limit(); + } + else { + throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName()); + } + + // HACK: Assure JVM does not collect Slice wrappers while compressing, since the + // collection may trigger freeing of the underlying memory resulting in a segfault + // There is no other known way to signal to the JVM that an object should not be + // collected in a block, and technically, the JVM is allowed to eliminate these locks. + synchronized (input) { + synchronized (output) { + int written = Lz4FrameRawCompressor.compress( + inputBase, + inputAddress, + inputLimit, + outputBase, + outputAddress, + outputLimit, + table); + output.position(output.position() + written); + } + } + } + + private static void verifyRange(byte[] data, int offset, int length) + { + requireNonNull(data, "data is null"); + if (offset < 0 || length < 0 || offset + length > data.length) { + throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length)); + } + } +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameDecompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameDecompressor.java new file mode 100644 index 00000000..d3b3b3a9 --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameDecompressor.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import static io.airlift.compress.lz4.UnsafeUtil.getAddress; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; + +public class Lz4FrameDecompressor + implements Decompressor +{ + @Override + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + throws MalformedInputException + { + verifyRange(input, inputOffset, inputLength); + verifyRange(output, outputOffset, maxOutputLength); + + return Lz4FrameRawDecompressor.decompress( + input, + ARRAY_BYTE_BASE_OFFSET + inputOffset, + inputLength, + output, + ARRAY_BYTE_BASE_OFFSET + outputOffset, + maxOutputLength); + } + + @Override + public void decompress(ByteBuffer inputBuffer, ByteBuffer outputBuffer) + throws MalformedInputException + { + if (true) { + // TODO support byte buffers, see disabled tests + throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet"); + } + + // Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode + // the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the + // methods through the interface class works around the problem + // Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile + Buffer input = inputBuffer; + Buffer output = outputBuffer; + + Object inputBase; + long inputAddress; + int inputLimit; + if (input.isDirect()) { + inputBase = null; + long address = getAddress(input); + inputAddress = address + input.position(); + inputLimit = input.limit(); + } + else if (input.hasArray()) { + inputBase = input.array(); + inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position(); + inputLimit = input.limit(); + } + else { + throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName()); + } + + Object outputBase; + long outputAddress; + int outputLimit; + if (output.isDirect()) { + outputBase = null; + long address = getAddress(output); + outputAddress = address + output.position(); + outputLimit = output.limit(); + } + else if (output.hasArray()) { + outputBase = output.array(); + outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position(); + outputLimit = output.limit(); + } + else { + throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName()); + } + + // HACK: Assure JVM does not collect Slice wrappers while decompressing, since the + // collection may trigger freeing of the underlying memory resulting in a segfault + // There is no other known way to signal to the JVM that an object should not be + // collected in a block, and technically, the JVM is allowed to eliminate these locks. + synchronized (input) { + synchronized (output) { + int written = Lz4FrameRawDecompressor.decompress(inputBase, inputAddress, inputLimit, outputBase, outputAddress, outputLimit); + output.position(output.position() + written); + } + } + } + + public static long getDecompressedSize(byte[] input, int offset, int length) + { + int baseAddress = ARRAY_BYTE_BASE_OFFSET + offset; + return Lz4FrameRawDecompressor.getDecompressedSize(input, baseAddress, length); + } + + private static void verifyRange(byte[] data, int offset, int length) + { + requireNonNull(data, "data is null"); + if (offset < 0 || length < 0 || offset + length > data.length) { + throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length)); + } + } +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameRawCompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawCompressor.java new file mode 100644 index 00000000..48e71961 --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawCompressor.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +import static io.airlift.compress.lz4.UnsafeUtil.UNSAFE; +import static java.lang.Math.toIntExact; +import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; + +/** + * Implementation of LZ4 Frame format. + */ +final class Lz4FrameRawCompressor +{ + private Lz4FrameRawCompressor() {} + + private static final byte[] MAGIC = {0x04, 0x22, 0x4D, 0x18}; + + private static final int FRAME_DESCRIPTOR_SIZE = + 2 + // FLG byte, BD byte + 8 + // content size + 1; // HC (Header Checksum) + + private static final int FRAME_START_SIZE = 4 /* magic */ + FRAME_DESCRIPTOR_SIZE; + private static final int FRAME_END_SIZE = 4 /* EndMark */; + + private static final int BLOCK_MAX_SIZE = 4 * 1024 * 1024; + private static final int BLOCK_MAX_SIZE_MARKER = 7; // per "Block Maximum Size" spec, 7 means 4 MB + + public static int maxCompressedLength(int uncompressedSize) + { + return FRAME_START_SIZE + Lz4RawCompressor.maxCompressedLength(uncompressedSize) + FRAME_END_SIZE + + // block sizes + 4 * (uncompressedSize / BLOCK_MAX_SIZE + 2); + } + + public static int compress( + Object inputBase, + long inputAddress, + int inputLength, + Object outputBase, + long outputAddress, + int maxOutputLength, + int[] table) + { + long originalOutputAddress = outputAddress; + + if (maxOutputLength < maxCompressedLength(inputLength)) { + throw new IllegalArgumentException("Max output length must be larger than " + maxCompressedLength(inputLength)); + } + + UNSAFE.copyMemory(MAGIC, ARRAY_BYTE_BASE_OFFSET, outputBase, outputAddress, MAGIC.length); + outputAddress += MAGIC.length; + maxOutputLength -= MAGIC.length; + + byte[] frameDescriptor = new byte[FRAME_DESCRIPTOR_SIZE]; + + // FLG byte + frameDescriptor[0] = + 0b01 << 6 | // Version: 1 + 1 << 5 | // B.Indep: blocks are independent + 0 << 4 | // B.Checksum: no checksum + 1 << 3 | // C.Size: content size present + 0 << 2 | // C.Checksum: no checksum + 0 << 1 | // Reserved + 0 << 0; // DictID: no dictionary + + // BD byte + frameDescriptor[1] = (BLOCK_MAX_SIZE_MARKER << 4); + + // content size + UNSAFE.putLong(frameDescriptor, ARRAY_BYTE_BASE_OFFSET + 2L, inputLength); + + // HC (Header Checksum) + XXHash32 xxHash32 = XXHashFactory.fastestInstance().hash32(); + byte hc = (byte) ((xxHash32.hash(frameDescriptor, 0, frameDescriptor.length - 1, 0) >> 8) & 0xFF); + frameDescriptor[frameDescriptor.length - 1] = hc; + UNSAFE.copyMemory(frameDescriptor, ARRAY_BYTE_BASE_OFFSET, outputBase, outputAddress, frameDescriptor.length); + outputAddress += frameDescriptor.length; + maxOutputLength -= frameDescriptor.length; + + while (inputLength > 0) { + int blockSize = Math.min(inputLength, BLOCK_MAX_SIZE); + int blockHeaderSize = 4; + int compressedSize = Lz4RawCompressor.compress( + inputBase, + inputAddress, + blockSize, + outputBase, + outputAddress + blockHeaderSize, + maxOutputLength - blockHeaderSize, + table); + int uncompressed; + if (compressedSize >= blockSize) { + // incompressible data + uncompressed = 1; + compressedSize = blockSize; + UNSAFE.copyMemory(inputBase, inputAddress, outputBase, outputAddress + blockHeaderSize, blockSize); + UNSAFE.putInt(outputBase, outputAddress, (1 << 31) | blockSize); + } + else { + // compressed data, already written to the output + uncompressed = 0; + } + + UNSAFE.putInt(outputBase, outputAddress, (uncompressed << 31) | compressedSize); + outputAddress += blockHeaderSize + compressedSize; + maxOutputLength -= blockHeaderSize + compressedSize; + + inputAddress += blockSize; + inputLength -= blockSize; + } + + // EndMark + UNSAFE.putInt(outputBase, outputAddress, 0); + outputAddress += 4; + maxOutputLength -= 4; + + return toIntExact(outputAddress - originalOutputAddress); + } +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameRawDecompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawDecompressor.java new file mode 100644 index 00000000..22ea1b6f --- /dev/null +++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawDecompressor.java @@ -0,0 +1,146 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +import static io.airlift.compress.lz4.UnsafeUtil.UNSAFE; +import static java.lang.Math.toIntExact; + +final class Lz4FrameRawDecompressor +{ + private Lz4FrameRawDecompressor() {} + + private static final int SUPPORTED_FLAGS = 0b01101000; + private static final int SUPPORTED_BD = 0b01110000; + + static int decompress( + Object inputBase, + long inputAddress, + int inputLimit, + Object outputBase, + long outputAddress, + int outputLimit) + { + long originalOutputAddress = outputAddress; + + checkArgument(inputLimit >= 6, "Not enough input bytes"); + checkArgument(UNSAFE.getInt(inputBase, inputAddress) == 0x184D2204, "Invalid magic number"); + inputAddress += 4; + inputLimit -= 4; + + byte flags = UNSAFE.getByte(inputBase, inputAddress); + byte bd = UNSAFE.getByte(inputBase, inputAddress + 1); + inputAddress += 2; + inputLimit -= 2; + + checkArgument(getVersion(flags) == 1, "Unsupported version"); + long contentSize = -1; + if (hasContentSize(flags)) { + contentSize = UNSAFE.getLong(inputBase, inputAddress); + inputAddress += 8; + inputLimit -= 8; + checkArgument(contentSize <= outputLimit, "Output buffer too small"); + } + + checkArgument(inputLimit >= 1, "Not enough input bytes"); + byte hc = UNSAFE.getByte(inputBase, inputAddress); // header checksum + inputAddress += 1; + inputLimit -= 1; + + checkArgument((flags & ~SUPPORTED_FLAGS) == 0 && areBlocksIndependent(flags), "Unsupported flags"); + checkArgument((bd & ~SUPPORTED_BD) == 0, "Invalid BD byte"); + + while (true) { + checkArgument(inputLimit >= 4, "Not enough input bytes"); + int blockSize = UNSAFE.getInt(inputBase, inputAddress); + inputAddress += 4; + inputLimit -= 4; + + if (blockSize == 0) { + // EndMark + break; + } + + if ((blockSize & (1 << 31)) != 0) { + // uncompressed + blockSize = blockSize & Integer.MAX_VALUE; + checkArgument(inputLimit >= blockSize, "Not enough input bytes"); + checkArgument(outputLimit >= blockSize, "Output buffer too small"); + UNSAFE.copyMemory(inputBase, inputAddress, outputBase, outputAddress, blockSize); + inputAddress += blockSize; + inputLimit -= blockSize; + outputAddress += blockSize; + outputLimit -= blockSize; + } + else { + // compressed + checkArgument(inputLimit >= blockSize, "Not enough input bytes"); + int decompressed = Lz4RawDecompressor.decompress( + inputBase, + inputAddress, + inputAddress + blockSize, + outputBase, + outputAddress, + outputAddress + outputLimit); + inputAddress += blockSize; + inputLimit -= blockSize; + outputAddress += decompressed; + outputLimit -= decompressed; + } + } + + checkArgument(inputLimit == 0, "Some input not consumed"); + int decompressed = toIntExact(outputAddress - originalOutputAddress); + checkArgument(contentSize == -1 || decompressed == contentSize, "Decompressed wrong number of bytes"); + return decompressed; + } + + public static long getDecompressedSize(Object inputBase, long inputAddress, int inputLimit) + { + checkArgument(inputLimit >= 6, "Not enough input bytes"); + checkArgument(UNSAFE.getInt(inputBase, inputAddress) == 0x184D2204, "Invalid magic number"); + inputAddress += 4; + inputLimit -= 4; + + byte flags = UNSAFE.getByte(inputBase, inputAddress); + // BD byte not read + inputAddress += 2; + inputLimit -= 2; + checkArgument(hasContentSize(flags), "Content size (C.Size) not present"); + + checkArgument(inputLimit >= 8, "Not enough input bytes"); + return UNSAFE.getLong(inputBase, inputAddress); + } + + private static int getVersion(byte flags) + { + return flags >> 6; + } + + private static boolean hasContentSize(byte flags) + { + return (flags & (1 << 3)) != 0; + } + + private static boolean areBlocksIndependent(byte flags) + { + return (flags & (1 << 5)) != 0; + } + + private static void checkArgument(boolean condition, String message) + { + if (!condition) { + throw new IllegalArgumentException(message); + } + } +} diff --git a/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java index f17163c0..9205dd2b 100644 --- a/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java +++ b/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java @@ -40,7 +40,7 @@ public final class Lz4RawCompressor private static final int RUN_BITS = 8 - ML_BITS; private static final int RUN_MASK = (1 << RUN_BITS) - 1; - private static final int MAX_DISTANCE = ((1 << 16) - 1); + static final int MAX_DISTANCE = ((1 << 16) - 1); private static final int SKIP_TRIGGER = 6; /* Increase this value ==> compression run slower on incompressible data */ diff --git a/src/test/java/io/airlift/compress/lz4/TestLz4Frame.java b/src/test/java/io/airlift/compress/lz4/TestLz4Frame.java new file mode 100644 index 00000000..50c4a871 --- /dev/null +++ b/src/test/java/io/airlift/compress/lz4/TestLz4Frame.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.lz4; + +import io.airlift.compress.AbstractTestCompression; +import io.airlift.compress.Compressor; +import io.airlift.compress.Decompressor; +import io.airlift.compress.benchmark.DataSet; +import io.airlift.compress.thirdparty.JPountzLz4FrameCompressor; +import io.airlift.compress.thirdparty.JPountzLz4FrameDecompressor; +import net.jpountz.lz4.LZ4Factory; +import org.testng.annotations.Test; + +import java.util.Arrays; + +import static org.testng.Assert.assertEquals; + +public class TestLz4Frame + extends AbstractTestCompression +{ + @Override + protected Compressor getCompressor() + { + return new Lz4FrameCompressor(); + } + + @Override + protected Decompressor getDecompressor() + { + return new Lz4FrameDecompressor(); + } + + @Override + protected boolean isByteBufferSupported() + { + // TODO support byte buffer + return false; + } + + @Override + protected Compressor getVerifyCompressor() + { + return new JPountzLz4FrameCompressor(LZ4Factory.fastestInstance()); + } + + @Override + protected Decompressor getVerifyDecompressor() + { + return new JPountzLz4FrameDecompressor(LZ4Factory.fastestInstance()); + } + + // test over data sets, should the result depend on input size or its compressibility + @Test(dataProvider = "data") + public void testGetDecompressedSize(DataSet dataSet) + { + Compressor compressor = getCompressor(); + byte[] originalUncompressed = dataSet.getUncompressed(); + byte[] compressed = new byte[compressor.maxCompressedLength(originalUncompressed.length)]; + + int compressedLength = compressor.compress(originalUncompressed, 0, originalUncompressed.length, compressed, 0, compressed.length); + + assertEquals(Lz4FrameDecompressor.getDecompressedSize(compressed, 0, compressedLength), originalUncompressed.length); + + int padding = 10; + byte[] compressedWithPadding = new byte[compressedLength + padding]; + Arrays.fill(compressedWithPadding, (byte) 42); + System.arraycopy(compressed, 0, compressedWithPadding, padding, compressedLength); + assertEquals(Lz4FrameDecompressor.getDecompressedSize(compressedWithPadding, padding, compressedLength), originalUncompressed.length); + } +} diff --git a/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameCompressor.java b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameCompressor.java new file mode 100644 index 00000000..ecc25724 --- /dev/null +++ b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameCompressor.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.thirdparty; + +import io.airlift.compress.Compressor; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FrameOutputStream; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +import static java.lang.String.format; + +public class JPountzLz4FrameCompressor + implements Compressor +{ + private final LZ4Compressor compressor; + + public JPountzLz4FrameCompressor(LZ4Factory factory) + { + compressor = factory.fastCompressor(); + } + + @Override + public int maxCompressedLength(int uncompressedSize) + { + int maxHeaderLength; + try { + Field maxHeaderLengthField = LZ4FrameOutputStream.class.getDeclaredField("LZ4_MAX_HEADER_LENGTH"); + maxHeaderLengthField.setAccessible(true); + maxHeaderLength = maxHeaderLengthField.getInt(null); + } + catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + return maxHeaderLength + compressor.maxCompressedLength(uncompressedSize); + } + + @Override + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + try (LZ4FrameOutputStream compressingOutputStream = new LZ4FrameOutputStream( + outputStream, + LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB, + inputLength, + compressor, + XXHashFactory.fastestInstance().hash32(), + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE, + LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE)) { + compressingOutputStream.write(input, inputOffset, inputLength); + } + byte[] compressed = outputStream.toByteArray(); + if (compressed.length > maxOutputLength) { + throw new IllegalArgumentException(format("Output buffer too small, provided capacity %s, compressed data size %s", maxOutputLength, compressed.length)); + } + System.arraycopy(compressed, 0, output, outputOffset, compressed.length); + return compressed.length; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) + { + throw new UnsupportedOperationException(); + } +} diff --git a/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameDecompressor.java b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameDecompressor.java new file mode 100644 index 00000000..c123fa76 --- /dev/null +++ b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameDecompressor.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.compress.thirdparty; + +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FrameInputStream; +import net.jpountz.lz4.LZ4SafeDecompressor; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; + +import static com.google.common.io.ByteStreams.read; + +public class JPountzLz4FrameDecompressor + implements Decompressor +{ + private final LZ4SafeDecompressor decompressor; + + public JPountzLz4FrameDecompressor(LZ4Factory factory) + { + decompressor = factory.safeDecompressor(); + } + + @Override + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + throws MalformedInputException + { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(input, inputOffset, inputLength); + LZ4FrameInputStream decompressingInputStream = new LZ4FrameInputStream(inputStream, decompressor, XXHashFactory.fastestInstance().hash32())) { + return read(decompressingInputStream, output, outputOffset, maxOutputLength); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void decompress(ByteBuffer input, ByteBuffer output) + throws MalformedInputException + { + throw new UnsupportedOperationException(); + } +}