From 7582320470e32ec613c226f8a52cc14477bb7fa6 Mon Sep 17 00:00:00 2001 From: Bill Oley Date: Wed, 25 Oct 2023 17:15:21 -0400 Subject: [PATCH] Move code from DatawaveFieldIndexCachingIteratorJexl.fillSet() to IvaratorRunnable and suspend/restart processing on yield --- ...DatawaveFieldIndexCachingIteratorJexl.java | 318 ++++++------------ .../DatawaveFieldIndexRegexIteratorJexl.java | 8 +- .../iterators/IteratorThreadPoolManager.java | 10 +- .../core/iterators/IvaratorFuture.java | 14 +- .../core/iterators/IvaratorRunnable.java | 281 ++++++++++++++++ 5 files changed, 397 insertions(+), 234 deletions(-) create mode 100644 warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorRunnable.java diff --git a/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java b/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java index e3af3acc8f..14c39eedc1 100644 --- a/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java +++ b/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexCachingIteratorJexl.java @@ -14,7 +14,6 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -37,13 +36,11 @@ import com.google.common.base.Objects; import com.google.common.base.Predicate; -import com.google.common.collect.Multimap; import datawave.core.iterators.querylock.QueryLock; import datawave.query.Constants; import datawave.query.composite.CompositeMetadata; import datawave.query.composite.CompositeSeeker.FieldIndexCompositeSeeker; -import datawave.query.exceptions.DatawaveIvaratorMaxResultsException; import datawave.query.exceptions.WaitWindowOverrunException; import datawave.query.iterator.CachingIterator; import datawave.query.iterator.ivarator.IvaratorCacheDir; @@ -378,7 +375,6 @@ public B withIvaratorSourcePool(GenericObjectPool> ivaratorSourcePool = null; - private AtomicBoolean running = new AtomicBoolean(false); // ------------------------------------------------------------------------- // ------------- Constructors @@ -725,6 +721,10 @@ public Text getFiName() { return fiName; } + public Text getFiRow() { + return fiRow; + } + public PartialKey getReturnKeyType() { return returnKeyType; } @@ -872,51 +872,20 @@ else if (key.compareTo(this.lastRangeSeeked.getStartKey()) < 0) { } } - /** - * A class to keep track of the total result size across all of the bounding ranges - */ - public static class TotalResults { - - private final long maxResults; - private AtomicLong size = new AtomicLong(); - - public TotalResults(long maxResults) { - this.maxResults = maxResults; - } - - public boolean increment() { - if (maxResults <= 0) { - return true; - } - return size.incrementAndGet() <= maxResults; - } - - public boolean add(long val) { - if (maxResults <= 0) { - return true; - } - return size.addAndGet(val) <= maxResults; - } - } - private void fillSortedSets() throws IOException { String sourceRow = this.fiRow.toString(); // if we are running fillSortedSets as port of a re-seek after yield, then the fillSet threads would not have // completed when a WaitWindowOverrunException was thrown and therefore the set would not have been marked as // complete when setupRowBasedHdfsBackedSet was called at the top of this method. We will try to copy the - // RowBasedHdfsBackedSet from the previously used Ivarator since there may be unpersisted results or - // a long-running Ivarator may still be filling the RowBasedHdfsBackedSet + // RowBasedHdfsBackedSet from the previously used Ivarator and resume processing. // If this re-seek is from an Ivarator yielding, the startKey will have a yield marker in either the // colFam (for a shard range) or colQual (for a document range) boolean usePreviousSortedSet = WaitWindowObserver.hasMarker(lastRangeSeeked.getStartKey()); if (usePreviousSortedSet) { - if (!copyRowBasedHdfsBackedSetFromPreviousCall()) { - // if copying from the previous call fails, then ensure that the previous IvaratorFutures are removed - // and set usePreviousSortedSet to false to force setup of a new HDFS backed set + if (!resumeFromPreviousCall()) { + // if resuming from the previous call fails, then set usePreviousSortedSet + // to false to force setup of a new HDFS backed set usePreviousSortedSet = false; - for (Range r : boundingFiRanges) { - IteratorThreadPoolManager.removeIvaratorFuture(getIvaratorTaskName(r), initEnv); - } } } if (!usePreviousSortedSet) { @@ -933,7 +902,7 @@ private void fillSortedSets() throws IOException { log.debug("Processing " + boundingFiRanges + " for " + this); } - TotalResults totalResults = new TotalResults(maxResults); + IvaratorRunnable.TotalResults totalResults = new IvaratorRunnable.TotalResults(maxResults); for (Range range : boundingFiRanges) { if (log.isTraceEnabled()) { @@ -980,7 +949,16 @@ private void fillSortedSets() throws IOException { // in which case we want the IvaratorFutures to remain so that we can reconnect the HDFSBackedSortedSet // on the next call for (Range range : boundingFiRanges) { - IteratorThreadPoolManager.removeIvaratorFuture(getIvaratorTaskName(range), this.initEnv); + IteratorThreadPoolManager.removeIvaratorFuture(getTaskName(range), this.initEnv); + } + } else { + // We can't use the source anymore since it is issued by Accumulo Tablet. We suspend + // the IvaratorRunnable until the new call when it will be resumed with a new source + for (IvaratorFuture future : futures) { + // this will cause the run method save state and exit to free up the executor thread + // we will be able to retrieve the state and start a new IvaratorRunnable with a + // source from the new IvaratorPool + future.getIvaratorRunnable().suspend(); } } } @@ -1044,7 +1022,7 @@ private void getNextUnsortedKey() throws IOException { } if (this.fiSource.hasTop()) { - addKey(this.fiSource.getTopKey(), this.fiSource.getTopValue()); + addKey(this.fiSource.getTopKey()); this.fiSource.next(); } } @@ -1125,13 +1103,11 @@ protected void returnPoolSource(SortedKeyValueIterator source) { * * @param topFiKey * the top index key - * @param value - * the value * @return true if it matched * @throws IOException * for issues with read/write */ - protected boolean addKey(Key topFiKey, Value value) throws IOException { + protected boolean addKey(Key topFiKey) throws IOException { if (log.isTraceEnabled()) { log.trace("addKey evaluating " + topFiKey); } @@ -1160,10 +1136,6 @@ protected boolean addKey(Key topFiKey, Value value) throws IOException { return false; } - public boolean isRunning() { - return running.get(); - } - public long getScanTimeout() { return scanTimeout; } @@ -1172,20 +1144,6 @@ public long getStartTime() { return startTime; } - public void waitUntilComplete() { - if (running.get()) { - synchronized (running) { - while (running.get()) { - try { - running.wait(); - } catch (InterruptedException e) { - - } - } - } - } - } - /** * This method will asynchronously fill the set with matches from within the specified bounding FI range. * @@ -1195,144 +1153,22 @@ public void waitUntilComplete() { * total results * @return the Future */ - protected IvaratorFuture fillSet(final Range boundingFiRange, final TotalResults totalResults) { - - // this will block until an ivarator source becomes available - final SortedKeyValueIterator source = takePoolSource(); + protected IvaratorFuture fillSet(final Range boundingFiRange, final IvaratorRunnable.TotalResults totalResults) { // create runnable - Runnable runnable = () -> { - running.set(true); - if (log.isDebugEnabled()) { - log.debug("Starting fillSet(" + boundingFiRange + ')'); - } - int scanned = 0; - int matched = 0; - QuerySpan querySpan = null; - Key nextSeekKey = null; - int nextCount = 0; - try { - if (collectTimingDetails && source instanceof SourceTrackingIterator) { - querySpan = ((SourceTrackingIterator) source).getQuerySpan(); - } - - // seek the source to a range covering the entire row....the bounding box will dictate the actual scan - source.seek(boundingFiRange, EMPTY_CFS, false); - scanned++; - DatawaveFieldIndexCachingIteratorJexl.this.scannedKeys.incrementAndGet(); - - // if this is a range iterator, build the composite-safe Fi range - Range compositeSafeFiRange = (this instanceof DatawaveFieldIndexRangeIteratorJexl) - ? ((DatawaveFieldIndexRangeIteratorJexl) this).buildCompositeSafeFiRange(fiRow, fiName, fieldValue) - : null; - - while (source.hasTop()) { - checkTiming(); - - Key top = source.getTopKey(); - - // if we are setup for composite seeking, seek if we are out of range - if (compositeSeeker != null && compositeSafeFiRange != null) { - String colQual = top.getColumnQualifier().toString(); - String ingestType = colQual.substring(colQual.indexOf('\0') + 1, colQual.lastIndexOf('\0')); - String colFam = top.getColumnFamily().toString(); - String fieldName = colFam.substring(colFam.indexOf('\0') + 1); - - Collection componentFields = null; - String separator = null; - Multimap compositeToFieldMap = compositeMetadata.getCompositeFieldMapByType().get(ingestType); - Map compositeSeparatorMap = compositeMetadata.getCompositeFieldSeparatorsByType().get(ingestType); - if (compositeToFieldMap != null && compositeSeparatorMap != null) { - componentFields = compositeToFieldMap.get(fieldName); - separator = compositeSeparatorMap.get(fieldName); - } - - if (componentFields != null && separator != null && !compositeSeeker.isKeyInRange(top, compositeSafeFiRange, separator)) { - boolean shouldSeek = false; - - // top key precedes nextSeekKey - if (nextSeekKey != null && top.compareTo(nextSeekKey) < 0) { - // if we hit the seek threshold, seek - if (nextCount >= compositeSeekThreshold) - shouldSeek = true; - } - // top key exceeds nextSeekKey, or nextSeekKey unset - else { - nextCount = 0; - nextSeekKey = null; - - // get a new seek key - Key newStartKey = compositeSeeker.nextSeekKey(new ArrayList<>(componentFields), top, compositeSafeFiRange, separator); - if (newStartKey != boundingFiRange.getStartKey() && newStartKey.compareTo(boundingFiRange.getStartKey()) > 0 - && newStartKey.compareTo(boundingFiRange.getEndKey()) <= 0) { - nextSeekKey = newStartKey; - - // if we hit the seek threshold (i.e. if it is set to 0), seek - if (nextCount >= compositeSeekThreshold) - shouldSeek = true; - } - } - - if (shouldSeek) { - source.seek(new Range(nextSeekKey, boundingFiRange.isStartKeyInclusive(), boundingFiRange.getEndKey(), - boundingFiRange.isEndKeyInclusive()), EMPTY_CFS, false); - - // reset next count and seek key - nextSeekKey = null; - nextCount = 0; - } else { - nextCount++; - source.next(); - } - - scanned++; - continue; - } - } - - // terminate if timed out or cancelled - if (DatawaveFieldIndexCachingIteratorJexl.this.setControl.isCancelledQuery()) { - break; - } - - if (addKey(top, source.getTopValue())) { - matched++; - if (!totalResults.increment()) { - throw new DatawaveIvaratorMaxResultsException("Exceeded the maximum set size"); - } - } - - source.next(); - scanned++; - DatawaveFieldIndexCachingIteratorJexl.this.scannedKeys.incrementAndGet(); - } - } catch (Exception e) { - // throw the exception up which will be available via the Future - log.error("Failed to complete fillSet(" + boundingFiRange + ")", e); - throw new RuntimeException(e); - } finally { - // return the ivarator source back to the pool. - returnPoolSource(source); - if (log.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - builder.append("Matched ").append(matched).append(" out of ").append(scanned).append(" for ").append(boundingFiRange).append(": ") - .append(DatawaveFieldIndexCachingIteratorJexl.this); - log.debug(builder.toString()); - } - if (collectTimingDetails && querySpanCollector != null && querySpan != null) { - querySpanCollector.addQuerySpan(querySpan); - } - synchronized (running) { - running.set(false); - running.notify(); - } - } - }; - - return IteratorThreadPoolManager.executeIvarator(this, runnable, getIvaratorTaskName(boundingFiRange), this.initEnv); + String taskName = getTaskName(boundingFiRange); + IvaratorFuture future = IteratorThreadPoolManager.getIvaratorFuture(taskName, this.initEnv); + if (future == null) { + // no future exists, so get a source and create/execute a new IvaratorRunnable + // this will block until an ivarator source becomes available + SortedKeyValueIterator source = takePoolSource(); + IvaratorRunnable ivaratorRunnable = new IvaratorRunnable(this, source, boundingFiRange, boundingFiRange, totalResults); + future = IteratorThreadPoolManager.executeIvarator(ivaratorRunnable, taskName, this.initEnv); + } + return future; } - public String getIvaratorTaskName(Range boundingFiRange) { + public String getTaskName(Range boundingFiRange) { StringBuilder sb = new StringBuilder(); sb.append(getClass().getSimpleName()).append(" in "); sb.append(queryId); @@ -1366,42 +1202,62 @@ protected void clearRowBasedHdfsBackedSet() throws IOException { this.set = null; } - protected boolean copyRowBasedHdfsBackedSetFromPreviousCall() { + protected boolean resumeFromPreviousCall() { + boolean canResume = true; // Only copy previous Ivarator's RowBasedHdfsBackedSet if all futures are available // and the Ivarator in the IvaratorFuture is the same object DatawaveFieldIndexCachingIteratorJexl previousIvarator = null; + List suspended = new ArrayList<>(); for (Range r : boundingFiRanges) { - IvaratorFuture f = IteratorThreadPoolManager.getIvaratorFuture(getIvaratorTaskName(r), initEnv); + IvaratorFuture f = IteratorThreadPoolManager.getIvaratorFuture(getTaskName(r), initEnv); if (f == null) { - return false; + canResume = false; + break; } else { + if (f.getIvaratorRunnable().isSupended()) { + suspended.add(f); + } if (previousIvarator == null) { previousIvarator = f.getIvarator(); - } else { - if (previousIvarator != f.getIvarator()) { - return false; - } + } else if (previousIvarator != f.getIvarator()) { + canResume = false; + break; } } } // ivaratorCacheDirs is declared final, but must be the same - Set currentUriSet = this.ivaratorCacheDirs.stream().map(d -> d.getPathURI()).collect(Collectors.toSet()); - Set previousUriSet = previousIvarator.ivaratorCacheDirs.stream().map(d -> d.getPathURI()).collect(Collectors.toSet()); - if (!currentUriSet.equals(previousUriSet)) { - return false; + if (canResume) { + Set currentUriSet = this.ivaratorCacheDirs.stream().map(d -> d.getPathURI()).collect(Collectors.toSet()); + Set previousUriSet = previousIvarator.ivaratorCacheDirs.stream().map(d -> d.getPathURI()).collect(Collectors.toSet()); + if (currentUriSet.equals(previousUriSet)) { + this.currentRow = previousIvarator.currentRow; + this.threadSafeSet = previousIvarator.threadSafeSet; + this.set = previousIvarator.set; + this.setControl = previousIvarator.setControl; + this.startTime = previousIvarator.startTime; + // resume processing of each suspended IvaratorRunnable + for (IvaratorFuture f : suspended) { + IvaratorRunnable resumingRunnable = f.getIvaratorRunnable(); + resumingRunnable.prepareForResume(this); + String taskName = resumingRunnable.getTaskName(); + // remove the previous IvaratorFuture from IteratorThreadPoolManager + IteratorThreadPoolManager.removeIvaratorFuture(taskName, this.initEnv); + // execute new IvaratorRunnable and add it to IteratorThreadPoolManager + IteratorThreadPoolManager.executeIvarator(resumingRunnable, taskName, initEnv); + } + } else { + canResume = false; + } } - // collect timing details from the previous Ivarator and add them to this one - if (this.collectTimingDetails) { - this.querySpanCollector.addQuerySpan(previousIvarator.querySpanCollector.getCombinedQuerySpan(null)); + if (!canResume) { + // if can not resume from the previous Ivarator, then ensure that the previous IvaratorFutures are removed + for (Range r : boundingFiRanges) { + IteratorThreadPoolManager.removeIvaratorFuture(getTaskName(r), initEnv); + } } - this.currentRow = previousIvarator.currentRow; - this.threadSafeSet = previousIvarator.threadSafeSet; - this.set = previousIvarator.set; - this.setControl = previousIvarator.setControl; - this.startTime = previousIvarator.startTime; - return true; + return canResume; } /** @@ -1786,7 +1642,35 @@ public void setCollectTimingDetails(boolean collectTimingDetails) { this.collectTimingDetails = collectTimingDetails; } + public boolean getCollectTimingDetails() { + return collectTimingDetails; + } + public void setQuerySpanCollector(QuerySpanCollector querySpanCollector) { this.querySpanCollector = querySpanCollector; } + + public AtomicLong getScannedKeys() { + return scannedKeys; + } + + public QuerySpanCollector getQuerySpanCollector() { + return querySpanCollector; + } + + public HdfsBackedControl getSetControl() { + return setControl; + } + + public FieldIndexCompositeSeeker getCompositeSeeker() { + return compositeSeeker; + } + + public CompositeMetadata getCompositeMetadata() { + return compositeMetadata; + } + + public int getCompositeSeekThreshold() { + return compositeSeekThreshold; + } } diff --git a/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexRegexIteratorJexl.java b/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexRegexIteratorJexl.java index 03271386f0..39c266ebfe 100644 --- a/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexRegexIteratorJexl.java +++ b/warehouse/query-core/src/main/java/datawave/core/iterators/DatawaveFieldIndexRegexIteratorJexl.java @@ -4,7 +4,6 @@ import java.util.List; import java.util.regex.Pattern; -import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -12,7 +11,6 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.hadoop.io.Text; -import datawave.data.ColumnFamilyConstants; import datawave.query.Constants; import datawave.query.parser.JavaRegexAnalyzer; import datawave.query.parser.JavaRegexAnalyzer.JavaRegexParseException; @@ -127,15 +125,11 @@ protected List buildBoundingFiRanges(Text rowId, Text fiName, Text fieldV */ @Override protected boolean matches(Key k) throws IOException { - boolean matches = false; String colq = k.getColumnQualifier().toString(); - // search backwards for the null bytes to expose the value in value\0datatype\0UID int index = colq.lastIndexOf('\0'); index = colq.lastIndexOf('\0', index - 1); - matches = (pattern.get().matcher(colq.substring(0, index)).matches()); - - return matches; + return (pattern.get().matcher(colq.substring(0, index)).matches()); } } diff --git a/warehouse/query-core/src/main/java/datawave/core/iterators/IteratorThreadPoolManager.java b/warehouse/query-core/src/main/java/datawave/core/iterators/IteratorThreadPoolManager.java index 944f84aed7..8716143b32 100644 --- a/warehouse/query-core/src/main/java/datawave/core/iterators/IteratorThreadPoolManager.java +++ b/warehouse/query-core/src/main/java/datawave/core/iterators/IteratorThreadPoolManager.java @@ -98,7 +98,7 @@ private IteratorThreadPoolManager(IteratorEnvironment env) { DatawaveFieldIndexCachingIteratorJexl ivarator = future.getIvarator(); long elapsed = now - ivarator.getStartTime(); long ivaratorScanTimeout = ivarator.getScanTimeout(); - if (ivarator.isRunning() && ((elapsed > ivaratorScanTimeout) || (elapsed > ivaratorMaxScanTimeout))) { + if (future.getIvaratorRunnable().isRunning() && ((elapsed > ivaratorScanTimeout) || (elapsed > ivaratorMaxScanTimeout))) { removeIvaratorFuture(taskName, env); } } @@ -180,18 +180,18 @@ public static IvaratorFuture getIvaratorFuture(String taskName, IteratorEnvironm public static void removeIvaratorFuture(String taskName, IteratorEnvironment env) { IvaratorFuture future = instance(env).ivaratorFutures.getIfPresent(taskName); if (future != null) { - if (future.getIvarator().isRunning()) { + if (future.getIvaratorRunnable().isRunning()) { future.cancel(true); - future.getIvarator().waitUntilComplete(); + future.getIvaratorRunnable().waitUntilComplete(); } instance(env).ivaratorFutures.invalidate(taskName); } } - public static IvaratorFuture executeIvarator(DatawaveFieldIndexCachingIteratorJexl ivarator, Runnable task, String taskName, IteratorEnvironment env) { + public static IvaratorFuture executeIvarator(IvaratorRunnable ivaratorRunnable, String taskName, IteratorEnvironment env) { IvaratorFuture future = instance(env).ivaratorFutures.getIfPresent(taskName); if (future == null) { - future = new IvaratorFuture(instance(env).execute(IVARATOR_THREAD_NAME, task, taskName), ivarator); + future = new IvaratorFuture(instance(env).execute(IVARATOR_THREAD_NAME, ivaratorRunnable, taskName), ivaratorRunnable); instance(env).ivaratorFutures.put(taskName, future); } return future; diff --git a/warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorFuture.java b/warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorFuture.java index ea3c35401a..98e166cbdc 100644 --- a/warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorFuture.java +++ b/warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorFuture.java @@ -7,12 +7,12 @@ public class IvaratorFuture implements Future { - final private Future future; - final private DatawaveFieldIndexCachingIteratorJexl ivarator; + private Future future; + private IvaratorRunnable ivaratorRunnable; - public IvaratorFuture(Future future, DatawaveFieldIndexCachingIteratorJexl ivarator) { + public IvaratorFuture(Future future, IvaratorRunnable ivaratorRunnable) { this.future = future; - this.ivarator = ivarator; + this.ivaratorRunnable = ivaratorRunnable; } @Override @@ -41,6 +41,10 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec } public DatawaveFieldIndexCachingIteratorJexl getIvarator() { - return ivarator; + return ivaratorRunnable.getIvarator(); + } + + public IvaratorRunnable getIvaratorRunnable() { + return ivaratorRunnable; } } diff --git a/warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorRunnable.java b/warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorRunnable.java new file mode 100644 index 0000000000..b604757954 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/core/iterators/IvaratorRunnable.java @@ -0,0 +1,281 @@ +package datawave.core.iterators; + +import static datawave.core.iterators.DatawaveFieldIndexCachingIteratorJexl.EMPTY_CFS; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.log4j.Logger; + +import com.google.common.collect.Multimap; + +import datawave.query.exceptions.DatawaveIvaratorMaxResultsException; +import datawave.query.iterator.profile.QuerySpan; +import datawave.query.iterator.profile.SourceTrackingIterator; + +public class IvaratorRunnable implements Runnable { + + private static final Logger log = Logger.getLogger(IvaratorRunnable.class); + private final String taskName; + private final Range boundingFiRange; + private final TotalResults totalResults; + private DatawaveFieldIndexCachingIteratorJexl ivarator; + private boolean suspended = false; + private AtomicBoolean suspendRequested = new AtomicBoolean(false); + private AtomicBoolean running = new AtomicBoolean(false); + private Key restartKey = null; + private SortedKeyValueIterator source; + private Range seekRange; + private QuerySpan querySpan = null; + private int scanned = 0; + private int matched = 0; + private int nextCount = 0; + + public IvaratorRunnable(DatawaveFieldIndexCachingIteratorJexl ivarator, SortedKeyValueIterator source, final Range boundingFiRange, + final Range seekRange, final TotalResults totalResults) { + this.ivarator = ivarator; + this.source = source; + this.boundingFiRange = boundingFiRange; + this.seekRange = seekRange; + this.totalResults = totalResults; + this.taskName = ivarator.getTaskName(boundingFiRange); + } + + public void prepareForResume(DatawaveFieldIndexCachingIteratorJexl resumingIvarator) { + // IvaratorRunnable runnable; + if (!suspended || restartKey == null) { + throw new IllegalStateException("IvaratorRunnable not suspended. Can not prepareForResume"); + } + ivarator = resumingIvarator; + // use a new seek range that starts from restartKey of the suspended IvaratorRunnable + seekRange = new Range(restartKey, true, boundingFiRange.getEndKey(), boundingFiRange.isEndKeyInclusive()); + // this will block until an ivarator source becomes available + source = ivarator.takePoolSource(); + suspended = false; + suspendRequested.set(false); + } + + public String getTaskName() { + return taskName; + } + + public void suspend() { + if (running.get()) { + suspendRequested.set(true); + // the run method should see suspended == true, save state, and exit + // the state will be available via the IvaratorFuture during a subsequent call + waitUntilComplete(); + } + } + + public boolean isSupended() { + return suspended; + } + + public boolean isRunning() { + return running.get(); + } + + public void waitUntilComplete() { + if (running.get()) { + synchronized (running) { + while (running.get()) { + try { + running.wait(); + } catch (InterruptedException e) { + + } + } + } + } + } + + private boolean suspendNow(Key key) { + boolean suspendNow = suspendRequested.get(); + if (suspendNow) { + restartKey = key; + suspended = true; + } + return suspendNow; + } + + public DatawaveFieldIndexCachingIteratorJexl getIvarator() { + return ivarator; + } + + @Override + public void run() { + running.set(true); + if (log.isDebugEnabled()) { + if (seekRange.equals(boundingFiRange)) { + log.debug(String.format("Starting IvaratorRunnable.run() for range %s", boundingFiRange)); + } else { + log.debug(String.format("Resuming IvaratorRunnable.run() for range %s at key %s", boundingFiRange, restartKey)); + } + } + Key nextSeekKey = null; + try { + if (ivarator.getCollectTimingDetails() && source instanceof SourceTrackingIterator) { + querySpan = ((SourceTrackingIterator) source).getQuerySpan(); + } + + // seek the source to a range covering the entire row....the bounding box will dictate the actual scan + // if we are resuming the ivarator, then we will be seeking to where we left off when suspended + source.seek(seekRange, EMPTY_CFS, false); + scanned++; + ivarator.getScannedKeys().incrementAndGet(); + + // if this is a range iterator, build the composite-safe Fi range + Range compositeSafeFiRange = (ivarator instanceof DatawaveFieldIndexRangeIteratorJexl) + ? ((DatawaveFieldIndexRangeIteratorJexl) ivarator).buildCompositeSafeFiRange(ivarator.getFiRow(), ivarator.getFiName(), + ivarator.getFieldValue()) + : null; + + while (source.hasTop()) { + Key top = source.getTopKey(); + // if suspended, set the restartKey and exit the run method + if (suspendNow(top)) { + break; + } + ivarator.checkTiming(); + + // if we are setup for composite seeking, seek if we are out of range + if (ivarator.getCompositeSeeker() != null && compositeSafeFiRange != null) { + String colQual = top.getColumnQualifier().toString(); + String ingestType = colQual.substring(colQual.indexOf('\0') + 1, colQual.lastIndexOf('\0')); + String colFam = top.getColumnFamily().toString(); + String fieldName = colFam.substring(colFam.indexOf('\0') + 1); + + Collection componentFields = null; + String separator = null; + Multimap compositeToFieldMap = ivarator.getCompositeMetadata().getCompositeFieldMapByType().get(ingestType); + Map compositeSeparatorMap = ivarator.getCompositeMetadata().getCompositeFieldSeparatorsByType().get(ingestType); + if (compositeToFieldMap != null && compositeSeparatorMap != null) { + componentFields = compositeToFieldMap.get(fieldName); + separator = compositeSeparatorMap.get(fieldName); + } + + if (componentFields != null && separator != null && !ivarator.getCompositeSeeker().isKeyInRange(top, compositeSafeFiRange, separator)) { + boolean shouldSeek = false; + + // top key precedes nextSeekKey + if (nextSeekKey != null && top.compareTo(nextSeekKey) < 0) { + // if we hit the seek threshold, seek + if (nextCount >= ivarator.getCompositeSeekThreshold()) + shouldSeek = true; + } + // top key exceeds nextSeekKey, or nextSeekKey unset + else { + nextCount = 0; + nextSeekKey = null; + + // get a new seek key + Key newStartKey = ivarator.getCompositeSeeker().nextSeekKey(new ArrayList<>(componentFields), top, compositeSafeFiRange, separator); + if (newStartKey != boundingFiRange.getStartKey() && newStartKey.compareTo(boundingFiRange.getStartKey()) > 0 + && newStartKey.compareTo(boundingFiRange.getEndKey()) <= 0) { + nextSeekKey = newStartKey; + + // if we hit the seek threshold (i.e. if it is set to 0), seek + if (nextCount >= ivarator.getCompositeSeekThreshold()) + shouldSeek = true; + } + } + + if (shouldSeek) { + source.seek(new Range(nextSeekKey, boundingFiRange.isStartKeyInclusive(), boundingFiRange.getEndKey(), + boundingFiRange.isEndKeyInclusive()), EMPTY_CFS, false); + + // reset next count and seek key + nextSeekKey = null; + nextCount = 0; + } else { + nextCount++; + source.next(); + } + + scanned++; + continue; + } + } + + // terminate if timed out or cancelled + if (ivarator.getSetControl().isCancelledQuery()) { + break; + } + + if (suspendNow(top)) { + break; + } + + if (ivarator.addKey(top)) { + matched++; + if (!totalResults.increment()) { + throw new DatawaveIvaratorMaxResultsException("Exceeded the maximum set size"); + } + } + + source.next(); + scanned++; + ivarator.getScannedKeys().incrementAndGet(); + } + if (suspended && log.isDebugEnabled()) { + log.debug(String.format("Suspended IvaratorRunnable.run() for range %s at key %s", boundingFiRange, restartKey)); + } + } catch (Exception e) { + // throw the exception up which will be available via the Future + log.error("Failed to complete fillSet(" + boundingFiRange + ")", e); + throw new RuntimeException(e); + } finally { + // return the ivarator source back to the pool. + ivarator.returnPoolSource(source); + source = null; + if (log.isDebugEnabled()) { + StringBuilder builder = new StringBuilder(); + builder.append("Matched ").append(matched).append(" out of ").append(scanned).append(" for ").append(boundingFiRange).append(": ") + .append(ivarator); + log.debug(builder.toString()); + } + if (ivarator.getCollectTimingDetails() && ivarator.getQuerySpanCollector() != null && querySpan != null) { + ivarator.getQuerySpanCollector().addQuerySpan(querySpan); + } + synchronized (running) { + running.set(false); + running.notify(); + } + } + } + + /** + * A class to keep track of the total result size across all of the bounding ranges + */ + public static class TotalResults { + + private final long maxResults; + private AtomicLong size = new AtomicLong(); + + public TotalResults(long maxResults) { + this.maxResults = maxResults; + } + + public boolean increment() { + if (maxResults <= 0) { + return true; + } + return size.incrementAndGet() <= maxResults; + } + + public boolean add(long val) { + if (maxResults <= 0) { + return true; + } + return size.addAndGet(val) <= maxResults; + } + } +}