Skip to content

Commit

Permalink
Close ScannerSessiosn on timeout, correctly apply timeout fixes #2574 (
Browse files Browse the repository at this point in the history
…#2578)

* Close ScannerSessiosn on timeout, correctly apply timeout fixes #2574

* javadoc updates fixes #2574

* Clear interrupt flags after checking interrupted state fixes #2574

---------

Co-authored-by: hgklohr <[email protected]>
  • Loading branch information
FineAndDandy and hgklohr committed Sep 30, 2024
1 parent 0c90817 commit d49dfd4
Show file tree
Hide file tree
Showing 14 changed files with 752 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
private int maxIndexBatchSize = 1000;
private boolean allTermsIndexOnly;
private long maxIndexScanTimeMillis = Long.MAX_VALUE;
private long maxAnyFieldScanTimeMillis = Long.MAX_VALUE;

// Allows this query to parse the root uids from TLD uids found in the global shard index. This effectively ignores hits in child documents.
private boolean parseTldUids = false;
private boolean collapseUids = false;
Expand Down Expand Up @@ -550,6 +552,7 @@ public void copyFrom(ShardQueryConfiguration other) {
this.setMaxIndexBatchSize(other.getMaxIndexBatchSize());
this.setAllTermsIndexOnly(other.isAllTermsIndexOnly());
this.setMaxIndexScanTimeMillis(other.getMaxIndexScanTimeMillis());
this.setMaxAnyFieldScanTimeMillis(other.getMaxAnyFieldScanTimeMillis());
this.setCollapseUids(other.getCollapseUids());
this.setCollapseUidsThreshold(other.getCollapseUidsThreshold());
this.setEnforceUniqueTermsWithinExpressions(other.getEnforceUniqueTermsWithinExpressions());
Expand Down Expand Up @@ -3232,4 +3235,12 @@ public List<ScanHintRule<JexlNode>> getQueryTreeScanHintRules() {
public void setQueryTreeScanHintRules(List<ScanHintRule<JexlNode>> queryTreeScanHintRules) {
this.queryTreeScanHintRules = queryTreeScanHintRules;
}

public long getMaxAnyFieldScanTimeMillis() {
return maxAnyFieldScanTimeMillis;
}

public void setMaxAnyFieldScanTimeMillis(long maxAnyFieldScanTimeMillis) {
this.maxAnyFieldScanTimeMillis = maxAnyFieldScanTimeMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -41,7 +42,7 @@ protected long getRemainingTimeMillis(long startTimeMillis) {
return Math.max(0L, config.getMaxIndexScanTimeMillis() - (System.currentTimeMillis() - startTimeMillis));
}

protected void timedScanWait(Future<Boolean> future, CountDownLatch startedLatch, CountDownLatch stoppedLatch, long startTimeMillis, long timeout) {
protected void timedScanWait(Future<Boolean> future, CountDownLatch startedLatch, CountDownLatch stoppedLatch, AtomicLong startTimeMillis, long timeout) {
// this ensures that we don't wait for the future response until the task has started
if (startedLatch != null) {
try {
Expand All @@ -68,7 +69,7 @@ protected void timedScanWait(Future<Boolean> future, CountDownLatch startedLatch
// timeout exception and except ( a max lookup specified ) 3) we receive a value under timeout and we break
while (!execService.isShutdown() && !execService.isTerminated()) {
try {
future.get((swallowTimeout) ? maxLookup : getRemainingTimeMillis(startTimeMillis), TimeUnit.MILLISECONDS);
future.get((swallowTimeout) ? maxLookup : getRemainingTimeMillis(startTimeMillis.get()), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (swallowTimeout) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
Expand Down Expand Up @@ -50,7 +51,7 @@ public class BoundedRangeIndexLookup extends AsyncIndexLookup {
private final LiteralRange<?> literalRange;

protected Future<Boolean> timedScanFuture;
protected long lookupStartTimeMillis = Long.MAX_VALUE;
protected AtomicLong lookupStartTimeMillis = new AtomicLong(Long.MAX_VALUE);
protected CountDownLatch lookupStartedLatch;
protected CountDownLatch lookupStoppedLatch;

Expand Down Expand Up @@ -182,6 +183,9 @@ public synchronized void submit() {
} catch (IOException e) {
QueryException qe = new QueryException(DatawaveErrorCode.RANGE_CREATE_ERROR, e, MessageFormat.format("{0}", this.literalRange));
log.debug(qe);
if (bs != null) {
scannerFactory.close(bs);
}
throw new IllegalRangeArgumentException(qe);
}
}
Expand Down Expand Up @@ -211,7 +215,7 @@ protected Callable<Boolean> createTimedCallable(final Iterator<Entry<Key,Value>>

return () -> {
try {
lookupStartTimeMillis = System.currentTimeMillis();
lookupStartTimeMillis.set(System.currentTimeMillis());
lookupStartedLatch.countDown();

Text holder = new Text();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datawave.query.jexl.lookups;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -10,6 +12,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -37,7 +40,7 @@ public class FieldNameIndexLookup extends AsyncIndexLookup {
protected Set<String> terms;

protected Future<Boolean> timedScanFuture;
protected long lookupStartTimeMillis = Long.MAX_VALUE;
protected AtomicLong lookupStartTimeMillis = new AtomicLong(Long.MAX_VALUE);
protected CountDownLatch lookupStartedLatch;
protected CountDownLatch lookupStoppedLatch;

Expand Down Expand Up @@ -75,7 +78,7 @@ public void submit() {

Iterator<Entry<Key,Value>> iter = Collections.emptyIterator();

ScannerSession bs;
ScannerSession bs = null;

try {
if (!fields.isEmpty()) {
Expand Down Expand Up @@ -104,9 +107,13 @@ public void submit() {
}

timedScanFuture = execService.submit(createTimedCallable(iter));
} catch (TableNotFoundException e) {
} catch (IOException | InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException | RuntimeException e) {
log.error(e);
} catch (Exception e) {
// ensure the scanner is cleaned up if no longer listening
if (bs != null) {
bs.close();
sessions.remove(bs);
}
throw new RuntimeException(e);
}
}
Expand All @@ -117,7 +124,10 @@ public synchronized IndexLookupMap lookup() {
if (!sessions.isEmpty()) {
try {
// for field name lookups, we wait indefinitely
timedScanWait(timedScanFuture, lookupStartedLatch, lookupStoppedLatch, lookupStartTimeMillis, Long.MAX_VALUE);
// TODO consider if this really should be Long.MAX_VALUE or some time less. Other index scanners are set to config.getMaxIndexScanTimeMillis().
// However the code currently can't handle a failure here, where other index lookup failures can conditionally still allow the query to be
// executed. See UnfieldedIndexExpansionVisitor.expandUnfielded()
timedScanWait(timedScanFuture, lookupStartedLatch, lookupStoppedLatch, lookupStartTimeMillis, config.getMaxAnyFieldScanTimeMillis());
} finally {
for (ScannerSession sesh : sessions) {
scannerFactory.close(sesh);
Expand All @@ -135,13 +145,18 @@ protected Callable<Boolean> createTimedCallable(final Iterator<Entry<Key,Value>>

return () -> {
try {
lookupStartTimeMillis = System.currentTimeMillis();
lookupStartTimeMillis.set(System.currentTimeMillis());
lookupStartedLatch.countDown();

final Text holder = new Text();

try {
while (iter.hasNext()) {
// check for interrupt which may be triggered by closing the batch scanner
if (Thread.interrupted()) {
throw new InterruptedException();
}

Entry<Key,Value> entry = iter.next();
if (log.isTraceEnabled()) {
log.trace("Index entry: " + entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
Expand Down Expand Up @@ -266,6 +267,10 @@ protected Callable<Boolean> createTimedCallable(final Iterator<Entry<Key,Value>>
}

while (iter.hasNext()) {
// check if interrupted which may be triggered by closing a batch scanner
if (Thread.interrupted()) {
throw new InterruptedException();
}

Entry<Key,Value> entry = iter.next();

Expand Down Expand Up @@ -359,7 +364,7 @@ private static class RegexLookupData {
private Future<Boolean> timedScanFuture;
private CountDownLatch lookupStartedLatch;
private CountDownLatch lookupStoppedLatch;
private long lookupStartTimeMillis = Long.MAX_VALUE;
private AtomicLong lookupStartTimeMillis = new AtomicLong(Long.MAX_VALUE);

public Collection<ScannerSession> getSessions() {
return sessions;
Expand Down Expand Up @@ -393,12 +398,12 @@ public void setLookupStoppedLatch(CountDownLatch lookupStoppedLatch) {
this.lookupStoppedLatch = lookupStoppedLatch;
}

public long getLookupStartTimeMillis() {
public AtomicLong getLookupStartTimeMillis() {
return lookupStartTimeMillis;
}

public void setLookupStartTimeMillis(long lookupStartTimeMillis) {
this.lookupStartTimeMillis = lookupStartTimeMillis;
this.lookupStartTimeMillis.set(lookupStartTimeMillis);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datawave.query.jexl.lookups;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -438,12 +439,16 @@ public static Range getLiteralRange(String fieldName, String normalizedQueryTerm
* @param limitToUniqueTerms
* check for limiting unique terms
* @return the scanner session
* @throws Exception
* if there are issues
* @throws InvocationTargetException
* @throws NoSuchMethodException
* @throws InstantiationException
* @throws IllegalAccessException
* @throws IOException
* dates can't be formatted
*/
public static ScannerSession configureTermMatchOnly(ShardQueryConfiguration config, ScannerFactory scannerFactory, String tableName,
Collection<Range> ranges, Collection<String> literals, Collection<String> patterns, boolean reverseIndex, boolean limitToUniqueTerms)
throws Exception {
throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, IOException {

// if we have no ranges, then nothing to scan
if (ranges.isEmpty()) {
Expand Down Expand Up @@ -471,7 +476,7 @@ public static ScannerSession configureTermMatchOnly(ShardQueryConfiguration conf

public static ScannerSession configureLimitedDiscovery(ShardQueryConfiguration config, ScannerFactory scannerFactory, String tableName,
Collection<Range> ranges, Collection<String> literals, Collection<String> patterns, boolean reverseIndex, boolean limitToUniqueTerms)
throws Exception {
throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, IOException {

// if we have no ranges, then nothing to scan
if (ranges.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datawave.query.tables;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -246,12 +247,14 @@ public BatchScannerSession newQueryScanner(final String tableName, final Set<Aut
* @param wrapper
* a wrapper class
* @return a new scanner session
* @throws Exception
* if there are issues
* @throws NoSuchMethodException
* @throws InvocationTargetException
* @throws InstantiationException
* @throws IllegalAccessException
*
*/
public <T extends ScannerSession> T newLimitedScanner(Class<T> wrapper, final String tableName, final Set<Authorizations> auths, final Query settings)
throws Exception {
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
Preconditions.checkNotNull(scanQueue);
Preconditions.checkNotNull(wrapper);
Preconditions.checkArgument(open.get(), "Factory has been locked. No New scanners can be created");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ protected void findTop() throws Exception {
}
}

protected int scannerInvariant(final Iterator<Result> iter) {
protected int scannerInvariant(final Iterator<Result> iter) throws InterruptedException {
int retrievalCount = 0;

Result myEntry = null;
Expand All @@ -554,6 +554,19 @@ protected int scannerInvariant(final Iterator<Result> iter) {
// this creates a bottleneck on the resultQueue size, but guarantees no results will be lost
boolean accepted = false;
while (!accepted) {
// this thread exists in between the batch scanner and the other side of the queue, so check both side
// are still running, otherwise terminate
if (!isRunning() || state().equals(State.TERMINATED) || state().equals(State.FAILED)) {
log.info("aborting offer on scanner invariant due to thread no longer running");
throw new InterruptedException("aborting offer on scanner invariant due to thread no longer running");
} else if (uncaughtExceptionHandler.getThrowable() != null) {
log.warn("aborting offer on scanner invariant due to throwable", uncaughtExceptionHandler.getThrowable());
throw new RuntimeException("aborting offer on scanner invariant due to throwable", uncaughtExceptionHandler.getThrowable());
} else if (forceClose) {
log.info("cleaning up scanner due to external close");
throw new InterruptedException("cleaning up scanner due to external close");
}

try {
accepted = resultQueue.offer(myEntry, 200, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,14 @@ public void setMaxIndexScanTimeMillis(long maxTime) {
getConfig().setMaxIndexScanTimeMillis(maxTime);
}

public long getMaxAnyFieldScanTimeMillis() {
return getConfig().getMaxAnyFieldScanTimeMillis();
}

public void setMaxAnyFieldScanTimeMillis(long maxAnyFieldScanTimeMillis) {
getConfig().setMaxAnyFieldScanTimeMillis(maxAnyFieldScanTimeMillis);
}

public Function getQueryMacroFunction() {
return queryMacroFunction;
}
Expand Down
Loading

0 comments on commit d49dfd4

Please sign in to comment.