diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index b43a7dee3e838..d1d9aa7b285a9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -8,13 +8,8 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsResponse; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.Client; @@ -22,17 +17,15 @@ import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; -import org.opensearch.core.action.ActionResponse; import org.opensearch.gateway.GatewayMetaState; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.discovery.DiscoveryStats; -import org.opensearch.gateway.GatewayMetaState; import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; import org.junit.Before; @@ -44,6 +37,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -68,13 +62,13 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase { private static String INDEX_NAME = "test-index"; private boolean isRemoteStateEnabled = true; - private String isRemotePublicationEnabled = "true"; + private boolean isRemotePublicationEnabled = true; @Before public void setup() { asyncUploadMockFsRepo = false; isRemoteStateEnabled = true; - isRemotePublicationEnabled = "true"; + isRemotePublicationEnabled = true; } @Override @@ -96,7 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName) .put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE) .put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath) - .put(REMOTE_PUBLICATION_EXPERIMENTAL, true) + .put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled) .build(); } @@ -194,7 +188,9 @@ public void testMasterReElectionUsesIncrementalUpload() throws IOException { ensureStableCluster(4); persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class); - CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); + CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest(); // coordination metadata is updated, it will be unequal @@ -209,39 +205,29 @@ public void testMasterReElectionUsesIncrementalUpload() throws IOException { assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting()); } - public void testVotingConfigAreCommitted() throws ExecutionException, InterruptedException { prepareCluster(3, 2, INDEX_NAME, 1, 2); ensureStableCluster(5); ensureGreen(INDEX_NAME); // add two new nodes to the cluster, to update the voting config - List newCMNodes = internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY); + internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY); ensureStableCluster(7); - internalCluster().getInstances(PersistedStateRegistry.class).forEach( - persistedStateRegistry -> { - CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL); - CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); - if (remoteState != null) { - assertEquals(localState.getLastAcceptedState().getLastCommittedConfiguration(), remoteState.getLastAcceptedState().getLastCommittedConfiguration()); - assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size()); - } - } - ); - - // remove the newly added nodes from the voting config - client().admin().cluster().execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(newCMNodes.get(0), newCMNodes.get(1))).get(); - - internalCluster().getInstances(PersistedStateRegistry.class).forEach( - persistedStateRegistry -> { - CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL); - CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); - if (remoteState != null) { - assertEquals(localState.getLastAcceptedState().getLastCommittedConfiguration(), remoteState.getLastAcceptedState().getLastCommittedConfiguration()); - assertEquals(3, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size()); - } + internalCluster().getInstances(PersistedStateRegistry.class).forEach(persistedStateRegistry -> { + CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.LOCAL + ); + CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); + if (remoteState != null) { + assertEquals( + localState.getLastAcceptedState().getLastCommittedConfiguration(), + remoteState.getLastAcceptedState().getLastCommittedConfiguration() + ); + assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size()); } - ); + }); } private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index d08f64be0bc0f..9990bc8940297 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -461,11 +461,9 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.term() ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); - // if Remote publication is enabled, we only need to update the accepted state in followers node as elected cluster manager would - // have already updated the last accepted state + if (shouldUpdateRemotePersistedState(publishRequest)) { - persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState); - persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(publishRequest.getManifest()); + updateRemotePersistedStateOnPublishRequest(publishRequest); } assert getLastAcceptedState() == clusterState; @@ -629,10 +627,21 @@ public void close() throws IOException { private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) { return isRemotePublicationEnabled - && publishRequest.getManifest() != null && localNode.isClusterManagerNode() - && publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false - && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null; + && publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false; + } + + private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) { + if (publishRequest instanceof PublishWithManifestRequest) { + assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null; + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState()); + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) + .setLastAcceptedManifest(((PublishWithManifestRequest) publishRequest).getAcceptedManifest()); + } else { + // We will end up here if PublishRequest was sent not using Remote Store even with remotePublication enabled on this node + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null); + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null); + } } private boolean shouldCommitRemotePersistedState() { @@ -642,7 +651,9 @@ private boolean shouldCommitRemotePersistedState() { .getLastAcceptedState() .getNodes() .isLocalNodeElectedClusterManager() == false - && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null; + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; } /** diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index ae339d1292793..d247af9ff0747 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -324,7 +324,10 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterM return handlePublishRequest.apply(publishRequest); } } - return handlePublishRequest.apply(new PublishRequest(incomingState, manifest)); + if (manifest != null) { + return handlePublishRequest.apply(new PublishWithManifestRequest(incomingState, manifest)); + } + return handlePublishRequest.apply(new PublishRequest(incomingState)); } private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java index 0fedcf59c28e5..e7c3e2d2c965b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java @@ -32,7 +32,6 @@ package org.opensearch.cluster.coordination; import org.opensearch.cluster.ClusterState; -import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.util.Objects; @@ -45,26 +44,15 @@ public class PublishRequest { private final ClusterState acceptedState; - private final ClusterMetadataManifest manifest; public PublishRequest(ClusterState acceptedState) { this.acceptedState = acceptedState; - this.manifest = null; - } - - public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) { - this.acceptedState = acceptedState; - this.manifest = manifest; } public ClusterState getAcceptedState() { return acceptedState; } - public ClusterMetadataManifest getManifest() { - return manifest; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -72,25 +60,16 @@ public boolean equals(Object o) { PublishRequest that = (PublishRequest) o; - if (!Objects.equals(manifest, that.manifest)) return false; - return acceptedState.term() == that.acceptedState.term() && acceptedState.version() == that.acceptedState.version(); } @Override public int hashCode() { - return Objects.hash(acceptedState.term(), acceptedState.version(), manifest); + return Objects.hash(acceptedState.term(), acceptedState.version()); } @Override public String toString() { - return "PublishRequest{term=" - + acceptedState.term() - + ", version=" - + acceptedState.version() - + ", state=" - + acceptedState - + (manifest != null ? ", manifest=" + manifest : "") - + '}'; + return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}'; } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublishWithManifestRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/PublishWithManifestRequest.java new file mode 100644 index 0000000000000..9c33f0de69004 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublishWithManifestRequest.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; + +import java.util.Objects; + +/** + * This class represents a publish request that includes a {@link ClusterMetadataManifest}. + * It extends the {@link PublishRequest} class and adds a field for the accepted manifest. + */ +public class PublishWithManifestRequest extends PublishRequest { + + private final ClusterMetadataManifest acceptedManifest; + + public PublishWithManifestRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) { + super(acceptedState); + this.acceptedManifest = manifest; + } + + public ClusterMetadataManifest getAcceptedManifest() { + return acceptedManifest; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + PublishWithManifestRequest that = (PublishWithManifestRequest) o; + + return acceptedManifest.equals(that.acceptedManifest); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), acceptedManifest); + } + + @Override + public String toString() { + return "PublishWithManifestRequest{term=" + + getAcceptedState().term() + + ", version=" + + getAcceptedState().version() + + ", state=" + + getAcceptedState() + + "manifest=" + + acceptedManifest + + "}"; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index cb7053379ac08..b3836edcd7d6c 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -705,7 +705,7 @@ public ClusterMetadataManifest getLastAcceptedManifest() { @Override public void setLastAcceptedState(ClusterState clusterState) { // for non leader node, update the lastAcceptedClusterState - if (clusterState.getNodes().isLocalNodeElectedClusterManager() == false) { + if (clusterState == null || clusterState.getNodes().isLocalNodeElectedClusterManager() == false) { lastAcceptedState = clusterState; return; } @@ -780,22 +780,14 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) { assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion; - if (remoteClusterStateService.isRemotePublicationEnabled()) { - // If Remote Publication is enabled, we just need to ensure that we have the state/manifest available. - // As in case of publication enabled, we add the lastAcceptedState as part of publication quorum votes. - return lastAcceptedState == null - || lastAcceptedManifest == null - || lastAcceptedManifest.getCodecVersion() != codecVersion - || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT; - } else { - // in case Remote Publication is disabled, we still need to upload the full state on every term update - // As we can't guarantee that nodes will have the latest state as accepted state - return lastAcceptedState == null - || lastAcceptedManifest == null - || lastAcceptedState.term() != clusterState.term() - || lastAcceptedManifest.getCodecVersion() != codecVersion - || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT; + if (lastAcceptedState == null + || lastAcceptedManifest == null + || (remoteClusterStateService.isRemotePublicationEnabled() == false && lastAcceptedState.term() != clusterState.term()) + || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT + || lastAcceptedManifest.getCodecVersion() != codecVersion) { + return true; } + return false; } @Override @@ -805,17 +797,18 @@ public void markLastAcceptedStateAsCommitted() { assert lastAcceptedManifest != null : "Last accepted manifest is not present"; ClusterState clusterState = lastAcceptedState; boolean shouldCommitVotingConfig = shouldCommitVotingConfig(); - Metadata.Builder metadataBuilder; - if (shouldCommitVotingConfig) { - metadataBuilder = commitVotingConfiguration(lastAcceptedState); - } else { - metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); - } - if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false - && lastAcceptedState.metadata().clusterUUIDCommitted() == false) { - metadataBuilder.clusterUUIDCommitted(true); + boolean isClusterUUIDUnknown = lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID); + boolean isClusterUUIDCommitted = lastAcceptedState.metadata().clusterUUIDCommitted(); + if (shouldCommitVotingConfig || (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false)) { + Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); + if (shouldCommitVotingConfig) { + metadataBuilder = commitVotingConfiguration(lastAcceptedState); + } + if (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false) { + metadataBuilder.clusterUUIDCommitted(true); + } + clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); } - clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); if (clusterState.getNodes().isLocalNodeElectedClusterManager()) { final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( clusterState, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index f9640223fb7cd..5535b048c09d0 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -68,6 +68,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.mockito.Mockito.spy; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.common.util.FeatureFlags.initializeFeatureFlags; import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; @@ -929,7 +930,7 @@ public void testSafety() { public void testHandlePrePublishAndCommitWhenRemoteStateDisabled() { final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); - final PersistedStateRegistry persistedStateRegistrySpy = Mockito.spy(persistedStateRegistry); + final PersistedStateRegistry persistedStateRegistrySpy = spy(persistedStateRegistry); final CoordinationState coordinationState = createCoordinationState(persistedStateRegistrySpy, node1, Settings.EMPTY); final VotingConfiguration initialConfig = VotingConfiguration.of(node1); final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); @@ -1006,15 +1007,6 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep public void testHandlePublishRequestOnFollowerWhenRemotePublicationEnabled() { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); - DiscoveryNode nodeWithPub = createNode( - "nodeWithPub", - Map.of( - REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, - "", - REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, - "" - ) - ); // cluster manager is node1 and node2 is a follower node VotingConfiguration initialConfig = VotingConfiguration.of(node1); ClusterState state1 = clusterState( @@ -1074,18 +1066,18 @@ public void testHandlePublishRequestOnFollowerWhenRemotePublicationEnabled() { .clusterUUIDCommitted(true) .build(); - PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishRequest(state2, manifest)); + PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishWithManifestRequest(state2, manifest)); assertEquals(state2.term(), publishResponse.getTerm()); assertEquals(state2.version(), publishResponse.getVersion()); verifyNoInteractions(remoteClusterStateService); assertEquals(state2, persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); assertEquals( manifest, - ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastAcceptedManifest() + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() ); } - public void testHandleCommitOnFollowerNodeWhenRemoteStateEnabled() { + public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabled() { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); VotingConfiguration initialConfig = VotingConfiguration.of(node1, nodeWithPub); ClusterState state1 = clusterState( @@ -1152,7 +1144,7 @@ public void testHandleCommitOnFollowerNodeWhenRemoteStateEnabled() { .build(); PublishRequest publishRequest = coordinationState.handleClientValue(state2); - coordinationState.handlePublishRequest(new PublishRequest(publishRequest.getAcceptedState(), manifest)); + coordinationState.handlePublishRequest(new PublishWithManifestRequest(publishRequest.getAcceptedState(), manifest)); ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version()); coordinationState.handleCommit(applyCommitRequest); verifyNoInteractions(remoteClusterStateService); @@ -1160,12 +1152,129 @@ public void testHandleCommitOnFollowerNodeWhenRemoteStateEnabled() { persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState().metadata().clusterUUIDCommitted() ); assertTrue( - ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastAcceptedManifest() - .isCommitted() + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest().isCommitted() ); assertEquals(coordinationState.getLastCommittedConfiguration(), newConfig); } + public void testRemotePersistedStateResetsForPublicationEnabledAfterLocalPublication() { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + // cluster manager is node1 and nodeWithPub is a follower node + VotingConfiguration initialConfig = VotingConfiguration.of(node1); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + coordinationState.handleStartJoin(startJoinRequest); + + ClusterState state2 = setValue( + ClusterState.builder(state1) + .metadata( + Metadata.builder(state1.metadata()) + .coordinationMetadata(CoordinationMetadata.builder(state1.coordinationMetadata()).term(newTerm).build()) + .build() + ) + .version(randomLongBetween(1, 10)) + .build(), + 43L + ); + + PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishRequest(state2)); + assertEquals(state2.term(), publishResponse.getTerm()); + assertEquals(state2.version(), publishResponse.getVersion()); + verifyNoInteractions(remoteClusterStateService); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + } + + public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRemotePersistedState() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + VotingConfiguration initialConfig = VotingConfiguration.of(node1, nodeWithPub); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + Join v1 = cs1.handleStartJoin(startJoinRequest); + Join v2 = coordinationState.handleStartJoin(startJoinRequest); + assertTrue(coordinationState.handleJoin(v1)); + assertTrue(coordinationState.handleJoin(v2)); + assertTrue(coordinationState.electionWon()); + VotingConfiguration newConfig = VotingConfiguration.of(node1, nodeWithPub, node3); + ClusterState state2 = clusterState( + startJoinRequest.getTerm(), + 2L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + newConfig, + 7L + ); + + PublishRequest publishRequest = coordinationState.handleClientValue(state2); + coordinationState.handlePublishRequest(new PublishRequest(publishRequest.getAcceptedState())); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version()); + PersistedState spyRPS = spy(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)); + coordinationState.handleCommit(applyCommitRequest); + verifyNoInteractions(spyRPS); + verifyNoInteractions(remoteClusterStateService); + } + public void testIsRemotePublicationEnabled_WithInconsistentSettings() { // create settings with remote state disabled but publication enabled Settings settings = Settings.builder() diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index e9a45fc230fa0..9753fe5e564fd 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -209,6 +209,10 @@ public void testSetCurrentTerm() throws IOException { } } + private ClusterState createClusterState(long version, Metadata metadata) { + return createClusterState(version, metadata, false); + } + private ClusterState createClusterState(long version, Metadata metadata, boolean isClusterManagerNode) { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()); if (isClusterManagerNode) { @@ -225,7 +229,7 @@ private ClusterState createClusterStateWithNodes(long version, Metadata metadata Sets.newHashSet(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), Version.V_2_13_0 ); - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).add(oldNode).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).clusterManagerNodeId(localNode.getId()).add(oldNode).build(); return ClusterState.builder(clusterName).nodes(discoveryNodes).version(version).metadata(metadata).build(); } @@ -282,7 +286,7 @@ public void testSetLastAcceptedState() throws IOException { .coordinationMetadata(createCoordinationMetadata(term)) .put(indexMetadata, false) .build(); - ClusterState state = createClusterState(version, metadata, false); + ClusterState state = createClusterState(version, metadata); gateway.setLastAcceptedState(state); gateway = maybeNew(gateway); @@ -307,8 +311,7 @@ public void testSetLastAcceptedStateTermChanged() throws IOException { final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, version); final ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build(), - false + Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build() ); gateway.setLastAcceptedState(state); @@ -318,8 +321,7 @@ public void testSetLastAcceptedStateTermChanged() throws IOException { final IndexMetadata newIndexMetadata = createIndexMetadata(indexName, newNumberOfShards, version); final ClusterState newClusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(createCoordinationMetadata(newTerm)).put(newIndexMetadata, false).build(), - false + Metadata.builder().coordinationMetadata(createCoordinationMetadata(newTerm)).put(newIndexMetadata, false).build() ); gateway.setLastAcceptedState(newClusterState); @@ -342,8 +344,7 @@ public void testCurrentTermAndTermAreDifferent() throws IOException { gateway.setLastAcceptedState( createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(term).build()).build(), - false + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(term).build()).build() ) ); @@ -368,8 +369,7 @@ public void testMarkAcceptedConfigAsCommitted() throws IOException { ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build(), - false + Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build() ); gateway.setLastAcceptedState(state); @@ -415,8 +415,7 @@ public void testStatePersistedOnLoad() throws IOException { ); final ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build(), - false + Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build() ); try ( GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState( @@ -540,8 +539,7 @@ public void testDataOnlyNodePersistence() throws Exception { ClusterState state = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build(), - false + Metadata.builder().coordinationMetadata(coordinationMetadata).clusterUUID(randomAlphaOfLength(10)).build() ); persistedState.setCurrentTerm(state.term()); persistedState.setLastAcceptedState(state); @@ -606,8 +604,7 @@ public void testDataOnlyNodePersistence() throws Exception { final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i); state = createClusterState( state.version() + 1, - Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build(), - false + Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build() ); persistedState.setLastAcceptedState(state); } @@ -652,11 +649,7 @@ Directory createDirectory(Path path) { return wrapper; } }; - ClusterState state = createClusterState( - randomNonNegativeLong(), - Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build(), - false - ); + ClusterState state = createClusterState(randomNonNegativeLong(), Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build()); long currentTerm = 42L; try ( GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState( @@ -670,8 +663,7 @@ Directory createDirectory(Path path) { if (randomBoolean()) { final ClusterState newState = createClusterState( randomNonNegativeLong(), - Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build(), - false + Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build() ); persistedState.setLastAcceptedState(newState); state = newState; @@ -700,7 +692,7 @@ Directory createDirectory(Path path) { .coordinationMetadata(createCoordinationMetadata(1L)) .put(indexMetadata, false) .build(); - state = createClusterState(version, metadata, false); + state = createClusterState(version, metadata); persistedState.setLastAcceptedState(state); } else { currentTerm += 1; @@ -832,7 +824,7 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx ClusterState clusterState2 = createClusterState( randomNonNegativeLong(), Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build(), - false + true ); final ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder() .clusterTerm(1L) @@ -848,7 +840,7 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx ClusterState clusterState3 = createClusterState( randomNonNegativeLong(), Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build(), - false + true ); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(new RemoteClusterStateManifestInfo(manifest2, "path/to/manifest3")); diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index ec88002317284..baaf17aec0907 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2164,6 +2164,10 @@ public String getClusterManagerName(@Nullable String viaNode) { } } + public Set getClusterManagers() { + return nodes.entrySet().stream().filter(entry -> CLUSTER_MANAGER_NODE_PREDICATE.test(entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet()); + } + /** * Returns the name of the current cluster-manager node in the cluster. *