Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit for GCS Batch Source plugin metadata feature. #1612

Open
wants to merge 1 commit into
base: release/2.9
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ public String getPathField() {
return null;
}

@Nullable
@Override
public String getLengthField() {
return null;
}

@Nullable
@Override
public String getModificationTimeField() {
return null;
}

@Override
public boolean useFilenameAsPath() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* Tests for ETLBatch.
*/
public class ETLMapReduceTestRun extends ETLBatchTestBase {
private static final Schema TEXT_SCHEMA = TextInputFormatProvider.getDefaultSchema(null);
private static final Schema TEXT_SCHEMA = TextInputFormatProvider.getDefaultSchema(null, null, null);

@Test
public void testInvalidTransformConfigFailsToDeploy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

Expand All @@ -51,11 +52,11 @@ public StructuredRecord transform(GenericRecord genericRecord, Schema structured
}

public StructuredRecord.Builder transform(GenericRecord genericRecord, Schema structuredSchema,
@Nullable String skipField) throws IOException {
@Nullable List<String> skipFields) throws IOException {
StructuredRecord.Builder builder = StructuredRecord.builder(structuredSchema);
for (Schema.Field field : structuredSchema.getFields()) {
String fieldName = field.getName();
if (!fieldName.equals(skipField)) {
if (!skipFields.contains(fieldName)) {
builder.set(fieldName, convertField(genericRecord.get(fieldName), field));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,66 +84,61 @@ public static class Conf extends PathTrackingConfig {
@Description(NAME_SCHEMA)
public String schema;

}

@Nullable
@Override
public Schema getSchema(FormatContext context) {
if (conf.containsMacro("schema")) {
return super.getSchema(context);
}
if (!Strings.isNullOrEmpty(conf.schema)) {
return super.getSchema(context);
}
String filePath = conf.getProperties().getProperties().getOrDefault("path", null);
if (filePath == null) {
return super.getSchema(context);
}
try {
return getDefaultSchema(context);
} catch (IOException e) {
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
}
}

/**
* Extract schema from file
*
* @param context {@link FormatContext}
* @return {@link Schema}
* @throws IOException raised when error occurs during schema extraction
*/
public Schema getDefaultSchema(@Nullable FormatContext context) throws IOException {
String filePath = conf.getProperties().getProperties().getOrDefault("path", null);
SeekableInput seekableInput = null;
FileReader<GenericRecord> dataFileReader = null;
try {
Job job = JobUtils.createInstance();
Configuration hconf = job.getConfiguration();
// set entries here, before FileSystem is used
for (Map.Entry<String, String> entry : conf.getFileSystemProperties().entrySet()) {
hconf.set(entry.getKey(), entry.getValue());
@Nullable
@Override
public Schema getSchema() {
if (containsMacro("schema")) {
return super.getSchema();
}
Path file = conf.getFilePathForSchemaGeneration(filePath, ".+\\.avro", hconf, job);
DatumReader<GenericRecord> dataReader = new GenericDatumReader<>();
seekableInput = new FsInput(file, hconf);
dataFileReader = DataFileReader.openReader(seekableInput, dataReader);
GenericRecord firstRecord;
if (!dataFileReader.hasNext()) {
return null;
if (!Strings.isNullOrEmpty(schema)) {
return super.getSchema();
}
firstRecord = dataFileReader.next();
return new AvroToStructuredTransformer().convertSchema(firstRecord.getSchema());
} catch (IOException e) {
context.getFailureCollector().addFailure("Schema parse error", e.getMessage());
} finally {
if (dataFileReader != null) {
dataFileReader.close();
String filePath = getProperties().getProperties().getOrDefault("path", null);
if (filePath == null) {
return super.getSchema();
}
if (seekableInput != null) {
seekableInput.close();
try {
return getDefaultSchema();
} catch (IOException e) {
throw new IllegalArgumentException("Invalid schema: " + e.getMessage(), e);
}
}

/**
* Extract schema from file
*
* @return {@link Schema}
* @throws IOException raised when error occurs during schema extraction
*/
public Schema getDefaultSchema() throws IOException {
String filePath = getProperties().getProperties().getOrDefault("path", null);
SeekableInput seekableInput = null;
FileReader<GenericRecord> dataFileReader = null;
try {
Job job = JobUtils.createInstance();
Configuration hconf = job.getConfiguration();
// set entries here, before FileSystem is used
for (Map.Entry<String, String> entry : getFileSystemProperties().entrySet()) {
hconf.set(entry.getKey(), entry.getValue());
}
Path file = getFilePathForSchemaGeneration(filePath, ".+\\.avro", hconf, job);
DatumReader<GenericRecord> dataReader = new GenericDatumReader<>();
seekableInput = new FsInput(file, hconf);
dataFileReader = DataFileReader.openReader(seekableInput, dataReader);
GenericRecord firstRecord;
if (!dataFileReader.hasNext()) {
return null;
}
firstRecord = dataFileReader.next();
return new AvroToStructuredTransformer().convertSchema(firstRecord.getSchema());
} finally {
if (dataFileReader != null) {
dataFileReader.close();
}
if (seekableInput != null) {
seekableInput.close();
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.format.MetadataField;
import io.cdap.plugin.format.avro.AvroToStructuredTransformer;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -31,7 +32,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
Expand All @@ -46,7 +49,17 @@ protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReade

RecordReader<AvroKey<GenericRecord>, NullWritable> delegate = (new AvroKeyInputFormat<GenericRecord>())
.createRecordReader(split, context);
return new AvroRecordReader(delegate, schema, pathField);
return new AvroRecordReader(delegate, schema, pathField, null);
}

@Override
protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReader(
FileSplit split, TaskAttemptContext context, @Nullable String pathField, Map<String, MetadataField> metadataFields,
@Nullable Schema schema) throws IOException, InterruptedException {

RecordReader<AvroKey<GenericRecord>, NullWritable> delegate = (new AvroKeyInputFormat<GenericRecord>())
.createRecordReader(split, context);
return new AvroRecordReader(delegate, schema, pathField, metadataFields);
}

/**
Expand All @@ -56,13 +69,15 @@ static class AvroRecordReader extends RecordReader<NullWritable, StructuredRecor
private final RecordReader<AvroKey<GenericRecord>, NullWritable> delegate;
private final AvroToStructuredTransformer recordTransformer;
private final String pathField;
private final Map<String, MetadataField> metadataFields;
private Schema schema;

AvroRecordReader(RecordReader<AvroKey<GenericRecord>, NullWritable> delegate, @Nullable Schema schema,
@Nullable String pathField) {
@Nullable String pathField, @Nullable Map<String, MetadataField> metadataFields) {
this.delegate = delegate;
this.schema = schema;
this.pathField = pathField;
this.metadataFields = metadataFields == null ? Collections.EMPTY_MAP : metadataFields;
this.recordTransformer = new AvroToStructuredTransformer();
}

Expand All @@ -87,18 +102,25 @@ public StructuredRecord.Builder getCurrentValue() throws IOException, Interrupte
// if schema is null, but we're still able to read, that means the file contains the schema information
// set the schema based on the schema of the record
if (schema == null) {
if (pathField == null) {
if (pathField == null && metadataFields.isEmpty()) {
schema = Schema.parseJson(genericRecord.getSchema().toString());
} else {
// if there is a path field, add the path as a field in the schema
Schema schemaWithoutPath = Schema.parseJson(genericRecord.getSchema().toString());
List<Schema.Field> fields = new ArrayList<>(schemaWithoutPath.getFields().size() + 1);
fields.addAll(schemaWithoutPath.getFields());
fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING)));
if (pathField != null) {
fields.add(Schema.Field.of(pathField, Schema.of(Schema.Type.STRING)));
}
for (String fieldName : metadataFields.keySet()) {
fields.add(Schema.Field.of(fieldName, Schema.of(metadataFields.get(fieldName).getSchemaType())));
}
schema = Schema.recordOf(schemaWithoutPath.getRecordName(), fields);
}
}
return recordTransformer.transform(genericRecord, schema, pathField);
List<String> fieldsToExclude = new ArrayList<>(metadataFields.keySet());
fieldsToExclude.add(pathField);
return recordTransformer.transform(genericRecord, schema, fieldsToExclude);
}

@Override
Expand Down
Loading