Skip to content

Commit

Permalink
Fixes to RealtimeInputFormat and RealtimeRecordReader and update docu…
Browse files Browse the repository at this point in the history
…mentation for HiveSyncTool
  • Loading branch information
prasannarajaperumal authored and prazanna committed Jun 16, 2017
1 parent 521555c commit 4b26be9
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 73 deletions.
22 changes: 12 additions & 10 deletions docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,23 @@ bin/hiveserver2 \

#### Hive Sync Tool

Once Hive is up and running, the sync tool can be used to sync commits done above to a Hive table, as follows.
Hive Sync Tool will update/create the necessary metadata(schema and partitions) in hive metastore.
This allows for schema evolution and incremental addition of new partitions written to.
It uses an incremental approach by storing the last commit time synced in the TBLPROPERTIES and only syncing the commits from the last sync commit time stored.
This can be run as frequently as the ingestion pipeline to make sure new partitions and schema evolution changes are reflected immediately.

```
java -cp target/hoodie-hive-0.3.1-SNAPSHOT-jar-with-dependencies.jar:target/jars/* com.uber.hoodie.hive.HiveSyncTool \
--base-path file:///tmp/hoodie/sample-table/ \
--database default \
--table hoodie_test \
--user hive \
--pass hive \
--jdbc-url jdbc:hive2://localhost:10010/
{JAVA8}/bin/java -cp "/etc/hive/conf:./hoodie-hive-0.3.8-SNAPSHOT-jar-with-dependencies.jar:/opt/hadoop/lib/hadoop-mapreduce/*" com.uber.hoodie.hive.HiveSyncTool
--user hive
--pass hive
--database default
--jdbc-url "jdbc:hive2://localhost:10010/"
--base-path tmp/hoodie/sample-table/
--table hoodie_test
--partitioned-by field1,field2
```

{% include callout.html content="Hive sync tools does not yet support Merge-On-Read tables." type="info" %}



#### Manually via Beeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class CompactionWriteStat implements Serializable {

private final HoodieWriteStat writeStat;
private HoodieWriteStat writeStat;
private String partitionPath;
private final long totalLogRecords;
private final long totalLogFiles;
private final long totalRecordsToBeUpdate;
private long totalLogRecords;
private long totalLogFiles;
private long totalRecordsToBeUpdate;

public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long totalLogFiles, long totalLogRecords,
long totalRecordsToUpdate) {
Expand All @@ -43,6 +43,10 @@ public CompactionWriteStat(HoodieWriteStat writeStat, String partitionPath, long
this.totalRecordsToBeUpdate = totalRecordsToUpdate;
}

public CompactionWriteStat() {
// For de-serialization
}

public long getTotalLogRecords() {
return totalLogRecords;
}
Expand Down
5 changes: 5 additions & 0 deletions hoodie-hadoop-mr/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down Expand Up @@ -108,6 +112,7 @@
<artifactSet>
<includes>
<include>com.uber.hoodie:hoodie-common</include>
<include>com.twitter:parquet-avro</include>
</includes>
</artifactSet>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import com.google.common.base.Preconditions;

import com.google.common.collect.Sets;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
// These positions have to be deterministic across all tables
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
Expand Down Expand Up @@ -112,9 +115,18 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<FileSplit> dataFileSplits = groupedInputSplits.get(dataFile.getFileId());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = logFiles.stream().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
String maxCommitTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add(new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime));
List<String> logFilePaths = logFiles.stream()
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList());
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline()
.getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add(
new HoodieRealtimeFileSplit(split, logFilePaths, maxCommitTime));
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
}
Expand All @@ -124,7 +136,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
throw new HoodieIOException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});

LOG.info("Returning a total splits of " + rtSplits.size());
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
}

Expand All @@ -135,54 +147,67 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
return super.listStatus(job);
}


private static Configuration addExtraReadColsIfNeeded(Configuration configuration) {
String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
String readColIds = configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);

if (!readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNames + "," + HoodieRecord.RECORD_KEY_METADATA_FIELD);
configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
readColIds + "," + HOODIE_RECORD_KEY_COL_POS);
LOG.info(String.format("Adding extra _hoodie_record_key column, to enable log merging cols (%s) ids (%s) ",
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
/**
* Add a field to the existing fields projected
*/
private static Configuration addProjectionField(Configuration conf, String fieldName,
int fieldIndex) {
String readColNames = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
String readColIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "");

String readColNamesPrefix = readColNames + ",";
if (readColNames == null || readColNames.isEmpty()) {
readColNamesPrefix = "";
}
String readColIdsPrefix = readColIds + ",";
if (readColIds == null || readColIds.isEmpty()) {
readColIdsPrefix = "";
}

if (!readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)) {
configuration.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNames + "," + HoodieRecord.COMMIT_TIME_METADATA_FIELD);
configuration.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
readColIds + "," + HOODIE_COMMIT_TIME_COL_POS);
LOG.info(String.format("Adding extra _hoodie_commit_time column, to enable log merging cols (%s) ids (%s) ",
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
if (!readColNames.contains(fieldName)) {
// If not already in the list - then add it
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
readColNamesPrefix + fieldName);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Adding extra column " + fieldName
+ ", to enable log merging cols (%s) ids (%s) ",
conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
}
}
return conf;
}

