Skip to content

Commit

Permalink
[feat] Support include topic to the metadata (#836)
Browse files Browse the repository at this point in the history
Special release for #816

### Motivation

See #816

### Modifications

- Add new configuration `includeTopicToMetadata`
  • Loading branch information
RobertIndie committed Dec 15, 2023
1 parent c99964f commit 94620e4
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class BlobStoreAbstractConfig implements Serializable {
private boolean withMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private boolean withTopicPartitionNumber = true;
private String bytesFormatTypeSeparator = "0x10";
private boolean skipFailedMessages = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class AvroFormat implements Format<GenericRecord> , InitConfiguration<Blo
private boolean useMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private CodecFactory codecFactory;

@Override
Expand All @@ -65,6 +66,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useMetadata = configuration.isWithMetadata();
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
String codecName = configuration.getAvroCodec();
if (codecName == null) {
this.codecFactory = CodecFactory.nullCodec();
Expand All @@ -86,7 +88,7 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
if (useMetadata){
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
}

LOGGER.debug("Using avro schema: {}", rootAvroSchema);
Expand Down Expand Up @@ -124,7 +126,7 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
if (useMetadata) {
org.apache.avro.generic.GenericRecord metadataRecord =
MetadataUtil.extractedMetadataRecord(next,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
writeRecord.put(MetadataUtil.MESSAGE_METADATA_KEY, metadataRecord);
}
fileWriter.append(writeRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class JsonFormat implements Format<GenericRecord>, InitConfiguration<Blob
private boolean useMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;

@Override
public String getExtension() {
Expand All @@ -72,6 +73,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useMetadata = configuration.isWithMetadata();
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
}

@Override
Expand Down Expand Up @@ -106,7 +108,8 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> record) throws
Map<String, Object> writeValue = convertRecordToObject(next.getValue(), schema);
if (useMetadata) {
writeValue.put(MetadataUtil.MESSAGE_METADATA_KEY,
MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion));
MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion,
includeTopicToMetadata));
}
String recordAsString = JSON_MAPPER.get().writeValueAsString(writeValue);
stringBuilder.append(recordAsString).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class ParquetFormat implements Format<GenericRecord>, InitConfiguration<B
private boolean useMetadata;
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;

private CompressionCodecName compressionCodecName = CompressionCodecName.GZIP;

Expand All @@ -76,6 +77,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useMetadata = configuration.isWithMetadata();
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
this.compressionCodecName = CompressionCodecName.fromConf(configuration.getParquetCodec());
}

Expand Down Expand Up @@ -185,7 +187,7 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
if (useMetadata) {
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
}
log.info("Using avro schema: {}", rootAvroSchema);
}
Expand Down Expand Up @@ -268,7 +270,9 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
if (useMetadata) {
org.apache.avro.generic.GenericRecord metadataRecord =
MetadataUtil.extractedMetadataRecord(next,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId,
useHumanReadableSchemaVersion,
includeTopicToMetadata);
writeRecord.put(MESSAGE_METADATA_KEY, metadataRecord);
}
if (parquetWriter != null) {
Expand Down
33 changes: 24 additions & 9 deletions src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class MetadataUtil {
public static final String METADATA_PROPERTIES_KEY = "properties";
public static final String METADATA_SCHEMA_VERSION_KEY = "schemaVersion";
public static final String METADATA_MESSAGE_ID_KEY = "messageId";
public static final String METADATA_TOPIC = "topic";
public static final String MESSAGE_METADATA_KEY = "__message_metadata__";
public static final String MESSAGE_METADATA_NAME = "messageMetadata";
public static final Schema MESSAGE_METADATA = buildMetadataSchema();
Expand All @@ -49,11 +50,12 @@ public class MetadataUtil {

public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record<GenericRecord> next,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
final Message<GenericRecord> message = next.getMessage().get();

GenericData.Record record = new GenericData.Record(buildMetadataSchema(
useHumanReadableMessageId, useHumanReadableSchemaVersion));
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata));
record.put(METADATA_PROPERTIES_KEY, message.getProperties());
if (useHumanReadableSchemaVersion) {
record.put(METADATA_SCHEMA_VERSION_KEY,
Expand All @@ -66,20 +68,24 @@ public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Reco
} else {
record.put(METADATA_MESSAGE_ID_KEY, ByteBuffer.wrap(message.getMessageId().toByteArray()));
}
if (includeTopicToMetadata) {
record.put(METADATA_TOPIC, message.getTopicName());
}
return record;
}

public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record<GenericRecord> next) {
return extractedMetadataRecord(next, false, false);
return extractedMetadataRecord(next, false, false, false);
}

