diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java index 2209e24488691..c86490552f2f2 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java @@ -70,7 +70,12 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - return List.of(new RestCreateQueryGroupAction(), new RestGetQueryGroupAction(), new RestDeleteQueryGroupAction(), new RestUpdateQueryGroupAction()); + return List.of( + new RestCreateQueryGroupAction(), + new RestGetQueryGroupAction(), + new RestDeleteQueryGroupAction(), + new RestUpdateQueryGroupAction() + ); } @Override diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java index d7850fa5cb5a2..5c457c406061e 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java @@ -11,15 +11,14 @@ import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.cluster.metadata.QueryGroup; -import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.ResourceType; -import org.joda.time.Instant; +import org.opensearch.wlm.ChangeableQueryGroup; +import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode; import java.io.IOException; -import java.util.HashMap; import java.util.Map; /** @@ -29,38 +28,16 @@ */ public class UpdateQueryGroupRequest extends ActionRequest { private final String name; - private final Map resourceLimits; - private final ResiliencyMode resiliencyMode; - private final long updatedAtInMillis; - - /** - * Constructor for UpdateQueryGroupRequest - * @param queryGroup - A {@link QueryGroup} object - */ - public UpdateQueryGroupRequest(QueryGroup queryGroup) { - this.name = queryGroup.getName(); - this.resiliencyMode = queryGroup.getResiliencyMode(); - this.resourceLimits = queryGroup.getResourceLimits(); - this.updatedAtInMillis = queryGroup.getUpdatedAtInMillis(); - } + private final ChangeableQueryGroup changeableQueryGroup; /** * Constructor for UpdateQueryGroupRequest * @param name - QueryGroup name for UpdateQueryGroupRequest - * @param resiliencyMode - QueryGroup mode for UpdateQueryGroupRequest - * @param resourceLimits - QueryGroup resourceLimits for UpdateQueryGroupRequest - * @param updatedAtInMillis - QueryGroup updated time in millis for UpdateQueryGroupRequest + * @param changeableQueryGroup - ChangeableQueryGroup for UpdateQueryGroupRequest */ - public UpdateQueryGroupRequest( - String name, - ResiliencyMode resiliencyMode, - Map resourceLimits, - long updatedAtInMillis - ) { + public UpdateQueryGroupRequest(String name, ChangeableQueryGroup changeableQueryGroup) { this.name = name; - this.resiliencyMode = resiliencyMode; - this.resourceLimits = resourceLimits; - this.updatedAtInMillis = updatedAtInMillis; + this.changeableQueryGroup = changeableQueryGroup; } /** @@ -68,16 +45,7 @@ public UpdateQueryGroupRequest( * @param in - A {@link StreamInput} object */ public UpdateQueryGroupRequest(StreamInput in) throws IOException { - super(in); - name = in.readString(); - if (in.readBoolean()) { - resourceLimits = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble); - } else { - resourceLimits = new HashMap<>(); - } - String updatedResiliencyMode = in.readOptionalString(); - resiliencyMode = updatedResiliencyMode == null ? null : ResiliencyMode.fromName(updatedResiliencyMode); - updatedAtInMillis = in.readLong(); + this(in.readString(), new ChangeableQueryGroup(in)); } /** @@ -87,16 +55,12 @@ public UpdateQueryGroupRequest(StreamInput in) throws IOException { */ public static UpdateQueryGroupRequest fromXContent(XContentParser parser, String name) throws IOException { QueryGroup.Builder builder = QueryGroup.Builder.fromXContent(parser); - return new UpdateQueryGroupRequest(name, builder.getResiliencyMode(), builder.getResourceLimits(), Instant.now().getMillis()); + return new UpdateQueryGroupRequest(name, builder.getChangeableQueryGroup()); } @Override public ActionRequestValidationException validate() { QueryGroup.validateName(name); - if (resourceLimits != null) { - QueryGroup.validateResourceLimits(resourceLimits); - } - assert QueryGroup.isValid(updatedAtInMillis); return null; } @@ -108,37 +72,30 @@ public String getName() { } /** - * ResourceLimits getter + * resourceLimits getter */ public Map getResourceLimits() { - return resourceLimits; + return getChangeableQueryGroup().getResourceLimits(); } /** * resiliencyMode getter */ public ResiliencyMode getResiliencyMode() { - return resiliencyMode; + return getChangeableQueryGroup().getResiliencyMode(); } /** - * updatedAtInMillis getter + * changeableQueryGroup getter */ - public long getUpdatedAtInMillis() { - return updatedAtInMillis; + public ChangeableQueryGroup getChangeableQueryGroup() { + return changeableQueryGroup; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(name); - if (resourceLimits == null || resourceLimits.isEmpty()) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble); - } - out.writeOptionalString(resiliencyMode == null ? null : resiliencyMode.getName()); - out.writeLong(updatedAtInMillis); + changeableQueryGroup.writeTo(out); } } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java index df4df5e67e692..5f7c2576c04b9 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java @@ -18,7 +18,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; -import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode; import org.opensearch.cluster.service.ClusterManagerTaskThrottler.ThrottlingKey; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; @@ -33,6 +32,9 @@ import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest; import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse; import org.opensearch.search.ResourceType; +import org.opensearch.wlm.ChangeableQueryGroup; +import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode; +import org.joda.time.Instant; import java.util.Collection; import java.util.EnumMap; @@ -40,7 +42,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import java.util.HashMap; /** * This class defines the functions for QueryGroup persistence @@ -244,7 +245,7 @@ ClusterState deleteQueryGroupInClusterState(final String name, final ClusterStat .orElseThrow(() -> new ResourceNotFoundException("No QueryGroup exists with the provided name: " + name)); return ClusterState.builder(currentClusterState).metadata(Metadata.builder(metadata).remove(queryGroupToRemove).build()).build(); - } + } /** * Modify cluster state to update the QueryGroup @@ -315,9 +316,8 @@ ClusterState updateQueryGroupInClusterState(UpdateQueryGroupRequest updateQueryG final QueryGroup updatedGroup = new QueryGroup( name, existingGroup.get_id(), - mode, - updatedResourceLimits, - updateQueryGroupRequest.getUpdatedAtInMillis() + new ChangeableQueryGroup(mode, updatedResourceLimits), + Instant.now().getMillis() ); return ClusterState.builder(currentState) .metadata(Metadata.builder(metadata).remove(existingGroup).put(updatedGroup).build()) diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java index 7d0f686285533..ea08f2be09cfc 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/QueryGroupTestUtils.java @@ -22,6 +22,7 @@ import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; import org.opensearch.search.ResourceType; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.ChangeableQueryGroup; import java.util.ArrayList; import java.util.Collection; @@ -32,7 +33,6 @@ import java.util.Set; import static org.opensearch.cluster.metadata.QueryGroup.builder; -import static org.opensearch.search.ResourceType.fromName; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -44,21 +44,17 @@ public class QueryGroupTestUtils { public static final String _ID_ONE = "AgfUO5Ja9yfsYlONlYi3TQ=="; public static final String _ID_TWO = "G5iIqHy4g7eK1qIAAAAIH53=1"; public static final String NAME_NONE_EXISTED = "query_group_none_existed"; - public static final String MEMORY_STRING = "memory"; - public static final String MONITOR_STRING = "monitor"; public static final long TIMESTAMP_ONE = 4513232413L; public static final long TIMESTAMP_TWO = 4513232415L; public static final QueryGroup queryGroupOne = builder().name(NAME_ONE) ._id(_ID_ONE) - .mode(MONITOR_STRING) - .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.3)) + .changeableQueryGroup(new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3))) .updatedAt(TIMESTAMP_ONE) .build(); public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO) ._id(_ID_TWO) - .mode(MONITOR_STRING) - .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.6)) + .changeableQueryGroup(new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.6))) .updatedAt(TIMESTAMP_TWO) .build(); @@ -140,14 +136,27 @@ public static void assertEqualResourceLimits( assertTrue(resourceLimitMapOne.values().containsAll(resourceLimitMapTwo.values())); } - public static void assertEqualQueryGroups(Collection collectionOne, Collection collectionTwo) { + public static void assertEqualQueryGroups( + Collection collectionOne, + Collection collectionTwo, + boolean assertUpdateAt + ) { assertEquals(collectionOne.size(), collectionTwo.size()); List listOne = new ArrayList<>(collectionOne); List listTwo = new ArrayList<>(collectionTwo); listOne.sort(Comparator.comparing(QueryGroup::getName)); listTwo.sort(Comparator.comparing(QueryGroup::getName)); for (int i = 0; i < listOne.size(); i++) { - assertTrue(listOne.get(i).equals(listTwo.get(i))); + if (assertUpdateAt) { + QueryGroup one = listOne.get(i); + QueryGroup two = listTwo.get(i); + assertEquals(one.getName(), two.getName()); + assertEquals(one.getResourceLimits(), two.getResourceLimits()); + assertEquals(one.getResiliencyMode(), two.getResiliencyMode()); + assertEquals(one.get_id(), two.get_id()); + } else { + assertEquals(listOne.get(i), listTwo.get(i)); + } } } } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java index b0fa96a46df80..dd9de4bf8fb1a 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java @@ -35,6 +35,6 @@ public void testSerialization() throws IOException { List list2 = new ArrayList<>(); list1.add(queryGroupOne); list2.add(otherRequest.getQueryGroup()); - assertEqualQueryGroups(list1, list2); + assertEqualQueryGroups(list1, list2, false); } } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java index ecb9a6b2dc0d2..713c62088b9ca 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java @@ -42,7 +42,7 @@ public void testSerialization() throws IOException { List listTwo = new ArrayList<>(); listOne.add(responseGroup); listTwo.add(otherResponseGroup); - QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo); + QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo, false); } /** diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java index 774f4b2d8db52..d9192dbb18fe4 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/GetQueryGroupResponseTests.java @@ -41,7 +41,7 @@ public void testSerializationSingleQueryGroup() throws IOException { GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput); assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); - QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups(), false); } /** @@ -58,7 +58,7 @@ public void testSerializationMultipleQueryGroup() throws IOException { GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput); assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); assertEquals(2, otherResponse.getQueryGroups().size()); - QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups(), false); } /** diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java index d804fb7569ba2..147759bceebfa 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequestTests.java @@ -8,20 +8,18 @@ package org.opensearch.plugin.wlm.action; -import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.search.ResourceType; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.wlm.ChangeableQueryGroup; +import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MEMORY_STRING; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MONITOR_STRING; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.TIMESTAMP_ONE; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.assertEqualResourceLimits; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne; @@ -31,7 +29,7 @@ public class UpdateQueryGroupRequestTests extends OpenSearchTestCase { * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest. */ public void testSerialization() throws IOException { - UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(queryGroupOne); + UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, queryGroupOne.getChangeableQueryGroup()); BytesStreamOutput out = new BytesStreamOutput(); request.writeTo(out); StreamInput streamInput = out.bytes().streamInput(); @@ -40,14 +38,13 @@ public void testSerialization() throws IOException { assertEquals(request.getResourceLimits().size(), otherRequest.getResourceLimits().size()); assertEquals(request.getResiliencyMode(), otherRequest.getResiliencyMode()); assertEqualResourceLimits(request.getResourceLimits(), otherRequest.getResourceLimits()); - assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); } /** * Test case to verify the serialization and deserialization of UpdateQueryGroupRequest with only name field. */ public void testSerializationOnlyName() throws IOException { - UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, null, new HashMap<>(), TIMESTAMP_ONE); + UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, new ChangeableQueryGroup(null, new HashMap<>())); BytesStreamOutput out = new BytesStreamOutput(); request.writeTo(out); StreamInput streamInput = out.bytes().streamInput(); @@ -55,7 +52,6 @@ public void testSerializationOnlyName() throws IOException { assertEquals(request.getName(), otherRequest.getName()); assertEquals(request.getResourceLimits(), otherRequest.getResourceLimits()); assertEquals(request.getResiliencyMode(), otherRequest.getResiliencyMode()); - assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); } /** @@ -64,9 +60,7 @@ public void testSerializationOnlyName() throws IOException { public void testSerializationOnlyResourceLimit() throws IOException { UpdateQueryGroupRequest request = new UpdateQueryGroupRequest( NAME_ONE, - null, - Map.of(ResourceType.fromName(MEMORY_STRING), 0.4), - TIMESTAMP_ONE + new ChangeableQueryGroup(null, Map.of(ResourceType.MEMORY, 0.4)) ); BytesStreamOutput out = new BytesStreamOutput(); request.writeTo(out); @@ -76,7 +70,6 @@ public void testSerializationOnlyResourceLimit() throws IOException { assertEquals(request.getResourceLimits().size(), otherRequest.getResourceLimits().size()); assertEqualResourceLimits(request.getResourceLimits(), otherRequest.getResourceLimits()); assertEquals(request.getResiliencyMode(), otherRequest.getResiliencyMode()); - assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); } /** @@ -87,9 +80,7 @@ public void testInvalidResourceLimitList() { IllegalArgumentException.class, () -> new UpdateQueryGroupRequest( NAME_ONE, - QueryGroup.ResiliencyMode.fromName(MONITOR_STRING), - Map.of(ResourceType.fromName("memory"), 0.3, ResourceType.fromName(MONITOR_STRING), 0.4), - TIMESTAMP_ONE + new ChangeableQueryGroup(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3, ResourceType.fromName("random"), 0.4)) ) ); } @@ -102,9 +93,7 @@ public void testInvalidEnforcement() { IllegalArgumentException.class, () -> new UpdateQueryGroupRequest( NAME_ONE, - QueryGroup.ResiliencyMode.fromName("random"), - Map.of(ResourceType.fromName("memory"), 0.3), - TIMESTAMP_ONE + new ChangeableQueryGroup(ResiliencyMode.fromName("random"), Map.of(ResourceType.fromName("memory"), 0.3)) ) ); } diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java index fe3d92763866c..650e0bdc061ce 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupResponseTests.java @@ -43,7 +43,7 @@ public void testSerialization() throws IOException { List list2 = new ArrayList<>(); list1.add(responseGroup); list2.add(otherResponseGroup); - QueryGroupTestUtils.assertEqualQueryGroups(list1, list2); + QueryGroupTestUtils.assertEqualQueryGroups(list1, list2, false); } /** diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java index fb6f34fa94035..08a727089ed34 100644 --- a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceServiceTests.java @@ -16,7 +16,6 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.QueryGroup; -import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; @@ -30,6 +29,8 @@ import org.opensearch.search.ResourceType; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.ChangeableQueryGroup; +import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode; import java.util.ArrayList; import java.util.Collection; @@ -43,8 +44,6 @@ import org.mockito.ArgumentCaptor; import static org.opensearch.cluster.metadata.QueryGroup.builder; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MEMORY_STRING; -import static org.opensearch.plugin.wlm.QueryGroupTestUtils.MONITOR_STRING; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_NONE_EXISTED; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE; import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_TWO; @@ -87,7 +86,7 @@ public void testCreateQueryGroup() { List listTwo = new ArrayList<>(); listOne.add(queryGroupOne); listTwo.add(updatedGroupsMap.get(_ID_ONE)); - assertEqualQueryGroups(listOne, listTwo); + assertEqualQueryGroups(listOne, listTwo, false); } /** @@ -103,7 +102,7 @@ public void testCreateAnotherQueryGroup() { assertEquals(2, updatedGroups.size()); assertTrue(updatedGroups.containsKey(_ID_TWO)); Collection values = updatedGroups.values(); - assertEqualQueryGroups(queryGroupList(), new ArrayList<>(values)); + assertEqualQueryGroups(queryGroupList(), new ArrayList<>(values), false); } /** @@ -115,8 +114,7 @@ public void testCreateQueryGroupDuplicateName() { ClusterState clusterState = setup.v2(); QueryGroup toCreate = builder().name(NAME_ONE) ._id("W5iIqHyhgi4K1qIAAAAIHw==") - .mode(MONITOR_STRING) - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.3)) + .changeableQueryGroup(new ChangeableQueryGroup(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3))) .updatedAt(1690934400000L) .build(); assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); @@ -130,8 +128,7 @@ public void testCreateQueryGroupOverflowAllocation() { Tuple setup = preparePersistenceServiceSetup(Map.of(_ID_TWO, queryGroupTwo)); QueryGroup toCreate = builder().name(NAME_ONE) ._id("W5iIqHyhgi4K1qIAAAAIHw==") - .mode(MONITOR_STRING) - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.41)) + .changeableQueryGroup(new ChangeableQueryGroup(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.41))) .updatedAt(1690934400000L) .build(); @@ -147,8 +144,7 @@ public void testCreateQueryGroupOverflowAllocation() { public void testCreateQueryGroupOverflowCount() { QueryGroup toCreate = builder().name(NAME_NONE_EXISTED) ._id("W5iIqHyhgi4K1qIAAAAIHw==") - .mode(MONITOR_STRING) - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.5)) + .changeableQueryGroup(new ChangeableQueryGroup(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.5))) .updatedAt(1690934400000L) .build(); Metadata metadata = Metadata.builder().queryGroups(Map.of(_ID_ONE, queryGroupOne, _ID_TWO, queryGroupTwo)).build(); @@ -271,7 +267,7 @@ public void testGetSingleQueryGroup() { List listTwo = new ArrayList<>(); listOne.add(QueryGroupTestUtils.queryGroupOne); listTwo.add(queryGroup); - QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo); + QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo, false); } /** @@ -285,7 +281,7 @@ public void testGetAllQueryGroups() { Set currentNAME = res.stream().map(QueryGroup::getName).collect(Collectors.toSet()); assertTrue(currentNAME.contains(QueryGroupTestUtils.NAME_ONE)); assertTrue(currentNAME.contains(QueryGroupTestUtils.NAME_TWO)); - QueryGroupTestUtils.assertEqualQueryGroups(QueryGroupTestUtils.queryGroupList(), res); + QueryGroupTestUtils.assertEqualQueryGroups(QueryGroupTestUtils.queryGroupList(), res, false); } /** @@ -311,7 +307,6 @@ public void testMaxQueryGroupCount() { } /** -<<<<<<< HEAD * Tests delete a single QueryGroup */ public void testDeleteSingleQueryGroup() { @@ -321,7 +316,7 @@ public void testDeleteSingleQueryGroup() { assertEquals(1, afterDeletionGroups.size()); List oldQueryGroups = new ArrayList<>(); oldQueryGroups.add(queryGroupOne); - assertEqualQueryGroups(new ArrayList<>(afterDeletionGroups.values()), oldQueryGroups); + assertEqualQueryGroups(new ArrayList<>(afterDeletionGroups.values()), oldQueryGroups, false); } /** @@ -363,23 +358,15 @@ public void testDeleteInClusterStateMetadata() throws Exception { } /** -======= ->>>>>>> 68348211caf (rebase) * Tests updating a QueryGroup with all fields */ public void testUpdateQueryGroupAllFields() { QueryGroup updated = builder().name(NAME_ONE) ._id(_ID_ONE) - .mode("enforced") - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.15)) + .changeableQueryGroup(new ChangeableQueryGroup(ResiliencyMode.ENFORCED, Map.of(ResourceType.MEMORY, 0.15))) .updatedAt(1690934400000L) .build(); - UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( - NAME_ONE, - ResiliencyMode.fromName("enforced"), - Map.of(ResourceType.fromName(MEMORY_STRING), 0.15), - 1690934400000L - ); + UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest(NAME_ONE, updated.getChangeableQueryGroup()); ClusterState newClusterState = queryGroupPersistenceService().updateQueryGroupInClusterState( updateQueryGroupRequest, clusterState() @@ -389,7 +376,7 @@ public void testUpdateQueryGroupAllFields() { List expectedList = new ArrayList<>(); expectedList.add(queryGroupTwo); expectedList.add(updated); - assertEqualQueryGroups(expectedList, updatedQueryGroups); + assertEqualQueryGroups(expectedList, updatedQueryGroups, true); } /** @@ -398,16 +385,10 @@ public void testUpdateQueryGroupAllFields() { public void testUpdateQueryGroupResourceLimitsOnly() { QueryGroup updated = builder().name(NAME_ONE) ._id(_ID_ONE) - .mode(MONITOR_STRING) - .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.15)) + .changeableQueryGroup(new ChangeableQueryGroup(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.15))) .updatedAt(1690934400000L) .build(); - UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( - NAME_ONE, - ResiliencyMode.fromName(MONITOR_STRING), - Map.of(ResourceType.fromName(MEMORY_STRING), 0.15), - 1690934400000L - ); + UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest(NAME_ONE, updated.getChangeableQueryGroup()); ClusterState newClusterState = queryGroupPersistenceService().updateQueryGroupInClusterState( updateQueryGroupRequest, clusterState() @@ -432,7 +413,7 @@ public void testUpdateQueryGroupResourceLimitsOnly() { list1.add(updated); List list2 = new ArrayList<>(); list2.add(findUpdatedGroupOne.get()); - assertEqualQueryGroups(list1, list2); + assertEqualQueryGroups(list1, list2, true); } /** @@ -442,9 +423,7 @@ public void testUpdateQueryGroupNonExistedName() { QueryGroupPersistenceService queryGroupPersistenceService = queryGroupPersistenceService(); UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( NAME_NONE_EXISTED, - ResiliencyMode.fromName(MONITOR_STRING), - Map.of(ResourceType.fromName(MEMORY_STRING), 0.15), - 1690934400000L + new ChangeableQueryGroup(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.15)) ); assertThrows( RuntimeException.class, @@ -457,7 +436,7 @@ public void testUpdateQueryGroupNonExistedName() { List expectedList = new ArrayList<>(); expectedList.add(queryGroupTwo); expectedList.add(queryGroupOne); - assertEqualQueryGroups(expectedList, updatedQueryGroups); + assertEqualQueryGroups(expectedList, updatedQueryGroups, true); } /** @@ -490,9 +469,7 @@ public void testUpdateInClusterStateMetadataInner() { ); UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( NAME_TWO, - ResiliencyMode.SOFT, - new HashMap<>(), - 2435465879685L + new ChangeableQueryGroup(ResiliencyMode.SOFT, new HashMap<>()) ); ArgumentCaptor captor = ArgumentCaptor.forClass(ClusterStateUpdateTask.class); queryGroupPersistenceService.updateInClusterStateMetadata(updateQueryGroupRequest, listener); @@ -523,9 +500,7 @@ public void testUpdateInClusterStateMetadataFailure() { ); UpdateQueryGroupRequest updateQueryGroupRequest = new UpdateQueryGroupRequest( NAME_TWO, - ResiliencyMode.SOFT, - new HashMap<>(), - 2435465879685L + new ChangeableQueryGroup(ResiliencyMode.SOFT, new HashMap<>()) ); doAnswer(invocation -> { ClusterStateUpdateTask task = invocation.getArgument(1); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java index ed6fabb2e1fb6..95f0beddde6fa 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java @@ -18,10 +18,11 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.ResourceType; +import org.opensearch.wlm.ChangeableQueryGroup; +import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode; import org.joda.time.Instant; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -30,7 +31,8 @@ * { * "_id": "fafjafjkaf9ag8a9ga9g7ag0aagaga", * "resource_limits": { - * "memory": 0.4 + * "memory": 0.4, + * "cpu": 0.2 * }, * "resiliency_mode": "enforced", * "name": "analytics", @@ -42,40 +44,35 @@ public class QueryGroup extends AbstractDiffable implements ToXConte public static final String _ID_STRING = "_id"; public static final String NAME_STRING = "name"; - public static final String RESILIENCY_MODE_STRING = "resiliency_mode"; public static final String UPDATED_AT_STRING = "updated_at"; - public static final String RESOURCE_LIMITS_STRING = "resource_limits"; private static final int MAX_CHARS_ALLOWED_IN_NAME = 50; private final String name; private final String _id; - private final ResiliencyMode resiliencyMode; // It is an epoch in millis private final long updatedAtInMillis; - private final Map resourceLimits; + private final ChangeableQueryGroup changeableQueryGroup; - public QueryGroup(String name, ResiliencyMode resiliencyMode, Map resourceLimits) { - this(name, UUIDs.randomBase64UUID(), resiliencyMode, resourceLimits, Instant.now().getMillis()); + public QueryGroup(String name, ChangeableQueryGroup changeableQueryGroup) { + this(name, UUIDs.randomBase64UUID(), changeableQueryGroup, Instant.now().getMillis()); } - public QueryGroup(String name, String _id, ResiliencyMode resiliencyMode, Map resourceLimits, long updatedAt) { + public QueryGroup(String name, String _id, ChangeableQueryGroup changeableQueryGroup, long updatedAt) { Objects.requireNonNull(name, "QueryGroup.name can't be null"); - Objects.requireNonNull(resourceLimits, "QueryGroup.resourceLimits can't be null"); - Objects.requireNonNull(resiliencyMode, "QueryGroup.resiliencyMode can't be null"); + Objects.requireNonNull(changeableQueryGroup.getResourceLimits(), "QueryGroup.resourceLimits can't be null"); + Objects.requireNonNull(changeableQueryGroup.getResiliencyMode(), "QueryGroup.resiliencyMode can't be null"); Objects.requireNonNull(_id, "QueryGroup._id can't be null"); validateName(name); - if (resourceLimits.isEmpty()) { + if (changeableQueryGroup.getResourceLimits().isEmpty()) { throw new IllegalArgumentException("QueryGroup.resourceLimits should at least have 1 resource limit"); } - validateResourceLimits(resourceLimits); if (!isValid(updatedAt)) { throw new IllegalArgumentException("QueryGroup.updatedAtInMillis is not a valid epoch"); } this.name = name; this._id = _id; - this.resiliencyMode = resiliencyMode; - this.resourceLimits = resourceLimits; + this.changeableQueryGroup = changeableQueryGroup; this.updatedAtInMillis = updatedAt; } @@ -90,21 +87,14 @@ public static boolean isValid(long updatedAt) { } public QueryGroup(StreamInput in) throws IOException { - this( - in.readString(), - in.readString(), - ResiliencyMode.fromName(in.readString()), - in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble), - in.readLong() - ); + this(in.readString(), in.readString(), new ChangeableQueryGroup(in), in.readLong()); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeString(_id); - out.writeString(resiliencyMode.getName()); - out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble); + changeableQueryGroup.writeTo(out); out.writeLong(updatedAtInMillis); } @@ -114,34 +104,15 @@ public static void validateName(String name) { } } - public static void validateResourceLimits(Map resourceLimits) { - for (Map.Entry resource : resourceLimits.entrySet()) { - Double threshold = resource.getValue(); - Objects.requireNonNull(resource.getKey(), "resourceName can't be null"); - Objects.requireNonNull(threshold, "resource limit threshold for" + resource.getKey().getName() + " : can't be null"); - - if (Double.compare(threshold, 0.0) <= 0 || Double.compare(threshold, 1.0) > 0) { - throw new IllegalArgumentException("resource value should be greater than 0 and less or equal to 1.0"); - } - } - } - @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); builder.field(_ID_STRING, _id); builder.field(NAME_STRING, name); - builder.field(RESILIENCY_MODE_STRING, resiliencyMode.getName()); - builder.field(UPDATED_AT_STRING, updatedAtInMillis); - // write resource limits - builder.startObject(RESOURCE_LIMITS_STRING); - for (ResourceType resourceType : ResourceType.values()) { - if (resourceLimits.containsKey(resourceType)) { - builder.field(resourceType.getName(), resourceLimits.get(resourceType)); - } + for (String fieldName : ChangeableQueryGroup.acceptedFieldNames) { + changeableQueryGroup.writeField(builder, fieldName); } - builder.endObject(); - + builder.field(UPDATED_AT_STRING, updatedAtInMillis); builder.endObject(); return builder; } @@ -160,27 +131,30 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; QueryGroup that = (QueryGroup) o; return Objects.equals(name, that.name) - && Objects.equals(resiliencyMode, that.resiliencyMode) - && Objects.equals(resourceLimits, that.resourceLimits) + && Objects.equals(changeableQueryGroup, that.changeableQueryGroup) && Objects.equals(_id, that._id) && updatedAtInMillis == that.updatedAtInMillis; } @Override public int hashCode() { - return Objects.hash(name, resourceLimits, updatedAtInMillis, _id); + return Objects.hash(name, changeableQueryGroup, updatedAtInMillis, _id); } public String getName() { return name; } + public ChangeableQueryGroup getChangeableQueryGroup() { + return changeableQueryGroup; + } + public ResiliencyMode getResiliencyMode() { - return resiliencyMode; + return getChangeableQueryGroup().getResiliencyMode(); } public Map getResourceLimits() { - return resourceLimits; + return getChangeableQueryGroup().getResourceLimits(); } public String get_id() { @@ -199,37 +173,6 @@ public static Builder builder() { return new Builder(); } - /** - * This enum models the different QueryGroup resiliency modes - * SOFT - means that this query group can consume more than query group resource limits if node is not in duress - * ENFORCED - means that it will never breach the assigned limits and will cancel as soon as the limits are breached - * MONITOR - it will not cause any cancellation but just log the eligible task cancellations - */ - @ExperimentalApi - public enum ResiliencyMode { - SOFT("soft"), - ENFORCED("enforced"), - MONITOR("monitor"); - - private final String name; - - ResiliencyMode(String mode) { - this.name = mode; - } - - public String getName() { - return name; - } - - public static ResiliencyMode fromName(String s) { - for (ResiliencyMode mode : values()) { - if (mode.getName().equalsIgnoreCase(s)) return mode; - - } - throw new IllegalArgumentException("Invalid value for QueryGroupMode: " + s); - } - } - /** * Builder class for {@link QueryGroup} */ @@ -237,9 +180,8 @@ public static ResiliencyMode fromName(String s) { public static class Builder { private String name; private String _id; - private ResiliencyMode resiliencyMode; + private ChangeableQueryGroup changeableQueryGroup; private long updatedAt; - private Map resourceLimits; private Builder() {} @@ -257,8 +199,7 @@ public static Builder fromXContent(XContentParser parser) throws IOException { } String fieldName = ""; - // Map to hold resources - final Map resourceLimits = new HashMap<>(); + ChangeableQueryGroup changeableQueryGroup1 = new ChangeableQueryGroup(); while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { fieldName = parser.currentName(); @@ -267,32 +208,21 @@ public static Builder fromXContent(XContentParser parser) throws IOException { builder._id(parser.text()); } else if (fieldName.equals(NAME_STRING)) { builder.name(parser.text()); - } else if (fieldName.equals(RESILIENCY_MODE_STRING)) { - builder.mode(parser.text()); + } else if (ChangeableQueryGroup.shouldParse(fieldName)) { + changeableQueryGroup1.parseField(parser, fieldName); } else if (fieldName.equals(UPDATED_AT_STRING)) { builder.updatedAt(parser.longValue()); } else { throw new IllegalArgumentException(fieldName + " is not a valid field in QueryGroup"); } } else if (token == XContentParser.Token.START_OBJECT) { - - if (!fieldName.equals(RESOURCE_LIMITS_STRING)) { - throw new IllegalArgumentException( - "QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token - ); - } - - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - fieldName = parser.currentName(); - } else { - resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue()); - } + if (!ChangeableQueryGroup.shouldParse(fieldName)) { + throw new IllegalArgumentException(fieldName + " is not a valid object in QueryGroup"); } - + changeableQueryGroup1.parseField(parser, fieldName); } } - return builder.resourceLimits(resourceLimits); + return builder.changeableQueryGroup(changeableQueryGroup1); } public Builder name(String name) { @@ -305,8 +235,8 @@ public Builder _id(String _id) { return this; } - public Builder mode(String mode) { - this.resiliencyMode = ResiliencyMode.fromName(mode); + public Builder changeableQueryGroup(ChangeableQueryGroup changeableQueryGroup) { + this.changeableQueryGroup = changeableQueryGroup; return this; } @@ -315,25 +245,12 @@ public Builder updatedAt(long updatedAt) { return this; } - public Builder resourceLimits(Map resourceLimits) { - this.resourceLimits = resourceLimits; - return this; - } - public QueryGroup build() { - return new QueryGroup(name, _id, resiliencyMode, resourceLimits, updatedAt); - } - - public ResiliencyMode getResiliencyMode() { - return resiliencyMode; - } - - public long getUpdatedAt() { - return updatedAt; + return new QueryGroup(name, _id, changeableQueryGroup, updatedAt); } - public Map getResourceLimits() { - return resourceLimits; + public ChangeableQueryGroup getChangeableQueryGroup() { + return changeableQueryGroup; } } } diff --git a/server/src/main/java/org/opensearch/wlm/ChangeableQueryGroup.java b/server/src/main/java/org/opensearch/wlm/ChangeableQueryGroup.java new file mode 100644 index 0000000000000..fad653d4fcbf9 --- /dev/null +++ b/server/src/main/java/org/opensearch/wlm/ChangeableQueryGroup.java @@ -0,0 +1,202 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.cluster.AbstractDiffable; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.search.ResourceType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +/** + * Class to hold the fields that can be updated in a QueryGroup + */ +@ExperimentalApi +public class ChangeableQueryGroup extends AbstractDiffable { + + public static final String RESILIENCY_MODE_STRING = "resiliency_mode"; + public static final String RESOURCE_LIMITS_STRING = "resource_limits"; + private ResiliencyMode resiliencyMode; + private Map resourceLimits; + + public static final List acceptedFieldNames = List.of(RESILIENCY_MODE_STRING, RESOURCE_LIMITS_STRING); + private final Map> fromXContentMap = Map.of(RESILIENCY_MODE_STRING, (parser) -> { + try { + setResiliencyMode(ResiliencyMode.fromName(parser.text())); + return null; + } catch (IOException e) { + throw new IllegalArgumentException("parsing error encountered for the field " + RESILIENCY_MODE_STRING); + } + }, RESOURCE_LIMITS_STRING, (parser) -> { + try { + String fieldName = ""; + XContentParser.Token token; + final Map resourceLimits = new HashMap<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else { + resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue()); + } + } + setResourceLimits(resourceLimits); + return null; + } catch (IOException e) { + throw new IllegalArgumentException("parsing error encountered for the object " + RESOURCE_LIMITS_STRING); + } + }); + + private final Map> toXContentMap = Map.of(RESILIENCY_MODE_STRING, (builder) -> { + try { + builder.field(RESILIENCY_MODE_STRING, resiliencyMode.getName()); + return null; + } catch (IOException e) { + throw new IllegalStateException("writing error encountered for the field " + RESILIENCY_MODE_STRING); + } + }, RESOURCE_LIMITS_STRING, (builder) -> { + try { + builder.startObject(RESOURCE_LIMITS_STRING); + for (ResourceType resourceType : ResourceType.values()) { + if (resourceLimits.containsKey(resourceType)) { + builder.field(resourceType.getName(), resourceLimits.get(resourceType)); + } + } + builder.endObject(); + return null; + } catch (IOException e) { + throw new IllegalStateException("writing error encountered for the field " + RESOURCE_LIMITS_STRING); + } + }); + + public ChangeableQueryGroup() {} + + public ChangeableQueryGroup(ResiliencyMode resiliencyMode, Map resourceLimits) { + validateResourceLimits(resourceLimits); + this.resiliencyMode = resiliencyMode; + this.resourceLimits = resourceLimits; + } + + public ChangeableQueryGroup(StreamInput in) throws IOException { + if (in.readBoolean()) { + resourceLimits = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble); + } else { + resourceLimits = new HashMap<>(); + } + String updatedResiliencyMode = in.readOptionalString(); + resiliencyMode = updatedResiliencyMode == null ? null : ResiliencyMode.fromName(updatedResiliencyMode); + } + + public static boolean shouldParse(String field) { + return acceptedFieldNames.contains(field); + } + + public void parseField(XContentParser parser, String field) { + fromXContentMap.get(field).apply(parser); + } + + public void writeField(XContentBuilder builder, String field) { + toXContentMap.get(field).apply(builder); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (resourceLimits == null || resourceLimits.isEmpty()) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble); + } + out.writeOptionalString(resiliencyMode == null ? null : resiliencyMode.getName()); + } + + public static void validateResourceLimits(Map resourceLimits) { + if (resourceLimits == null) { + return; + } + for (Map.Entry resource : resourceLimits.entrySet()) { + Double threshold = resource.getValue(); + Objects.requireNonNull(resource.getKey(), "resourceName can't be null"); + Objects.requireNonNull(threshold, "resource limit threshold for" + resource.getKey().getName() + " : can't be null"); + + if (Double.compare(threshold, 0.0) <= 0 || Double.compare(threshold, 1.0) > 0) { + throw new IllegalArgumentException("resource value should be greater than 0 and less or equal to 1.0"); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ChangeableQueryGroup that = (ChangeableQueryGroup) o; + return Objects.equals(resiliencyMode, that.resiliencyMode) && Objects.equals(resourceLimits, that.resourceLimits); + } + + @Override + public int hashCode() { + return Objects.hash(resiliencyMode, resourceLimits); + } + + public ResiliencyMode getResiliencyMode() { + return resiliencyMode; + } + + public Map getResourceLimits() { + return resourceLimits; + } + + /** + * This enum models the different QueryGroup resiliency modes + * SOFT - means that this query group can consume more than query group resource limits if node is not in duress + * ENFORCED - means that it will never breach the assigned limits and will cancel as soon as the limits are breached + * MONITOR - it will not cause any cancellation but just log the eligible task cancellations + */ + @ExperimentalApi + public enum ResiliencyMode { + SOFT("soft"), + ENFORCED("enforced"), + MONITOR("monitor"); + + private final String name; + + ResiliencyMode(String mode) { + this.name = mode; + } + + public String getName() { + return name; + } + + public static ResiliencyMode fromName(String s) { + for (ResiliencyMode mode : values()) { + if (mode.getName().equalsIgnoreCase(s)) return mode; + + } + throw new IllegalArgumentException("Invalid value for QueryGroupMode: " + s); + } + } + + public void setResiliencyMode(ResiliencyMode resiliencyMode) { + this.resiliencyMode = resiliencyMode; + } + + public void setResourceLimits(Map resourceLimits) { + validateResourceLimits(resourceLimits); + this.resourceLimits = resourceLimits; + } +} diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java index 06734b8e0bac2..980791cbb0bd7 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupMetadataTests.java @@ -16,6 +16,7 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.ResourceType; import org.opensearch.test.AbstractDiffableSerializationTestCase; +import org.opensearch.wlm.ChangeableQueryGroup; import java.io.IOException; import java.util.Collections; @@ -33,8 +34,7 @@ public void testToXContent() throws IOException { new QueryGroup( "test", "ajakgakg983r92_4242", - QueryGroup.ResiliencyMode.ENFORCED, - Map.of(ResourceType.MEMORY, 0.5), + new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.ENFORCED, Map.of(ResourceType.MEMORY, 0.5)), updatedAt ) ) diff --git a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java index 884b364fb26b8..1a0ee534d3569 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/QueryGroupTests.java @@ -16,6 +16,8 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.ResourceType; import org.opensearch.test.AbstractSerializingTestCase; +import org.opensearch.wlm.ChangeableQueryGroup; +import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode; import org.joda.time.Instant; import java.io.IOException; @@ -26,20 +28,16 @@ public class QueryGroupTests extends AbstractSerializingTestCase { - private static final List allowedModes = List.of( - QueryGroup.ResiliencyMode.SOFT, - QueryGroup.ResiliencyMode.ENFORCED, - QueryGroup.ResiliencyMode.MONITOR - ); + private static final List allowedModes = List.of(ResiliencyMode.SOFT, ResiliencyMode.ENFORCED, ResiliencyMode.MONITOR); static QueryGroup createRandomQueryGroup(String _id) { String name = randomAlphaOfLength(10); Map resourceLimit = new HashMap<>(); resourceLimit.put(ResourceType.MEMORY, randomDoubleBetween(0.0, 0.80, false)); - return new QueryGroup(name, _id, randomMode(), resourceLimit, Instant.now().getMillis()); + return new QueryGroup(name, _id, new ChangeableQueryGroup(randomMode(), resourceLimit), Instant.now().getMillis()); } - private static QueryGroup.ResiliencyMode randomMode() { + private static ResiliencyMode randomMode() { return allowedModes.get(randomIntBetween(0, allowedModes.size() - 1)); } @@ -74,37 +72,50 @@ protected QueryGroup createTestInstance() { public void testNullName() { assertThrows( NullPointerException.class, - () -> new QueryGroup(null, "_id", randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + () -> new QueryGroup(null, "_id", new ChangeableQueryGroup(randomMode(), Collections.emptyMap()), Instant.now().getMillis()) ); } public void testNullId() { assertThrows( NullPointerException.class, - () -> new QueryGroup("Dummy", null, randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + () -> new QueryGroup("Dummy", null, new ChangeableQueryGroup(randomMode(), Collections.emptyMap()), Instant.now().getMillis()) ); } public void testNullResourceLimits() { - assertThrows(NullPointerException.class, () -> new QueryGroup("analytics", "_id", randomMode(), null, Instant.now().getMillis())); + assertThrows( + NullPointerException.class, + () -> new QueryGroup("analytics", "_id", new ChangeableQueryGroup(randomMode(), null), Instant.now().getMillis()) + ); } public void testEmptyResourceLimits() { assertThrows( IllegalArgumentException.class, - () -> new QueryGroup("analytics", "_id", randomMode(), Collections.emptyMap(), Instant.now().getMillis()) + () -> new QueryGroup( + "analytics", + "_id", + new ChangeableQueryGroup(randomMode(), Collections.emptyMap()), + Instant.now().getMillis() + ) ); } public void testIllegalQueryGroupMode() { assertThrows( NullPointerException.class, - () -> new QueryGroup("analytics", "_id", null, Map.of(ResourceType.MEMORY, 0.4), Instant.now().getMillis()) + () -> new QueryGroup( + "analytics", + "_id", + new ChangeableQueryGroup(null, Map.of(ResourceType.MEMORY, 0.4)), + Instant.now().getMillis() + ) ); } public void testQueryGroupInitiation() { - QueryGroup queryGroup = new QueryGroup("analytics", randomMode(), Map.of(ResourceType.MEMORY, 0.4)); + QueryGroup queryGroup = new QueryGroup("analytics", new ChangeableQueryGroup(randomMode(), Map.of(ResourceType.MEMORY, 0.4))); assertNotNull(queryGroup.getName()); assertNotNull(queryGroup.get_id()); assertNotNull(queryGroup.getResourceLimits()); @@ -117,12 +128,9 @@ public void testQueryGroupInitiation() { public void testIllegalQueryGroupName() { assertThrows( NullPointerException.class, - () -> new QueryGroup("a".repeat(51), "_id", null, Map.of(ResourceType.MEMORY, 0.4), Instant.now().getMillis()) - ); - assertThrows( - NullPointerException.class, - () -> new QueryGroup("", "_id", null, Map.of(ResourceType.MEMORY, 0.4), Instant.now().getMillis()) + () -> new QueryGroup("a".repeat(51), "_id", new ChangeableQueryGroup(), Instant.now().getMillis()) ); + assertThrows(NullPointerException.class, () -> new QueryGroup("", "_id", new ChangeableQueryGroup(), Instant.now().getMillis())); } @@ -132,8 +140,7 @@ public void testInvalidResourceLimitWhenInvalidSystemResourceValueIsGiven() { () -> new QueryGroup( "analytics", "_id", - randomMode(), - Map.of(ResourceType.MEMORY, randomDoubleBetween(1.1, 1.8, false)), + new ChangeableQueryGroup(randomMode(), Map.of(ResourceType.MEMORY, randomDoubleBetween(1.1, 1.8, false))), Instant.now().getMillis() ) ); @@ -143,8 +150,7 @@ public void testValidQueryGroup() { QueryGroup queryGroup = new QueryGroup( "analytics", "_id", - randomMode(), - Map.of(ResourceType.MEMORY, randomDoubleBetween(0.01, 0.8, false)), + new ChangeableQueryGroup(randomMode(), Map.of(ResourceType.MEMORY, randomDoubleBetween(0.01, 0.8, false))), Instant.ofEpochMilli(1717187289).getMillis() ); @@ -163,8 +169,7 @@ public void testToXContent() throws IOException { QueryGroup queryGroup = new QueryGroup( "TestQueryGroup", queryGroupId, - QueryGroup.ResiliencyMode.ENFORCED, - Map.of(ResourceType.CPU, 0.30, ResourceType.MEMORY, 0.40), + new ChangeableQueryGroup(ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.30, ResourceType.MEMORY, 0.40)), currentTimeInMillis ); XContentBuilder builder = JsonXContent.contentBuilder(); diff --git a/server/src/test/java/org/opensearch/wlm/ChangeableQueryGroupTests.java b/server/src/test/java/org/opensearch/wlm/ChangeableQueryGroupTests.java new file mode 100644 index 0000000000000..77aaa3633ae96 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/ChangeableQueryGroupTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.search.ResourceType; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class ChangeableQueryGroupTests extends OpenSearchTestCase { + + public void testSerializationDeserialization() throws IOException { + Map resourceLimits = new HashMap<>(); + resourceLimits.put(ResourceType.CPU, 0.5); + resourceLimits.put(ResourceType.MEMORY, 0.75); + ChangeableQueryGroup changeableQueryGroup = new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.SOFT, resourceLimits); + BytesStreamOutput out = new BytesStreamOutput(); + changeableQueryGroup.writeTo(out); + StreamInput in = out.bytes().streamInput(); + ChangeableQueryGroup deserializedGroup = new ChangeableQueryGroup(in); + assertEquals(changeableQueryGroup, deserializedGroup); + } + + public void testSerializationDeserializationWithNull() throws IOException { + ChangeableQueryGroup changeableQueryGroup = new ChangeableQueryGroup(); + BytesStreamOutput out = new BytesStreamOutput(); + changeableQueryGroup.writeTo(out); + StreamInput in = out.bytes().streamInput(); + ChangeableQueryGroup deserializedGroup = new ChangeableQueryGroup(in); + assertEquals(0, deserializedGroup.getResourceLimits().size()); + assertEquals(changeableQueryGroup.getResiliencyMode(), deserializedGroup.getResiliencyMode()); + } + + public void testValidateResourceLimits() { + Map invalidLimits = new HashMap<>(); + invalidLimits.put(ResourceType.CPU, 1.5); + Exception exception = assertThrows( + IllegalArgumentException.class, + () -> { ChangeableQueryGroup.validateResourceLimits(invalidLimits); } + ); + String expectedMessage = "resource value should be greater than 0 and less or equal to 1.0"; + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } + + public void testSetMethodsWithNullAndEmptyValues() { + ChangeableQueryGroup queryGroup = new ChangeableQueryGroup(); + queryGroup.setResiliencyMode(null); + assertNull(queryGroup.getResiliencyMode()); + queryGroup.setResourceLimits(null); + assertNull(queryGroup.getResourceLimits()); + queryGroup.setResourceLimits(new HashMap<>()); + assertEquals(0, queryGroup.getResourceLimits().size()); + } +}