Skip to content

Commit

Permalink
Modify mixed mode behaviour in Remote Publication Enabled
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Sep 2, 2024
1 parent 2a828af commit 985d25b
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,24 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.coordination.CoordinationState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.junit.Before;
Expand All @@ -44,6 +37,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -68,13 +62,13 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean isRemotePublicationEnabled = true;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
isRemotePublicationEnabled = true;
}

@Override
Expand All @@ -96,7 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(REMOTE_PUBLICATION_EXPERIMENTAL, true)
.put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled)
.build();
}

Expand Down Expand Up @@ -194,7 +188,9 @@ public void testMasterReElectionUsesIncrementalUpload() throws IOException {
ensureStableCluster(4);

persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest();

// coordination metadata is updated, it will be unequal
Expand All @@ -209,39 +205,29 @@ public void testMasterReElectionUsesIncrementalUpload() throws IOException {
assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting());
}


public void testVotingConfigAreCommitted() throws ExecutionException, InterruptedException {
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);
// add two new nodes to the cluster, to update the voting config
List<String> newCMNodes = internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY);
internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY);
ensureStableCluster(7);

internalCluster().getInstances(PersistedStateRegistry.class).forEach(
persistedStateRegistry -> {
CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL);
CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
if (remoteState != null) {
assertEquals(localState.getLastAcceptedState().getLastCommittedConfiguration(), remoteState.getLastAcceptedState().getLastCommittedConfiguration());
assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size());
}
}
);

// remove the newly added nodes from the voting config
client().admin().cluster().execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(newCMNodes.get(0), newCMNodes.get(1))).get();

internalCluster().getInstances(PersistedStateRegistry.class).forEach(
persistedStateRegistry -> {
CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL);
CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
if (remoteState != null) {
assertEquals(localState.getLastAcceptedState().getLastCommittedConfiguration(), remoteState.getLastAcceptedState().getLastCommittedConfiguration());
assertEquals(3, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size());
}
internalCluster().getInstances(PersistedStateRegistry.class).forEach(persistedStateRegistry -> {
CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.LOCAL
);
CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
if (remoteState != null) {
assertEquals(
localState.getLastAcceptedState().getLastCommittedConfiguration(),
remoteState.getLastAcceptedState().getLastCommittedConfiguration()
);
assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size());
}
);
});
}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,9 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
// if Remote publication is enabled, we only need to update the accepted state in followers node as elected cluster manager would
// have already updated the last accepted state

if (shouldUpdateRemotePersistedState(publishRequest)) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(publishRequest.getManifest());
updateRemotePersistedStateOnPublishRequest(publishRequest);
}
assert getLastAcceptedState() == clusterState;

Expand Down Expand Up @@ -629,10 +627,21 @@ public void close() throws IOException {

private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
return isRemotePublicationEnabled
&& publishRequest.getManifest() != null
&& localNode.isClusterManagerNode()
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null;
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
}

private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
if (publishRequest instanceof PublishWithManifestRequest) {
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null;
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
.setLastAcceptedManifest(((PublishWithManifestRequest) publishRequest).getAcceptedManifest());
} else {
// We will end up here if PublishRequest was sent not using Remote Store even with remotePublication enabled on this node
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
}
}

private boolean shouldCommitRemotePersistedState() {
Expand All @@ -642,7 +651,9 @@ private boolean shouldCommitRemotePersistedState() {
.getLastAcceptedState()
.getNodes()
.isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null;
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterM
return handlePublishRequest.apply(publishRequest);
}
}
return handlePublishRequest.apply(new PublishRequest(incomingState, manifest));
if (manifest != null) {
return handlePublishRequest.apply(new PublishWithManifestRequest(incomingState, manifest));
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

Expand All @@ -45,52 +44,32 @@
public class PublishRequest {

private final ClusterState acceptedState;
private final ClusterMetadataManifest manifest;

public PublishRequest(ClusterState acceptedState) {
this.acceptedState = acceptedState;
this.manifest = null;
}

public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) {
this.acceptedState = acceptedState;
this.manifest = manifest;
}

public ClusterState getAcceptedState() {
return acceptedState;
}

public ClusterMetadataManifest getManifest() {
return manifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PublishRequest)) return false;

