Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jul 15, 2024
1 parent cfad7aa commit 29a4cf2
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -150,14 +149,14 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param indexRouting indexRoutingTable to write to remote store
* Async action for writing one {@code IndexRoutingTable} to remote store
*
* @param clusterState current cluster state
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @param clusterBasePath base path for remote file
* @return returns runnable async action
* @param clusterBasePath base path for remote file
*/
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
public void writeAsync(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
Expand Down Expand Up @@ -187,7 +186,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
)
);

return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
uploadIndex(indexRouting, fileName, blobContainer, completionListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -42,14 +41,13 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

@Override
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
public void writeAsync(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
) {
// noop
return () -> {};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -62,7 +61,7 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
RoutingTable after
);

CheckedRunnable<IOException> getIndexRoutingAsyncAction(
void writeAsync(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ UploadedMetadataResults writeMetadataInParallel(
});
indicesRoutingToUpload.forEach(indexRoutingTable -> {
uploadTasks.add(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName());
remoteRoutingTableService.getIndexRoutingAsyncAction(
remoteRoutingTableService.writeAsync(
clusterState,
indexRoutingTable,
listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -31,6 +30,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.TestCapturingListener;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
Expand All @@ -57,6 +57,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -347,23 +348,23 @@ public void testGetIndicesRoutingMapDiffIndexDeleted() {
assertEquals(indexName, diff.getDeletes().get(0));
}

public void testGetIndexRoutingAsyncAction() throws IOException {
public void testWriteAsync() throws IOException, InterruptedException {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
ClusterState clusterState = createClusterState(indexName);
BlobPath expectedPath = getPath();

LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> listener = mock(LatchedActionListener.class);
TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
CountDownLatch latch = new CountDownLatch(1);
when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer);

remoteRoutingTableService.start();
CheckedRunnable<IOException> runnable = remoteRoutingTableService.getIndexRoutingAsyncAction(
remoteRoutingTableService.writeAsync(
clusterState,
clusterState.routingTable().getIndicesRouting().get(indexName),
listener,
new LatchedActionListener<>(listener, latch),
basePath
);
assertNotNull(runnable);
runnable.run();
latch.await();

String expectedFilePrefix = String.join(
DELIMITER,
Expand All @@ -372,43 +373,53 @@ public void testGetIndexRoutingAsyncAction() throws IOException {
RemoteStoreUtils.invertLong(clusterState.version())
);
verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true));
verify(listener, times(1)).onResponse(any(ClusterMetadataManifest.UploadedMetadata.class));
assertNull(listener.getFailure());
assertNotNull(listener.getResult());
assertTrue(listener.getResult() instanceof ClusterMetadataManifest.UploadedIndexMetadata);
ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = (ClusterMetadataManifest.UploadedIndexMetadata) listener
.getResult();
assertEquals(indexName, uploadedIndexMetadata.getIndexName());
}

public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException {
public void testWriteAsync_FailureInBlobRepo() throws IOException, InterruptedException {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
ClusterState clusterState = createClusterState(indexName);
BlobPath expectedPath = getPath();

LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> listener = mock(LatchedActionListener.class);
TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
CountDownLatch latch = new CountDownLatch(1);
when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer);
doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true));
IOException exception = new IOException("testing failure");
doThrow(exception).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true));

remoteRoutingTableService.start();
CheckedRunnable<IOException> runnable = remoteRoutingTableService.getIndexRoutingAsyncAction(
remoteRoutingTableService.writeAsync(
clusterState,
clusterState.routingTable().getIndicesRouting().get(indexName),
listener,
new LatchedActionListener<>(listener, latch),
basePath
);
assertNotNull(runnable);
runnable.run();
latch.await();
String expectedFilePrefix = String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(clusterState.term()),
RemoteStoreUtils.invertLong(clusterState.version())
);
verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true));
verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class));
assertNull(listener.getResult());
assertNotNull(listener.getFailure());
assertTrue(listener.getFailure() instanceof RemoteStateTransferException);
assertEquals(exception, listener.getFailure().getCause());
}

public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException {
public void testWriteAsync_AsyncRepo() throws IOException, InterruptedException {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
ClusterState clusterState = createClusterState(indexName);
BlobPath expectedPath = getPath();

LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> listener = mock(LatchedActionListener.class);
TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
CountDownLatch latch = new CountDownLatch(1);
blobContainer = mock(AsyncMultiStreamBlobContainer.class);
when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer);
ArgumentCaptor<ActionListener<Void>> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class);
Expand All @@ -424,50 +435,59 @@ public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException {
.asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture());

remoteRoutingTableService.start();
CheckedRunnable<IOException> runnable = remoteRoutingTableService.getIndexRoutingAsyncAction(
remoteRoutingTableService.writeAsync(
clusterState,
clusterState.routingTable().getIndicesRouting().get(indexName),
listener,
new LatchedActionListener<>(listener, latch),
basePath
);
assertNotNull(runnable);
runnable.run();

String expectedFilePrefix = String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(clusterState.term()),
RemoteStoreUtils.invertLong(clusterState.version())
);
latch.await();
assertNull(listener.getFailure());
assertNotNull(listener.getResult());
assertTrue(listener.getResult() instanceof ClusterMetadataManifest.UploadedIndexMetadata);
ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = (ClusterMetadataManifest.UploadedIndexMetadata) listener
.getResult();
assertEquals(indexName, uploadedIndexMetadata.getIndexName());
assertEquals(1, actionListenerArgumentCaptor.getAllValues().size());
assertEquals(1, writeContextArgumentCaptor.getAllValues().size());
assertNotNull(capturedWriteContext.get("index_routing"));
assertEquals(capturedWriteContext.get("index_routing").getWritePriority(), WritePriority.URGENT);
assertTrue(capturedWriteContext.get("index_routing").getFileName().startsWith(expectedFilePrefix));
}

public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException {
public void testWriteAsync_AsyncRepoFailureInRepo() throws IOException, InterruptedException {
String indexName = randomAlphaOfLength(randomIntBetween(1, 50));
ClusterState clusterState = createClusterState(indexName);
BlobPath expectedPath = getPath();

LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> listener = mock(LatchedActionListener.class);
TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
CountDownLatch latch = new CountDownLatch(1);
blobContainer = mock(AsyncMultiStreamBlobContainer.class);
when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer);

doThrow(new IOException("Testing failure")).when((AsyncMultiStreamBlobContainer) blobContainer)
IOException exception = new IOException("Testing failure");
doThrow(exception).when((AsyncMultiStreamBlobContainer) blobContainer)
.asyncBlobUpload(any(WriteContext.class), any(ActionListener.class));

remoteRoutingTableService.start();
CheckedRunnable<IOException> runnable = remoteRoutingTableService.getIndexRoutingAsyncAction(
remoteRoutingTableService.writeAsync(
clusterState,
clusterState.routingTable().getIndicesRouting().get(indexName),
listener,
new LatchedActionListener<>(listener, latch),
basePath
);
assertNotNull(runnable);
runnable.run();
verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class));
latch.await();
assertNull(listener.getResult());
assertNotNull(listener.getFailure());
assertTrue(listener.getFailure() instanceof RemoteStateTransferException);
assertEquals(exception, listener.getFailure().getCause());
}

public void testGetAllUploadedIndicesRouting() {
Expand Down

0 comments on commit 29a4cf2

Please sign in to comment.