Skip to content

Commit

Permalink
Adding test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Manasvini B S <[email protected]>
  • Loading branch information
manasvinibs committed Sep 23, 2024
1 parent edbc586 commit 770ae41
Show file tree
Hide file tree
Showing 10 changed files with 813 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,17 @@

public abstract class Paginate extends BatchPhysicalOperator<SearchHit> {

/** Request to submit to OpenSearch to scroll over */
/** Request to submit to OpenSearch to scan over */
protected final TableInJoinRequestBuilder request;

/** Page size to scroll over index */
protected final int pageSize;

/** Client connection to ElasticSearch */
protected Client client;

/** Currently undergoing scan */
protected SearchResponse searchResponse;

/** Time out */
protected Integer timeout;

/** Resource monitor manager */
protected ResourceManager resourceMgr;

public Paginate(TableInJoinRequestBuilder request, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ protected void loadFirstBatch() {
pit = new PointInTimeHandlerImpl(client, request.getOriginalSelect().getIndexArr());
pit.create();
pitId = pit.getPitId();

LOG.info("Loading first batch of response using Point In Time");
searchResponse =
request
.getRequestBuilder()
Expand All @@ -44,7 +46,6 @@ protected void loadFirstBatch() {
.setTimeout(TimeValue.timeValueSeconds(timeout))
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
LOG.info("Loading first batch of response using Point In Time");
}

@Override
Expand All @@ -57,6 +58,7 @@ protected void loadNextBatch() {
.getHits()
.getHits()[searchResponse.getHits().getHits().length - 1]
.getSortValues();

LOG.info("Loading next batch of response using Point In Time. - " + pitId);
searchResponse =
request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexSettings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
Expand Down Expand Up @@ -179,7 +178,7 @@ public void cleanup(OpenSearchRequest request) {
"Failed to clean up resources for search request " + request, e);
}
});
} else if (request instanceof OpenSearchQueryRequest) {
} else {
request.clean(
pitId -> {
DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
Expand Down Expand Up @@ -190,7 +189,7 @@ public void cleanup(OpenSearchRequest request) {
"Failed to clean up resources for search request " + request, e);
}
});
} else if (request instanceof OpenSearchQueryRequest) {
} else {
request.clean(
pitId -> {
DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.PointInTimeBuilder;
Expand Down Expand Up @@ -117,6 +118,41 @@ public OpenSearchQueryRequest(
this.pitId = pitId;
}

/**
* Constructs OpenSearchQueryRequest from serialized representation.
*
* @param in stream to read data from.
* @param engine OpenSearchSqlEngine to get node-specific context.
* @throws IOException thrown if reading from input {@code in} fails.
*/
public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) throws IOException {
// Deserialize the SearchSourceBuilder from the string representation
String sourceBuilderString = in.readString();

NamedXContentRegistry xContentRegistry =
new NamedXContentRegistry(
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, sourceBuilderString);
this.sourceBuilder = SearchSourceBuilder.fromXContent(parser);

cursorKeepAlive = in.readTimeValue();
pitId = in.readString();
includes = in.readStringList();
indexName = new IndexName(in);

int length = in.readVInt();
this.searchAfter = new Object[length];
for (int i = 0; i < length; i++) {
this.searchAfter[i] = in.readGenericValue();
}

OpenSearchIndex index = (OpenSearchIndex) engine.getTable(null, indexName.toString());
exprValueFactory = new OpenSearchExprValueFactory(index.getFieldOpenSearchTypes());
}

@Override
public OpenSearchResponse search(
Function<SearchRequest, SearchResponse> searchAction,
Expand Down Expand Up @@ -161,14 +197,9 @@ public OpenSearchResponse searchWithPIT(Function<SearchRequest, SearchResponse>

needClean = openSearchResponse.isEmpty();
searchDone = openSearchResponse.isEmpty();
if (!needClean
&& this.searchResponse.getHits().getHits() != null
&& this.searchResponse.getHits().getHits().length > 0) {
searchAfter =
this.searchResponse
.getHits()
.getHits()[this.searchResponse.getHits().getHits().length - 1]
.getSortValues();
SearchHit[] searchHits = this.searchResponse.getHits().getHits();
if (searchHits != null && searchHits.length > 0) {
searchAfter = searchHits[searchHits.length - 1].getSortValues();
this.sourceBuilder.searchAfter(searchAfter);
}
}
Expand All @@ -179,10 +210,9 @@ public OpenSearchResponse searchWithPIT(Function<SearchRequest, SearchResponse>
public void clean(Consumer<String> cleanAction) {
try {
// clean on the last page only, to prevent deleting the PitId in the middle of paging.
if (needClean && this.pitId != null) {
if (this.pitId != null && needClean) {
cleanAction.accept(this.pitId);
searchDone = true;
this.pitId = null;
}
} finally {
this.pitId = null;
Expand All @@ -209,49 +239,16 @@ public void writeTo(StreamOutput out) throws IOException {
indexName.writeTo(out);

// Serialize the searchAfter array
out.writeVInt(searchAfter.length);
for (Object obj : searchAfter) {
out.writeGenericValue(obj);
if (searchAfter != null) {
out.writeVInt(searchAfter.length);
for (Object obj : searchAfter) {
out.writeGenericValue(obj);
}
}
} else {
// OpenSearch Query request without PIT for single page requests
throw new UnsupportedOperationException(
"OpenSearchQueryRequest serialization is not implemented.");
}
}

/**
* Constructs OpenSearchQueryRequest from serialized representation.
*
* @param in stream to read data from.
* @param engine OpenSearchSqlEngine to get node-specific context.
* @throws IOException thrown if reading from input {@code in} fails.
*/
public OpenSearchQueryRequest(StreamInput in, OpenSearchStorageEngine engine) throws IOException {
// Deserialize the SearchSourceBuilder from the string representation
String sourceBuilderString = in.readString();

NamedXContentRegistry xContentRegistry =
new NamedXContentRegistry(
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, sourceBuilderString);
this.sourceBuilder = SearchSourceBuilder.fromXContent(parser);

cursorKeepAlive = in.readTimeValue();
pitId = in.readString();
includes = in.readStringList();
indexName = new IndexName(in);

int length = in.readVInt();
this.searchAfter = new Object[length];
for (int i = 0; i < length; i++) {
this.searchAfter[i] = in.readGenericValue();
}

OpenSearchIndex index = (OpenSearchIndex) engine.getTable(null, indexName.toString());
exprValueFactory = new OpenSearchExprValueFactory(index.getFieldOpenSearchTypes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -31,6 +30,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.SneakyThrows;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -51,15 +51,16 @@
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.ClearScrollRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.*;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
Expand All @@ -74,6 +75,7 @@
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
Expand Down Expand Up @@ -393,6 +395,65 @@ void cleanup_rethrows_exception() {
assertThrows(IllegalStateException.class, () -> client.cleanup(request));
}

@Test
@SneakyThrows
void cleanup_pit_request() {
OpenSearchQueryRequest request =
new OpenSearchQueryRequest(
new OpenSearchRequest.IndexName("test"),
new SearchSourceBuilder(),
factory,
List.of(),
TimeValue.timeValueMinutes(1L),
"samplePitId");
// Enforce cleaning by setting a private field.
FieldUtils.writeField(request, "needClean", true, true);
client.cleanup(request);
verify(nodeClient).execute(any(), any());
}

@Test
@SneakyThrows
void cleanup_pit_request_throw_exception() {
DeletePitRequest deletePitRequest = new DeletePitRequest("samplePitId");
ActionFuture<DeletePitResponse> actionFuture = mock(ActionFuture.class);
when(actionFuture.get()).thenThrow(new ExecutionException("Execution failed", new Throwable()));
when(nodeClient.execute(eq(DeletePitAction.INSTANCE), any(DeletePitRequest.class)))
.thenReturn(actionFuture);
assertThrows(RuntimeException.class, () -> client.deletePit(deletePitRequest));
}

@Test
@SneakyThrows
void create_pit() {
CreatePitRequest createPitRequest =
new CreatePitRequest(TimeValue.timeValueMinutes(5), false, Strings.EMPTY_ARRAY);
ActionFuture<CreatePitResponse> actionFuture = mock(ActionFuture.class);
CreatePitResponse createPitResponse = mock(CreatePitResponse.class);
when(createPitResponse.getId()).thenReturn("samplePitId");
when(actionFuture.get()).thenReturn(createPitResponse);
when(nodeClient.execute(eq(CreatePitAction.INSTANCE), any(CreatePitRequest.class)))
.thenReturn(actionFuture);

String pitId = client.createPit(createPitRequest);
assertEquals("samplePitId", pitId);

verify(nodeClient).execute(CreatePitAction.INSTANCE, createPitRequest);
verify(actionFuture).get();
}

@Test
@SneakyThrows
void create_pit_request_throw_exception() {
CreatePitRequest createPitRequest =
new CreatePitRequest(TimeValue.timeValueMinutes(5), false, Strings.EMPTY_ARRAY);
ActionFuture<CreatePitResponse> actionFuture = mock(ActionFuture.class);
when(actionFuture.get()).thenThrow(new ExecutionException("Execution failed", new Throwable()));
when(nodeClient.execute(eq(CreatePitAction.INSTANCE), any(CreatePitRequest.class)))
.thenReturn(actionFuture);
assertThrows(RuntimeException.class, () -> client.createPit(createPitRequest));
}

@Test
void get_indices() {
AliasMetadata aliasMetadata = mock(AliasMetadata.class);
Expand Down
Loading

0 comments on commit 770ae41

Please sign in to comment.