From f1a145fc30b93b4776eb44e960526ed3c5f9a769 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 23 Aug 2024 23:29:24 +0530 Subject: [PATCH] Add snapshot shard blobs with hashed prefix Signed-off-by: Ashish Singh --- CHANGELOG.md | 1 + .../recovery/RemoteStoreRestoreService.java | 3 +- .../index/remote/RemoteIndexPathUploader.java | 3 +- .../index/remote/RemoteStoreEnums.java | 4 +- .../index/remote/RemoteStorePathStrategy.java | 72 ++- .../org/opensearch/repositories/IndexId.java | 38 +- .../repositories/RepositoryData.java | 30 +- .../blobstore/BaseBlobStoreFormat.java | 58 +- .../blobstore/BlobStoreRepository.java | 535 +++++++++++++++--- .../blobstore/ChecksumBlobStoreFormat.java | 40 +- .../blobstore/ConfigBlobStoreFormat.java | 26 +- .../snapshots/SnapshotShardPaths.java | 95 ++++ .../snapshots/SnapshotsService.java | 7 +- .../snapshots/SnapshotShardPathsTests.java | 103 ++++ 14 files changed, 865 insertions(+), 150 deletions(-) create mode 100644 server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java create mode 100644 server/src/test/java/org/opensearch/snapshots/SnapshotShardPathsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a9469846b1648..18b30361dc57d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325)) - Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) +- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index d3c6fc9d1f3bf..03d841d13b7f7 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -227,7 +227,8 @@ private RemoteRestoreResult executeRestore( .build(); } - IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID()); + // This instance of IndexId is not related to Snapshot Restore. Hence, we are using the ctor without pathType. + IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID(), IndexId.DEFAULT_SHARD_PATH_TYPE); if (metadataFromRemoteStore == false) { Map indexShardRoutingTableMap = currentState.routingTable() diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index d5617bdfd94a7..5457fe80d6e2d 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -64,7 +64,8 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener { public static final String DELIMITER = "#"; public static final ConfigBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>( - RemoteIndexPath.FILE_NAME_FORMAT + RemoteIndexPath.FILE_NAME_FORMAT, + RemoteIndexPath::fromXContent ); private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s"; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java index b0376c97e6994..0af0d07e11597 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java @@ -207,7 +207,7 @@ public enum PathHashAlgorithm { @Override String hash(PathInput pathInput) { StringBuilder input = new StringBuilder(); - for (String path : pathInput.fixedSubPath().toArray()) { + for (String path : pathInput.hashPath().toArray()) { input.append(path); } long hash = FNV1a.hash64(input.toString()); @@ -222,7 +222,7 @@ String hash(PathInput pathInput) { @Override String hash(PathInput pathInput) { StringBuilder input = new StringBuilder(); - for (String path : pathInput.fixedSubPath().toArray()) { + for (String path : pathInput.hashPath().toArray()) { input.append(path); } long hash = FNV1a.hash64(input.toString()); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java index d0250790068f7..bf5e1e71e8ff2 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java @@ -17,6 +17,7 @@ import org.opensearch.index.remote.RemoteStoreEnums.DataType; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.util.Objects; @@ -100,6 +101,10 @@ BlobPath fixedSubPath() { return BlobPath.cleanPath().add(indexUUID); } + BlobPath hashPath() { + return BlobPath.cleanPath().add(indexUUID); + } + /** * Returns a new builder for {@link PathInput}. */ @@ -127,7 +132,7 @@ public T basePath(BlobPath basePath) { return self(); } - public Builder indexUUID(String indexUUID) { + public T indexUUID(String indexUUID) { this.indexUUID = indexUUID; return self(); } @@ -142,6 +147,61 @@ public PathInput build() { } } + /** + * A subclass of {@link PathInput} that represents the input required to generate a path + * for a shard in a snapshot. It includes the base path, index UUID, and shard ID. + * + * @opensearch.internal + */ + public static class SnapshotShardPathInput extends PathInput { + private final String shardId; + + public SnapshotShardPathInput(SnapshotShardPathInput.Builder builder) { + super(builder); + this.shardId = Objects.requireNonNull(builder.shardId); + } + + @Override + BlobPath fixedSubPath() { + return BlobPath.cleanPath().add(BlobStoreRepository.INDICES_DIR).add(super.fixedSubPath()).add(shardId); + } + + @Override + BlobPath hashPath() { + return BlobPath.cleanPath().add(shardId).add(indexUUID()); + } + + /** + * Returns a new builder for {@link SnapshotShardPathInput}. + */ + public static SnapshotShardPathInput.Builder builder() { + return new SnapshotShardPathInput.Builder(); + } + + /** + * Builder for {@link SnapshotShardPathInput}. + * + * @opensearch.internal + */ + public static class Builder extends PathInput.Builder { + private String shardId; + + public SnapshotShardPathInput.Builder shardId(String shardId) { + this.shardId = shardId; + return this; + } + + @Override + protected SnapshotShardPathInput.Builder self() { + return this; + } + + public SnapshotShardPathInput build() { + return new SnapshotShardPathInput(this); + } + } + } + /** * Wrapper class for the data aware path input required to generate path for remote store uploads. This input is * composed of the parent inputs, shard id, data category and data type. @@ -204,16 +264,6 @@ public static class Builder extends PathInput.Builder { private DataCategory dataCategory; private DataType dataType; - public Builder basePath(BlobPath basePath) { - super.basePath = basePath; - return this; - } - - public Builder indexUUID(String indexUUID) { - super.indexUUID = indexUUID; - return this; - } - public Builder shardId(String shardId) { this.shardId = shardId; return this; diff --git a/server/src/main/java/org/opensearch/repositories/IndexId.java b/server/src/main/java/org/opensearch/repositories/IndexId.java index 87a0063e8c21b..238dffbb46bde 100644 --- a/server/src/main/java/org/opensearch/repositories/IndexId.java +++ b/server/src/main/java/org/opensearch/repositories/IndexId.java @@ -32,6 +32,7 @@ package org.opensearch.repositories; +import org.opensearch.Version; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -40,6 +41,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.remote.RemoteStoreEnums; import java.io.IOException; import java.util.Objects; @@ -51,23 +53,36 @@ */ @PublicApi(since = "1.0.0") public final class IndexId implements Writeable, ToXContentObject { - protected static final String NAME = "name"; - protected static final String ID = "id"; + static final String NAME = "name"; + static final String ID = "id"; + static final String SHARD_PATH_TYPE = "shard_path_type"; + public static final int DEFAULT_SHARD_PATH_TYPE = RemoteStoreEnums.PathType.FIXED.getCode(); private final String name; private final String id; + private final int shardPathType; private final int hashCode; + // Used for testing only public IndexId(final String name, final String id) { + this(name, id, DEFAULT_SHARD_PATH_TYPE); + } + + public IndexId(String name, String id, int shardPathType) { this.name = name; this.id = id; + this.shardPathType = shardPathType; this.hashCode = computeHashCode(); - } public IndexId(final StreamInput in) throws IOException { this.name = in.readString(); this.id = in.readString(); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + this.shardPathType = in.readVInt(); + } else { + this.shardPathType = DEFAULT_SHARD_PATH_TYPE; + } this.hashCode = computeHashCode(); } @@ -93,9 +108,16 @@ public String getId() { return id; } + /** + * The storage path type in remote store for the indexes having the underlying index ids. + */ + public int getShardPathType() { + return shardPathType; + } + @Override public String toString() { - return "[" + name + "/" + id + "]"; + return "[" + name + "/" + id + "/" + shardPathType + "]"; } @Override @@ -107,7 +129,7 @@ public boolean equals(Object o) { return false; } IndexId that = (IndexId) o; - return Objects.equals(name, that.name) && Objects.equals(id, that.id); + return Objects.equals(name, that.name) && Objects.equals(id, that.id) && Objects.equals(this.shardPathType, that.shardPathType); } @Override @@ -116,13 +138,16 @@ public int hashCode() { } private int computeHashCode() { - return Objects.hash(name, id); + return Objects.hash(name, id, shardPathType); } @Override public void writeTo(final StreamOutput out) throws IOException { out.writeString(name); out.writeString(id); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeVInt(shardPathType); + } } @Override @@ -130,6 +155,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.startObject(); builder.field(NAME, name); builder.field(ID, id); + builder.field(SHARD_PATH_TYPE, shardPathType); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/opensearch/repositories/RepositoryData.java b/server/src/main/java/org/opensearch/repositories/RepositoryData.java index ea48d9b1a49fe..09cb654604c06 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoryData.java @@ -517,7 +517,7 @@ public List resolveIndices(final List indices) { * @param indicesToResolve names of indices to resolve * @param inFlightIds name to index mapping for currently in-flight snapshots not yet in the repository data to fall back to */ - public List resolveNewIndices(List indicesToResolve, Map inFlightIds) { + public List resolveNewIndices(List indicesToResolve, Map inFlightIds, int pathType) { List snapshotIndices = new ArrayList<>(); for (String index : indicesToResolve) { IndexId indexId = indices.get(index); @@ -525,7 +525,7 @@ public List resolveNewIndices(List indicesToResolve, Map resolveNewIndices(List indicesToResolve, Map snapshotIds = indexSnapshots.get(indexId); assert snapshotIds != null; @@ -765,14 +774,20 @@ private static void parseIndices( final List snapshotIds = new ArrayList<>(); final List gens = new ArrayList<>(); + String id = null; + int pathType = IndexId.DEFAULT_SHARD_PATH_TYPE; IndexId indexId = null; + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { final String indexMetaFieldName = parser.currentName(); final XContentParser.Token currentToken = parser.nextToken(); switch (indexMetaFieldName) { case INDEX_ID: - indexId = new IndexId(indexName, parser.text()); + id = parser.text(); + break; + case IndexId.SHARD_PATH_TYPE: + pathType = parser.intValue(); break; case SNAPSHOTS: XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, currentToken, parser); @@ -795,7 +810,7 @@ private static void parseIndices( // different versions create or delete snapshot in the same repository. throw new OpenSearchParseException( "Detected a corrupted repository, index " - + indexId + + new IndexId(indexName, id, pathType) + " references an unknown snapshot uuid [" + uuid + "]" @@ -812,9 +827,10 @@ private static void parseIndices( break; } } - assert indexId != null; + assert id != null; + indexId = new IndexId(indexName, id, pathType); indexSnapshots.put(indexId, Collections.unmodifiableList(snapshotIds)); - indexLookup.put(indexId.getId(), indexId); + indexLookup.put(id, indexId); for (int i = 0; i < gens.size(); i++) { String parsedGen = gens.get(i); if (parsedGen != null) { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BaseBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/BaseBlobStoreFormat.java index 262a32fa1e74d..524effdd41ad8 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BaseBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BaseBlobStoreFormat.java @@ -9,19 +9,34 @@ package org.opensearch.repositories.blobstore; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedFunction; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.lucene.store.IndexOutputOutputStream; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.gateway.CorruptStateException; import java.io.IOException; import java.io.OutputStream; +import java.util.Arrays; import java.util.Locale; import java.util.Objects; @@ -30,7 +45,7 @@ * * @opensearch.internal */ -public class BaseBlobStoreFormat { +public abstract class BaseBlobStoreFormat { private static final int BUFFER_SIZE = 4096; @@ -91,7 +106,7 @@ public BytesReference serialize( try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", + "BaseBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", blobName, outputStream, BUFFER_SIZE @@ -127,4 +142,43 @@ public void close() { protected String getBlobNameFormat() { return blobNameFormat; } + + public T deserialize( + String blobName, + NamedXContentRegistry namedXContentRegistry, + BytesReference bytes, + XContentType xContentType, + String codec, + Integer version + ) throws IOException { + assert skipHeaderFooter || (Objects.nonNull(codec) && Objects.nonNull(version)); + final String resourceDesc = "BaseBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + try { + final IndexInput indexInput = bytes.length() > 0 + ? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) + : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); + if (skipHeaderFooter == false) { + CodecUtil.checksumEntireFile(indexInput); + CodecUtil.checkHeader(indexInput, codec, version, version); + } + int footerLength = skipHeaderFooter ? 0 : CodecUtil.footerLength(); + long filePointer = indexInput.getFilePointer(); + long contentSize = indexInput.length() - footerLength - filePointer; + try ( + XContentParser parser = XContentHelper.createParser( + namedXContentRegistry, + LoggingDeprecationHandler.INSTANCE, + bytes.slice((int) filePointer, (int) contentSize), + xContentType + ) + ) { + return reader().apply(parser); + } + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + // we trick this into a dedicated exception with the original stacktrace + throw new CorruptStateException(ex); + } + } + + abstract CheckedFunction reader(); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 3e6a75565891f..64f8bca92dc9f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -108,8 +108,11 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput; +import org.opensearch.index.remote.RemoteStorePathStrategy.SnapshotShardPathInput; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; @@ -144,6 +147,7 @@ import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotMissingException; +import org.opensearch.snapshots.SnapshotShardPaths; import org.opensearch.snapshots.SnapshotsService; import org.opensearch.threadpool.ThreadPool; @@ -155,6 +159,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -177,7 +183,6 @@ import java.util.stream.Stream; import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1; -import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; @@ -223,6 +228,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String UPLOADED_DATA_BLOB_PREFIX = "__"; + public static final String INDICES_DIR = "indices"; + /** * Prefix used for the identifiers of data blobs that were not actually written to the repository physically because their contents are * already stored in the metadata referencing them, i.e. in {@link BlobStoreIndexShardSnapshot} and @@ -266,6 +273,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final Setting REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false); + public static final Setting SHARD_PATH_TYPE = new Setting<>( + "shard_path_type", + PathType.FIXED.toString(), + PathType::parseString + ); + /** * Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion. * For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation @@ -379,6 +392,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp BlobStoreIndexShardSnapshots::fromXContent ); + public static final ConfigBlobStoreFormat SNAPSHOT_SHARD_PATHS_FORMAT = new ConfigBlobStoreFormat<>( + SnapshotShardPaths.FILE_NAME_FORMAT, + SnapshotShardPaths::fromXContent + ); + private volatile boolean readOnly; private final boolean isSystemRepository; @@ -389,6 +407,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final SetOnce blobContainer = new SetOnce<>(); + private final SetOnce rootBlobContainer = new SetOnce<>(); + + private final SetOnce snapshotShardPathBlobContainer = new SetOnce<>(); + private final SetOnce blobStore = new SetOnce<>(); protected final ClusterService clusterService; @@ -779,6 +801,16 @@ BlobContainer getBlobContainer() { return blobContainer.get(); } + // package private, only use for testing + BlobContainer getRootBlobContainer() { + return rootBlobContainer.get(); + } + + // package private, only use for testing + public SetOnce getSnapshotShardPathBlobContainer() { + return snapshotShardPathBlobContainer; + } + // for test purposes only protected BlobStore getBlobStore() { return blobStore.get(); @@ -804,10 +836,47 @@ protected BlobContainer blobContainer() { } } } - return blobContainer; } + /** + * maintains single lazy instance of {@link BlobContainer} + */ + protected BlobContainer rootBlobContainer() { + assertSnapshotOrGenericThread(); + + BlobContainer rootBlobContainer = this.rootBlobContainer.get(); + if (rootBlobContainer == null) { + synchronized (lock) { + rootBlobContainer = this.rootBlobContainer.get(); + if (rootBlobContainer == null) { + rootBlobContainer = blobStore().blobContainer(BlobPath.cleanPath()); + this.rootBlobContainer.set(rootBlobContainer); + } + } + } + return rootBlobContainer; + } + + /** + * maintains single lazy instance of {@link BlobContainer} + */ + protected BlobContainer snapshotShardPathBlobContainer() { + assertSnapshotOrGenericThread(); + + BlobContainer snapshotShardPathBlobContainer = this.snapshotShardPathBlobContainer.get(); + if (snapshotShardPathBlobContainer == null) { + synchronized (lock) { + snapshotShardPathBlobContainer = this.snapshotShardPathBlobContainer.get(); + if (snapshotShardPathBlobContainer == null) { + snapshotShardPathBlobContainer = blobStore().blobContainer(basePath().add(SnapshotShardPaths.DIR)); + this.snapshotShardPathBlobContainer.set(snapshotShardPathBlobContainer); + } + } + } + return snapshotShardPathBlobContainer; + } + /** * Maintains single lazy instance of {@link BlobStore}. * Public for testing. @@ -1099,7 +1168,7 @@ private void asyncCleanupUnlinkedShardLevelBlobs( RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { - final List filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults); + final List> filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults); if (filesToDelete.isEmpty()) { listener.onResponse(null); return; @@ -1107,10 +1176,10 @@ private void asyncCleanupUnlinkedShardLevelBlobs( try { AtomicInteger counter = new AtomicInteger(); - Collection> subList = filesToDelete.stream() + Collection>> subList = filesToDelete.stream() .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch)) .values(); - final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList); + final BlockingQueue>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList); final GroupedActionListener groupedListener = new GroupedActionListener<>( ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure), @@ -1213,28 +1282,31 @@ protected void releaseRemoteStoreLockAndCleanup( // When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective // shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs. private void executeStaleShardDelete( - BlockingQueue> staleFilesToDeleteInBatch, + BlockingQueue>> staleFilesToDeleteInBatch, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, GroupedActionListener listener ) throws InterruptedException { - List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); + List> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); if (filesToDelete != null) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { try { // filtering files for which remote store lock release and cleanup succeeded, // remaining files for which it failed will be retried in next snapshot delete run. List eligibleFilesToDelete = new ArrayList<>(); - for (String fileToDelete : filesToDelete) { - if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { - String[] fileToDeletePath = fileToDelete.split("/"); - String indexId = fileToDeletePath[1]; - String shardId = fileToDeletePath[2]; - String shallowSnapBlob = fileToDeletePath[3]; - String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow(); - BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); + for (Tuple fileToDelete : filesToDelete) { + BlobPath blobPath = fileToDelete.v1(); + String blobName = fileToDelete.v2(); + String fullPath = blobPath.buildAsString() + blobName; + if (blobName.startsWith(SHALLOW_SNAPSHOT_PREFIX)) { + String snapshotUUID = extractShallowSnapshotUUID(blobName).orElseThrow(); + String[] pathComponents = blobPath.toArray(); + int numComponents = pathComponents.length; + String indexId = pathComponents[numComponents - 2]; + String shardId = pathComponents[numComponents - 1]; + BlobContainer shardContainer = blobStore().blobContainer(blobPath); try { releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory); - eligibleFilesToDelete.add(fileToDelete); + eligibleFilesToDelete.add(fullPath); } catch (Exception e) { logger.error( "Failed to release lock or cleanup shard for indexID {}, shardID {} " + "and snapshot {}", @@ -1244,11 +1316,11 @@ private void executeStaleShardDelete( ); } } else { - eligibleFilesToDelete.add(fileToDelete); + eligibleFilesToDelete.add(fullPath); } } // Deleting the shard blobs - deleteFromContainer(blobContainer(), eligibleFilesToDelete); + deleteFromContainer(rootBlobContainer(), eligibleFilesToDelete); l.onResponse(null); } catch (Exception e) { logger.warn( @@ -1405,26 +1477,30 @@ public void onFailure(Exception ex) { } } - private List resolveFilesToDelete( + /** + * Resolves the list of files that should be deleted during a snapshot deletion operation. + * This method combines files to be deleted from shard-level metadata and index-level metadata. + * + * @param oldRepositoryData The repository data before the snapshot deletion + * @param snapshotIds The IDs of the snapshots being deleted + * @param deleteResults The results of removing snapshots from shard-level metadata + * @return A list of tuples, each containing a blob path and the name of a blob to be deleted + */ + private List> resolveFilesToDelete( RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults ) { - final String basePath = basePath().buildAsString(); - final int basePathLen = basePath.length(); final Map> indexMetaGenerations = oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots( snapshotIds ); return Stream.concat(deleteResults.stream().flatMap(shardResult -> { - final String shardPath = shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); - return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); + final BlobPath shardPath = shardPath(shardResult.indexId, shardResult.shardId); + return shardResult.blobsToDelete.stream().map(blob -> Tuple.tuple(shardPath, blob)); }), indexMetaGenerations.entrySet().stream().flatMap(entry -> { - final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString(); - return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id)); - })).map(absolutePath -> { - assert absolutePath.startsWith(basePath); - return absolutePath.substring(basePathLen); - }).collect(Collectors.toList()); + final BlobPath indexPath = indexPath(entry.getKey()); + return entry.getValue().stream().map(id -> Tuple.tuple(indexPath, INDEX_METADATA_FORMAT.blobName(id))); + })).collect(Collectors.toList()); } /** @@ -1472,10 +1548,20 @@ private void cleanupStaleBlobs( if (foundIndices.keySet().equals(survivingIndexIds)) { groupedListener.onResponse(DeleteResult.ZERO); } else { - cleanupStaleIndices(foundIndices, survivingIndexIds, remoteStoreLockManagerFactory, groupedListener); + Map snapshotShardPaths = getSnapshotShardPaths(); + cleanupStaleIndices(foundIndices, survivingIndexIds, remoteStoreLockManagerFactory, groupedListener, snapshotShardPaths); } } + private Map getSnapshotShardPaths() { + try { + return snapshotShardPathBlobContainer().listBlobs(); + } catch (IOException ex) { + logger.warn(new ParameterizedMessage("Repository [{}] Failed to get the snapshot shard paths", metadata.name()), ex); + } + return Collections.emptyMap(); + } + /** * Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the * repository. If remoteStoreLockManagerFactory is not null, remote store lock files are released when deleting the respective @@ -1619,7 +1705,8 @@ private void cleanupStaleIndices( Map foundIndices, Set survivingIndexIds, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, - GroupedActionListener listener + GroupedActionListener listener, + Map snapshotShardPaths ) { final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> { DeleteResult deleteResult = DeleteResult.ZERO; @@ -1643,7 +1730,7 @@ private void cleanupStaleIndices( foundIndices.size() - survivingIndexIds.size() ); for (int i = 0; i < workers; ++i) { - executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, groupedListener); + executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, groupedListener, snapshotShardPaths); } } catch (Exception e) { // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. @@ -1663,55 +1750,185 @@ private static boolean isIndexPresent(ClusterService clusterService, String inde return false; } + /** + * Executes the deletion of a single stale index. + * + * @param staleIndicesToDelete Queue of stale indices to delete + * @param remoteStoreLockManagerFactory Factory for creating remote store lock managers + * @param listener Listener for grouped delete actions + * @param snapshotShardPaths Map of snapshot shard paths and their metadata + * @throws InterruptedException if the thread is interrupted while waiting + */ private void executeOneStaleIndexDelete( BlockingQueue> staleIndicesToDelete, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, - GroupedActionListener listener + GroupedActionListener listener, + Map snapshotShardPaths ) throws InterruptedException { Map.Entry indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS); - if (indexEntry != null) { - final String indexSnId = indexEntry.getKey(); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { - DeleteResult deleteResult = DeleteResult.ZERO; - try { - logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); - if (remoteStoreLockManagerFactory != null) { - final Map shardBlobs = indexEntry.getValue().children(); - for (Map.Entry shardBlob : shardBlobs.entrySet()) { - for (String blob : shardBlob.getValue().listBlobs().keySet()) { - final Optional snapshotUUID = extractShallowSnapshotUUID(blob); - if (snapshotUUID.isPresent()) { - releaseRemoteStoreLockAndCleanup( - shardBlob.getKey(), - snapshotUUID.get(), - shardBlob.getValue(), - remoteStoreLockManagerFactory - ); - } - } - } - } - // Deleting the index folder - deleteResult = indexEntry.getValue().delete(); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); - } catch (IOException e) { - logger.warn( - () -> new ParameterizedMessage( - "[{}] index {} is no longer part of any snapshots in the repository, " - + "but failed to clean up their index folders", - metadata.name(), - indexSnId - ), - e - ); - } catch (Exception e) { - assert false : e; - logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e); + if (indexEntry == null) { + return; + } + final String indexSnId = indexEntry.getKey(); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { + try { + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + List matchingShardPaths = findMatchingShardPaths(indexSnId, snapshotShardPaths); + Optional highestGenShardPaths = findHighestGenerationShardPaths(matchingShardPaths); + + if (remoteStoreLockManagerFactory != null) { + cleanupRemoteStoreLocks(indexEntry, highestGenShardPaths, remoteStoreLockManagerFactory); } - executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener); + DeleteResult deleteResult = deleteShardData(highestGenShardPaths, matchingShardPaths, snapshotShardPaths); + deleteResult = deleteResult.add(indexEntry.getValue().delete()); // Deleting the index folder + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); return deleteResult; - })); + } catch (IOException e) { + logger.warn( + () -> new ParameterizedMessage( + "[{}] index {} is no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders", + metadata.name(), + indexSnId + ), + e + ); + return DeleteResult.ZERO; + } catch (Exception e) { + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e); + return DeleteResult.ZERO; + } finally { + executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener, snapshotShardPaths); + } + })); + } + + /** + * Finds and returns a list of shard paths that match the given index ID. + * + * @param indexId The ID of the index to match + * @param snapshotShardPaths Map of snapshot shard paths and their metadata + * @return List of matching shard paths + */ + private List findMatchingShardPaths(String indexId, Map snapshotShardPaths) { + return snapshotShardPaths.keySet().stream().filter(s -> s.startsWith(indexId)).collect(Collectors.toList()); + } + + /** + * Finds the shard path with the highest generation number from the given list of matching shard paths. + * + * @param matchingShardPaths List of shard paths that match a specific criteria + * @return An Optional containing the shard path with the highest generation number, or empty if the list is empty + */ + private Optional findHighestGenerationShardPaths(List matchingShardPaths) { + return matchingShardPaths.stream() + .map(s -> s.split(SnapshotShardPaths.DELIMITER)) + .sorted((a, b) -> Integer.parseInt(b[2]) - Integer.parseInt(a[2])) + .map(parts -> String.join(SnapshotShardPaths.DELIMITER, parts)) + .findFirst(); + } + + /** + * Cleans up remote store locks for a given index entry. + * + * @param indexEntry The index entry containing the blob container + * @param highestGenShardPaths The optional highest generation shard path + * @param remoteStoreLockManagerFactory Factory for creating remote store lock managers + * @throws IOException If an I/O error occurs during the cleanup process + */ + private void cleanupRemoteStoreLocks( + Map.Entry indexEntry, + Optional highestGenShardPaths, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory + ) throws IOException { + if (highestGenShardPaths.isEmpty()) { + releaseRemoteStoreLocksAndCleanup(indexEntry.getValue().children(), remoteStoreLockManagerFactory); + } else { + ShardInfo shardInfo = parseShardPath(highestGenShardPaths.get()); + Map shardContainers = new HashMap<>(shardInfo.shardCount); + for (int i = 0; i < shardInfo.shardCount; i++) { + shardContainers.put(String.valueOf(i), shardContainer(shardInfo.indexId, i)); + } + releaseRemoteStoreLocksAndCleanup(shardContainers, remoteStoreLockManagerFactory); + } + } + + /** + * Releases remote store locks and performs cleanup for each shard blob. + * + * @param shardBlobs Map of shard IDs to their corresponding BlobContainers + * @param remoteStoreLockManagerFactory Factory for creating remote store lock managers + * @throws IOException If an I/O error occurs during the release and cleanup process + */ + private void releaseRemoteStoreLocksAndCleanup( + Map shardBlobs, + RemoteStoreLockManagerFactory remoteStoreLockManagerFactory + ) throws IOException { + for (Map.Entry shardBlob : shardBlobs.entrySet()) { + for (String blob : shardBlob.getValue().listBlobs().keySet()) { + final Optional snapshotUUID = extractShallowSnapshotUUID(blob); + if (snapshotUUID.isPresent()) { + releaseRemoteStoreLockAndCleanup( + shardBlob.getKey(), + snapshotUUID.get(), + shardBlob.getValue(), + remoteStoreLockManagerFactory + ); + } + } + } + } + + /** + * Deletes shard data for the highest generation shard paths. + * + * @param highestGenShardPaths The optional highest generation shard path + * @param matchingShardPaths List of shard paths that match a specific criteria + * @param snapshotShardPaths Map of snapshot shard paths and their metadata + * @throws IOException If an I/O error occurs during the deletion process + */ + private DeleteResult deleteShardData( + Optional highestGenShardPaths, + List matchingShardPaths, + Map snapshotShardPaths + ) throws IOException { + if (highestGenShardPaths.isEmpty()) { + return DeleteResult.ZERO; + } + + ShardInfo shardInfo = parseShardPath(highestGenShardPaths.get()); + DeleteResult deleteResult = DeleteResult.ZERO; + + for (int i = 0; i < shardInfo.shardCount; i++) { + deleteResult = deleteResult.add(shardContainer(shardInfo.indexId, i).delete()); + } + + deleteFromContainer(snapshotShardPathBlobContainer(), matchingShardPaths); + long totalBytes = matchingShardPaths.stream().mapToLong(s -> snapshotShardPaths.get(s).length()).sum(); + return deleteResult.add(matchingShardPaths.size(), totalBytes); + } + + /** + * Parses a shard path string and extracts relevant shard information. + * + * @param shardPath The shard path string to parse. Expected format is: + * [index_id]#[index_name]#[shard_count]#[path_type_code]#[path_hash_algorithm_code] + * @return A {@link ShardInfo} object containing the parsed index ID and shard count. + * @throws IllegalArgumentException if the shard path format is invalid or cannot be parsed. + */ + private ShardInfo parseShardPath(String shardPath) { + String[] parts = shardPath.split(SnapshotShardPaths.DELIMITER); + if (parts.length != 5) { + throw new IllegalArgumentException("Invalid shard path format: " + shardPath); + } + try { + IndexId indexId = new IndexId(parts[1], parts[0], Integer.parseInt(parts[3])); + int shardCount = Integer.parseInt(parts[2]); + return new ShardInfo(indexId, shardCount); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid shard path format: " + shardPath, e); } } @@ -1754,6 +1971,8 @@ public void finalizeSnapshot( indexMetas, indexMetaIdentifiers ); + Set updatedIndexIds = writeNewIndexShardPaths(existingRepositoryData, updatedRepositoryData, snapshotId); + cleanupStaleSnapshotShardPaths(updatedIndexIds); writeIndexGen( updatedRepositoryData, repositoryStateId, @@ -1804,21 +2023,130 @@ public void finalizeSnapshot( }, onUpdateFailure); } + private void cleanupStaleSnapshotShardPaths(Set updatedShardPathsIndexIds) { + Set updatedIndexIds = updatedShardPathsIndexIds.stream() + .map(s -> s.split(SnapshotShardPaths.DELIMITER)[0]) + .collect(Collectors.toSet()); + Set indexIdShardPaths = getSnapshotShardPaths().keySet(); + List staleShardPaths = indexIdShardPaths.stream().filter(s -> updatedShardPathsIndexIds.contains(s) == false).filter(s -> { + String indexId = s.split(SnapshotShardPaths.DELIMITER)[0]; + return updatedIndexIds.contains(indexId); + }).collect(Collectors.toList()); + try { + deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths); + } catch (IOException e) { + logger.warn( + new ParameterizedMessage( + "Repository [{}] Exception during snapshot stale index deletion {}", + metadata.name(), + staleShardPaths + ), + e + ); + } + } + + private Set writeNewIndexShardPaths( + RepositoryData existingRepositoryData, + RepositoryData updatedRepositoryData, + SnapshotId snapshotId + ) { + Set updatedIndexIds = new HashSet<>(); + Set indicesToUpdate = new HashSet<>(updatedRepositoryData.getIndices().values()); + for (IndexId indexId : indicesToUpdate) { + if (indexId.getShardPathType() == PathType.FIXED.getCode()) { + continue; + } + int oldShardCount = existingRepositoryData.shardGenerations().getGens(indexId).size(); + int newShardCount = updatedRepositoryData.shardGenerations().getGens(indexId).size(); + if (newShardCount > oldShardCount) { + String shardPathsBlobName = writeIndexShardPaths(indexId, snapshotId, newShardCount); + if (Objects.nonNull(shardPathsBlobName)) { + updatedIndexIds.add(shardPathsBlobName); + } + } + } + return updatedIndexIds; + } + + private String writeIndexShardPaths(IndexId indexId, SnapshotId snapshotId, int shardCount) { + try { + List paths = getShardPaths(indexId, shardCount); + String pathType = String.valueOf(indexId.getShardPathType()); + String pathHashAlgorithm = String.valueOf(FNV_1A_COMPOSITE_1.getCode()); + String blobName = String.join( + SnapshotShardPaths.DELIMITER, + indexId.getId(), + indexId.getName(), + String.valueOf(shardCount), + pathType, + pathHashAlgorithm + ); + SnapshotShardPaths shardPaths = new SnapshotShardPaths(paths); + SNAPSHOT_SHARD_PATHS_FORMAT.writeAsyncWithUrgentPriority( + shardPaths, + snapshotShardPathBlobContainer(), + blobName, + getSnapshotShardListener(indexId, snapshotId) + ); + return blobName; + } catch (IOException e) { + logShardPathsOperationWarning(indexId, snapshotId, e); + } + return null; + } + + private List getShardPaths(IndexId indexId, int shardCount) { + List paths = new ArrayList<>(); + for (int shardId = 0; shardId < shardCount; shardId++) { + BlobPath shardPath = shardPath(indexId, shardId); + paths.add(shardPath.buildAsString()); + } + return paths; + } + + private ActionListener getSnapshotShardListener(IndexId indexId, SnapshotId snapshotId) { + return ActionListener.wrap( + resp -> logShardPathsOperationSuccess(indexId, snapshotId), + ex -> logShardPathsOperationWarning(indexId, snapshotId, ex) + ); + } + + private void logShardPathsOperationSuccess(IndexId indexId, SnapshotId snapshotId) { + logger.trace( + () -> new ParameterizedMessage( + "Repository [{}] successfully wrote shard paths for index [{}] in snapshot [{}]", + metadata.name(), + indexId.getName(), + snapshotId.getName() + ) + ); + } + + private void logShardPathsOperationWarning(IndexId indexId, SnapshotId snapshotId, @Nullable Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "Repository [{}] Failed to write shard paths for index [{}] in snapshot [{}]", + metadata.name(), + indexId.getName(), + snapshotId.getName() + ), + e + ); + } + // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) { final List toDelete = new ArrayList<>(); - final int prefixPathLen = basePath().buildAsString().length(); updatedRepositoryData.shardGenerations() .obsoleteShardGenerations(existingRepositoryData.shardGenerations()) .forEach( (indexId, gens) -> gens.forEach( - (shardId, oldGen) -> toDelete.add( - shardContainer(indexId, shardId).path().buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen - ) + (shardId, oldGen) -> toDelete.add(shardPath(indexId, shardId).buildAsString() + INDEX_FILE_PREFIX + oldGen) ) ); try { - deleteFromContainer(blobContainer(), toDelete); + deleteFromContainer(rootBlobContainer(), toDelete); } catch (Exception e) { logger.warn("Failed to clean up old shard generation blobs", e); } @@ -1865,11 +2193,15 @@ private void deleteFromContainer(BlobContainer container, List blobs) th } private BlobPath indicesPath() { - return basePath().add("indices"); + return basePath().add(INDICES_DIR); } private BlobContainer indexContainer(IndexId indexId) { - return blobStore().blobContainer(indicesPath().add(indexId.getId())); + return blobStore().blobContainer(indexPath(indexId)); + } + + private BlobPath indexPath(IndexId indexId) { + return indicesPath().add(indexId.getId()); } private BlobContainer shardContainer(IndexId indexId, ShardId shardId) { @@ -1877,7 +2209,17 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) { } public BlobContainer shardContainer(IndexId indexId, int shardId) { - return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId))); + return blobStore().blobContainer(shardPath(indexId, shardId)); + } + + private BlobPath shardPath(IndexId indexId, int shardId) { + PathType pathType = PathType.fromCode(indexId.getShardPathType()); + SnapshotShardPathInput shardPathInput = new SnapshotShardPathInput.Builder().basePath(basePath()) + .indexUUID(indexId.getId()) + .shardId(String.valueOf(shardId)) + .build(); + PathHashAlgorithm pathHashAlgorithm = pathType != PathType.FIXED ? FNV_1A_COMPOSITE_1 : null; + return pathType.path(shardPathInput, pathHashAlgorithm); } /** @@ -1964,7 +2306,7 @@ private BlobContainer testContainer(String seed) { BlobPath testBlobPath; if (prefixModeVerification == true) { PathInput pathInput = PathInput.builder().basePath(basePath()).indexUUID(seed).build(); - testBlobPath = HASHED_PREFIX.path(pathInput, FNV_1A_COMPOSITE_1); + testBlobPath = PathType.HASHED_PREFIX.path(pathInput, FNV_1A_COMPOSITE_1); } else { testBlobPath = basePath(); } @@ -2052,11 +2394,12 @@ private void doGetRepositoryData(ActionListener listener) { loaded = repositoryDataFromCachedEntry(cached); } else { loaded = getRepositoryData(genToLoad); + Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion(); // We can cache serialized in the most recent version here without regard to the actual repository metadata version // since we're only caching the information that we just wrote and thus won't accidentally cache any information that // isn't safe cacheRepositoryData( - BytesReference.bytes(loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT)), + BytesReference.bytes(loaded.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT, minNodeVersion)), genToLoad ); } @@ -2444,8 +2787,9 @@ public void onFailure(Exception e) { } final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); + Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion(); final BytesReference serializedRepoData = BytesReference.bytes( - newRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version) + newRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version, minNodeVersion) ); writeAtomic(blobContainer(), indexBlob, serializedRepoData, true); maybeWriteIndexLatest(newGen); @@ -3655,4 +3999,27 @@ public String toString() { return name; } } + + /** + * Represents parsed information from a shard path. + * This class encapsulates the index ID and shard count extracted from a shard path string. + */ + private static class ShardInfo { + /** The ID of the index associated with this shard. */ + final IndexId indexId; + + /** The total number of shards for this index. */ + final int shardCount; + + /** + * Constructs a new ShardInfo instance. + * + * @param indexId The ID of the index associated with this shard. + * @param shardCount The total number of shards for this index. + */ + ShardInfo(IndexId indexId, int shardCount) { + this.indexId = indexId; + this.shardCount = shardCount; + } + } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 3a49fed4be282..e6f2218086e78 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -31,14 +31,7 @@ package org.opensearch.repositories.blobstore; -import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.util.BytesRef; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.CheckedFunction; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; @@ -48,8 +41,6 @@ import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.Streams; import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; @@ -57,12 +48,10 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.gateway.CorruptStateException; import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.snapshots.SnapshotInfo; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -124,29 +113,7 @@ public String blobName(String name) { } public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException { - final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; - try { - final IndexInput indexInput = bytes.length() > 0 - ? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) - : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); - CodecUtil.checksumEntireFile(indexInput); - CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); - long filePointer = indexInput.getFilePointer(); - long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; - try ( - XContentParser parser = XContentHelper.createParser( - namedXContentRegistry, - LoggingDeprecationHandler.INSTANCE, - bytes.slice((int) filePointer, (int) contentSize), - XContentType.SMILE - ) - ) { - return reader.apply(parser); - } - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - // we trick this into a dedicated exception with the original stacktrace - throw new CorruptStateException(ex); - } + return deserialize(blobName, namedXContentRegistry, bytes, XContentType.SMILE, codec, VERSION); } /** @@ -260,4 +227,9 @@ public BytesReference serialize(final T obj, final String blobName, final Compre throws IOException { return serialize(obj, blobName, compressor, params, XContentType.SMILE, codec, VERSION); } + + @Override + CheckedFunction reader() { + return reader; + } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java index 8127bf8c2a2a2..48855b18a5981 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ConfigBlobStoreFormat.java @@ -8,17 +8,21 @@ package org.opensearch.repositories.blobstore; +import org.opensearch.common.CheckedFunction; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.io.Streams; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentParser; import java.io.IOException; @@ -35,11 +39,14 @@ */ public class ConfigBlobStoreFormat extends BaseBlobStoreFormat { + private final CheckedFunction reader; + /** * @param blobNameFormat format of the blobname in {@link String#format} format */ - public ConfigBlobStoreFormat(String blobNameFormat) { + public ConfigBlobStoreFormat(String blobNameFormat, CheckedFunction reader) { super(blobNameFormat, true); + this.reader = reader; } public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, String name, ActionListener listener) @@ -76,4 +83,21 @@ public void writeAsyncWithUrgentPriority(T obj, BlobContainer blobContainer, Str ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); } } + + public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException { + String blobName = blobName(name); + return deserialize( + blobName, + namedXContentRegistry, + Streams.readFully(blobContainer.readBlob(blobName)), + XContentType.JSON, + null, + null + ); + } + + @Override + CheckedFunction reader() { + return reader; + } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java new file mode 100644 index 0000000000000..1b0403886e69a --- /dev/null +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import com.fasterxml.jackson.core.JsonParseException; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Snapshot Shard path information. + * + * @opensearch.internal + */ +public class SnapshotShardPaths implements ToXContent { + + public static final String DIR = "snapshot_shard_paths"; + + public static final String DELIMITER = "#"; + + public static final String FILE_NAME_FORMAT = "%s"; + + private static final String PATHS_FIELD = "paths"; + + private final List paths; + + public SnapshotShardPaths(List paths) { + this.paths = Collections.unmodifiableList(paths); + } + + public List getPaths() { + return paths; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(PATHS_FIELD); + for (String path : paths) { + builder.value(path); + } + builder.endArray(); + return builder; + } + + public static SnapshotShardPaths fromXContent(XContentParser parser) throws IOException { + List paths = new ArrayList<>(); + + try { + XContentParser.Token token = parser.currentToken(); + if (token == null) { + token = parser.nextToken(); + } + + if (token != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException("Expected a start object"); + } + + token = parser.nextToken(); + if (token == XContentParser.Token.END_OBJECT) { + throw new OpenSearchParseException("Missing [" + PATHS_FIELD + "] field"); + } + + while (token != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + if (PATHS_FIELD.equals(fieldName)) { + if (parser.nextToken() != XContentParser.Token.START_ARRAY) { + throw new OpenSearchParseException("Expected an array for field [" + PATHS_FIELD + "]"); + } + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + paths.add(parser.text()); + } + } else { + throw new OpenSearchParseException("Unexpected field [" + fieldName + "]"); + } + token = parser.nextToken(); + } + } catch (JsonParseException e) { + throw new OpenSearchParseException("Failed to parse SnapshotIndexIdPaths", e); + } + return new SnapshotShardPaths(paths); + } +} diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 5e49208465dbb..4e526bdf290a1 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -136,6 +136,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHARD_PATH_TYPE; import static org.opensearch.snapshots.SnapshotUtils.validateSnapshotsBackingAnyIndex; /** @@ -320,9 +321,13 @@ public ClusterState execute(ClusterState currentState) { logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + int pathType = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.CURRENT) + ? SHARD_PATH_TYPE.get(repository.getMetadata().settings()).getCode() + : IndexId.DEFAULT_SHARD_PATH_TYPE; final List indexIds = repositoryData.resolveNewIndices( indices, - getInFlightIndexIds(runningSnapshots, repositoryName) + getInFlightIndexIds(runningSnapshots, repositoryName), + pathType ); final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); final Map shards = shards( diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotShardPathsTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotShardPathsTests.java new file mode 100644 index 0000000000000..3c932319e583b --- /dev/null +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotShardPathsTests.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class SnapshotShardPathsTests extends OpenSearchTestCase { + + public void testToXContent() throws IOException { + List paths = Arrays.asList("path1", "path2", "path3"); + SnapshotShardPaths indexIdPaths = new SnapshotShardPaths(paths); + + BytesReference bytes = XContentHelper.toXContent(indexIdPaths, XContentType.JSON, false); + String expectedJson = "{\"paths\":[\"path1\",\"path2\",\"path3\"]}"; + assertEquals(expectedJson, bytes.utf8ToString()); + } + + public void testToXContentEmptyPaths() throws IOException { + SnapshotShardPaths indexIdPaths = new SnapshotShardPaths(Collections.emptyList()); + + BytesReference bytes = XContentHelper.toXContent(indexIdPaths, XContentType.JSON, false); + String expectedJson = "{\"paths\":[]}"; + assertEquals(expectedJson, bytes.utf8ToString()); + } + + public void testFromXContent() throws IOException { + String json = "{\"paths\":[\"path1\",\"path2\",\"path3\"]}"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { + SnapshotShardPaths indexIdPaths = SnapshotShardPaths.fromXContent(parser); + List expectedPaths = Arrays.asList("path1", "path2", "path3"); + assertEquals(expectedPaths, indexIdPaths.getPaths()); + } + } + + public void testFromXContentEmptyPaths() throws IOException { + String json = "{\"paths\":[]}"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { + SnapshotShardPaths indexIdPaths = SnapshotShardPaths.fromXContent(parser); + assertEquals(Collections.emptyList(), indexIdPaths.getPaths()); + } + } + + public void testFromXContentMissingPaths() throws IOException { + String json = "{}"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { + OpenSearchParseException exception = expectThrows( + OpenSearchParseException.class, + () -> SnapshotShardPaths.fromXContent(parser) + ); + assertEquals("Missing [paths] field", exception.getMessage()); + } + } + + public void testFromXContentInvalidJson() throws IOException { + String json = "{\"paths\":\"invalid\"}"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { + OpenSearchParseException exception = expectThrows( + OpenSearchParseException.class, + () -> SnapshotShardPaths.fromXContent(parser) + ); + assertEquals("Expected an array for field [paths]", exception.getMessage()); + } + } + + public void testFromXContentInvalidJsonObject() throws IOException { + String json = "invalid"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { + OpenSearchParseException exception = expectThrows( + OpenSearchParseException.class, + () -> SnapshotShardPaths.fromXContent(parser) + ); + assertEquals("Failed to parse SnapshotIndexIdPaths", exception.getMessage()); + } + } + + public void testFromXContentNotAnObject() throws IOException { + String json = "[]"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) { + OpenSearchParseException exception = expectThrows( + OpenSearchParseException.class, + () -> SnapshotShardPaths.fromXContent(parser) + ); + assertEquals("Expected a start object", exception.getMessage()); + } + } +}