Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed May 13, 2022
1 parent bd0105c commit 6ceaf61
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.withIndicesOptions(searchRequest.indicesOptions());
}
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
if (searchRequest.pointInTimeBuilder() == null) {
if (searchRequest.pointInTimeBuilder() != null) {
params.putParam("ccs_minimize_roundtrips", "false");
} else {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void executeCreatePit(StepListener<SearchResponse> createPitListener, Act
* Phase 2 of create PIT where we update pit id in pit contexts
*/
createPitListener.whenComplete(
searchResponse -> { executeUpdatePitId(request, searchResponse, updatePitIdListener); },
searchResponse -> { executeUpdatePitId(request, searchRequest, searchResponse, updatePitIdListener); },
updatePitIdListener::onFailure
);
}
Expand Down Expand Up @@ -148,6 +148,7 @@ public void executeOnShardTarget(
*/
void executeUpdatePitId(
CreatePitRequest request,
SearchRequest searchRequest,
SearchResponse searchResponse,
ActionListener<CreatePitResponse> updatePitIdListener
) {
Expand All @@ -161,7 +162,13 @@ void executeUpdatePitId(
* store the create time ( same create time for all PIT contexts across shards ) to be used
* for list PIT api
*/
final long creationTime = System.currentTimeMillis();
final long relativeStartNanos = System.nanoTime();
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
searchRequest.getOrCreateAbsoluteStartMillis(),
relativeStartNanos,
System::nanoTime
);
final long creationTime = timeProvider.getAbsoluteStartMillis();
CreatePitResponse createPITResponse = new CreatePitResponse(
searchResponse.pointInTimeId(),
creationTime,
Expand Down Expand Up @@ -212,27 +219,17 @@ void executeUpdatePitId(
}
}
}, updatePitIdListener::onFailure);

}

private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(SearchContextId contextId) {
ClusterState state = clusterService.state();

final Set<String> clusters = contextId.shards()
.values()
.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());

final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();

if (clusters.isEmpty()) {
lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId));
} else {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
}
return lookupListener;
return SearchUtils.getConnectionLookupListener(searchTransportService, state, clusters);
}

private ActionListener<UpdatePitContextResponse> getGroupedListener(
Expand Down
42 changes: 42 additions & 0 deletions server/src/main/java/org/opensearch/action/search/SearchUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;

import java.util.Set;
import java.util.function.BiFunction;

/**
* Helper class for common search functions
*/
public class SearchUtils {

public SearchUtils() {}

/**
* Get connection lookup listener for list of clusters passed
*/
public static StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(
SearchTransportService searchTransportService,
ClusterState state,
Set<String> clusters
) {
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();

if (clusters.isEmpty()) {
lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId));
} else {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
}
return lookupListener;
}
}
18 changes: 6 additions & 12 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -872,10 +872,7 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL
Releasable decreasePitContexts = null;
Engine.SearcherSupplier searcherSupplier = null;
ReaderContext readerContext = null;
boolean success = false;
try {
// use this when reader context is freed
decreasePitContexts = openPitContexts::decrementAndGet;
if (openPitContexts.incrementAndGet() > maxOpenPitContext) {
throw new OpenSearchRejectedExecutionException(
"Trying to create too many Point In Time contexts. Must be less than or equal to: ["
Expand All @@ -894,6 +891,9 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL

searchOperationListener.onNewReaderContext(readerContext);
searchOperationListener.onNewPitContext(finalReaderContext);

// use this when reader context is freed
decreasePitContexts = openPitContexts::decrementAndGet;
readerContext.addOnClose(decreasePitContexts);
decreasePitContexts = null;

Expand All @@ -905,15 +905,9 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL
putReaderContext(readerContext);
readerContext = null;
listener.onResponse(finalReaderContext.id());
success = true;
} catch (Exception exc) {
Releasables.closeWhileHandlingException(searcherSupplier, readerContext, decreasePitContexts);
listener.onFailure(exc);
} finally {
if (success) {
Releasables.close(readerContext, searcherSupplier, decreasePitContexts);
} else {
Releasables.closeWhileHandlingException(searcherSupplier, readerContext, decreasePitContexts);
}
}
});
}
Expand Down Expand Up @@ -1043,14 +1037,14 @@ public void updatePitIdAndKeepAlive(UpdatePitContextRequest request, ActionListe
Releasable updatePit = null;
try {
updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreationTime());
updatePit.close();
listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreationTime(), request.getKeepAlive()));
} catch (Exception e) {
freeReaderContext(readerContext.id());
listener.onFailure(e);
} finally {
if (updatePit != null) {
updatePit.close();
}
listener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void onFailure(Exception e) {
clusterServiceMock = mock(ClusterService.class);
ClusterState state = mock(ClusterState.class);

final Settings keepAliveSettings = Settings.builder().put(CreatePitController.PIT_CREATE_PHASE_KEEP_ALIVE.getKey(), 30000).build();
final Settings keepAliveSettings = Settings.builder().put(CreatePitController.PIT_INIT_KEEP_ALIVE.getKey(), 30000).build();
when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings);

when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA);
Expand Down Expand Up @@ -177,7 +177,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod

CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });

CreatePitController controller = new CreatePitController(
request,
searchTransportService,
Expand Down

0 comments on commit 6ceaf61

Please sign in to comment.