Skip to content

Commit

Permalink
Add abilty to yield in Ivarators, AndIterator, OrIterator and return …
Browse files Browse the repository at this point in the history
…metrics before yield (#704)
  • Loading branch information
billoley committed Jun 13, 2024
1 parent 0f34220 commit d91ca47
Show file tree
Hide file tree
Showing 43 changed files with 2,529 additions and 713 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import java.util.List;
import java.util.regex.Pattern;

import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.hadoop.io.Text;

import datawave.data.ColumnFamilyConstants;
import datawave.query.Constants;
import datawave.query.parser.JavaRegexAnalyzer;
import datawave.query.parser.JavaRegexAnalyzer.JavaRegexParseException;
Expand Down Expand Up @@ -127,15 +125,11 @@ protected List<Range> buildBoundingFiRanges(Text rowId, Text fiName, Text fieldV
*/
@Override
protected boolean matches(Key k) throws IOException {
boolean matches = false;
String colq = k.getColumnQualifier().toString();

// search backwards for the null bytes to expose the value in value\0datatype\0UID
int index = colq.lastIndexOf('\0');
index = colq.lastIndexOf('\0', index - 1);
matches = (pattern.get().matcher(colq.substring(0, index)).matches());

return matches;
return (pattern.get().matcher(colq.substring(0, index)).matches());
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package datawave.core.iterators;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -14,8 +16,11 @@
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.log4j.Logger;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
Expand All @@ -28,36 +33,100 @@ public class IteratorThreadPoolManager {
private static final String EVALUATOR_THREAD_PROP = "tserver.datawave.evaluation.threads";
private static final String EVALUATOR_THREAD_NAME = "DATAWAVE Evaluation";
private static final int DEFAULT_THREAD_POOL_SIZE = 100;
private static final String IVARATOR_MAX_TIME_AFTER_ACCESS_PROP = "tserver.datawave.ivarator.maxTimeAfterAccess";
private static final String IVARATOR_MAX_SCAN_TIMEOUT_PROP = "tserver.datawave.ivarator.maxScanTimeout";
private static final long DEFAULT_IVARATOR_MAX_TIME_AFTER_ACCESS = TimeUnit.HOURS.toMillis(1);
private static final long DEFAULT_IVARATOR_MAX_SCAN_TIMEOUT = TimeUnit.HOURS.toMillis(1);

private Map<String,ExecutorService> threadPools = new TreeMap<>();

private Cache<String,IvaratorFuture> ivaratorFutures;
// If an IvaratorFuture is not accessed within this time, then it will be removed from the cache
private long ivaratorMaxTimeAfterAccess;
// Each Ivarator has a scanTimeout. This is a system-wide limit which could be
// useful in causing all Ivarators to terminate if necessary
private long ivaratorMaxScanTimeout;
private static final Object instanceSemaphore = new Object();
private static final String instanceId = Integer.toHexString(instanceSemaphore.hashCode());
private static volatile IteratorThreadPoolManager instance;

private IteratorThreadPoolManager(IteratorEnvironment env) {
final AccumuloConfiguration accumuloConfiguration;
final PluginEnvironment pluginEnv;
if (env != null) {
pluginEnv = env.getPluginEnv();
accumuloConfiguration = env.getConfig();
} else {
pluginEnv = null;
accumuloConfiguration = DefaultConfiguration.getInstance();
}
// create the thread pools
createExecutorService(IVARATOR_THREAD_PROP, IVARATOR_THREAD_NAME, env);
createExecutorService(EVALUATOR_THREAD_PROP, EVALUATOR_THREAD_NAME, env);
ivaratorMaxTimeAfterAccess = getLongPropertyValue(IVARATOR_MAX_TIME_AFTER_ACCESS_PROP, DEFAULT_IVARATOR_MAX_TIME_AFTER_ACCESS, pluginEnv);
log.info("Using " + ivaratorMaxTimeAfterAccess + " ms for " + IVARATOR_MAX_TIME_AFTER_ACCESS_PROP);
ivaratorMaxScanTimeout = getLongPropertyValue(IVARATOR_MAX_SCAN_TIMEOUT_PROP, DEFAULT_IVARATOR_MAX_SCAN_TIMEOUT, pluginEnv);
log.info("Using " + ivaratorMaxScanTimeout + " ms for " + IVARATOR_MAX_SCAN_TIMEOUT_PROP);
// This thread will check for changes to ivaratorMaxTimeAfterAccess and ivaratorMaxScanTimeout
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(accumuloConfiguration).scheduleWithFixedDelay(() -> {
try {
long max = getLongPropertyValue(IVARATOR_MAX_TIME_AFTER_ACCESS_PROP, DEFAULT_IVARATOR_MAX_TIME_AFTER_ACCESS, pluginEnv);
if (ivaratorMaxTimeAfterAccess != max) {
log.info("Changing " + IVARATOR_MAX_TIME_AFTER_ACCESS_PROP + " to " + max + " ms");
ivaratorMaxTimeAfterAccess = max;
}
} catch (Throwable t) {
log.error(t, t);
}
try {
long max = getLongPropertyValue(IVARATOR_MAX_SCAN_TIMEOUT_PROP, DEFAULT_IVARATOR_MAX_SCAN_TIMEOUT, pluginEnv);
if (ivaratorMaxScanTimeout != max) {
log.info("Changing " + IVARATOR_MAX_SCAN_TIMEOUT_PROP + " to " + max + " ms");
ivaratorMaxScanTimeout = max;
}
} catch (Throwable t) {
log.error(t, t);
}
}, 1, 10, TimeUnit.SECONDS);

// If the IvaratorFuture has been not been accessed in ivaratorMaxTimeAfterAccess, then cancel and remove from the cache
ivaratorFutures = Caffeine.newBuilder().expireAfterAccess(ivaratorMaxTimeAfterAccess, TimeUnit.MILLISECONDS).evictionListener((t, f, removalCause) -> {
((IvaratorFuture) f).cancel(true);
}).build();

// If Ivarator has been been running for a time greater than either its scanTimeout or the maxScanTimeout,
// then stop the Ivarator and remove the future from the cache
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(accumuloConfiguration).scheduleWithFixedDelay(() -> {
Set<String> tasks = new HashSet<>(ivaratorFutures.asMap().keySet());
long now = System.currentTimeMillis();
for (String taskName : tasks) {
IvaratorFuture future = getIvaratorFuture(taskName, env);
if (future != null) {
DatawaveFieldIndexCachingIteratorJexl ivarator = future.getIvarator();
long elapsed = now - ivarator.getStartTime();
long ivaratorScanTimeout = ivarator.getScanTimeout();
if (future.getIvaratorRunnable().isRunning() && ((elapsed > ivaratorScanTimeout) || (elapsed > ivaratorMaxScanTimeout))) {
removeIvaratorFuture(taskName, env);
}
}
}
}, 1, 60, TimeUnit.SECONDS);
}

private ThreadPoolExecutor createExecutorService(final String prop, final String name, final IteratorEnvironment env) {
final AccumuloConfiguration accumuloConfiguration;
private ThreadPoolExecutor createExecutorService(final String prop, final String name, IteratorEnvironment env) {
final PluginEnvironment pluginEnv;
if (env != null) {
pluginEnv = env.getPluginEnv();
accumuloConfiguration = env.getConfig();
} else {
pluginEnv = null;
accumuloConfiguration = DefaultConfiguration.getInstance();
}
final ThreadPoolExecutor service = createExecutorService(getMaxThreads(prop, pluginEnv), name + " (" + instanceId + ')');
int maxThreads = getIntPropertyValue(prop, DEFAULT_THREAD_POOL_SIZE, pluginEnv);
final ThreadPoolExecutor service = createExecutorService(maxThreads, name + " (" + instanceId + ')');
threadPools.put(name, service);
Executors.newScheduledThreadPool(getMaxThreads(prop, pluginEnv)).scheduleWithFixedDelay(() -> {
Executors.newScheduledThreadPool(maxThreads).scheduleWithFixedDelay(() -> {
try {
// Very important to not use the accumuloConfiguration in this thread and instead use the pluginEnv
// The accumuloConfiguration caches table ids which may no longer exist down the road.
int max = getMaxThreads(prop, pluginEnv);
int max = getIntPropertyValue(prop, DEFAULT_THREAD_POOL_SIZE, pluginEnv);
if (service.getMaximumPoolSize() != max) {
log.info("Changing " + prop + " to " + max);
// if raising the max size, then we need to set the max first before the core
Expand All @@ -84,14 +153,26 @@ private ThreadPoolExecutor createExecutorService(int maxThreads, String name) {
return pool;
}

private int getMaxThreads(final String prop, PluginEnvironment pluginEnv) {
if (pluginEnv != null && pluginEnv.getConfiguration() != null) {
private int getIntPropertyValue(final String prop, int defaultValue, PluginEnvironment pluginEnv) {
if (pluginEnv != null) {
Map<String,String> properties = new TreeMap<>();
String value = pluginEnv.getConfiguration().get(prop);
if (value != null) {
return Integer.parseInt(value);
return Integer.parseInt(properties.get(prop));
}
}
return DEFAULT_THREAD_POOL_SIZE;
return defaultValue;
}

private long getLongPropertyValue(final String prop, long defaultValue, PluginEnvironment pluginEnv) {
if (pluginEnv != null) {
Map<String,String> properties = new TreeMap<>();
String value = pluginEnv.getConfiguration().get(prop);
if (value != null) {
return Long.parseLong(properties.get(prop));
}
}
return defaultValue;
}

private static IteratorThreadPoolManager instance(IteratorEnvironment env) {
Expand All @@ -117,12 +198,31 @@ private Future<?> execute(String name, final Runnable task, final String taskNam
});
}

public static Future<?> executeIvarator(Runnable task, String taskName, IteratorEnvironment env) {
return instance(env).execute(IVARATOR_THREAD_NAME, task, taskName);
public static IvaratorFuture getIvaratorFuture(String taskName, IteratorEnvironment env) {
return instance(env).ivaratorFutures.getIfPresent(taskName);
}

public static void removeIvaratorFuture(String taskName, IteratorEnvironment env) {
IvaratorFuture future = instance(env).ivaratorFutures.getIfPresent(taskName);
if (future != null) {
if (future.getIvaratorRunnable().isRunning()) {
future.cancel(true);
future.getIvaratorRunnable().waitUntilComplete();
}
instance(env).ivaratorFutures.invalidate(taskName);
}
}

public static IvaratorFuture executeIvarator(IvaratorRunnable ivaratorRunnable, String taskName, IteratorEnvironment env) {
IvaratorFuture future = instance(env).ivaratorFutures.getIfPresent(taskName);
if (future == null) {
future = new IvaratorFuture(instance(env).execute(IVARATOR_THREAD_NAME, ivaratorRunnable, taskName), ivaratorRunnable);
instance(env).ivaratorFutures.put(taskName, future);
}
return future;
}

public static Future<?> executeEvaluation(Runnable task, String taskName, IteratorEnvironment env) {
return instance(env).execute(EVALUATOR_THREAD_NAME, task, taskName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package datawave.core.iterators;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class IvaratorFuture implements Future {

private Future future;
private IvaratorRunnable ivaratorRunnable;

public IvaratorFuture(Future future, IvaratorRunnable ivaratorRunnable) {
this.future = future;
this.ivaratorRunnable = ivaratorRunnable;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.future.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return this.future.isCancelled();
}

@Override
public boolean isDone() {
return this.future.isDone();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return this.future.get();
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.future.get(timeout, unit);
}

public DatawaveFieldIndexCachingIteratorJexl getIvarator() {
return ivaratorRunnable.getIvarator();
}

public IvaratorRunnable getIvaratorRunnable() {
return ivaratorRunnable;
}
}
Loading

0 comments on commit d91ca47

Please sign in to comment.