Skip to content

Commit

Permalink
Fixes #2039 (#2040)
Browse files Browse the repository at this point in the history
* Fixes #2039
- Restored ability to configure AccumuloOutputFormat's accumulo client via script
- Cleanup of IngestJob's CBMutationOutputFormatter setup for live ingest

* RE: #2039 - Ensure that parent class configure method is used
  • Loading branch information
keith-ratcliffe committed Jul 18, 2023
1 parent a90449a commit 2df441b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,20 @@ public static void setInstanceName(Configuration conf, String instanceName) {
public static void setZooKeepers(Configuration conf, String zooKeepers) {
conf.set(ZOOKEEPERS, zooKeepers);
}

public static String getUsername(Configuration conf) {
return conf.get(USERNAME);
}

public static byte[] getPassword(Configuration conf) {
return Base64.decodeBase64(ConfigurationHelper.isNull(conf, PASSWORD, String.class).getBytes());
}

public static String getInstanceName(Configuration conf) {
return conf.get(INSTANCE_NAME);
}

public static String getZooKeepers(Configuration conf) {
return conf.get(ZOOKEEPERS);
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
package datawave.ingest.mapreduce.job;

import java.io.IOException;
import java.util.Properties;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.hadoop.mapreduce.OutputFormatBuilder;
import org.apache.accumulo.hadoopImpl.mapreduce.OutputFormatBuilderImpl;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.OutputConfigurator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;

import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler;

public class CBMutationOutputFormatter extends AccumuloOutputFormat {

private static final Logger log = Logger.getLogger(CBMutationOutputFormatter.class);

@Override
public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException {
return new CBRecordWriter(super.getRecordWriter(attempt), attempt);
}

public static Properties getClientProperties(Configuration conf) {
// Get any AccumuloClient property customizations (e.g., batch writer config overrides, etc)
Properties clientProps = OutputConfigurator.getClientProperties(AccumuloOutputFormat.class, conf);
//@formatter:off
// Convert DW's connection conf keys into Accumulo-compatible keys as required
clientProps.putAll(Accumulo.newClientProperties()
.to(AccumuloHelper.getInstanceName(conf), AccumuloHelper.getZooKeepers(conf))
.as(AccumuloHelper.getUsername(conf), new PasswordToken(new String(AccumuloHelper.getPassword(conf))))
.build());
//@formatter:on
return clientProps;
}

public static OutputFormatBuilder.OutputOptions<Job> configure(Configuration conf) {
return AccumuloOutputFormat.configure().clientProperties(getClientProperties(conf));
}

public static class CBRecordWriter extends RecordWriter<Text,Mutation> {
private RecordWriter<Text,Mutation> delegate;
private String eventTable = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,10 @@
import java.util.Set;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.NamespaceExistsException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NamespaceOperations;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyValue;
Expand Down Expand Up @@ -1027,9 +1019,7 @@ protected void configureJob(Job job, Configuration conf, Path workDirPath, FileS
// Setup the Output
job.setWorkingDirectory(workDirPath);
if (outputMutations) {
CBMutationOutputFormatter.configure()
.clientProperties(Accumulo.newClientProperties().to(instanceName, zooKeepers).as(userName, new PasswordToken(password)).build())
.createTables(true).store(job);
CBMutationOutputFormatter.configure(job.getConfiguration()).createTables(true).store(job);
job.setOutputFormatClass(CBMutationOutputFormatter.class);
} else {
FileOutputFormat.setOutputPath(job, new Path(workDirPath, "mapFiles"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ TIMEOUT=600000
INPUT_FILES=$1
REDUCERS=$2
EXTRA_OPTS=${@:3}
# Don't change this option unless you know what you're doing. There's a specific format to the field. See AccumuloOutputFormat for details.
BATCHWRITER_OPTS="-AccumuloOutputFormat.WriteOpts.BatchWriterConfig= 11#maxMemory=100000000,maxWriteThreads=4"
# Customize accumulo client properties for AccumuloOutputFormat (newline-separated, one property per line)
BATCHWRITER_OPTS="-AccumuloOutputFormat.ClientOpts.ClientProps=
batch.writer.memory.max=100000000B
batch.writer.threads.max=4
"
MAPRED_OPTS="-mapreduce.map.memory.mb=$MAP_MEMORY_MB -mapreduce.reduce.memory.mb=$REDUCE_MEMORY_MB -mapreduce.job.reduces=$REDUCERS -mapreduce.task.io.sort.mb=${BULK_CHILD_IO_SORT_MB} -mapreduce.task.io.sort.factor=100 -bulk.ingest.mapper.threads=0 -bulk.ingest.mapper.workqueue.size=10000 -io.file.buffer.size=1048576 -dfs.bytes-per-checksum=4096 -io.sort.record.percent=.10 -mapreduce.map.output.compress=${BULK_MAP_OUTPUT_COMPRESS} -mapreduce.map.output.compress.codec=${BULK_MAP_OUTPUT_COMPRESSION_CODEC} -mapreduce.output.fileoutputformat.compress.type=${BULK_MAP_OUTPUT_COMPRESSION_TYPE} $PART_ARG -mapreduce.task.timeout=$TIMEOUT -markerFileReducePercentage 0.33 -context.writer.max.cache.size=2500 -mapreduce.job.queuename=bulkIngestQueue $MAPRED_INGEST_OPTS"
#-mapreduce.map.sort.spill.percent=.50

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ TIMEOUT=600000
INPUT_FILES=$1
REDUCERS=$2
EXTRA_OPTS=${@:3}
# Don't change this option unless you know what you're doing. There's a specific format to the field. See AccumuloOutputFormat for details.
BATCHWRITER_OPTS="-AccumuloOutputFormat.WriteOpts.BatchWriterConfig= 11#maxMemory=100000000,maxWriteThreads=8"
# Customize accumulo client properties for AccumuloOutputFormat (newline-separated, one property per line)
BATCHWRITER_OPTS="-AccumuloOutputFormat.ClientOpts.ClientProps=
batch.writer.memory.max=100000000B
batch.writer.threads.max=8
"
MAPRED_OPTS="-mapreduce.map.memory.mb=$MAP_MEMORY_MB -mapreduce.reduce.memory.mb=$REDUCE_MEMORY_MB -mapreduce.job.reduces=$REDUCERS -mapreduce.task.io.sort.mb=${LIVE_CHILD_IO_SORT_MB} -mapreduce.task.io.sort.factor=100 -bulk.ingest.mapper.threads=0 -bulk.ingest.mapper.workqueue.size=10000 -io.file.buffer.size=1048576 -dfs.bytes-per-checksum=4096 -io.sort.record.percent=.10 -mapreduce.map.sort.spill.percent=.50 -mapreduce.map.output.compress=${LIVE_MAP_OUTPUT_COMPRESS} -mapreduce.map.output.compress.codec=${LIVE_MAP_OUTPUT_COMPRESSION_CODEC} -mapreduce.output.fileoutputformat.compress.type=${LIVE_MAP_OUTPUT_COMPRESSION_TYPE} $PART_ARG -mapreduce.task.timeout=$TIMEOUT -markerFileReducePercentage 0.33 -context.writer.max.cache.size=2500 -mapreduce.job.queuename=liveIngestQueue $MAPRED_INGEST_OPTS"

export HADOOP_CLASSPATH=$CLASSPATH
Expand Down

0 comments on commit 2df441b

Please sign in to comment.