From 4b26be9f6178dde6cace2ed87ae3d2ae8b4ac827 Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Thu, 15 Jun 2017 05:40:59 -0700 Subject: [PATCH] Fixes to RealtimeInputFormat and RealtimeRecordReader and update documentation for HiveSyncTool --- docs/quickstart.md | 22 ++--- .../common/model/CompactionWriteStat.java | 12 ++- hoodie-hadoop-mr/pom.xml | 5 ++ .../realtime/HoodieRealtimeInputFormat.java | 85 ++++++++++++------- .../realtime/HoodieRealtimeRecordReader.java | 81 ++++++++++++------ hoodie-hive/pom.xml | 2 +- .../com/uber/hoodie/hive/HiveSyncConfig.java | 2 +- 7 files changed, 136 insertions(+), 73 deletions(-) diff --git a/docs/quickstart.md b/docs/quickstart.md index ecc31992cf7b..b04accdb2fdf 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -70,21 +70,23 @@ bin/hiveserver2 \ #### Hive Sync Tool -Once Hive is up and running, the sync tool can be used to sync commits done above to a Hive table, as follows. +Hive Sync Tool will update/create the necessary metadata(schema and partitions) in hive metastore. +This allows for schema evolution and incremental addition of new partitions written to. +It uses an incremental approach by storing the last commit time synced in the TBLPROPERTIES and only syncing the commits from the last sync commit time stored. +This can be run as frequently as the ingestion pipeline to make sure new partitions and schema evolution changes are reflected immediately. ``` -java -cp target/hoodie-hive-0.3.1-SNAPSHOT-jar-with-dependencies.jar:target/jars/* com.uber.hoodie.hive.HiveSyncTool \ - --base-path file:///tmp/hoodie/sample-table/ \ - --database default \ - --table hoodie_test \ - --user hive \ - --pass hive \ - --jdbc-url jdbc:hive2://localhost:10010/ +{JAVA8}/bin/java -cp "/etc/hive/conf:./hoodie-hive-0.3.8-SNAPSHOT-jar-with-dependencies.jar:/opt/hadoop/lib/hadoop-mapreduce/*" com.uber.hoodie.hive.HiveSyncTool + --user hive + --pass hive + --database default + --jdbc-url "jdbc:hive2://localhost:10010/" + --base-path tmp/hoodie/sample-table/ + --table hoodie_test + --partitioned-by field1,field2 ``` -{% include callout.html content="Hive sync tools does not yet support Merge-On-Read tables." type="info" %} - #### Manually via Beeline diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java index bdac4babb529..1ff704bbbef0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java @@ -28,11 +28,11 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class CompactionWriteStat implements Serializable { - private final HoodieWriteStat writeStat; + private HoodieWriteStat writeStat; private String partitionPath; - private final long totalLogRecords; - private final long totalLogFiles; - private final long totalRecordsToBeUpdate; + private long totalLogRecords; + private long totalLogFiles; + private long totalRecordsToBeUpdate; public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long totalLogFiles, long totalLogRecords, long totalRecordsToUpdate) { @@ -43,6 +43,10 @@ public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long this.totalRecordsToBeUpdate = totalRecordsToUpdate; } + public CompactionWriteStat() { + // For de-serialization + } + public long getTotalLogRecords() { return totalLogRecords; } diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml index 01137dbdd7eb..52429a4f0b72 100644 --- a/hoodie-hadoop-mr/pom.xml +++ b/hoodie-hadoop-mr/pom.xml @@ -75,6 +75,10 @@ org.apache.parquet parquet-avro + + com.twitter + parquet-avro + org.apache.avro avro @@ -108,6 +112,7 @@ com.uber.hoodie:hoodie-common + com.twitter:parquet-avro diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index 3a60c21873c0..047c6e2850a2 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -20,9 +20,11 @@ import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFile; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; @@ -66,6 +68,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf // These positions have to be deterministic across all tables public static final int HOODIE_COMMIT_TIME_COL_POS = 0; public static final int HOODIE_RECORD_KEY_COL_POS = 2; + public static final int HOODIE_PARTITION_PATH_COL_POS = 3; @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -112,9 +115,18 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { List dataFileSplits = groupedInputSplits.get(dataFile.getFileId()); dataFileSplits.forEach(split -> { try { - List logFilePaths = logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); - String maxCommitTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); - rtSplits.add(new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime)); + List logFilePaths = logFiles.stream() + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList()); + // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table + String maxCommitTime = metaClient.getActiveTimeline() + .getTimelineOfActions( + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, + HoodieTimeline.COMPACTION_ACTION, + HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + rtSplits.add( + new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime)); } catch (IOException e) { throw new HoodieIOException("Error creating hoodie real time split ", e); } @@ -124,7 +136,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { throw new HoodieIOException("Error obtaining data file/log file grouping: " + partitionPath, e); } }); - + LOG.info("Returning a total splits of " + rtSplits.size()); return rtSplits.toArray(new InputSplit[rtSplits.size()]); } @@ -135,36 +147,49 @@ public FileStatus[] listStatus(JobConf job) throws IOException { return super.listStatus(job); } - - private static Configuration addExtraReadColsIfNeeded(Configuration configuration) { - String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); - String readColIds = configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); - - if (!readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { - configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, - readColNames + "," + HoodieRecord.RECORD_KEY_METADATA_FIELD); - configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, - readColIds + "," + HOODIE_RECORD_KEY_COL_POS); - LOG.info(String.format("Adding extra _hoodie_record_key column, to enable log merging cols (%s) ids (%s) ", - configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), - configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR))); + /** + * Add a field to the existing fields projected + */ + private static Configuration addProjectionField(Configuration conf, String fieldName, + int fieldIndex) { + String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""); + String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, ""); + + String readColNamesPrefix = readColNames + ","; + if (readColNames == null || readColNames.isEmpty()) { + readColNamesPrefix = ""; + } + String readColIdsPrefix = readColIds + ","; + if (readColIds == null || readColIds.isEmpty()) { + readColIdsPrefix = ""; } - if (!readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) { - configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, - readColNames + "," + HoodieRecord.COMMIT_TIME_METADATA_FIELD); - configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, - readColIds + "," + HOODIE_COMMIT_TIME_COL_POS); - LOG.info(String.format("Adding extra _hoodie_commit_time column, to enable log merging cols (%s) ids (%s) ", - configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), - configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR))); + if (!readColNames.contains(fieldName)) { + // If not already in the list - then add it + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, + readColNamesPrefix + fieldName); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Adding extra column " + fieldName + + ", to enable log merging cols (%s) ids (%s) ", + conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR))); + } } + return conf; + } + private static Configuration addRequiredProjectionFields(Configuration configuration) { + // Need this to do merge records in HoodieRealtimeRecordReader + configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, + HOODIE_RECORD_KEY_COL_POS); + configuration = addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, + HOODIE_COMMIT_TIME_COL_POS); + configuration = addProjectionField(configuration, + HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS); return configuration; } - - @Override public RecordReader getRecordReader(final InputSplit split, final JobConf job, @@ -172,17 +197,17 @@ public RecordReader getRecordReader(final InputSplit split, LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); // sanity check Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit, - "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit"); + "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split ); return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter)); } @Override public void setConf(Configuration conf) { - this.conf = addExtraReadColsIfNeeded(conf); + this.conf = addRequiredProjectionFields(conf); } @Override public Configuration getConf() { - return addExtraReadColsIfNeeded(conf); + return conf; } } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index cbd485461b84..ed22c14c2222 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -18,20 +18,24 @@ package com.uber.hoodie.hadoop.realtime; +import com.google.common.collect.Lists; import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.io.ArrayWritable; @@ -45,18 +49,15 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.TreeMap; -import java.util.stream.Collectors; +import parquet.avro.AvroSchemaConverter; +import parquet.hadoop.ParquetFileReader; +import parquet.schema.MessageType; /** * Record Reader implementation to merge fresh avro data with base parquet data, to support real time @@ -83,37 +84,54 @@ public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); try { - baseFileSchema = ParquetUtils.readSchema(split.getPath()); + baseFileSchema = readSchema(jobConf, split.getPath()); readAndCompactLog(); } catch (IOException e) { - throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); + throw new HoodieIOException( + "Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); } } + /** + * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the + * twitter parquet to support hive 1.1.0 + */ + private static MessageType readSchema(Configuration conf, Path parquetFilePath) { + try { + return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData() + .getSchema(); + } catch (IOException e) { + throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, + e); + } + } + + /** * Goes through the log files and populates a map with latest version of each key logged, since the base split was written. */ private void readAndCompactLog() throws IOException { Schema writerSchema = new AvroSchemaConverter().convert(baseFileSchema); List projectionFields = orderFields( - jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), - jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), - jobConf.get("partition_columns")); + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), + jobConf.get("partition_columns", "")); // TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields); - LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", + LOG.info( + String.format("About to read compacted logs %s for base split %s, projecting cols %s", split.getDeltaFilePaths(), split.getPath(), projectionFields)); HoodieCompactedLogRecordScanner compactedLogRecordScanner = - new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), readerSchema); - Iterator> itr = compactedLogRecordScanner.iterator(); + new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), + readerSchema); // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // but can return records for completed commits > the commit we are trying to read (if using readCommit() API) - while(itr.hasNext()) { - HoodieRecord hoodieRecord = itr.next(); - GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema).get(); + for (HoodieRecord hoodieRecord : compactedLogRecordScanner) { + GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema) + .get(); String key = hoodieRecord.getRecordKey(); // we assume, a later safe record in the log, is newer than what we have in the map & replace it. ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema); @@ -146,22 +164,27 @@ private static String arrayWritableToString(ArrayWritable writable) { * @param fieldOrderCsv * @return */ - public static List orderFields(String fieldNameCsv, String fieldOrderCsv, String partitioningFieldsCsv) { + public static List orderFields(String fieldNameCsv, String fieldOrderCsv, + String partitioningFieldsCsv) { String[] fieldOrders = fieldOrderCsv.split(","); - Set partitioningFields = Arrays.stream(partitioningFieldsCsv.split(",")).collect(Collectors.toSet()); - List fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); + Set partitioningFields = Arrays.stream(partitioningFieldsCsv.split(",")) + .collect(Collectors.toSet()); + List fieldNames = Arrays.stream(fieldNameCsv.split(",")) + .filter(fn -> !partitioningFields.contains(fn)).collect( + Collectors.toList()); // Hive does not provide ids for partitioning fields, so check for lengths excluding that. if (fieldNames.size() != fieldOrders.length) { - throw new HoodieException(String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", - fieldNames.size(), fieldOrders.length)); + throw new HoodieException(String.format( + "Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", + fieldNames.size(), fieldOrders.length)); } TreeMap orderedFieldMap = new TreeMap<>(); - for (int ox=0; ox < fieldOrders.length; ox++) { + for (int ox = 0; ox < fieldOrders.length; ox++) { orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox)); } - return orderedFieldMap.values().stream().collect(Collectors.toList()); + return new ArrayList<>(orderedFieldMap.values()); } /** @@ -235,6 +258,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { return new ArrayWritable(Writable.class, values2); case MAP: // TODO(vc): Need to add support for complex types + return NullWritable.get(); case UNION: List types = schema.getTypes(); if (types.size() != 2) { @@ -271,7 +295,10 @@ public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException key, arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key)))); } if (deltaRecordMap.containsKey(key)) { - arrayWritable.set(deltaRecordMap.get(key).get()); + Writable[] replaceValue = deltaRecordMap.get(key).get(); + Writable[] originalValue = arrayWritable.get(); + System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length); + arrayWritable.set(originalValue); } return true; } diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml index d9a8a5626e1f..17c10fd39a39 100644 --- a/hoodie-hive/pom.xml +++ b/hoodie-hive/pom.xml @@ -167,7 +167,7 @@ - com.uber.hoodie.hive.example.HoodieHiveSyncExample + com.uber.hoodie.hive.HiveSyncTool diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java index 159f695da681..4f40355de222 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java @@ -49,7 +49,7 @@ public class HiveSyncConfig implements Serializable { "--base-path"}, description = "Basepath of hoodie dataset to sync", required = true) public String basePath; - @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by") + @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by", required = true) public List partitionFields = new ArrayList<>(); @Parameter(names = "-partition-value-extractor", description = "Class which implements PartitionValueExtractor to extract the partition values from HDFS path")