Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace nonpublic Acumulo DefaultFormatter #2525

Merged
merged 5 commits into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import java.net.InetAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -52,9 +54,11 @@
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.singletons.SingletonReservation;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.format.DateFormatSupplier;
import org.apache.accumulo.core.util.format.DefaultFormatter;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.commons.codec.binary.Base64;
Expand Down Expand Up @@ -94,6 +98,9 @@ public class BulkInputFormat extends InputFormat<Key,Value> {

protected static final Logger log = Logger.getLogger(BulkInputFormat.class);

private static final ThreadLocal<Date> tmpDate = ThreadLocal.withInitial(Date::new);
private static final ThreadLocal<DateFormat> formatter = DateFormatSupplier.createDefaultFormatSupplier();

protected static final String PREFIX = BulkInputFormat.class.getSimpleName();
protected static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
protected static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
Expand Down Expand Up @@ -1322,15 +1329,60 @@ public String toString() {
public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext context) {

return new RecordReaderBase<Key,Value>() {

// helper function for formatting. Rewritten from DefaultFormatter.appendBytes()
private StringBuilder appendBytes(StringBuilder sb, byte[] ba, int offset, int len) {
for (int i = 0; i < len; i++) {
int c = 0xff & ba[offset + i];
if (c == '\\') {
sb.append("\\\\");
} else if (c >= 32 && c <= 126) {
sb.append((char) c);
} else {
sb.append("\\x").append(String.format("%02X", c));
}
}
return sb;
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (scannerIterator.hasNext()) {
++numKeysRead;
Entry<Key,Value> entry = scannerIterator.next();
currentK = currentKey = entry.getKey();
currentV = currentValue = entry.getValue();
if (log.isTraceEnabled())
log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true));
if (log.isTraceEnabled()) {

// rewritten from DefaultFormatter.formatEntry()
StringBuilder sb = new StringBuilder();
Text buffer = new Text();

// append row0
appendBytes(sb, currentK.getRow(buffer).getBytes(), 0, currentK.getRow(buffer).getLength()).append(" ");

// append column family
appendBytes(sb, currentK.getColumnFamily(buffer).getBytes(), 0, currentK.getColumnFamily(buffer).getLength()).append(":");

// append column qualifier
appendBytes(sb, currentK.getColumnQualifier(buffer).getBytes(), 0, currentK.getColumnQualifier(buffer).getLength()).append(" ");

// append visibility expression
sb.append(new ColumnVisibility(currentK.getColumnVisibility(buffer)));

// append timestamp
tmpDate.get().setTime(entry.getKey().getTimestamp());
sb.append(" ").append(formatter.get().format(tmpDate.get()));

// append value
if (currentV != null && currentV.getSize() > 0) {
sb.append("\t");
appendBytes(sb, currentV.get(), 0, currentV.getSize());
}

log.trace("Processing key/value pair: " + sb);
}

return true;
} else if (numKeysRead < 0) {
numKeysRead = 0;
Expand All @@ -1339,5 +1391,4 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
}
};
}

}
Loading