PublishRequest that = (PublishRequest) o;

if (!Objects.equals(manifest, that.manifest)) return false;

return acceptedState.term() == that.acceptedState.term() && acceptedState.version() == that.acceptedState.version();
}

@Override
public int hashCode() {
return Objects.hash(acceptedState.term(), acceptedState.version(), manifest);
return Objects.hash(acceptedState.term(), acceptedState.version());
}

@Override
public String toString() {
return "PublishRequest{term="
+ acceptedState.term()
+ ", version="
+ acceptedState.version()
+ ", state="
+ acceptedState
+ (manifest != null ? ", manifest=" + manifest : "")
+ '}';
return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

/**
* This class represents a publish request that includes a {@link ClusterMetadataManifest}.
* It extends the {@link PublishRequest} class and adds a field for the accepted manifest.
*/
public class PublishWithManifestRequest extends PublishRequest {

private final ClusterMetadataManifest acceptedManifest;

public PublishWithManifestRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) {
super(acceptedState);
this.acceptedManifest = manifest;
}

public ClusterMetadataManifest getAcceptedManifest() {
return acceptedManifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;

PublishWithManifestRequest that = (PublishWithManifestRequest) o;

return acceptedManifest.equals(that.acceptedManifest);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acceptedManifest);
}

@Override
public String toString() {
return "PublishWithManifestRequest{term="
+ getAcceptedState().term()
+ ", version="
+ getAcceptedState().version()
+ ", state="
+ getAcceptedState()
+ "manifest="
+ acceptedManifest
+ "}";
}
}
45 changes: 19 additions & 26 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public ClusterMetadataManifest getLastAcceptedManifest() {
@Override
public void setLastAcceptedState(ClusterState clusterState) {
// for non leader node, update the lastAcceptedClusterState
if (clusterState.getNodes().isLocalNodeElectedClusterManager() == false) {
if (clusterState == null || clusterState.getNodes().isLocalNodeElectedClusterManager() == false) {
lastAcceptedState = clusterState;
return;
}
Expand Down Expand Up @@ -780,22 +780,14 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,

private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) {
assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion;
if (remoteClusterStateService.isRemotePublicationEnabled()) {
// If Remote Publication is enabled, we just need to ensure that we have the state/manifest available.
// As in case of publication enabled, we add the lastAcceptedState as part of publication quorum votes.
return lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedManifest.getCodecVersion() != codecVersion
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT;
} else {
// in case Remote Publication is disabled, we still need to upload the full state on every term update
// As we can't guarantee that nodes will have the latest state as accepted state
return lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getCodecVersion() != codecVersion
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT;
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| (remoteClusterStateService.isRemotePublicationEnabled() == false && lastAcceptedState.term() != clusterState.term())
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT
|| lastAcceptedManifest.getCodecVersion() != codecVersion) {
return true;
}
return false;
}

@Override
Expand All @@ -805,17 +797,18 @@ public void markLastAcceptedStateAsCommitted() {
assert lastAcceptedManifest != null : "Last accepted manifest is not present";
ClusterState clusterState = lastAcceptedState;
boolean shouldCommitVotingConfig = shouldCommitVotingConfig();
Metadata.Builder metadataBuilder;
if (shouldCommitVotingConfig) {
metadataBuilder = commitVotingConfiguration(lastAcceptedState);
} else {
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
}
if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
&& lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
metadataBuilder.clusterUUIDCommitted(true);
boolean isClusterUUIDUnknown = lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID);
boolean isClusterUUIDCommitted = lastAcceptedState.metadata().clusterUUIDCommitted();
if (shouldCommitVotingConfig || (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false)) {
Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
if (shouldCommitVotingConfig) {
metadataBuilder = commitVotingConfiguration(lastAcceptedState);
}
if (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false) {
metadataBuilder.clusterUUIDCommitted(true);
}
clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build();
}
clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build();
if (clusterState.getNodes().isLocalNodeElectedClusterManager()) {
final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted(
clusterState,
Expand Down
Loading

0 comments on commit 985d25b

Please sign in to comment.