public static Map<String, Object> extractedMetadata(Record<GenericRecord> next) {
return extractedMetadata(next, false, false);
return extractedMetadata(next, false, false, false);
}

public static Map<String, Object> extractedMetadata(Record<GenericRecord> next,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
Map<String, Object> metadata = new HashMap<>();
final Message<GenericRecord> message = next.getMessage().get();
metadata.put(METADATA_PROPERTIES_KEY, message.getProperties());
Expand All @@ -94,6 +100,9 @@ public static Map<String, Object> extractedMetadata(Record<GenericRecord> next,
} else {
metadata.put(METADATA_MESSAGE_ID_KEY, ByteBuffer.wrap(message.getMessageId().toByteArray()));
}
if (includeTopicToMetadata) {
metadata.put(METADATA_TOPIC, message.getTopicName());
}
return metadata;
}

Expand All @@ -114,14 +123,15 @@ public static Schema setMetadataSchema(Schema schema) {

public static Schema setMetadataSchema(Schema schema,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
final List<Schema.Field> fieldWithMetadata = schemaFieldThreadLocal.get();
fieldWithMetadata.clear();
schema.getFields().forEach(f -> {
fieldWithMetadata.add(new Schema.Field(f, f.schema()));
});
fieldWithMetadata.add(new Schema.Field(MESSAGE_METADATA_KEY, buildMetadataSchema(
useHumanReadableMessageId, useHumanReadableSchemaVersion)));
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata)));
return Schema.createRecord(schema.getName(),
schema.getDoc(),
schema.getNamespace(),
Expand All @@ -131,11 +141,12 @@ public static Schema setMetadataSchema(Schema schema,
}

private static Schema buildMetadataSchema(){
return buildMetadataSchema(false, false);
return buildMetadataSchema(false, false, false);
}

private static Schema buildMetadataSchema(boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
List<Schema.Field> fields = new ArrayList<>();
fields.add(new Schema.Field(METADATA_PROPERTIES_KEY,
Schema.createUnion(
Expand All @@ -157,6 +168,10 @@ private static Schema buildMetadataSchema(boolean useHumanReadableMessageId,
fields.add(new Schema.Field(METADATA_MESSAGE_ID_KEY, Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES))));
}
if (includeTopicToMetadata) {
fields.add(new Schema.Field(METADATA_TOPIC, Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))));
}
return Schema.createRecord(MESSAGE_METADATA_NAME,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testExtractedMetadata() throws IOException {
String messageIdString = "1:2:3:4";
byte[] schemaVersionBytes = new byte[]{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A};
byte[] messageIdBytes = new byte[]{0x00, 0x01, 0x02, 0x03};
String topicName = "test-topic";
Record<GenericRecord> mockRecord = mock(Record.class);
Message<GenericRecord> mockMessage = mock(Message.class);
MessageId mockMessageId = mock(MessageId.class);
Expand All @@ -53,23 +54,25 @@ public void testExtractedMetadata() throws IOException {
when(mockMessage.getSchemaVersion()).thenReturn(schemaVersionBytes);
when(mockMessageId.toString()).thenReturn(messageIdString);
when(mockMessageId.toByteArray()).thenReturn(messageIdBytes);
when(mockMessage.getTopicName()).thenReturn(topicName);

Map<String, Object> metadataWithHumanReadableMetadata =
MetadataUtil.extractedMetadata(mockRecord, true, true);
MetadataUtil.extractedMetadata(mockRecord, true, true, true);
Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Assert.assertNotEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY),
ByteBuffer.wrap(messageIdBytes));
Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_SCHEMA_VERSION_KEY),
MetadataUtil.parseSchemaVersionFromBytes(schemaVersionBytes));
Assert.assertEquals(metadataWithHumanReadableMetadata.get(MetadataUtil.METADATA_TOPIC), topicName);

Map<String, Object> metadataWithHumanReadableMessageId =
MetadataUtil.extractedMetadata(mockRecord, true, false);
MetadataUtil.extractedMetadata(mockRecord, true, false, false);
Assert.assertEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Assert.assertNotEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY),
ByteBuffer.wrap(messageIdBytes));


Map<String, Object> metadata = MetadataUtil.extractedMetadata(mockRecord, false, false);
Map<String, Object> metadata = MetadataUtil.extractedMetadata(mockRecord, false, false, false);
Assert.assertEquals(metadata.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes));
Assert.assertEquals(metadata.get(METADATA_SCHEMA_VERSION_KEY), schemaVersionBytes);
Assert.assertNotEquals(metadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Expand Down

0 comments on commit 94620e4

Please sign in to comment.