Skip to content

Commit

Permalink
Separate Remote State and Publication enabled and configured methods
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Sep 25, 2024
1 parent 12dadcf commit 60fbfea
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public CoordinationState(
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
&& localNode.isRemoteStatePublicationConfigured();
}

public boolean isRemotePublicationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;
private final RemoteStoreNodeService remoteStoreNodeService;
private final RemoteClusterStateService remoteClusterStateService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -310,6 +311,7 @@ public Coordinator(
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
this.remoteStoreNodeService = remoteStoreNodeService;
this.remoteClusterStateService = remoteClusterStateService;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -903,9 +905,9 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
if (remoteClusterStateService != null) {
stats.add(remoteClusterStateService.getFullDownloadStats());
stats.add(remoteClusterStateService.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi

assert existingNodes.isEmpty() == false;
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.filter(DiscoveryNode::isRemoteStatePublicationConfigured)
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,6 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -356,7 +348,7 @@ public PublicationContext newPublicationContext(
) {
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
Expand All @@ -374,11 +366,11 @@ public PublicationContext newPublicationContext(
return publicationContext;
}

private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
if (node.isRemoteStatePublicationConfigured() == false) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ public boolean isRemoteStoreNode() {
}

/**
* Returns whether remote cluster state publication is enabled on this node
* Returns whether settings required for remote cluster state publication is configured
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
*/
public boolean isRemoteStatePublicationEnabled() {
public boolean isRemoteStatePublicationConfigured() {
return this.getAttributes()
.keySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -256,7 +257,7 @@ public RemoteClusterStateService(
List<IndexMetadataUploadListener> indexMetadataUploadListeners,
NamedWriteableRegistry namedWriteableRegistry
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
assert isRemoteClusterStateAttributePresent(settings) : "Remote cluster state is not enabled";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH;
import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory}
Expand Down Expand Up @@ -235,7 +235,7 @@ private Repository validateAndGetRepository(String repoSetting) {
}

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
assert isRemoteClusterStateAttributePresent(settings) == true : "Remote cluster state is not configured";
if (isRemoteDataAttributePresent == false) {
// If remote store data attributes are not present than we skip this.
return;
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -796,7 +796,7 @@ protected Node(
final RemoteClusterStateService remoteClusterStateService;
final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
final RemoteIndexPathUploader remoteIndexPathUploader;
if (isRemoteStoreClusterStateEnabled(settings)) {
if (isRemoteClusterStateAttributePresent(settings)) {
remoteIndexPathUploader = new RemoteIndexPathUploader(
threadPool,
settings,
Expand Down

0 comments on commit 60fbfea

Please sign in to comment.