Skip to content

Commit

Permalink
Add snapshot shard blobs with hashed prefix
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Aug 27, 2024
1 parent 5e44976 commit f1a145f
Show file tree
Hide file tree
Showing 14 changed files with 865 additions and 150 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private RemoteRestoreResult executeRestore(
.build();
}

IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID());
// This instance of IndexId is not related to Snapshot Restore. Hence, we are using the ctor without pathType.
IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID(), IndexId.DEFAULT_SHARD_PATH_TYPE);

if (metadataFromRemoteStore == false) {
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = currentState.routingTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {

public static final String DELIMITER = "#";
public static final ConfigBlobStoreFormat<RemoteIndexPath> REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>(
RemoteIndexPath.FILE_NAME_FORMAT
RemoteIndexPath.FILE_NAME_FORMAT,
RemoteIndexPath::fromXContent
);

private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public enum PathHashAlgorithm {
@Override
String hash(PathInput pathInput) {
StringBuilder input = new StringBuilder();
for (String path : pathInput.fixedSubPath().toArray()) {
for (String path : pathInput.hashPath().toArray()) {
input.append(path);
}
long hash = FNV1a.hash64(input.toString());
Expand All @@ -222,7 +222,7 @@ String hash(PathInput pathInput) {
@Override
String hash(PathInput pathInput) {
StringBuilder input = new StringBuilder();
for (String path : pathInput.fixedSubPath().toArray()) {
for (String path : pathInput.hashPath().toArray()) {
input.append(path);
}
long hash = FNV1a.hash64(input.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.DataType;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.util.Objects;

Expand Down Expand Up @@ -100,6 +101,10 @@ BlobPath fixedSubPath() {
return BlobPath.cleanPath().add(indexUUID);
}

BlobPath hashPath() {
return BlobPath.cleanPath().add(indexUUID);
}

/**
* Returns a new builder for {@link PathInput}.
*/
Expand Down Expand Up @@ -127,7 +132,7 @@ public T basePath(BlobPath basePath) {
return self();
}

public Builder indexUUID(String indexUUID) {
public T indexUUID(String indexUUID) {
this.indexUUID = indexUUID;
return self();
}
Expand All @@ -142,6 +147,61 @@ public PathInput build() {
}
}

/**
* A subclass of {@link PathInput} that represents the input required to generate a path
* for a shard in a snapshot. It includes the base path, index UUID, and shard ID.
*
* @opensearch.internal
*/
public static class SnapshotShardPathInput extends PathInput {
private final String shardId;

public SnapshotShardPathInput(SnapshotShardPathInput.Builder builder) {
super(builder);
this.shardId = Objects.requireNonNull(builder.shardId);
}

@Override
BlobPath fixedSubPath() {
return BlobPath.cleanPath().add(BlobStoreRepository.INDICES_DIR).add(super.fixedSubPath()).add(shardId);
}

@Override
BlobPath hashPath() {
return BlobPath.cleanPath().add(shardId).add(indexUUID());
}

/**
* Returns a new builder for {@link SnapshotShardPathInput}.
*/
public static SnapshotShardPathInput.Builder builder() {
return new SnapshotShardPathInput.Builder();
}

/**
* Builder for {@link SnapshotShardPathInput}.
*
* @opensearch.internal
*/
public static class Builder extends PathInput.Builder<SnapshotShardPathInput.Builder> {
private String shardId;

public SnapshotShardPathInput.Builder shardId(String shardId) {
this.shardId = shardId;
return this;
}

@Override
protected SnapshotShardPathInput.Builder self() {
return this;
}

public SnapshotShardPathInput build() {
return new SnapshotShardPathInput(this);
}
}
}

/**
* Wrapper class for the data aware path input required to generate path for remote store uploads. This input is
* composed of the parent inputs, shard id, data category and data type.
Expand Down Expand Up @@ -204,16 +264,6 @@ public static class Builder extends PathInput.Builder<Builder> {
private DataCategory dataCategory;
private DataType dataType;

public Builder basePath(BlobPath basePath) {
super.basePath = basePath;
return this;
}

public Builder indexUUID(String indexUUID) {
super.indexUUID = indexUUID;
return this;
}

public Builder shardId(String shardId) {
this.shardId = shardId;
return this;
Expand Down
38 changes: 32 additions & 6 deletions server/src/main/java/org/opensearch/repositories/IndexId.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories;

import org.opensearch.Version;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -40,6 +41,7 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteStoreEnums;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -51,23 +53,36 @@
*/
@PublicApi(since = "1.0.0")
public final class IndexId implements Writeable, ToXContentObject {
protected static final String NAME = "name";
protected static final String ID = "id";
static final String NAME = "name";
static final String ID = "id";
static final String SHARD_PATH_TYPE = "shard_path_type";
public static final int DEFAULT_SHARD_PATH_TYPE = RemoteStoreEnums.PathType.FIXED.getCode();

private final String name;
private final String id;
private final int shardPathType;
private final int hashCode;

// Used for testing only
public IndexId(final String name, final String id) {
this(name, id, DEFAULT_SHARD_PATH_TYPE);
}

public IndexId(String name, String id, int shardPathType) {
this.name = name;
this.id = id;
this.shardPathType = shardPathType;
this.hashCode = computeHashCode();

}

public IndexId(final StreamInput in) throws IOException {
this.name = in.readString();
this.id = in.readString();
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.shardPathType = in.readVInt();
} else {
this.shardPathType = DEFAULT_SHARD_PATH_TYPE;
}
this.hashCode = computeHashCode();
}

Expand All @@ -93,9 +108,16 @@ public String getId() {
return id;
}

/**
* The storage path type in remote store for the indexes having the underlying index ids.
*/
public int getShardPathType() {
return shardPathType;
}

@Override
public String toString() {
return "[" + name + "/" + id + "]";
return "[" + name + "/" + id + "/" + shardPathType + "]";
}

@Override
Expand All @@ -107,7 +129,7 @@ public boolean equals(Object o) {
return false;
}
IndexId that = (IndexId) o;
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
return Objects.equals(name, that.name) && Objects.equals(id, that.id) && Objects.equals(this.shardPathType, that.shardPathType);
}

@Override
Expand All @@ -116,20 +138,24 @@ public int hashCode() {
}

private int computeHashCode() {
return Objects.hash(name, id);
return Objects.hash(name, id, shardPathType);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeVInt(shardPathType);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(ID, id);
builder.field(SHARD_PATH_TYPE, shardPathType);
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,15 +517,15 @@ public List<IndexId> resolveIndices(final List<String> indices) {
* @param indicesToResolve names of indices to resolve
* @param inFlightIds name to index mapping for currently in-flight snapshots not yet in the repository data to fall back to
*/
public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String, IndexId> inFlightIds) {
public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String, IndexId> inFlightIds, int pathType) {
List<IndexId> snapshotIndices = new ArrayList<>();
for (String index : indicesToResolve) {
IndexId indexId = indices.get(index);
if (indexId == null) {
indexId = inFlightIds.get(index);
}
if (indexId == null) {
indexId = new IndexId(index, UUIDs.randomBase64UUID());
indexId = new IndexId(index, UUIDs.randomBase64UUID(), pathType);
}
snapshotIndices.add(indexId);
}
Expand All @@ -544,10 +544,16 @@ public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String
private static final String VERSION = "version";
private static final String MIN_VERSION = "min_version";

// Visible for testing only
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
return snapshotsToXContent(builder, repoMetaVersion, Version.CURRENT);
}

/**
* Writes the snapshots metadata and the related indices metadata to x-content.
*/
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion, final Version minNodeVersion)
throws IOException {
builder.startObject();
// write the snapshots list
builder.startArray(SNAPSHOTS);
Expand Down Expand Up @@ -578,6 +584,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
for (final IndexId indexId : getIndices().values()) {
builder.startObject(indexId.getName());
builder.field(INDEX_ID, indexId.getId());
if (minNodeVersion.onOrAfter(Version.CURRENT)) {
builder.field(IndexId.SHARD_PATH_TYPE, indexId.getShardPathType());
}
builder.startArray(SNAPSHOTS);
List<SnapshotId> snapshotIds = indexSnapshots.get(indexId);
assert snapshotIds != null;
Expand Down Expand Up @@ -765,14 +774,20 @@ private static void parseIndices(
final List<SnapshotId> snapshotIds = new ArrayList<>();
final List<String> gens = new ArrayList<>();

String id = null;
int pathType = IndexId.DEFAULT_SHARD_PATH_TYPE;
IndexId indexId = null;

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexMetaFieldName = parser.currentName();
final XContentParser.Token currentToken = parser.nextToken();
switch (indexMetaFieldName) {
case INDEX_ID:
indexId = new IndexId(indexName, parser.text());
id = parser.text();
break;
case IndexId.SHARD_PATH_TYPE:
pathType = parser.intValue();
break;
case SNAPSHOTS:
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, currentToken, parser);
Expand All @@ -795,7 +810,7 @@ private static void parseIndices(
// different versions create or delete snapshot in the same repository.
throw new OpenSearchParseException(
"Detected a corrupted repository, index "
+ indexId
+ new IndexId(indexName, id, pathType)
+ " references an unknown snapshot uuid ["
+ uuid
+ "]"
Expand All @@ -812,9 +827,10 @@ private static void parseIndices(
break;
}
}
assert indexId != null;
assert id != null;
indexId = new IndexId(indexName, id, pathType);
indexSnapshots.put(indexId, Collections.unmodifiableList(snapshotIds));
indexLookup.put(indexId.getId(), indexId);
indexLookup.put(id, indexId);
for (int i = 0; i < gens.size(); i++) {
String parsedGen = gens.get(i);
if (parsedGen != null) {
Expand Down
Loading

0 comments on commit f1a145f

Please sign in to comment.