Skip to content

Commit

Permalink
Use scanTimeout in DatawaveFieldIndexCachingIteratorJexl as well as i…
Browse files Browse the repository at this point in the history
…n IteratorThreadPoolManager
  • Loading branch information
billoley committed Oct 3, 2023
1 parent 1935337 commit 4fa6746
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ public B withIvaratorSourcePool(GenericObjectPool<SortedKeyValueIterator<Key,Val
// timeout for the building of the cache. Default 1 hour
private volatile long scanTimeout = 1000L * 60 * 60;

// have we timed out
private volatile boolean timedOut = false;

// The max number of results that can be returned from this iterator.
private final long maxResults;

Expand Down Expand Up @@ -834,6 +837,14 @@ else if (key.compareTo(this.lastRangeSeeked.getStartKey()) < 0) {

if (this.setControl.isCancelledQuery()) {
this.topKey = null;
}

if (isTimedOut()) {
log.error("Ivarator query timed out");
throw new IvaratorException("Ivarator query timed out");
}

if (this.setControl.isCancelledQuery()) {
log.debug("Ivarator query was cancelled");
throw new IterationInterruptedException("Ivarator query was cancelled");
}
Expand All @@ -849,9 +860,15 @@ else if (key.compareTo(this.lastRangeSeeked.getStartKey()) < 0) {
}

if (this.setControl.isCancelledQuery()) {
log.debug("Ivarator query was cancelled");
throw new IterationInterruptedException("Ivarator query was cancelled");
if (isTimedOut()) {
log.error("Ivarator query timed out");
throw new IvaratorException("Ivarator query timed out");
} else {
log.debug("Ivarator query was cancelled");
throw new IterationInterruptedException("Ivarator query was cancelled");
}
}

}
}

Expand Down Expand Up @@ -933,6 +950,8 @@ private void fillSortedSets() throws IOException {
try {
// wait for all of the threads to complete
for (IvaratorFuture future : futures) {
checkTiming();

if (!failed && !this.setControl.isCancelledQuery()) {
try {
result = future.get(this.waitWindowObserver.remainingTimeMs(), TimeUnit.MILLISECONDS);
Expand All @@ -950,6 +969,9 @@ private void fillSortedSets() throws IOException {
this.setControl.setCancelled();
}
}
if (this.setControl.isCancelledQuery()) {
break;
}
}
} finally {
if (yieldKey == null) {
Expand Down Expand Up @@ -1035,6 +1057,27 @@ protected void startTiming() {
startTime = System.currentTimeMillis();
}

/**
* Check if the scan timeout has been reached. Mark as timed out and cancel the query if so.
*/
protected void checkTiming() {
if (System.currentTimeMillis() > (startTime + scanTimeout)) {
// mark as timed out
this.timedOut = true;
// and cancel the query
this.setControl.setCancelled();
}
}

/**
* Was the timed out flag set.
*
* @return a boolean if timed out
*/
protected boolean isTimedOut() {
return this.timedOut;
}

/**
* Get a source copy. This is only used when retrieving unsorted values.
*
Expand Down Expand Up @@ -1184,6 +1227,8 @@ protected IvaratorFuture fillSet(final Range boundingFiRange, final TotalResults
: null;

while (source.hasTop()) {
checkTiming();

Key top = source.getTopKey();

// if we are setup for composite seeking, seek if we are out of range
Expand Down Expand Up @@ -1355,6 +1400,7 @@ protected boolean copyRowBasedHdfsBackedSetFromPreviousCall() {
this.threadSafeSet = previousIvarator.threadSafeSet;
this.set = previousIvarator.set;
this.setControl = previousIvarator.setControl;
this.startTime = previousIvarator.startTime;
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private IteratorThreadPoolManager(IteratorEnvironment env) {
DatawaveFieldIndexCachingIteratorJexl ivarator = future.getIvarator();
long elapsed = now - ivarator.getStartTime();
long ivaratorScanTimeout = ivarator.getScanTimeout();
if (!ivarator.isRunning() && ((elapsed > ivaratorScanTimeout) || (elapsed > ivaratorMaxScanTimeout))) {
if (ivarator.isRunning() && ((elapsed > ivaratorScanTimeout) || (elapsed > ivaratorMaxScanTimeout))) {
removeIvaratorFuture(taskName, env);
}
}
Expand Down

0 comments on commit 4fa6746

Please sign in to comment.