Skip to content

Commit

Permalink
Changes from MR feedback, handle infinite startKey (#704)
Browse files Browse the repository at this point in the history
  • Loading branch information
billoley committed Oct 2, 2023
1 parent 65f7862 commit 6b8b123
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,11 @@ public void seek(Range r, Collection<ByteSequence> columnFamilies, boolean inclu
if (!sortedUIDs) {
String cq = WaitWindowObserver.removeMarkers(startKey.getColumnQualifier()).toString();
if (r.getStartKey().getColumnFamily().getLength() > 0 && cq.length() > 0) {
String cqSuffix = WaitWindowObserver.hasBeginMarker(startKey.getColumnQualifier()) ? "" : "\0";
int fieldnameIndex = cq.indexOf('\0');
if (fieldnameIndex >= 0) {
// If startKey colQual has YIELD_AT_BEGIN marker then we want to include keys with the
// fieldName / fieldValue in the key, otherwise we seek past this key by adding a \0
String cqSuffix = WaitWindowObserver.hasBeginMarker(startKey.getColumnQualifier()) ? "" : "\0";
String cf = startKey.getColumnFamily().toString();
lastFiKey = new Key(startKey.getRow().toString(), "fi\0" + cq.substring(0, fieldnameIndex),
cq.substring(fieldnameIndex + 1) + '\0' + cf + cqSuffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,4 @@ public WaitWindowOverrunException(Key yieldKey) {
public Key getYieldKey() {
return yieldKey;
}

// disallow other constructors that don't include a yieldKey
private WaitWindowOverrunException() {
super();
}

// disallow constructors that don't include a yieldKey
private WaitWindowOverrunException(String message) {
super(message);
}

// disallow constructors that don't include a yieldKey
private WaitWindowOverrunException(String message, Throwable cause) {
super(message, cause);
}

// disallow constructors that don't include a yieldKey
private WaitWindowOverrunException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,16 @@ protected void seek(SortedKeyValueIterator<Key,Value> source, Range r) throws IO
*/
protected Range buildIndexRange(Range r) {
// include startKey if we yielded to the beginning of a document range
boolean includeStartKey = WaitWindowObserver.hasBeginMarker(r.getStartKey().getColumnQualifier()) ? true : r.isStartKeyInclusive();
Key startKey = permuteRangeKey(r.getStartKey(), includeStartKey);
Key endKey = permuteRangeKey(r.getEndKey(), r.isEndKeyInclusive());
Key startKey = r.getStartKey();
Key endKey = r.getEndKey();
boolean startKeyInclusive = r.isStartKeyInclusive();
if (startKey != null && WaitWindowObserver.hasBeginMarker(startKey.getColumnQualifier())) {
startKeyInclusive = true;
}

return new Range(startKey, includeStartKey, endKey, r.isEndKeyInclusive());
startKey = permuteRangeKey(startKey, startKeyInclusive);
endKey = permuteRangeKey(endKey, r.isEndKeyInclusive());
return new Range(startKey, startKeyInclusive, endKey, r.isEndKeyInclusive());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@
public class WaitWindowObserver {

private static final Logger log = Logger.getLogger(WaitWindowObserver.class);
private static Timer timer = null;
public static final String WAIT_WINDOW_OVERRUN = "WAIT_WINDOW_OVERRUN";
public static final String YIELD_AT_BEGIN_STR = "!YIELD_AT_BEGIN";
public static final Text YIELD_AT_BEGIN = new Text(YIELD_AT_BEGIN_STR);
public static final String YIELD_AT_END_STR = "\uffffYIELD_AT_END";
public static final Text YIELD_AT_END = new Text(YIELD_AT_END_STR);
private static Timer timer = null;
private boolean readyToYield = false;

public static final Comparator<Comparable> keyComparator = (o1, o2) -> {
if (o1 instanceof Key) {
Expand All @@ -57,12 +56,13 @@ public class WaitWindowObserver {
return o2.compareTo(o1);
};

protected boolean readyToYield = false;
protected YieldCallback yieldCallback = null;
// When the wait window is over. Set during the initial seek
protected long endOfWaitWindow;
// Remaining time in the wait window. Updated by the timerTask
protected AtomicLong remainingTimeMs = new AtomicLong();
protected TimerTask timerTask = new WaitWindowTimerTask();
protected TimerTask timerTask = null;
// How often the timerTask gets run
protected long checkPeriod = 50;
// Seek range of the QueryIterator. Used to ensure that yieldKey is in the range.
Expand Down Expand Up @@ -114,21 +114,41 @@ private static Timer getTimer() {
return WaitWindowObserver.timer;
}

private Key convertInfiniteStartKey(Key key) {
if (key == null) {
return new Key(new Text(), YIELD_AT_BEGIN);
} else {
return key;
}
}

/*
* Set seekRange, remainingTimeMs, endOfWaitWindow and start the Timer
*/
public void start(Range seekRange, long yieldThresholdMs) {
this.seekRange = seekRange;
Key startKey = seekRange.getStartKey();
if (startKey == null) {
startKey = convertInfiniteStartKey(startKey);
this.seekRange = new Range(startKey, seekRange.isStartKeyInclusive(), seekRange.getEndKey(), seekRange.isEndKeyInclusive());
} else {
this.seekRange = seekRange;
}
this.remainingTimeMs.set(yieldThresholdMs);
this.endOfWaitWindow = yieldThresholdMs + System.currentTimeMillis();
WaitWindowObserver.getTimer().schedule(this.timerTask, this.checkPeriod, this.checkPeriod);
if (this.timerTask == null) {
this.timerTask = new WaitWindowTimerTask();
WaitWindowObserver.getTimer().schedule(this.timerTask, this.checkPeriod, this.checkPeriod);
}
}

/*
* Ensure that the WaitWindowTimerTask is cancelled. Called from QueryIterator.hasTop.
*/
public void stop() {
this.timerTask.cancel();
if (this.timerTask != null) {
this.timerTask.cancel();
this.timerTask = null;
}
}

/*
Expand Down Expand Up @@ -207,6 +227,10 @@ public void yieldOnOverrun() {
* Create a yield key with YIELD_AT_BEGIN or YIELD_AT_END marker
*/
public Key createYieldKey(Key yieldKey, boolean yieldToBeginning) {
if (yieldKey == null) {
// handle infinite (null) key which may be passed as a range startKey
yieldKey = convertInfiniteStartKey(yieldKey);
}
if (isShardKey(yieldKey)) {
return createShardYieldKey(yieldKey, yieldToBeginning);
} else {
Expand Down Expand Up @@ -338,50 +362,50 @@ public Key getYieldKey() {
*/
static public boolean isShardKey(Key key) {
Text colFam = key.getColumnFamily();
return colFam.equals(new Text()) || hasBeginMarker(colFam);
return key != null && (colFam.equals(new Text()) || hasBeginMarker(colFam));
}

/*
* Check if YIELD_AT_BEGIN or YIELD_AT_END is in either the colFam or colQual
*/
static public boolean hasMarker(Key key) {
return hasBeginMarker(key) || hasEndMarker(key);
return key != null && (hasBeginMarker(key) || hasEndMarker(key));
}

/*
* Check if YIELD_AT_BEGIN or YIELD_AT_END is in this Text
*/
static public boolean hasMarker(Text text) {
return hasBeginMarker(text) || hasEndMarker(text);
return text != null && (hasBeginMarker(text) || hasEndMarker(text));
}

/*
* Check if YIELD_AT_BEGIN is in either the colFam or colQual
*/
static public boolean hasBeginMarker(Key key) {
return hasBeginMarker(key.getColumnFamily()) || hasBeginMarker(key.getColumnQualifier());
return key != null && (hasBeginMarker(key.getColumnFamily()) || hasBeginMarker(key.getColumnQualifier()));
}

/*
* YIELD_AT_BEGIN will always be at the beginning of the Text
*/
static public boolean hasBeginMarker(Text text) {
return text.toString().startsWith(YIELD_AT_BEGIN_STR);
return text != null && (text.toString().startsWith(YIELD_AT_BEGIN_STR));
}

/*
* Check if YIELD_AT_END is in either the colFam or colQual
*/
static public boolean hasEndMarker(Key key) {
return hasEndMarker(key.getColumnFamily()) || hasEndMarker(key.getColumnQualifier());
return key != null && (hasEndMarker(key.getColumnFamily()) || hasEndMarker(key.getColumnQualifier()));
}

/*
* Check if YIELD_AT_END is contained in the Text. There are cases where one or more null characters get added to the end of the colFam or colQual, so we
* can not check for endsWith
*/
static public boolean hasEndMarker(Text text) {
return text.toString().contains(YIELD_AT_END_STR);
return text != null && (text.toString().contains(YIELD_AT_END_STR));
}

static public Text removeMarkers(Text text) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public KeyAdjudicator() {
@Override
public Entry<Key,T> apply(Entry<Key,T> entry) {
final Key entryKey = entry.getKey();
// if the key has a YIELD_AT_BEGIN or YIELD_AT_END marker, then don't modify the key
// because doing so will adversely affect the subsequent yield and re-seek position
if (WaitWindowObserver.hasMarker(entryKey)) {
return entry;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,16 @@ public void indexOnly_trailingRegex_shardRange_secondEvent_test() throws IOExcep
indexOnly_test(seekRange, query, false, addEvent("123.345.457"), Arrays.asList(secondEvent));
}

@Test
public void indexOnly_trailingRegex_infiniteRange_secondEvent_test() throws IOException {
// build an infinite range to make sure the wait window / yielding framework can handle it
Range seekRange = new Range(null, true, (Key) null, true);
String query = "((_Value_ = true) && (INDEX_ONLY_FIELD1 =~ 'ap.*'))";
Map.Entry<Key,Map<String,List<String>>> secondEvent = getBaseExpectedEvent("123.345.457");
secondEvent.getValue().put("INDEX_ONLY_FIELD1", Arrays.asList(new String[] {"apple"}));
indexOnly_test(seekRange, query, false, addEvent("123.345.457"), Arrays.asList(secondEvent));
}

@Test
public void indexOnly_leadingRegex_documentSpecific_test() throws IOException {
// build the seek range for a document specific pull
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package datawave.query.iterator;

import static datawave.query.iterator.QueryOptions.COLLECT_TIMING_DETAILS;
import static datawave.query.iterator.QueryOptions.MAX_EVALUATION_PIPELINES;
import static datawave.query.iterator.QueryOptions.SERIAL_EVALUATION_PIPELINE;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -78,8 +76,8 @@ public WaitWindowQueryIterator() {
}

// Custom WaitWindowObserver that allows checksBeforeYield checks
// before either yielding or reyurning a WAIT_WINDOW_OERRUN
// It will also limit the number of reseeks on the same startKey using maxNulls
// before either yielding or returning a WAIT_WINDOW_OVERRUN
// It will also limit the number of re-seeks on the same startKey using maxNulls
static public class TestWaitWindowObserver extends WaitWindowObserver {

private AtomicLong checksBeforeYield = new AtomicLong(2);
Expand All @@ -92,9 +90,11 @@ public TestWaitWindowObserver() {
@Override
public void start(Range seekRange, long yieldThresholdMs) {
super.start(seekRange, Long.MAX_VALUE);
// use the seekRange from the parent class since it may have been
// modified if it was an infinite startKey
Key startKey = this.seekRange.getStartKey();
// limit the number of times that the test yields in the
// same place by counting the final null characters
Key startKey = seekRange.getStartKey();
String s = null;
if (WaitWindowObserver.hasMarker(startKey.getColumnFamily())) {
s = startKey.getColumnFamily().toString();
Expand Down

0 comments on commit 6b8b123

Please sign in to comment.