Skip to content

Commit

Permalink
Add abilty to yield in Ivarators, AndIterator, OrIterator and return …
Browse files Browse the repository at this point in the history
…metrics before yield (#704)
  • Loading branch information
billoley committed Jul 21, 2023
1 parent 2df441b commit 64a4062
Show file tree
Hide file tree
Showing 40 changed files with 2,023 additions and 580 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package datawave.core.iterators;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -16,6 +17,9 @@
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.log4j.Logger;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

/**
*
*/
Expand All @@ -26,31 +30,94 @@ public class IteratorThreadPoolManager {
private static final String EVALUATOR_THREAD_PROP = "tserver.datawave.evaluation.threads";
private static final String EVALUATOR_THREAD_NAME = "DATAWAVE Evaluation";
private static final int DEFAULT_THREAD_POOL_SIZE = 100;
private static final String IVARATOR_MAX_TIME_AFTER_ACCESS_PROP = "tserver.datawave.ivarator.maxTimeAfterAccess";
private static final String IVARATOR_MAX_SCAN_TIMEOUT_PROP = "tserver.datawave.ivarator.maxScanTimeout";
private static final long DEFAULT_IVARATOR_MAX_TIME_AFTER_ACCESS = TimeUnit.HOURS.toMillis(1);
private static final long DEFAULT_IVARATOR_MAX_SCAN_TIMEOUT = TimeUnit.HOURS.toMillis(1);

private Map<String,ExecutorService> threadPools = new TreeMap<>();

private Cache<String,IvaratorFuture> ivaratorFutures;
// If an IvaratorFuture is not accessed within this time, then it will be removed from the cache
private long ivaratorMaxTimeAfterAccess;
// Each Ivarator has a scanTimeout. This is a system-wide limit which could be
// useful in causing all Ivarators to terminate if necessary
private long ivaratorMaxScanTimeout;
private static final Object instanceSemaphore = new Object();
private static final String instanceId = Integer.toHexString(instanceSemaphore.hashCode());
private static volatile IteratorThreadPoolManager instance;

private IteratorThreadPoolManager(IteratorEnvironment env) {
final AccumuloConfiguration config;
if (env != null) {
config = env.getConfig();
} else {
config = DefaultConfiguration.getInstance();
}
// create the thread pools
createExecutorService(IVARATOR_THREAD_PROP, IVARATOR_THREAD_NAME, env);
createExecutorService(EVALUATOR_THREAD_PROP, EVALUATOR_THREAD_NAME, env);
ivaratorMaxTimeAfterAccess = getLongPropertyValue(config, IVARATOR_MAX_TIME_AFTER_ACCESS_PROP, DEFAULT_IVARATOR_MAX_TIME_AFTER_ACCESS);
log.info("Using " + ivaratorMaxTimeAfterAccess + " ms for " + IVARATOR_MAX_TIME_AFTER_ACCESS_PROP);
ivaratorMaxScanTimeout = getLongPropertyValue(config, IVARATOR_MAX_SCAN_TIMEOUT_PROP, DEFAULT_IVARATOR_MAX_SCAN_TIMEOUT);
log.info("Using " + ivaratorMaxScanTimeout + " ms for " + IVARATOR_MAX_SCAN_TIMEOUT_PROP);
// This thread will check for changes to ivaratorMaxTimeAfterAccess and ivaratorMaxScanTimeout
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(() -> {
try {
long max = getLongPropertyValue(config, IVARATOR_MAX_TIME_AFTER_ACCESS_PROP, DEFAULT_IVARATOR_MAX_TIME_AFTER_ACCESS);
if (ivaratorMaxTimeAfterAccess != max) {
log.info("Changing " + IVARATOR_MAX_TIME_AFTER_ACCESS_PROP + " to " + max + " ms");
ivaratorMaxTimeAfterAccess = max;
}
} catch (Throwable t) {
log.error(t, t);
}
try {
long max = getLongPropertyValue(config, IVARATOR_MAX_SCAN_TIMEOUT_PROP, DEFAULT_IVARATOR_MAX_SCAN_TIMEOUT);
if (ivaratorMaxScanTimeout != max) {
log.info("Changing " + IVARATOR_MAX_SCAN_TIMEOUT_PROP + " to " + max + " ms");
ivaratorMaxScanTimeout = max;
}
} catch (Throwable t) {
log.error(t, t);
}
}, 1, 10, TimeUnit.SECONDS);

// If the IvaratorFuture has been not been accessed in ivaratorMaxTimeAfterAccess, then cancel and remove from the cache
ivaratorFutures = Caffeine.newBuilder().expireAfterAccess(ivaratorMaxTimeAfterAccess, TimeUnit.MILLISECONDS).evictionListener((t, f, removalCause) -> {
((IvaratorFuture) f).cancel(true);
}).build();

// If Ivarator has been been running for a time greater than either its scanTimeout or the maxScanTimeout,
// then stop the Ivarator and remove the future from the cache
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(() -> {
Set<String> tasks = new HashSet<>(ivaratorFutures.asMap().keySet());
long now = System.currentTimeMillis();
for (String taskName : tasks) {
IvaratorFuture future = getIvaratorFuture(taskName, env);
if (future != null) {
DatawaveFieldIndexCachingIteratorJexl ivarator = future.getIvarator();
long elapsed = now - ivarator.getStartTime();
long ivaratorScanTimeout = ivarator.getScanTimeout();
if (!ivarator.isRunning() && ((elapsed > ivaratorScanTimeout) || (elapsed > ivaratorMaxScanTimeout))) {
removeIvaratorFuture(taskName, env);
}
}
}
}, 1, 60, TimeUnit.SECONDS);
}

private ThreadPoolExecutor createExecutorService(final String prop, final String name, IteratorEnvironment env) {
final AccumuloConfiguration accumuloConfiguration;
final AccumuloConfiguration config;
if (env != null) {
accumuloConfiguration = env.getConfig();
config = env.getConfig();
} else {
accumuloConfiguration = DefaultConfiguration.getInstance();
config = DefaultConfiguration.getInstance();
}
final ThreadPoolExecutor service = createExecutorService(getMaxThreads(prop, accumuloConfiguration), name + " (" + instanceId + ')');
final ThreadPoolExecutor service = createExecutorService(getIntPropertyValue(config, prop, DEFAULT_THREAD_POOL_SIZE), name + " (" + instanceId + ')');
threadPools.put(name, service);
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(accumuloConfiguration).scheduleWithFixedDelay(() -> {
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(() -> {
try {
int max = getMaxThreads(prop, accumuloConfiguration);
int max = getIntPropertyValue(config, prop, DEFAULT_THREAD_POOL_SIZE);
if (service.getMaximumPoolSize() != max) {
log.info("Changing " + prop + " to " + max);
service.setMaximumPoolSize(max);
Expand All @@ -70,15 +137,23 @@ private ThreadPoolExecutor createExecutorService(int maxThreads, String name) {
return pool;
}

private int getMaxThreads(final String prop, AccumuloConfiguration conf) {
private int getIntPropertyValue(AccumuloConfiguration conf, final String prop, int defaultValue) {
if (conf != null) {
Map<String,String> properties = new TreeMap<>();
conf.getProperties(properties, k -> Objects.equals(k, prop));
if (properties.containsKey(prop)) {
return Integer.parseInt(properties.get(prop));
}
}
return DEFAULT_THREAD_POOL_SIZE;
return defaultValue;
}

private long getLongPropertyValue(AccumuloConfiguration config, final String prop, long defaultValue) {
String valueStr = config.get(prop);
if (valueStr != null) {
return Long.parseLong(valueStr);
}
return defaultValue;
}

private static IteratorThreadPoolManager instance(IteratorEnvironment env) {
Expand All @@ -104,12 +179,31 @@ private Future<?> execute(String name, final Runnable task, final String taskNam
});
}

public static Future<?> executeIvarator(Runnable task, String taskName, IteratorEnvironment env) {
return instance(env).execute(IVARATOR_THREAD_NAME, task, taskName);
public static IvaratorFuture getIvaratorFuture(String taskName, IteratorEnvironment env) {
return instance(env).ivaratorFutures.getIfPresent(taskName);
}

public static void removeIvaratorFuture(String taskName, IteratorEnvironment env) {
IvaratorFuture future = instance(env).ivaratorFutures.getIfPresent(taskName);
if (future != null) {
if (future.getIvarator().isRunning()) {
future.cancel(true);
future.getIvarator().waitUntilComplete();
}
instance(env).ivaratorFutures.invalidate(taskName);
}
}

public static IvaratorFuture executeIvarator(DatawaveFieldIndexCachingIteratorJexl ivarator, Runnable task, String taskName, IteratorEnvironment env) {
IvaratorFuture future = instance(env).ivaratorFutures.getIfPresent(taskName);
if (future == null) {
future = new IvaratorFuture(instance(env).execute(IVARATOR_THREAD_NAME, task, taskName), ivarator);
instance(env).ivaratorFutures.put(taskName, future);
}
return future;
}

public static Future<?> executeEvaluation(Runnable task, String taskName, IteratorEnvironment env) {
return instance(env).execute(EVALUATOR_THREAD_NAME, task, taskName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package datawave.core.iterators;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class IvaratorFuture implements Future {

final private Future future;
final private DatawaveFieldIndexCachingIteratorJexl ivarator;

public IvaratorFuture(Future future, DatawaveFieldIndexCachingIteratorJexl ivarator) {
this.future = future;
this.ivarator = ivarator;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.future.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return this.future.isCancelled();
}

@Override
public boolean isDone() {
return this.future.isDone();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return this.future.get();
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.future.get(timeout, unit);
}

public DatawaveFieldIndexCachingIteratorJexl getIvarator() {
return ivarator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package datawave.query.attributes;

/**
* Indicates that the scan session was ended after exceeding the wait window
*/
public class WaitWindowExceededMetadata extends Metadata {

@Override
public int size() {
return 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package datawave.query.exceptions;

import org.apache.accumulo.core.data.Key;

public class WaitWindowOverrunException extends RuntimeException {

private Key yieldKey;

public WaitWindowOverrunException(Key yieldKey) {
this.yieldKey = yieldKey;
}

public Key getYieldKey() {
return yieldKey;
}

// disallow other constructors that don't include a yieldKey
private WaitWindowOverrunException() {
super();
}

// disallow constructors that don't include a yieldKey
private WaitWindowOverrunException(String message) {
super(message);
}

// disallow constructors that don't include a yieldKey
private WaitWindowOverrunException(String message, Throwable cause) {
super(message, cause);
}

// disallow constructors that don't include a yieldKey
private WaitWindowOverrunException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public Entry<DocumentData,Document> apply(Entry<Key,Document> from) {
logStop(keyRange.getStartKey());
return Maps.immutableEntry(new DocumentData(from.getKey(), docKeys, attrs, false), from.getValue());
} catch (IOException e) {
log.error("Unable to collection document attributes for evaluation: " + keyRange, e);
log.error("Unable to collect document attributes for evaluation: " + keyRange, e);
QueryException qe = new QueryException(DatawaveErrorCode.DOCUMENT_EVALUATION_ERROR, e);
throw new DatawaveFatalQueryException(qe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Predicate;

import datawave.query.attributes.Document;
import datawave.query.iterator.waitwindow.WaitWindowObserver;

/**
*
Expand Down Expand Up @@ -51,7 +52,7 @@ protected Key nextStartKey(Key key) {
@Override
public Key move(Key minimum) {
if (totalRange != null) {
Range newRange = totalRange;
Range newRange;
if (totalRange.contains(minimum)) {
newRange = new Range(minimum, true, totalRange.getEndKey(), totalRange.isEndKeyInclusive());
} else {
Expand Down Expand Up @@ -112,7 +113,7 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
this.inclusive = inclusive;

// determine if we have been torn down and rebuilt
if (!range.isInfiniteStartKey() && !range.isStartKeyInclusive()) {
if (!range.isInfiniteStartKey() && !range.isStartKeyInclusive() && !WaitWindowObserver.hasMarker(range.getStartKey())) {
move(nextStartKey(range.getStartKey()));
} else {
source.seek(range, columnFamilies, inclusive);
Expand Down
Loading

0 comments on commit 64a4062

Please sign in to comment.