From e1da84d9d2e648066f3948906f935ce06495aac4 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Wed, 19 Apr 2023 13:34:23 -0500 Subject: [PATCH] Fix snapshotting searchable snapshot indexes (#7247) Two changes here: First, when snapshotting a searchable snapshot index (or snapshotting a full cluster that contains searchable snapshot indices) then we will snapshot the index metadata which includes the pointer to the original snapshot, but skip copying the data since it already exists in the source snapshot. Second, when restoring an index from a snapshot that is itself a searchable snapshot index, then it must be handled as such and restored as a searchable snapshot index. Resolves #7204 Signed-off-by: Andrew Ross --- .../snapshots/SearchableSnapshotIT.java | 69 +++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 7 ++ .../opensearch/snapshots/RestoreService.java | 8 +- .../snapshots/SnapshotShardsService.java | 76 ++++++++++++------- .../blobstore/BlobStoreTestUtil.java | 55 +++++++++++--- 5 files changed, 170 insertions(+), 45 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 4493137ca81e7..e759485c95b72 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -12,6 +12,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; import org.opensearch.action.index.IndexRequestBuilder; @@ -106,6 +107,74 @@ public void testCreateSearchableSnapshot() throws Exception { assertIndexDirectoryDoesNotExist(restoredIndexName1, restoredIndexName2); } + public void testSnapshottingSearchableSnapshots() throws Exception { + final String repoName = "test-repo"; + final String indexName = "test-idx"; + final Client client = client(); + + // create an index, add data, snapshot it, then delete it + internalCluster().ensureAtLeastNumDataNodes(1); + createIndexWithDocsAndEnsureGreen(0, 100, indexName); + createRepositoryWithSettings(null, repoName); + takeSnapshot(client, "initial-snapshot", repoName, indexName); + deleteIndicesAndEnsureGreen(client, indexName); + + // restore the index as a searchable snapshot + internalCluster().ensureAtLeastNumSearchNodes(1); + client.admin() + .cluster() + .prepareRestoreSnapshot(repoName, "initial-snapshot") + .setRenamePattern("(.+)") + .setRenameReplacement("$1-copy-0") + .setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT) + .setWaitForCompletion(true) + .execute() + .actionGet(); + ensureGreen(); + assertDocCount(indexName + "-copy-0", 100L); + assertIndexDirectoryDoesNotExist(indexName + "-copy-0"); + + // Test that the searchable snapshot index can continue to be snapshotted and restored + for (int i = 0; i < 4; i++) { + final String repeatedSnapshotName = "test-repeated-snap-" + i; + takeSnapshot(client, repeatedSnapshotName, repoName); + deleteIndicesAndEnsureGreen(client, "_all"); + client.admin() + .cluster() + .prepareRestoreSnapshot(repoName, repeatedSnapshotName) + .setRenamePattern("([a-z-]+).*") + .setRenameReplacement("$1" + (i + 1)) + .setWaitForCompletion(true) + .execute() + .actionGet(); + ensureGreen(); + final String restoredIndexName = indexName + "-copy-" + (i + 1); + assertDocCount(restoredIndexName, 100L); + assertIndexDirectoryDoesNotExist(restoredIndexName); + } + // Assert all the snapshots exist. Note that AbstractSnapshotIntegTestCase::assertRepoConsistency + // will run after this test (and all others) and assert on the consistency of the data in the repo. + final GetSnapshotsResponse response = client.admin().cluster().prepareGetSnapshots(repoName).execute().actionGet(); + final Map> snapshotInfoMap = response.getSnapshots() + .stream() + .collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices)); + assertEquals( + Map.of( + "initial-snapshot", + List.of("test-idx"), + "test-repeated-snap-0", + List.of("test-idx-copy-0"), + "test-repeated-snap-1", + List.of("test-idx-copy-1"), + "test-repeated-snap-2", + List.of("test-idx-copy-2"), + "test-repeated-snap-3", + List.of("test-idx-copy-3") + ), + snapshotInfoMap + ); + } + /** * Tests a chunked repository scenario for searchable snapshots by creating an index, * taking a snapshot, restoring it as a searchable snapshot index. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index be21a8ce54c14..e8b2c61d0f177 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3559,6 +3559,13 @@ public boolean isRemoteTranslogEnabled() { return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } + /** + * @return True if settings indicate this shard is backed by a remote snapshot, false otherwise. + */ + public boolean isRemoteSnapshot() { + return indexSettings != null && indexSettings.isRemoteSnapshot(); + } + /** * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 1ef1bddda7252..07fbbadaff29e 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -444,16 +444,16 @@ public ClusterState execute(ClusterState currentState) { request.indexSettings(), request.ignoreIndexSettings() ); - final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match( - request.storageType().toString() - ); - if (isSearchableSnapshot) { + if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) { snapshotIndexMetadata = addSnapshotToIndexSettings( snapshotIndexMetadata, snapshot, repositoryData.resolveIndexId(index) ); } + final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match( + snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()) + ); final SnapshotRecoverySource recoverySource = new SnapshotRecoverySource( restoreUUID, snapshot, diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 1aeda91629d4f..4c37af46bd101 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -54,6 +54,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexEventListener; @@ -275,42 +276,59 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { - @Override - public void onResponse(String newGeneration) { - assert newGeneration != null; - assert newGeneration.equals(snapshotStatus.generation()); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug( - "snapshot [{}] completed to [{}] with [{}] at generation [{}]", - snapshot, - snapshot.getRepository(), - lastSnapshotStatus, - snapshotStatus.generation() - ); + if (isRemoteSnapshot(shardId)) { + // If the source of the data is another remote snapshot (i.e. searchable snapshot) + // then no need to snapshot the shard and can immediately notify success. + notifySuccessfulSnapshotShard(snapshot, shardId, snapshotStatus.generation()); + } else { + snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(), new ActionListener<>() { + @Override + public void onResponse(String newGeneration) { + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug( + "snapshot [{}] completed to [{}] with [{}] at generation [{}]", + snapshot, + snapshot.getRepository(), + lastSnapshotStatus, + snapshotStatus.generation() + ); + } + notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); } - notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); - } - @Override - public void onFailure(Exception e) { - final String failure; - if (e instanceof AbortedSnapshotException) { - failure = "aborted"; - logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); - } else { - failure = summarizeFailure(e); - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + @Override + public void onFailure(Exception e) { + final String failure; + if (e instanceof AbortedSnapshotException) { + failure = "aborted"; + logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); + } else { + failure = summarizeFailure(e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + } + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); + notifyFailedSnapshotShard(snapshot, shardId, failure); } - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - notifyFailedSnapshotShard(snapshot, shardId, failure); - } - }); + }); + } } }); } + private boolean isRemoteSnapshot(ShardId shardId) { + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null) { + final IndexShard shard = indexService.getShardOrNull(shardId.id()); + if (shard != null) { + return shard.isRemoteSnapshot(); + } + } + return false; + } + // package private for testing static String summarizeFailure(Throwable t) { if (t.getCause() == null) { diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index 0aa6bb402149d..b89656b1d2c77 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -57,12 +57,14 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.IndexModule; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.ShardGenerations; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotMissingException; import org.opensearch.test.InternalTestCluster; import org.opensearch.threadpool.ThreadPool; @@ -86,6 +88,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.anEmptyMap; import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress; import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -140,7 +143,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex } assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); - assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); + assertShardIndexGenerations(repository, blobContainer, repositoryData); return null; } catch (AssertionError e) { return e; @@ -164,23 +167,31 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe assertTrue(indexGenerations.length <= 2); } - private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGenerations shardGenerations) throws IOException { + private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData) + throws IOException { + final ShardGenerations shardGenerations = repositoryData.shardGenerations(); final BlobContainer indicesContainer = repoRoot.children().get("indices"); for (IndexId index : shardGenerations.indices()) { final List gens = shardGenerations.getGens(index); if (gens.isEmpty() == false) { final BlobContainer indexContainer = indicesContainer.children().get(index.getId()); final Map shardContainers = indexContainer.children(); - for (int i = 0; i < gens.size(); i++) { - final String generation = gens.get(i); - assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN)); - if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) { - final String shardId = Integer.toString(i); - assertThat(shardContainers, hasKey(shardId)); - assertThat( - shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX), - hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation) - ); + if (isRemoteSnapshot(repository, repositoryData, index)) { + // If the source of the data is another snapshot (i.e. searchable snapshot) + // then assert that there is no shard data (because it exists in the source snapshot) + assertThat(shardContainers, anEmptyMap()); + } else { + for (int i = 0; i < gens.size(); i++) { + final String generation = gens.get(i); + assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN)); + if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) { + final String shardId = Integer.toString(i); + assertThat(shardContainers, hasKey(shardId)); + assertThat( + shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX), + hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation) + ); + } } } } @@ -439,4 +450,24 @@ private static ClusterService mockClusterService(ClusterState initialState) { when(clusterApplierService.threadPool()).thenReturn(threadPool); return clusterService; } + + private static boolean isRemoteSnapshot(BlobStoreRepository repository, RepositoryData repositoryData, IndexId indexId) + throws IOException { + final Collection snapshotIds = repositoryData.getSnapshotIds(); + for (SnapshotId snapshotId : snapshotIds) { + try { + if (isRemoteSnapshot(repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId))) { + return true; + } + } catch (SnapshotMissingException e) { + // Index does not exist in this snapshot so continue looping + } + } + return false; + } + + private static boolean isRemoteSnapshot(IndexMetadata metadata) { + final String storeType = metadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()); + return storeType != null && storeType.equals(IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()); + } }