Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Shallow Snapshot flows to support remote path type & hash algo #12988

Merged
merged 7 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -284,7 +286,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {

indexDocuments(client, indexName1, randomIntBetween(5, 10));
ensureGreen(indexName1);
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1)));
Expand All @@ -301,7 +303,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
.get();
assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status());
ensureGreen(restoredIndexName1version1);
validatePathType(restoredIndexName1version1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(restoredIndexName1version1, PathType.FIXED);

client(clusterManagerNode).admin()
.cluster()
Expand All @@ -327,16 +329,22 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

// Validating that custom data has not changed for indexes which were created before the cluster setting got updated
validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A);
validatePathType(indexName1, PathType.FIXED);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}

private void validatePathType(String index, PathType pathType, PathHashAlgorithm pathHashAlgorithm) {
private void validatePathType(String index, PathType pathType) {
validatePathType(index, pathType, null);
}

private void validatePathType(String index, PathType pathType, @Nullable PathHashAlgorithm pathHashAlgorithm) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Validate that the remote_store custom data is present in index metadata for the created index.
Map<String, String> remoteCustomData = state.metadata().index(index).getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assertNotNull(remoteCustomData);
assertEquals(pathType.name(), remoteCustomData.get(PathType.NAME));
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
if (Objects.nonNull(pathHashAlgorithm)) {
assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME));
}
}

public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ public MetadataCreateIndexService(

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings())
? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings(), minNodeVersionSupplier)
: null;
}

Expand Down Expand Up @@ -575,22 +576,18 @@ public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdB
if (remoteStorePathStrategyResolver != null) {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingRemoteCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
Map<String, String> remoteCustomData = existingRemoteCustomData == null
? new HashMap<>()
: new HashMap<>(existingRemoteCustomData);
Map<String, String> existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert assertNullOldType == false || Objects.isNull(existingCustomData);

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
String oldPathType = remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
String oldHashAlgorithm = remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
assert !assertNullOldType || (Objects.isNull(oldPathType) && Objects.isNull(oldHashAlgorithm));
Map<String, String> remoteCustomData = new HashMap<>();
remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) {
remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
}
logger.trace(
() -> new ParameterizedMessage(
"Added newPathStrategy={}, replaced oldPathType={} oldHashAlgorithm={}",
newPathStrategy,
oldPathType,
oldHashAlgorithm
)
() -> new ParameterizedMessage("Added newStrategy={}, replaced oldStrategy={}", remoteCustomData, existingCustomData)
);
tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData);
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1910,12 +1911,11 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
if (remoteCustomData != null
&& remoteCustomData.containsKey(PathType.NAME)
&& remoteCustomData.containsKey(PathHashAlgorithm.NAME)) {
if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) {
PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME));
PathHashAlgorithm pathHashAlgorithm = PathHashAlgorithm.parseString(remoteCustomData.get(PathHashAlgorithm.NAME));
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME);
PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null;
return new RemoteStorePathStrategy(pathType, hashAlgorithm);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}
return new RemoteStorePathStrategy(PathType.FIXED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
import org.opensearch.common.hash.FNV1a;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;

Expand Down Expand Up @@ -78,9 +82,10 @@ public String getName() {
*/
@PublicApi(since = "2.14.0")
public enum PathType {
FIXED {
FIXED(0) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.isNull(hashAlgorithm) : "hashAlgorithm is expected to be null with fixed remote store path type";
// Hash algorithm is not used in FIXED path type
return pathInput.basePath()
.add(pathInput.indexUUID())
Expand All @@ -94,7 +99,7 @@ boolean requiresHashAlgorithm() {
return false;
}
},
HASHED_PREFIX {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
Expand All @@ -112,6 +117,33 @@ boolean requiresHashAlgorithm() {
}
};

private final int code;

PathType(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathType> CODE_TO_ENUM;
static {
PathType[] values = values();
Map<Integer, PathType> codeToStatus = new HashMap<>(values.length);
for (PathType value : values) {
codeToStatus.put(value.code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathType}.
*/
public static PathType fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

/**
* This method generates the path for the given path input which constitutes multiple fields and characteristics
* of the data.
Expand All @@ -131,7 +163,7 @@ public BlobPath path(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
return generatePath(pathInput, hashAlgorithm);
}

abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);
protected abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm);

abstract boolean requiresHashAlgorithm();

Expand All @@ -158,7 +190,7 @@ public static PathType parseString(String pathType) {
@PublicApi(since = "2.14.0")
public enum PathHashAlgorithm {

FNV_1A {
FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
Expand All @@ -167,6 +199,33 @@ long hash(PathInput pathInput) {
}
};

private final int code;
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

PathHashAlgorithm(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;
static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
for (PathHashAlgorithm value : values) {
codeToStatus.put(value.code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathHashAlgorithm}.
*/
public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

abstract long hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ public RemoteStorePathStrategy(PathType type) {
}

public RemoteStorePathStrategy(PathType type, PathHashAlgorithm hashAlgorithm) {
assert type.requiresHashAlgorithm() == false || Objects.nonNull(hashAlgorithm);
assert isCompatible(type, hashAlgorithm);
this.type = Objects.requireNonNull(type);
this.hashAlgorithm = hashAlgorithm;
}

public static boolean isCompatible(PathType type, PathHashAlgorithm hashAlgorithm) {
return (type.requiresHashAlgorithm() == false && Objects.isNull(hashAlgorithm))
|| (type.requiresHashAlgorithm() && Objects.nonNull(hashAlgorithm));
}

public PathType getType() {
return type;
}
Expand All @@ -55,7 +60,7 @@ public String toString() {
}

public BlobPath generatePath(PathInput pathInput) {
return type.generatePath(pathInput, hashAlgorithm);
return type.path(pathInput, hashAlgorithm);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.IndicesService;

import java.util.function.Supplier;

/**
* Determines the {@link RemoteStorePathStrategy} at the time of index metadata creation.
*
Expand All @@ -22,13 +25,22 @@ public class RemoteStorePathStrategyResolver {

private volatile PathType type;

public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings) {
private final Supplier<Version> minNodeVersionSupplier;

public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings, Supplier<Version> minNodeVersionSupplier) {
this.minNodeVersionSupplier = minNodeVersionSupplier;
type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, this::setType);
}

public RemoteStorePathStrategy get() {
return new RemoteStorePathStrategy(type, PathHashAlgorithm.FNV_1A);
PathType pathType;
PathHashAlgorithm pathHashAlgorithm;
// Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it.
pathType = Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 ? type : PathType.FIXED;
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
// If the path type is fixed, hash algorithm is not applicable.
pathHashAlgorithm = pathType == PathType.FIXED ? null : PathHashAlgorithm.FNV_1A;
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

private void setType(PathType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
Expand Down Expand Up @@ -412,8 +410,7 @@
remoteStoreRepository,
indexUUID,
shardId,
new RemoteStorePathStrategy(PathType.FIXED)
// TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
shallowCopyShardMetadata.getRemoteStorePathStrategy()

Check warning on line 413 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L413

Added line #L413 was not covered by tests
);
sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
Expand Down
Loading
Loading