diff --git a/docs/quickstart.md b/docs/quickstart.md
index ecc31992cf7b..b04accdb2fdf 100644
--- a/docs/quickstart.md
+++ b/docs/quickstart.md
@@ -70,21 +70,23 @@ bin/hiveserver2 \
#### Hive Sync Tool
-Once Hive is up and running, the sync tool can be used to sync commits done above to a Hive table, as follows.
+Hive Sync Tool will update/create the necessary metadata(schema and partitions) in hive metastore.
+This allows for schema evolution and incremental addition of new partitions written to.
+It uses an incremental approach by storing the last commit time synced in the TBLPROPERTIES and only syncing the commits from the last sync commit time stored.
+This can be run as frequently as the ingestion pipeline to make sure new partitions and schema evolution changes are reflected immediately.
```
-java -cp target/hoodie-hive-0.3.1-SNAPSHOT-jar-with-dependencies.jar:target/jars/* com.uber.hoodie.hive.HiveSyncTool \
- --base-path file:///tmp/hoodie/sample-table/ \
- --database default \
- --table hoodie_test \
- --user hive \
- --pass hive \
- --jdbc-url jdbc:hive2://localhost:10010/
+{JAVA8}/bin/java -cp "/etc/hive/conf:./hoodie-hive-0.3.8-SNAPSHOT-jar-with-dependencies.jar:/opt/hadoop/lib/hadoop-mapreduce/*" com.uber.hoodie.hive.HiveSyncTool
+ --user hive
+ --pass hive
+ --database default
+ --jdbc-url "jdbc:hive2://localhost:10010/"
+ --base-path tmp/hoodie/sample-table/
+ --table hoodie_test
+ --partitioned-by field1,field2
```
-{% include callout.html content="Hive sync tools does not yet support Merge-On-Read tables." type="info" %}
-
#### Manually via Beeline
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java
index bdac4babb529..1ff704bbbef0 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionWriteStat.java
@@ -28,11 +28,11 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class CompactionWriteStat implements Serializable {
- private final HoodieWriteStat writeStat;
+ private HoodieWriteStat writeStat;
private String partitionPath;
- private final long totalLogRecords;
- private final long totalLogFiles;
- private final long totalRecordsToBeUpdate;
+ private long totalLogRecords;
+ private long totalLogFiles;
+ private long totalRecordsToBeUpdate;
public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long totalLogFiles, long totalLogRecords,
long totalRecordsToUpdate) {
@@ -43,6 +43,10 @@ public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long
this.totalRecordsToBeUpdate = totalRecordsToUpdate;
}
+ public CompactionWriteStat() {
+ // For de-serialization
+ }
+
public long getTotalLogRecords() {
return totalLogRecords;
}
diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml
index 01137dbdd7eb..52429a4f0b72 100644
--- a/hoodie-hadoop-mr/pom.xml
+++ b/hoodie-hadoop-mr/pom.xml
@@ -75,6 +75,10 @@
org.apache.parquet
parquet-avro
+
+ com.twitter
+ parquet-avro
+
org.apache.avro
avro
@@ -108,6 +112,7 @@
com.uber.hoodie:hoodie-common
+ com.twitter:parquet-avro
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
index 3a60c21873c0..047c6e2850a2 100644
--- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
@@ -20,9 +20,11 @@
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
@@ -66,6 +68,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
// These positions have to be deterministic across all tables
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
+ public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
@@ -112,9 +115,18 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List dataFileSplits = groupedInputSplits.get(dataFile.getFileId());
dataFileSplits.forEach(split -> {
try {
- List logFilePaths = logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
- String maxCommitTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
- rtSplits.add(new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime));
+ List logFilePaths = logFiles.stream()
+ .map(logFile -> logFile.getPath().toString())
+ .collect(Collectors.toList());
+ // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
+ String maxCommitTime = metaClient.getActiveTimeline()
+ .getTimelineOfActions(
+ Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
+ HoodieTimeline.COMPACTION_ACTION,
+ HoodieTimeline.DELTA_COMMIT_ACTION))
+ .filterCompletedInstants().lastInstant().get().getTimestamp();
+ rtSplits.add(
+ new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime));
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
@@ -124,7 +136,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
throw new HoodieIOException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});
-
+ LOG.info("Returning a total splits of " + rtSplits.size());
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
}
@@ -135,36 +147,49 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
return super.listStatus(job);
}
-
- private static Configuration addExtraReadColsIfNeeded(Configuration configuration) {
- String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
- String readColIds = configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
-
- if (!readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
- configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
- readColNames + "," + HoodieRecord.RECORD_KEY_METADATA_FIELD);
- configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
- readColIds + "," + HOODIE_RECORD_KEY_COL_POS);
- LOG.info(String.format("Adding extra _hoodie_record_key column, to enable log merging cols (%s) ids (%s) ",
- configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
- configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
+ /**
+ * Add a field to the existing fields projected
+ */
+ private static Configuration addProjectionField(Configuration conf, String fieldName,
+ int fieldIndex) {
+ String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
+ String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "");
+
+ String readColNamesPrefix = readColNames + ",";
+ if (readColNames == null || readColNames.isEmpty()) {
+ readColNamesPrefix = "";
+ }
+ String readColIdsPrefix = readColIds + ",";
+ if (readColIds == null || readColIds.isEmpty()) {
+ readColIdsPrefix = "";
}
- if (!readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
- configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
- readColNames + "," + HoodieRecord.COMMIT_TIME_METADATA_FIELD);
- configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
- readColIds + "," + HOODIE_COMMIT_TIME_COL_POS);
- LOG.info(String.format("Adding extra _hoodie_commit_time column, to enable log merging cols (%s) ids (%s) ",
- configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
- configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
+ if (!readColNames.contains(fieldName)) {
+ // If not already in the list - then add it
+ conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
+ readColNamesPrefix + fieldName);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Adding extra column " + fieldName
+ + ", to enable log merging cols (%s) ids (%s) ",
+ conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+ conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
+ }
}
+ return conf;
+ }
+ private static Configuration addRequiredProjectionFields(Configuration configuration) {
+ // Need this to do merge records in HoodieRealtimeRecordReader
+ configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD,
+ HOODIE_RECORD_KEY_COL_POS);
+ configuration = addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ HOODIE_COMMIT_TIME_COL_POS);
+ configuration = addProjectionField(configuration,
+ HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
return configuration;
}
-
-
@Override
public RecordReader getRecordReader(final InputSplit split,
final JobConf job,
@@ -172,17 +197,17 @@ public RecordReader getRecordReader(final InputSplit split,
LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
- "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit");
+ "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split );
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter));
}
@Override
public void setConf(Configuration conf) {
- this.conf = addExtraReadColsIfNeeded(conf);
+ this.conf = addRequiredProjectionFields(conf);
}
@Override
public Configuration getConf() {
- return addExtraReadColsIfNeeded(conf);
+ return conf;
}
}
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
index cbd485461b84..ed22c14c2222 100644
--- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
@@ -18,20 +18,24 @@
package com.uber.hoodie.hadoop.realtime;
+import com.google.common.collect.Lists;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieRecord;
-import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
-import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
@@ -45,18 +49,15 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
+import parquet.avro.AvroSchemaConverter;
+import parquet.hadoop.ParquetFileReader;
+import parquet.schema.MessageType;
/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time
@@ -83,37 +84,54 @@ public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
try {
- baseFileSchema = ParquetUtils.readSchema(split.getPath());
+ baseFileSchema = readSchema(jobConf, split.getPath());
readAndCompactLog();
} catch (IOException e) {
- throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
+ throw new HoodieIOException(
+ "Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
}
}
+ /**
+ * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the
+ * twitter parquet to support hive 1.1.0
+ */
+ private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
+ try {
+ return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData()
+ .getSchema();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath,
+ e);
+ }
+ }
+
+
/**
* Goes through the log files and populates a map with latest version of each key logged, since the base split was written.
*/
private void readAndCompactLog() throws IOException {
Schema writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
List projectionFields = orderFields(
- jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
- jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
- jobConf.get("partition_columns"));
+ jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+ jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
+ jobConf.get("partition_columns", ""));
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
- LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
+ LOG.info(
+ String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
HoodieCompactedLogRecordScanner compactedLogRecordScanner =
- new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), readerSchema);
- Iterator> itr = compactedLogRecordScanner.iterator();
+ new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(),
+ readerSchema);
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
- while(itr.hasNext()) {
- HoodieRecord hoodieRecord = itr.next();
- GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema).get();
+ for (HoodieRecord hoodieRecord : compactedLogRecordScanner) {
+ GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema)
+ .get();
String key = hoodieRecord.getRecordKey();
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
@@ -146,22 +164,27 @@ private static String arrayWritableToString(ArrayWritable writable) {
* @param fieldOrderCsv
* @return
*/
- public static List orderFields(String fieldNameCsv, String fieldOrderCsv, String partitioningFieldsCsv) {
+ public static List orderFields(String fieldNameCsv, String fieldOrderCsv,
+ String partitioningFieldsCsv) {
String[] fieldOrders = fieldOrderCsv.split(",");
- Set partitioningFields = Arrays.stream(partitioningFieldsCsv.split(",")).collect(Collectors.toSet());
- List fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
+ Set partitioningFields = Arrays.stream(partitioningFieldsCsv.split(","))
+ .collect(Collectors.toSet());
+ List fieldNames = Arrays.stream(fieldNameCsv.split(","))
+ .filter(fn -> !partitioningFields.contains(fn)).collect(
+ Collectors.toList());
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
if (fieldNames.size() != fieldOrders.length) {
- throw new HoodieException(String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
- fieldNames.size(), fieldOrders.length));
+ throw new HoodieException(String.format(
+ "Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
+ fieldNames.size(), fieldOrders.length));
}
TreeMap orderedFieldMap = new TreeMap<>();
- for (int ox=0; ox < fieldOrders.length; ox++) {
+ for (int ox = 0; ox < fieldOrders.length; ox++) {
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox));
}
- return orderedFieldMap.values().stream().collect(Collectors.toList());
+ return new ArrayList<>(orderedFieldMap.values());
}
/**
@@ -235,6 +258,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) {
return new ArrayWritable(Writable.class, values2);
case MAP:
// TODO(vc): Need to add support for complex types
+ return NullWritable.get();
case UNION:
List types = schema.getTypes();
if (types.size() != 2) {
@@ -271,7 +295,10 @@ public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException
key, arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key))));
}
if (deltaRecordMap.containsKey(key)) {
- arrayWritable.set(deltaRecordMap.get(key).get());
+ Writable[] replaceValue = deltaRecordMap.get(key).get();
+ Writable[] originalValue = arrayWritable.get();
+ System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
+ arrayWritable.set(originalValue);
}
return true;
}
diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml
index d9a8a5626e1f..17c10fd39a39 100644
--- a/hoodie-hive/pom.xml
+++ b/hoodie-hive/pom.xml
@@ -167,7 +167,7 @@
- com.uber.hoodie.hive.example.HoodieHiveSyncExample
+ com.uber.hoodie.hive.HiveSyncTool
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java
index 159f695da681..4f40355de222 100644
--- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java
@@ -49,7 +49,7 @@ public class HiveSyncConfig implements Serializable {
"--base-path"}, description = "Basepath of hoodie dataset to sync", required = true)
public String basePath;
- @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
+ @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by", required = true)
public List partitionFields = new ArrayList<>();
@Parameter(names = "-partition-value-extractor", description = "Class which implements PartitionValueExtractor to extract the partition values from HDFS path")