Skip to content

Commit

Permalink
feat: migrate event serialization to protobuf (#15417)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <[email protected]>
  • Loading branch information
lpetrovic05 committed Sep 18, 2024
1 parent 60b03c5 commit 0fea101
Show file tree
Hide file tree
Showing 19 changed files with 293 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.swirlds.common.io.streams.SerializableStreamConstants.NULL_VERSION;
import static com.swirlds.common.io.streams.SerializableStreamConstants.SERIALIZATION_PROTOCOL_VERSION;

import com.hedera.pbj.runtime.Codec;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.io.ReadableSequentialData;
import com.hedera.pbj.runtime.io.stream.ReadableStreamingData;
import com.swirlds.base.function.CheckedFunction;
Expand All @@ -33,8 +35,11 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -64,16 +69,6 @@ public SerializableDataInputStream(final InputStream in) {
readableSequentialData = new ReadableStreamingData(in);
}

/**
* While transitioning serialization from {@link SelfSerializable} to protobuf, this stream will support both
* serialization methods by providing a separate instance to deserialize protobuf objects.
*
* @return the readable sequential data stream
*/
public @NonNull ReadableSequentialData getReadableSequentialData() {
return readableSequentialData;
}

/**
* Reads the protocol version written by {@link SerializableDataOutputStream#writeProtocolVersion()} and saves it
* internally. From this point on, it will use this version number to deserialize.
Expand Down Expand Up @@ -586,4 +581,32 @@ private static <T extends SelfSerializable> T registryConstructor(final long cla
}
return rc;
}

/**
* Reads a PBJ record from the stream.
*
* @param codec the codec to use to parse the record
* @param <T> the type of the record
* @return the parsed record
* @throws IOException if an IO error occurs
*/
public @NonNull <T extends Record> T readPbjRecord(@NonNull final Codec<T> codec) throws IOException {
final int size = readInt();
readableSequentialData.limit(readableSequentialData.position() + size);
try {
final T parsed = codec.parse(readableSequentialData);
if (readableSequentialData.position() != readableSequentialData.limit()) {
throw new EOFException("PBJ record was not fully read");
}
return parsed;
} catch (final ParseException e) {
if (e.getCause() instanceof BufferOverflowException || e.getCause() instanceof BufferUnderflowException) {
// PBJ Codec can throw these exceptions if it does not read enough bytes
final EOFException eofException = new EOFException("Buffer underflow while reading PBJ record");
eofException.addSuppressed(e);
throw eofException;
}
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.swirlds.common.io.streams.SerializableStreamConstants.SERIALIZATION_PROTOCOL_VERSION;
import static com.swirlds.common.io.streams.SerializableStreamConstants.VERSION_BYTES;

import com.hedera.pbj.runtime.Codec;
import com.hedera.pbj.runtime.io.WritableSequentialData;
import com.hedera.pbj.runtime.io.stream.WritableStreamingData;
import com.swirlds.common.io.FunctionalSerialize;
Expand Down Expand Up @@ -61,16 +62,6 @@ public SerializableDataOutputStream(OutputStream out) {
writableSequentialData = new WritableStreamingData(out);
}

/**
* While transitioning serialization from {@link SelfSerializable} to protobuf, this stream will support both
* serialization methods by providing a separate instance to serialize protobuf objects.
*
* @return the writable sequential data stream
*/
public @NonNull WritableSequentialData getWritableSequentialData() {
return writableSequentialData;
}

/**
* Write the serialization protocol version number to the stream. Should be used when serializing to a file that
* can be read by future versions.
Expand Down Expand Up @@ -295,4 +286,22 @@ protected void writeClassIdVersion(final SerializableDet serializable, final boo
}
this.writeInt(serializable.getVersion());
}

/**
* Write a PBJ record to the stream
*
* @param record
* the record to write
* @param codec
* the codec to use to write the record
* @param <T>
* the type of the record
* @throws IOException
* thrown if any IO problems occur
*/
public <T extends Record> void writePbjRecord(@NonNull final T record, @NonNull final Codec<T> codec)
throws IOException {
writeInt(codec.measureRecord(record));
codec.write(record, writableSequentialData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.common.io.exceptions.InvalidVersionException;
import com.swirlds.common.test.fixtures.io.InputOutputStream;
import com.swirlds.common.test.fixtures.io.SelfSerializableExample;
Expand Down Expand Up @@ -73,27 +72,4 @@ void deserializeInvalidVersions() throws IOException {
assertThrows(InvalidVersionException.class, () -> io2.getInput()
.readSerializable(true, SelfSerializableExample::new));
}

@Test
void pbjSupportTest() throws IOException {
final SelfSerializableExample serializable = new SelfSerializableExample(666, "Not a PBJ object");
final byte[] byteArray = {1, 2, 3};
final Bytes bytes = Bytes.wrap(byteArray);

try (final InputOutputStream io = new InputOutputStream()) {
io.getOutput().writeSerializable(serializable, true);
bytes.writeTo(io.getOutput().getWritableSequentialData());
io.getOutput().writeSerializable(serializable, false);

io.startReading();

final SelfSerializable readSer1 = io.getInput().readSerializable(true, SelfSerializableExample::new);
final Bytes readBytes = io.getInput().getReadableSequentialData().readBytes(byteArray.length);
final SelfSerializable readSer2 = io.getInput().readSerializable(false, SelfSerializableExample::new);

assertEquals(serializable, readSer1, "the serializable object should be the same as the one written");
assertEquals(bytes, readBytes, "the bytes should be the same as the ones written");
assertEquals(serializable, readSer2, "the serializable object should be the same as the one written");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.swirlds.platform.core.jmh;

import com.hedera.hapi.platform.event.GossipEvent;
import com.swirlds.common.constructable.ConstructableRegistry;
import com.swirlds.common.constructable.ConstructableRegistryException;
import com.swirlds.common.io.streams.MerkleDataInputStream;
import com.swirlds.common.io.streams.MerkleDataOutputStream;
import com.swirlds.platform.event.EventSerializationUtils;
import com.swirlds.platform.event.PlatformEvent;
import com.swirlds.platform.event.hashing.EventHasher;
import com.swirlds.platform.event.hashing.PbjStreamHasher;
Expand Down Expand Up @@ -96,8 +96,8 @@ public void serializeDeserialize(final Blackhole bh) throws IOException {
//
// Benchmark (seed) Mode Cnt Score Error Units
// EventSerialization.serializeDeserialize 0 thrpt 3 962.486 ± 29.252 ops/ms
EventSerializationUtils.serializePlatformEvent(outStream, event, true);
bh.consume(EventSerializationUtils.deserializePlatformEvent(inStream, true));
outStream.writePbjRecord(event.getGossipEvent(), GossipEvent.PROTOBUF);
bh.consume(inStream.readPbjRecord(GossipEvent.PROTOBUF));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package com.swirlds.platform.event;

import com.hedera.hapi.platform.event.EventCore;
import com.hedera.hapi.platform.event.EventTransaction;
import com.hedera.hapi.platform.event.EventTransaction.TransactionOneOfType;
import com.hedera.hapi.platform.event.GossipEvent;
import com.hedera.hapi.platform.event.StateSignatureTransaction;
import com.hedera.hapi.util.HapiUtils;
import com.hedera.pbj.runtime.OneOf;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.common.crypto.SignatureType;
Expand All @@ -31,9 +30,7 @@
import com.swirlds.platform.system.StaticSoftwareVersion;
import com.swirlds.platform.system.events.EventDescriptorWrapper;
import com.swirlds.platform.system.events.UnsignedEvent;
import com.swirlds.platform.util.TransactionUtils;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -59,115 +56,6 @@ private EventSerializationUtils() {
// Utility class
}

/**
* Serialize a unsigned event to the output stream {@code out}.
*
* @param out the stream to which this object is to be written
* @param softwareVersion the software version
* @param eventCore the event core
* @param selfParent the self parent
* @param otherParents the other parents
* @param eventTransactions the event transactions
*
* @throws IOException if an I/O error occurs
*/
private static void serializeUnsignedEvent(
@NonNull final SerializableDataOutputStream out,
@NonNull final SoftwareVersion softwareVersion,
@NonNull final EventCore eventCore,
@Nullable final EventDescriptorWrapper selfParent,
@NonNull final List<EventDescriptorWrapper> otherParents,
@NonNull final List<EventTransaction> eventTransactions)
throws IOException {
out.writeInt(UNSIGNED_EVENT_VERSION);
out.writeSerializable(softwareVersion, true);
out.writeInt(NodeId.ClassVersion.ORIGINAL);
out.writeLong(eventCore.creatorNodeId());
EventDescriptorWrapper.serialize(out, selfParent);
EventDescriptorWrapper.serializeList(out, otherParents);
out.writeLong(eventCore.birthRound());
out.writeInstant(HapiUtils.asInstant(eventCore.timeCreated()));

// write serialized length of transaction array first, so during the deserialization proces
// it is possible to skip transaction array and move on to the next object
out.writeInt(TransactionUtils.getLegacyObjectSize(eventTransactions));
// transactions may include both system transactions and application transactions
// so writeClassId set to true and allSameClass set to false
out.writeInt(eventTransactions.size());
if (!eventTransactions.isEmpty()) {
final boolean allSameClass = false;
out.writeBoolean(allSameClass);
}
for (final EventTransaction transaction : eventTransactions) {
switch (transaction.transaction().kind()) {
case APPLICATION_TRANSACTION:
serializeApplicationTransaction(out, transaction);
break;
case STATE_SIGNATURE_TRANSACTION:
serializeStateSignatureTransaction(out, transaction);
break;
default:
throw new IOException("Unknown transaction type: "
+ transaction.transaction().kind());
}
}
}

private static void serializeApplicationTransaction(
@NonNull final SerializableDataOutputStream out, @NonNull final EventTransaction transaction)
throws IOException {
out.writeLong(APPLICATION_TRANSACTION_CLASS_ID);
out.writeInt(APPLICATION_TRANSACTION_VERSION);
final Bytes bytes = transaction.transaction().as();
out.writeInt((int) bytes.length());
bytes.writeTo(out);
}

private static void serializeStateSignatureTransaction(
@NonNull final SerializableDataOutputStream out, @NonNull final EventTransaction transaction)
throws IOException {
final StateSignatureTransaction stateSignatureTransaction =
transaction.transaction().as();

out.writeLong(STATE_SIGNATURE_CLASS_ID);
out.writeInt(STATE_SIGNATURE_VERSION);
out.writeInt((int) stateSignatureTransaction.signature().length());
stateSignatureTransaction.signature().writeTo(out);

out.writeInt((int) stateSignatureTransaction.hash().length());
stateSignatureTransaction.hash().writeTo(out);

out.writeLong(stateSignatureTransaction.round());
out.writeInt(Integer.MIN_VALUE); // epochHash is always null
}

/**
* Serialize the given {@link PlatformEvent} to the output stream {@code out}
*
* @param out the stream to which this object is to be written
* @param event the event to serialize
* @param writeVersion if true, the event version number will be written to the stream
* @throws IOException if an I/O error occurs
*/
public static void serializePlatformEvent(
@NonNull final SerializableDataOutputStream out,
@NonNull final PlatformEvent event,
final boolean writeVersion)
throws IOException {
if (writeVersion) {
out.writeInt(PLATFORM_EVENT_VERSION);
}
serializeUnsignedEvent(
out,
event.getOldSoftwareVersion(),
event.getEventCore(),
event.getSelfParent(),
event.getOtherParents(),
event.getEventTransactions());
out.writeInt((int) event.getSignature().length());
event.getSignature().writeTo(out);
}

/**
* Deserialize the event as {@link UnsignedEvent}.
*
Expand Down Expand Up @@ -281,11 +169,11 @@ public static PlatformEvent serializeDeserializePlatformEvent(@NonNull final Pla
throws IOException {
try (final ByteArrayOutputStream io = new ByteArrayOutputStream()) {
final SerializableDataOutputStream out = new SerializableDataOutputStream(io);
serializePlatformEvent(out, original, true);
out.writePbjRecord(original.getGossipEvent(), GossipEvent.PROTOBUF);
out.flush();
final SerializableDataInputStream in =
new SerializableDataInputStream(new ByteArrayInputStream(io.toByteArray()));
return deserializePlatformEvent(in, true);
return new PlatformEvent(in.readPbjRecord(GossipEvent.PROTOBUF));
}
}
}
Loading

0 comments on commit 0fea101

Please sign in to comment.