Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace nonpublic Accumulo IterationInterruptedException #2539

Open
wants to merge 9 commits into
base: integration
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package datawave.iterators;

/**
* This code was repurposed from org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException. It was not part of the public API, so we've
* created a DataWave equivalent. This exception should be used in place of {@link org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException}
* when thrown from an iterator.
*/
public class IterationInterruptException extends RuntimeException {

private static final long serialVersionUID = 1L;

public IterationInterruptException() {}

public IterationInterruptException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;

import com.google.common.collect.TreeMultimap;

Expand Down Expand Up @@ -70,7 +69,7 @@ public void next() throws IOException {
throw new IllegalStateException();

if (interruptFlag != null && interruptCheckCount++ % 100 == 0 && interruptFlag.get())
throw new IterationInterruptedException();
throw new IterationInterruptException();

if (iter.hasNext()) {
entry = iter.next();
Expand All @@ -86,7 +85,7 @@ public void next() throws IOException {
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {

if (interruptFlag != null && interruptFlag.get())
throw new IterationInterruptedException();
throw new IterationInterruptException();

this.range = range;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void seek(Range r, Collection<ByteSequence> columnFamilies, boolean inclu

Key pStartKey = parentRange.getStartKey();

// Check if we are recovering from IterationInterruptedException
// Check if we are recovering from IterationInterruptException
if (null != pStartKey && null != pStartKey.getRow() && null != pStartKey.getColumnFamily() && !pStartKey.getColumnFamily().toString().isEmpty()
&& null != pStartKey.getColumnQualifier() && !pStartKey.getColumnQualifier().toString().isEmpty()
&& !parentRange.isStartKeyInclusive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
Expand All @@ -30,6 +29,7 @@
import datawave.data.hash.UID;
import datawave.data.type.NumberType;
import datawave.data.type.util.NumericalEncoder;
import datawave.iterators.IterationInterruptException;
import datawave.query.attributes.Attribute;
import datawave.query.attributes.Document;
import datawave.query.iterator.SourcedOptions;
Expand Down Expand Up @@ -693,7 +693,7 @@ private boolean validate(final Text row, final String dataType, final String uid
isValid = validate(range);
}
}
} catch (final IterationInterruptedException e) {
} catch (final IterationInterruptException e) {
// Re-throw iteration interrupted as-is since this is an expected event from
// a client going away. Re-throwing as-is will let the
// tserver catch and ignore it as intended.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
Expand All @@ -28,6 +27,7 @@

import datawave.data.hash.UID;
import datawave.data.hash.UIDConstants;
import datawave.iterators.IterationInterruptException;
import datawave.query.Constants;
import datawave.query.iterator.QueryOptions;
import datawave.query.util.Tuple3;
Expand Down Expand Up @@ -325,7 +325,7 @@ private int getCountByEventScan(final Range seekRange, final Text row, final Str

// and return the count
return uids.size();
} catch (IterationInterruptedException e) {
} catch (IterationInterruptException e) {
// Re-throw iteration interrupted as-is since this is an expected event from
// a client going away. Re-throwing as an IOException will cause the tserver
// to catch the exception and log a warning. Re-throwing as-is will let the
Expand Down Expand Up @@ -423,7 +423,7 @@ private CountResult getCountByFieldIndexScan(final Range seekRange, final Text r
final CountResult result = new CountResult(numberOfImmediateChildren, numberOfDescendants);
result.setSkippedDescendants(skippedSomeDescendants);
return result;
} catch (IterationInterruptedException e) {
} catch (IterationInterruptException e) {
// Re-throw iteration interrupted as-is since this is an expected event from
// a client going away. Re-throwing as an IOException will cause the tserver
// to catch the exception and log a warning. Re-throwing as-is will let the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.YieldCallback;
import org.apache.accumulo.core.iterators.YieldingKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.tserver.tablet.TabletClosedException;
import org.apache.commons.collections4.iterators.EmptyIterator;
import org.apache.commons.jexl3.JexlArithmetic;
Expand Down Expand Up @@ -59,6 +58,7 @@
import datawave.data.type.Type;
import datawave.data.type.util.NumericalEncoder;
import datawave.ingest.data.config.ingest.CompositeIngest;
import datawave.iterators.IterationInterruptException;
import datawave.marking.MarkingFunctionsFactory;
import datawave.query.Constants;
import datawave.query.attributes.AttributeKeepFilter;
Expand Down Expand Up @@ -538,7 +538,7 @@ else if (documentRange != null && (!this.isContainsIndexOnlyTerms() && this.getT
}

/**
* Handle an exception returned from seek or next. This will silently ignore IterationInterruptedException as that happens when the underlying iterator was
* Handle an exception returned from seek or next. This will silently ignore IterationInterruptException as that happens when the underlying iterator was
* interrupted because the client is no longer listening.
*
* @param e
Expand All @@ -552,14 +552,14 @@ private void handleException(Exception e) throws IOException {
// We need to pass IOException, IteratorInterruptedException, and TabletClosedExceptions up to the Tablet as they are
// handled specially to ensure that the client will retry the scan elsewhere
IOException ioe = null;
IterationInterruptedException iie = null;
IterationInterruptException iie = null;
QueryIteratorYieldingException qiy = null;
TabletClosedException tce = null;
if (reason instanceof IOException) {
ioe = (IOException) reason;
}
if (reason instanceof IterationInterruptedException) {
iie = (IterationInterruptedException) reason;
if (reason instanceof IterationInterruptException) {
iie = (IterationInterruptException) reason;
}
if (reason instanceof QueryIteratorYieldingException) {
qiy = (QueryIteratorYieldingException) reason;
Expand All @@ -574,8 +574,8 @@ private void handleException(Exception e) throws IOException {
if (reason instanceof IOException) {
ioe = (IOException) reason;
}
if (reason instanceof IterationInterruptedException) {
iie = (IterationInterruptedException) reason;
if (reason instanceof IterationInterruptException) {
iie = (IterationInterruptException) reason;
}
if (reason instanceof QueryIteratorYieldingException) {
qiy = (QueryIteratorYieldingException) reason;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.log4j.Logger;

import com.google.common.collect.TreeMultimap;

import datawave.iterators.IterationInterruptException;
import datawave.query.attributes.Document;
import datawave.query.exceptions.DatawaveFatalQueryException;
import datawave.query.exceptions.QueryIteratorYieldingException;
Expand Down Expand Up @@ -262,7 +262,7 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
if (itr instanceof SeekableIterator) {
try {
((SeekableIterator) itr).seek(range, columnFamilies, inclusive);
} catch (IterationInterruptedException e2) {
} catch (IterationInterruptException e2) {
// throw IterationInterrupted exceptions as-is with no modifications so the QueryIterator can handle it
throw e2;
} catch (Exception e2) {
Expand All @@ -282,11 +282,11 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
}
} catch (QueryIteratorYieldingException qye) {
throw qye;
} catch (IterationInterruptedException iie) {
} catch (IterationInterruptException iie) {
throw iie;
} catch (Exception e) {
include.remove();
if (includes.isEmpty() || e instanceof DatawaveFatalQueryException || e instanceof IterationInterruptedException) {
if (includes.isEmpty() || e instanceof DatawaveFatalQueryException || e instanceof IterationInterruptException) {
throw e;
} else {
log.warn("Lookup of event field failed, precision of query reduced.");
Expand Down Expand Up @@ -406,7 +406,7 @@ protected TreeMultimap<T,NestedIterator<T>> advanceIterators(T key) {
}
} catch (QueryIteratorYieldingException qe) {
throw qe;
} catch (IterationInterruptedException ie) {
} catch (IterationInterruptException ie) {
throw ie;
} catch (Exception e) {
seenException = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,12 @@ public void seek(Range r, Collection<ByteSequence> columnFamilies, boolean inclu

} else { // We have a valid start key and row

// Check if we are recovering from IterationInterruptedException (verify that we have the right key parts
// Check if we are recovering from IterationInterruptException (verify that we have the right key parts
// and that the start key is NOT inclusive).
if (null != pStartKey.getColumnFamily() && !pStartKey.getColumnFamily().toString().trim().isEmpty() && null != pStartKey.getColumnQualifier()
&& !pStartKey.getColumnQualifier().toString().trim().isEmpty() && !parentRange.isStartKeyInclusive()) {

// Iteration interrupted case, need to seek to the end of this FN:FV range. IterationInterruptedException
// Iteration interrupted case, need to seek to the end of this FN:FV range. IterationInterruptException
// should always seek with the previously returned top key but with the inclusivity bit set to false.
// i.e. Key-> Row:000 CFAM:fi\x00COLOR CQ:red, inclusive:False
// we want to seek to the end of 'red' to the next unknown value, so CQ: red\u0001 should get us there.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;

Expand All @@ -37,6 +36,7 @@
import datawave.accumulo.inmemory.InMemoryScanner;
import datawave.accumulo.inmemory.InMemoryScannerBase;
import datawave.accumulo.inmemory.ScannerRebuilder;
import datawave.iterators.IterationInterruptException;
import datawave.query.attributes.Document;
import datawave.query.function.deserializer.KryoDocumentDeserializer;
import datawave.query.iterator.profile.FinalDocumentTrackingIterator;
Expand Down Expand Up @@ -122,7 +122,7 @@ public boolean hasTop() {
@Override
public void next() throws IOException {
if (initialized && interruptListener != null && interruptListener.interrupt(source.getTopKey())) {
throw new IterationInterruptedException("testing next interrupt");
throw new IterationInterruptException("testing next interrupt");
}

source.next();
Expand All @@ -131,7 +131,7 @@ public void next() throws IOException {
@Override
public void seek(Range range, Collection<ByteSequence> collection, boolean inclusive) throws IOException {
if (interruptListener != null && interruptListener.interrupt(null)) {
throw new IterationInterruptedException("testing seek interrupt");
throw new IterationInterruptException("testing seek interrupt");
}

source.seek(range, collection, inclusive);
Expand Down Expand Up @@ -403,7 +403,7 @@ private void findNext() {
}
// reset interrupted flag
interrupted = false;
} catch (IterationInterruptedException e) {
} catch (IterationInterruptException e) {
interrupted = true;
interruptListener.processedInterrupt(true);
interruptCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Sets;

import datawave.iterators.IterationInterruptException;
import datawave.query.attributes.Document;
import datawave.query.iterator.NestedIterator;

Expand Down Expand Up @@ -90,7 +90,7 @@ void testIterationInterruptedOnInitialSeekOfEventField() throws IOException {
// "Lookup of event field failed, precision of query reduced."

SortedSet<String> uids = new TreeSet<>(uidsA);
assertThrows(IterationInterruptedException.class, () -> driveIterator(itr, uids));
assertThrows(IterationInterruptException.class, () -> driveIterator(itr, uids));
}

/**
Expand All @@ -104,7 +104,7 @@ void testIterationInterruptedOnInitialSeekOfIndexOnlyField() {
includes.add(IndexIteratorBridgeTest.createInterruptibleIndexIteratorBridge("FIELD_B", uidsB, true, 1));

AndIterator itr = new AndIterator<>(includes);
assertThrows(IterationInterruptedException.class, () -> itr.seek(new Range(), Collections.emptyList(), false));
assertThrows(IterationInterruptException.class, () -> itr.seek(new Range(), Collections.emptyList(), false));
}

// 1) handles the next() -> advanceIterators() path with an event field
Expand All @@ -120,7 +120,7 @@ void testIterationInterruptedOnNextCallEventField() throws IOException {
includes.add(IndexIteratorBridgeTest.createIndexIteratorBridge("FIELD_C", uidsC, false));

AndIterator itr = new AndIterator<>(includes);
assertThrows(IterationInterruptedException.class, () -> driveIterator(itr, intersectUids(uidsA, uidsB, uidsC)));
assertThrows(IterationInterruptException.class, () -> driveIterator(itr, intersectUids(uidsA, uidsB, uidsC)));
}

@Test
Expand All @@ -136,7 +136,7 @@ void testIterationInterruptedOnNextCallAllIteratorsFail() {

AndIterator itr = new AndIterator<>(includes);
SortedSet<String> uids = intersectUids(uidsA, uidsB, uidsC);
assertThrows(IterationInterruptedException.class, () -> driveIterator(itr, uids));
assertThrows(IterationInterruptException.class, () -> driveIterator(itr, uids));
}

// 1) handles the next() -> advanceIterators() path with an index only term
Expand All @@ -156,7 +156,7 @@ void testIterationInterruptedOnNextCallIndexOnlyField() {
Set<String> indexOnlyFields = Sets.newHashSet("FIELD_A", "FIELD_B", "FIELD_C");
Set<String> droppedFields = Collections.singleton("FIELD_C");
SortedSet<String> uids = intersectUids(uidsA, uidsB, uidsC);
assertThrows(IterationInterruptedException.class, () -> driveIterator(itr, uids, indexOnlyFields, droppedFields));
assertThrows(IterationInterruptException.class, () -> driveIterator(itr, uids, indexOnlyFields, droppedFields));
}

// 2) handles the next() -> advanceIterators() path when a negation is in play
Expand All @@ -178,7 +178,7 @@ void testIterationInterruptedOnNextCallWithNegation() {
Set<String> indexOnlyFields = Sets.newHashSet("FIELD_A", "FIELD_B");
Set<String> droppedFields = Collections.singleton("FIELD_B");
SortedSet<String> uids = intersectUids(uidsA, uidsB);
assertThrows(IterationInterruptedException.class, () -> driveIterator(itr, uids, indexOnlyFields, droppedFields));
assertThrows(IterationInterruptException.class, () -> driveIterator(itr, uids, indexOnlyFields, droppedFields));
}

// 3) applyContextRequired -> contextIncludes are uneven and there's no high key
Expand Down Expand Up @@ -213,7 +213,7 @@ void testIterationExceptionDuringApplyContextRequired() throws IOException {
Set<String> indexOnlyFields = Sets.newHashSet("FIELD_A", "FIELD_B");
Set<String> droppedFields = Collections.singleton("FIELD_B");
SortedSet<String> uids = intersectUids(uidsA, uidsB);
assertThrows(IterationInterruptedException.class, () -> driveIterator(itr, uids, indexOnlyFields, droppedFields));
assertThrows(IterationInterruptException.class, () -> driveIterator(itr, uids, indexOnlyFields, droppedFields));
}

@Test
Expand Down
Loading
Loading