diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 079ee1c6d7aac..829012a65b991 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -158,13 +158,11 @@ public void startReplication(ActionListener listener) { logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); - cancellableThreads.checkForCancel(); cancellableThreads.execute(() -> source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener)); checkpointInfoListener.whenComplete(checkpointInfo -> { final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); - cancellableThreads.checkForCancel(); cancellableThreads.execute( () -> source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener) ); @@ -177,7 +175,6 @@ public void startReplication(ActionListener listener) { } private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); logger.trace(() -> new ParameterizedMessage("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff)); @@ -203,7 +200,6 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) } private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException { - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); // Handle empty SegmentInfos bytes for recovering replicas if (checkpointInfoResponse.getInfosBytes() == null) { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 4e78748cc9845..8622e5944b165 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -10,9 +10,6 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -23,14 +20,11 @@ import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.replication.TestReplicationSource; -import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; import org.opensearch.indices.replication.GetSegmentFilesResponse; -import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.RemoteStoreReplicationSource; import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -106,66 +100,16 @@ public void testCloseShardWhileGettingCheckpoint() throws Exception { IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); primary.refresh("Test"); - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - - // Create custom replication source in order to trigger shard close operations at specific point of segment replication - // lifecycle - SegmentReplicationSource source = new TestReplicationSource() { - RemoteSegmentStoreDirectory remoteDirectory; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - // shard is closing while fetching metadata - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory(); - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); - - RemoteSegmentMetadata mdFile = null; - try { - mdFile = remoteDirectory.init(); - final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); - Map metadataMap = mdFile.getMetadata() - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> e.getKey(), - e -> new StoreFileMetadata( - e.getValue().getOriginalFilename(), - e.getValue().getLength(), - Store.digestToString(Long.valueOf(e.getValue().getChecksum())), - version, - null - ) - ) - ); - listener.onResponse( - new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()) - ); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - Assert.fail("Unreachable"); - } - }; - when(sourceFactory.get(any())).thenReturn(source); + Runnable beforeCkpSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + Runnable beforeGetFilesSourceCall = () -> Assert.fail("Should not have been executed"); + TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource( + replica, + beforeCkpSourceCall, + beforeGetFilesSourceCall + ); + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); startReplicationAndAssertCancellation(replica, primary, targetService); shards.removeReplica(replica); closeShards(replica); @@ -183,67 +127,14 @@ public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - RemoteSegmentStoreDirectory remoteDirectory; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - try { - FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory(); - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); - - RemoteSegmentMetadata mdFile = remoteDirectory.init(); - final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); - Map metadataMap = mdFile.getMetadata() - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> e.getKey(), - e -> new StoreFileMetadata( - e.getValue().getOriginalFilename(), - e.getValue().getLength(), - Store.digestToString(Long.valueOf(e.getValue().getChecksum())), - version, - null - ) - ) - ); - listener.onResponse( - new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()) - ); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - try { - // shard is closing while we are copying files. - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - final Directory storeDirectory = indexShard.store().directory(); - for (StoreFileMetadata fileMetadata : filesToFetch) { - String file = fileMetadata.name(); - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - when(sourceFactory.get(any())).thenReturn(source); + Runnable beforeCkpSourceCall = () -> {}; + Runnable beforeGetFilesSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource( + replica, + beforeCkpSourceCall, + beforeGetFilesSourceCall + ); + when(sourceFactory.get(any())).thenReturn(testRSReplicationSource); startReplicationAndAssertCancellation(replica, primary, targetService); shards.removeReplica(replica); closeShards(replica); @@ -522,3 +413,42 @@ private void assertSingleSegmentFile(IndexShard shard, String fileName) throws I assertEquals(segmentsFileNames.stream().findFirst().get(), fileName); } } + +class TestRSReplicationSource extends RemoteStoreReplicationSource { + + private final Thread beforeCheckpoint; + private final Thread beforeGetFiles; + + public TestRSReplicationSource(IndexShard indexShard, Runnable beforeCheckpoint, Runnable beforeGetFiles) { + super(indexShard); + this.beforeCheckpoint = new Thread(beforeCheckpoint); + this.beforeGetFiles = new Thread(beforeGetFiles); + } + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + this.beforeCheckpoint.start(); + super.getCheckpointMetadata(replicationId, checkpoint, listener); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + this.beforeGetFiles.start(); + super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener); + } + + @Override + public String getDescription() { + return "TestReplicationSource"; + } +}