private static Configuration addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD,
HOODIE_RECORD_KEY_COL_POS);
configuration = addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD,
HOODIE_COMMIT_TIME_COL_POS);
configuration = addProjectionField(configuration,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
return configuration;
}



@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job,
final Reporter reporter) throws IOException {
LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit");
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split );
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter));
}

@Override
public void setConf(Configuration conf) {
this.conf = addExtraReadColsIfNeeded(conf);
this.conf = addRequiredProjectionFields(conf);
}

@Override
public Configuration getConf() {
return addExtraReadColsIfNeeded(conf);
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

package com.uber.hoodie.hadoop.realtime;

import com.google.common.collect.Lists;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
Expand All @@ -45,18 +49,15 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.schema.MessageType;

/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time
Expand All @@ -83,37 +84,54 @@ public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,

LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
try {
baseFileSchema = ParquetUtils.readSchema(split.getPath());
baseFileSchema = readSchema(jobConf, split.getPath());
readAndCompactLog();
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
throw new HoodieIOException(
"Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
}
}

/**
* Reads the schema from the parquet file. This is different from ParquetUtils as it uses the
* twitter parquet to support hive 1.1.0
*/
private static MessageType readSchema(Configuration conf, Path parquetFilePath) {
try {
return ParquetFileReader.readFooter(conf, parquetFilePath).getFileMetaData()
.getSchema();
} catch (IOException e) {
throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath,
e);
}
}


/**
* Goes through the log files and populates a map with latest version of each key logged, since the base split was written.
*/
private void readAndCompactLog() throws IOException {
Schema writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
List<String> projectionFields = orderFields(
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
jobConf.get("partition_columns"));
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
jobConf.get("partition_columns", ""));
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);

LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
LOG.info(
String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));

HoodieCompactedLogRecordScanner compactedLogRecordScanner =
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), readerSchema);
Iterator<HoodieRecord<HoodieAvroPayload>> itr = compactedLogRecordScanner.iterator();
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(),
readerSchema);

// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
while(itr.hasNext()) {
HoodieRecord<HoodieAvroPayload> hoodieRecord = itr.next();
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema).get();
for (HoodieRecord<HoodieAvroPayload> hoodieRecord : compactedLogRecordScanner) {
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema)
.get();
String key = hoodieRecord.getRecordKey();
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
Expand Down Expand Up @@ -146,22 +164,27 @@ private static String arrayWritableToString(ArrayWritable writable) {
* @param fieldOrderCsv
* @return
*/
public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, String partitioningFieldsCsv) {
public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv,
String partitioningFieldsCsv) {

String[] fieldOrders = fieldOrderCsv.split(",");
Set<String> partitioningFields = Arrays.stream(partitioningFieldsCsv.split(",")).collect(Collectors.toSet());
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
Set<String> partitioningFields = Arrays.stream(partitioningFieldsCsv.split(","))
.collect(Collectors.toSet());
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
.filter(fn -> !partitioningFields.contains(fn)).collect(
Collectors.toList());

// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
if (fieldNames.size() != fieldOrders.length) {
throw new HoodieException(String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
fieldNames.size(), fieldOrders.length));
throw new HoodieException(String.format(
"Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
fieldNames.size(), fieldOrders.length));
}
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
for (int ox=0; ox < fieldOrders.length; ox++) {
for (int ox = 0; ox < fieldOrders.length; ox++) {
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox));
}
return orderedFieldMap.values().stream().collect(Collectors.toList());
return new ArrayList<>(orderedFieldMap.values());
}

/**
Expand Down Expand Up @@ -235,6 +258,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) {
return new ArrayWritable(Writable.class, values2);
case MAP:
// TODO(vc): Need to add support for complex types
return NullWritable.get();
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
Expand Down Expand Up @@ -271,7 +295,10 @@ public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException
key, arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key))));
}
if (deltaRecordMap.containsKey(key)) {
arrayWritable.set(deltaRecordMap.get(key).get());
Writable[] replaceValue = deltaRecordMap.get(key).get();
Writable[] originalValue = arrayWritable.get();
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
arrayWritable.set(originalValue);
}
return true;
}
Expand Down
Loading

0 comments on commit 4b26be9

Please sign in to comment.