Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Aug 22, 2023
1 parent 79e7210 commit 4fc4af6
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,11 @@ public void startReplication(ActionListener<Void> listener) {
logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description()));
// Get list of files to copy from this checkpoint.
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
cancellableThreads.checkForCancel();
cancellableThreads.execute(() -> source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener));

checkpointInfoListener.whenComplete(checkpointInfo -> {
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
cancellableThreads.execute(
() -> source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener)
);
Expand All @@ -177,7 +175,6 @@ public void startReplication(ActionListener<Void> listener) {
}

private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
logger.trace(() -> new ParameterizedMessage("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff));
Expand All @@ -203,7 +200,6 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
// Handle empty SegmentInfos bytes for recovering replicas
if (checkpointInfoResponse.getInfosBytes() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
Expand All @@ -23,14 +20,11 @@
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.RemoteStoreReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand Down Expand Up @@ -106,66 +100,16 @@ public void testCloseShardWhileGettingCheckpoint() throws Exception {
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);

// Create custom replication source in order to trigger shard close operations at specific point of segment replication
// lifecycle
SegmentReplicationSource source = new TestReplicationSource() {
RemoteSegmentStoreDirectory remoteDirectory;

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
// shard is closing while fetching metadata
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate();

RemoteSegmentMetadata mdFile = null;
try {
mdFile = remoteDirectory.init();
final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
Map<String, StoreFileMetadata> metadataMap = mdFile.getMetadata()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
)
)
);
listener.onResponse(
new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
Assert.fail("Unreachable");
}
};
when(sourceFactory.get(any())).thenReturn(source);
Runnable beforeCkpSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
Runnable beforeGetFilesSourceCall = () -> Assert.fail("Should not have been executed");
TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource(
replica,
beforeCkpSourceCall,
beforeGetFilesSourceCall
);
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
startReplicationAndAssertCancellation(replica, primary, targetService);
shards.removeReplica(replica);
closeShards(replica);
Expand All @@ -183,67 +127,14 @@ public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception {

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new TestReplicationSource() {
RemoteSegmentStoreDirectory remoteDirectory;

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
try {
FilterDirectory remoteStoreDirectory = (FilterDirectory) replica.remoteStore().directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate();

RemoteSegmentMetadata mdFile = remoteDirectory.init();
final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
Map<String, StoreFileMetadata> metadataMap = mdFile.getMetadata()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
)
)
);
listener.onResponse(
new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
try {
// shard is closing while we are copying files.
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
final Directory storeDirectory = indexShard.store().directory();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
when(sourceFactory.get(any())).thenReturn(source);
Runnable beforeCkpSourceCall = () -> {};
Runnable beforeGetFilesSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource(
replica,
beforeCkpSourceCall,
beforeGetFilesSourceCall
);
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
startReplicationAndAssertCancellation(replica, primary, targetService);
shards.removeReplica(replica);
closeShards(replica);
Expand Down Expand Up @@ -522,3 +413,42 @@ private void assertSingleSegmentFile(IndexShard shard, String fileName) throws I
assertEquals(segmentsFileNames.stream().findFirst().get(), fileName);
}
}

class TestRSReplicationSource extends RemoteStoreReplicationSource {

private final Thread beforeCheckpoint;
private final Thread beforeGetFiles;

public TestRSReplicationSource(IndexShard indexShard, Runnable beforeCheckpoint, Runnable beforeGetFiles) {
super(indexShard);
this.beforeCheckpoint = new Thread(beforeCheckpoint);
this.beforeGetFiles = new Thread(beforeGetFiles);
}

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
this.beforeCheckpoint.start();
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
this.beforeGetFiles.start();
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener);
}

@Override
public String getDescription() {
return "TestReplicationSource";
}
}

0 comments on commit 4fc4af6

Please sign in to comment.