Skip to content

Commit

Permalink
Fix snapshotting searchable snapshot indexes (opensearch-project#7247)
Browse files Browse the repository at this point in the history
Two changes here: First, when snapshotting a searchable snapshot index
(or snapshotting a full cluster that contains searchable snapshot
indices) then we will snapshot the index metadata which includes the
pointer to the original snapshot, but skip copying the data since it
already exists in the source snapshot. Second, when restoring an index
from a snapshot that is itself a searchable snapshot index, then it must
be handled as such and restored as a searchable snapshot index.

Resolves opensearch-project#7204

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross authored Apr 19, 2023
1 parent 9f6c067 commit e1da84d
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
import org.opensearch.action.index.IndexRequestBuilder;
Expand Down Expand Up @@ -106,6 +107,74 @@ public void testCreateSearchableSnapshot() throws Exception {
assertIndexDirectoryDoesNotExist(restoredIndexName1, restoredIndexName2);
}

public void testSnapshottingSearchableSnapshots() throws Exception {
final String repoName = "test-repo";
final String indexName = "test-idx";
final Client client = client();

// create an index, add data, snapshot it, then delete it
internalCluster().ensureAtLeastNumDataNodes(1);
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, "initial-snapshot", repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

// restore the index as a searchable snapshot
internalCluster().ensureAtLeastNumSearchNodes(1);
client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "initial-snapshot")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy-0")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();
assertDocCount(indexName + "-copy-0", 100L);
assertIndexDirectoryDoesNotExist(indexName + "-copy-0");

// Test that the searchable snapshot index can continue to be snapshotted and restored
for (int i = 0; i < 4; i++) {
final String repeatedSnapshotName = "test-repeated-snap-" + i;
takeSnapshot(client, repeatedSnapshotName, repoName);
deleteIndicesAndEnsureGreen(client, "_all");
client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, repeatedSnapshotName)
.setRenamePattern("([a-z-]+).*")
.setRenameReplacement("$1" + (i + 1))
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();
final String restoredIndexName = indexName + "-copy-" + (i + 1);
assertDocCount(restoredIndexName, 100L);
assertIndexDirectoryDoesNotExist(restoredIndexName);
}
// Assert all the snapshots exist. Note that AbstractSnapshotIntegTestCase::assertRepoConsistency
// will run after this test (and all others) and assert on the consistency of the data in the repo.
final GetSnapshotsResponse response = client.admin().cluster().prepareGetSnapshots(repoName).execute().actionGet();
final Map<String, List<String>> snapshotInfoMap = response.getSnapshots()
.stream()
.collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices));
assertEquals(
Map.of(
"initial-snapshot",
List.of("test-idx"),
"test-repeated-snap-0",
List.of("test-idx-copy-0"),
"test-repeated-snap-1",
List.of("test-idx-copy-1"),
"test-repeated-snap-2",
List.of("test-idx-copy-2"),
"test-repeated-snap-3",
List.of("test-idx-copy-3")
),
snapshotInfoMap
);
}

/**
* Tests a chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3559,6 +3559,13 @@ public boolean isRemoteTranslogEnabled() {
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
}

/**
* @return True if settings indicate this shard is backed by a remote snapshot, false otherwise.
*/
public boolean isRemoteSnapshot() {
return indexSettings != null && indexSettings.isRemoteSnapshot();
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,16 @@ public ClusterState execute(ClusterState currentState) {
request.indexSettings(),
request.ignoreIndexSettings()
);
final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(
request.storageType().toString()
);
if (isSearchableSnapshot) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
snapshotIndexMetadata = addSnapshotToIndexSettings(
snapshotIndexMetadata,
snapshot,
repositoryData.resolveIndexId(index)
);
}
final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(
snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
);
final SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(
restoreUUID,
snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -275,42 +276,59 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexS
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(), new ActionListener<String>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),
lastSnapshotStatus,
snapshotStatus.generation()
);
if (isRemoteSnapshot(shardId)) {
// If the source of the data is another remote snapshot (i.e. searchable snapshot)
// then no need to snapshot the shard and can immediately notify success.
notifySuccessfulSnapshotShard(snapshot, shardId, snapshotStatus.generation());
} else {
snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(), new ActionListener<>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),
lastSnapshotStatus,
snapshotStatus.generation()
);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}

@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
}
});
});
}
}
});
}

private boolean isRemoteSnapshot(ShardId shardId) {
final IndexService indexService = indicesService.indexService(shardId.getIndex());
if (indexService != null) {
final IndexShard shard = indexService.getShardOrNull(shardId.id());
if (shard != null) {
return shard.isRemoteSnapshot();
}
}
return false;
}

// package private for testing
static String summarizeFailure(Throwable t) {
if (t.getCause() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.IndexModule;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.repositories.ShardGenerations;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotMissingException;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -86,6 +88,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress;
import static org.opensearch.test.OpenSearchTestCase.randomIntBetween;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand Down Expand Up @@ -140,7 +143,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
}
assertIndexUUIDs(repository, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
assertShardIndexGenerations(repository, blobContainer, repositoryData);
return null;
} catch (AssertionError e) {
return e;
Expand All @@ -164,23 +167,31 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe
assertTrue(indexGenerations.length <= 2);
}

private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGenerations shardGenerations) throws IOException {
private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData)
throws IOException {
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final BlobContainer indicesContainer = repoRoot.children().get("indices");
for (IndexId index : shardGenerations.indices()) {
final List<String> gens = shardGenerations.getGens(index);
if (gens.isEmpty() == false) {
final BlobContainer indexContainer = indicesContainer.children().get(index.getId());
final Map<String, BlobContainer> shardContainers = indexContainer.children();
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
if (isRemoteSnapshot(repository, repositoryData, index)) {
// If the source of the data is another snapshot (i.e. searchable snapshot)
// then assert that there is no shard data (because it exists in the source snapshot)
assertThat(shardContainers, anEmptyMap());
} else {
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
}
}
}
}
Expand Down Expand Up @@ -439,4 +450,24 @@ private static ClusterService mockClusterService(ClusterState initialState) {
when(clusterApplierService.threadPool()).thenReturn(threadPool);
return clusterService;
}

private static boolean isRemoteSnapshot(BlobStoreRepository repository, RepositoryData repositoryData, IndexId indexId)
throws IOException {
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
for (SnapshotId snapshotId : snapshotIds) {
try {
if (isRemoteSnapshot(repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId))) {
return true;
}
} catch (SnapshotMissingException e) {
// Index does not exist in this snapshot so continue looping
}
}
return false;
}

private static boolean isRemoteSnapshot(IndexMetadata metadata) {
final String storeType = metadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey());
return storeType != null && storeType.equals(IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey());
}
}

0 comments on commit e1da84d

Please sign in to comment.