Skip to content

Commit

Permalink
Stop yielding on the same key after maxYields
Browse files Browse the repository at this point in the history
  • Loading branch information
billoley committed Oct 3, 2023
1 parent 6b8b123 commit 1935337
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
// preserve the original range for use with the Final Document tracking iterator because it is placed after the ResultCountingIterator
// so the FinalDocumentTracking iterator needs the start key with the count already appended
this.originalRange = range;
this.waitWindowObserver.start(range, yieldThresholdMs);
if (WaitWindowObserver.getNumYields(range.getStartKey(), collectTimingDetails) < maxYields) {
this.waitWindowObserver.start(range, yieldThresholdMs);
}
getActiveQueryLog().get(getQueryId()).beginCall(this.originalRange, ActiveQuery.CallType.SEEK);
ActiveQueryLog.getInstance().get(getQueryId()).beginCall(this.originalRange, ActiveQuery.CallType.SEEK);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public class QueryOptions implements OptionDescriber {
public static final String START_TIME = "start.time";
public static final String END_TIME = "end.time";
public static final String YIELD_THRESHOLD_MS = "yield.threshold.ms";
public static final String MAX_YIELDS = "max.yields";

public static final String FILTER_MASKED_VALUES = "filter.masked.values";
public static final String INCLUDE_DATATYPE = "include.datatype";
Expand Down Expand Up @@ -355,6 +356,7 @@ public class QueryOptions implements OptionDescriber {
protected long maxIvaratorResults = -1;

protected long yieldThresholdMs = Long.MAX_VALUE;
protected long maxYields = 10;

protected Predicate<Key> fieldIndexKeyDataTypeFilter = KeyIdentity.Function;
protected Predicate<Key> eventEntryKeyDataTypeFilter = KeyIdentity.Function;
Expand Down Expand Up @@ -486,6 +488,7 @@ public void deepCopy(QueryOptions other) {
this.maxIvaratorResults = other.maxIvaratorResults;

this.yieldThresholdMs = other.yieldThresholdMs;
this.maxYields = other.maxYields;

this.compressResults = other.compressResults;
this.limitFieldsMap = other.limitFieldsMap;
Expand Down Expand Up @@ -1162,6 +1165,7 @@ public IteratorOptions describeOptions() {
" The maximum number of sources to use for ivarators across all ivarated terms within the query. Note the thread pool size is controlled via an accumulo property.");
options.put(YIELD_THRESHOLD_MS,
"The threshold in milliseconds that the query iterator will evaluate consecutive documents to false before yielding the scan.");
options.put(MAX_YIELDS, "The maximum number of times to yield on the same startKey without making progress.");
options.put(COMPRESS_SERVER_SIDE_RESULTS, "GZIP compress the serialized Documents before returning to the webserver");
options.put(MAX_EVALUATION_PIPELINES, "The max number of evaluation pipelines");
options.put(SERIAL_EVALUATION_PIPELINE, "Forces us to use the serial pipeline. Allows us to still have a single thread for evaluation");
Expand Down Expand Up @@ -1616,6 +1620,10 @@ public boolean validateOptions(Map<String,String> options) {
this.setYieldThresholdMs(Long.parseLong(options.get(YIELD_THRESHOLD_MS)));
}

if (options.containsKey(MAX_YIELDS)) {
this.setMaxYields(Long.parseLong(options.get(MAX_YIELDS)));
}

if (options.containsKey(COMPRESS_SERVER_SIDE_RESULTS)) {
this.setCompressResults(Boolean.parseBoolean(options.get(COMPRESS_SERVER_SIDE_RESULTS)));
}
Expand Down Expand Up @@ -2075,6 +2083,14 @@ public void setYieldThresholdMs(long yieldThresholdMs) {
this.yieldThresholdMs = yieldThresholdMs;
}

public long getMaxYields() {
return maxYields;
}

public void setMaxYields(long maxYields) {
this.maxYields = maxYields;
}

public int getFiFieldSeek() {
return fiFieldSeek;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class WaitWindowObserver {
// When the wait window is over. Set during the initial seek
protected long endOfWaitWindow;
// Remaining time in the wait window. Updated by the timerTask
protected AtomicLong remainingTimeMs = new AtomicLong();
protected AtomicLong remainingTimeMs = new AtomicLong(Long.MAX_VALUE);
protected TimerTask timerTask = null;
// How often the timerTask gets run
protected long checkPeriod = 50;
Expand Down Expand Up @@ -433,6 +433,27 @@ static public Text removeMarkers(Text text) {
}
}

static public int getNumYields(Key startKey, boolean collectTimingDetails) {
int numNulls = 0;
if (startKey != null) {
String s = null;
if (WaitWindowObserver.hasMarker(startKey.getColumnFamily())) {
s = startKey.getColumnFamily().toString();
} else if (WaitWindowObserver.hasMarker(startKey.getColumnQualifier())) {
s = startKey.getColumnQualifier().toString();
}
if (s != null && s.endsWith("\0")) {
for (int x = s.length() - 1; x > 0 && s.charAt(x) == '\0'; x--) {
numNulls++;
}
}
if (collectTimingDetails) {
numNulls = numNulls / 2;
}
}
return numNulls;
}

/*
* Convenience method to produce a document containing a WAIT_WINDOW_OVERRUN attribute, This document gets returned before a yield when
* collectTimingDetails=true so that the FinalDocumenTrackingIterator can add timing details and metrics befoer returning the Document.
Expand Down

0 comments on commit 1935337

Please sign in to comment.