Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Aug 28, 2024
1 parent 0807ffa commit 945f1a6
Show file tree
Hide file tree
Showing 15 changed files with 413 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestCreateQueryGroupAction(), new RestGetQueryGroupAction(), new RestDeleteQueryGroupAction(), new RestUpdateQueryGroupAction());
return List.of(
new RestCreateQueryGroupAction(),
new RestGetQueryGroupAction(),
new RestDeleteQueryGroupAction(),
new RestUpdateQueryGroupAction()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.ResourceType;
import org.joda.time.Instant;
import org.opensearch.wlm.ChangeableQueryGroup;
import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -29,55 +28,24 @@
*/
public class UpdateQueryGroupRequest extends ActionRequest {
private final String name;
private final Map<ResourceType, Double> resourceLimits;
private final ResiliencyMode resiliencyMode;
private final long updatedAtInMillis;

/**
* Constructor for UpdateQueryGroupRequest
* @param queryGroup - A {@link QueryGroup} object
*/
public UpdateQueryGroupRequest(QueryGroup queryGroup) {
this.name = queryGroup.getName();
this.resiliencyMode = queryGroup.getResiliencyMode();
this.resourceLimits = queryGroup.getResourceLimits();
this.updatedAtInMillis = queryGroup.getUpdatedAtInMillis();
}
private final ChangeableQueryGroup changeableQueryGroup;

/**
* Constructor for UpdateQueryGroupRequest
* @param name - QueryGroup name for UpdateQueryGroupRequest
* @param resiliencyMode - QueryGroup mode for UpdateQueryGroupRequest
* @param resourceLimits - QueryGroup resourceLimits for UpdateQueryGroupRequest
* @param updatedAtInMillis - QueryGroup updated time in millis for UpdateQueryGroupRequest
* @param changeableQueryGroup - ChangeableQueryGroup for UpdateQueryGroupRequest
*/
public UpdateQueryGroupRequest(
String name,
ResiliencyMode resiliencyMode,
Map<ResourceType, Double> resourceLimits,
long updatedAtInMillis
) {
public UpdateQueryGroupRequest(String name, ChangeableQueryGroup changeableQueryGroup) {
this.name = name;
this.resiliencyMode = resiliencyMode;
this.resourceLimits = resourceLimits;
this.updatedAtInMillis = updatedAtInMillis;
this.changeableQueryGroup = changeableQueryGroup;
}

/**
* Constructor for UpdateQueryGroupRequest
* @param in - A {@link StreamInput} object
*/
public UpdateQueryGroupRequest(StreamInput in) throws IOException {
super(in);
name = in.readString();
if (in.readBoolean()) {
resourceLimits = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble);
} else {
resourceLimits = new HashMap<>();
}
String updatedResiliencyMode = in.readOptionalString();
resiliencyMode = updatedResiliencyMode == null ? null : ResiliencyMode.fromName(updatedResiliencyMode);
updatedAtInMillis = in.readLong();
this(in.readString(), new ChangeableQueryGroup(in));
}

/**
Expand All @@ -87,16 +55,12 @@ public UpdateQueryGroupRequest(StreamInput in) throws IOException {
*/
public static UpdateQueryGroupRequest fromXContent(XContentParser parser, String name) throws IOException {
QueryGroup.Builder builder = QueryGroup.Builder.fromXContent(parser);
return new UpdateQueryGroupRequest(name, builder.getResiliencyMode(), builder.getResourceLimits(), Instant.now().getMillis());
return new UpdateQueryGroupRequest(name, builder.getChangeableQueryGroup());
}

@Override
public ActionRequestValidationException validate() {
QueryGroup.validateName(name);
if (resourceLimits != null) {
QueryGroup.validateResourceLimits(resourceLimits);
}
assert QueryGroup.isValid(updatedAtInMillis);
return null;
}

Expand All @@ -108,37 +72,30 @@ public String getName() {
}

/**
* ResourceLimits getter
* resourceLimits getter
*/
public Map<ResourceType, Double> getResourceLimits() {
return resourceLimits;
return getChangeableQueryGroup().getResourceLimits();
}

/**
* resiliencyMode getter
*/
public ResiliencyMode getResiliencyMode() {
return resiliencyMode;
return getChangeableQueryGroup().getResiliencyMode();
}

/**
* updatedAtInMillis getter
* changeableQueryGroup getter
*/
public long getUpdatedAtInMillis() {
return updatedAtInMillis;
public ChangeableQueryGroup getChangeableQueryGroup() {
return changeableQueryGroup;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
if (resourceLimits == null || resourceLimits.isEmpty()) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeDouble);
}
out.writeOptionalString(resiliencyMode == null ? null : resiliencyMode.getName());
out.writeLong(updatedAtInMillis);
changeableQueryGroup.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler.ThrottlingKey;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -33,14 +32,16 @@
import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest;
import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse;
import org.opensearch.search.ResourceType;
import org.opensearch.wlm.ChangeableQueryGroup;
import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode;
import org.joda.time.Instant;

import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.HashMap;

/**
* This class defines the functions for QueryGroup persistence
Expand Down Expand Up @@ -244,7 +245,7 @@ ClusterState deleteQueryGroupInClusterState(final String name, final ClusterStat
.orElseThrow(() -> new ResourceNotFoundException("No QueryGroup exists with the provided name: " + name));

return ClusterState.builder(currentClusterState).metadata(Metadata.builder(metadata).remove(queryGroupToRemove).build()).build();
}
}

/**
* Modify cluster state to update the QueryGroup
Expand Down Expand Up @@ -315,9 +316,8 @@ ClusterState updateQueryGroupInClusterState(UpdateQueryGroupRequest updateQueryG
final QueryGroup updatedGroup = new QueryGroup(
name,
existingGroup.get_id(),
mode,
updatedResourceLimits,
updateQueryGroupRequest.getUpdatedAtInMillis()
new ChangeableQueryGroup(mode, updatedResourceLimits),
Instant.now().getMillis()
);
return ClusterState.builder(currentState)
.metadata(Metadata.builder(metadata).remove(existingGroup).put(updatedGroup).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.search.ResourceType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.ChangeableQueryGroup;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -32,7 +33,6 @@
import java.util.Set;

import static org.opensearch.cluster.metadata.QueryGroup.builder;
import static org.opensearch.search.ResourceType.fromName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -44,21 +44,17 @@ public class QueryGroupTestUtils {
public static final String _ID_ONE = "AgfUO5Ja9yfsYlONlYi3TQ==";
public static final String _ID_TWO = "G5iIqHy4g7eK1qIAAAAIH53=1";
public static final String NAME_NONE_EXISTED = "query_group_none_existed";
public static final String MEMORY_STRING = "memory";
public static final String MONITOR_STRING = "monitor";
public static final long TIMESTAMP_ONE = 4513232413L;
public static final long TIMESTAMP_TWO = 4513232415L;
public static final QueryGroup queryGroupOne = builder().name(NAME_ONE)
._id(_ID_ONE)
.mode(MONITOR_STRING)
.resourceLimits(Map.of(fromName(MEMORY_STRING), 0.3))
.changeableQueryGroup(new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3)))
.updatedAt(TIMESTAMP_ONE)
.build();

public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO)
._id(_ID_TWO)
.mode(MONITOR_STRING)
.resourceLimits(Map.of(fromName(MEMORY_STRING), 0.6))
.changeableQueryGroup(new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.6)))
.updatedAt(TIMESTAMP_TWO)
.build();

Expand Down Expand Up @@ -140,14 +136,27 @@ public static void assertEqualResourceLimits(
assertTrue(resourceLimitMapOne.values().containsAll(resourceLimitMapTwo.values()));
}

public static void assertEqualQueryGroups(Collection<QueryGroup> collectionOne, Collection<QueryGroup> collectionTwo) {
public static void assertEqualQueryGroups(
Collection<QueryGroup> collectionOne,
Collection<QueryGroup> collectionTwo,
boolean assertUpdateAt
) {
assertEquals(collectionOne.size(), collectionTwo.size());
List<QueryGroup> listOne = new ArrayList<>(collectionOne);
List<QueryGroup> listTwo = new ArrayList<>(collectionTwo);
listOne.sort(Comparator.comparing(QueryGroup::getName));
listTwo.sort(Comparator.comparing(QueryGroup::getName));
for (int i = 0; i < listOne.size(); i++) {
assertTrue(listOne.get(i).equals(listTwo.get(i)));
if (assertUpdateAt) {
QueryGroup one = listOne.get(i);
QueryGroup two = listTwo.get(i);
assertEquals(one.getName(), two.getName());
assertEquals(one.getResourceLimits(), two.getResourceLimits());
assertEquals(one.getResiliencyMode(), two.getResiliencyMode());
assertEquals(one.get_id(), two.get_id());
} else {
assertEquals(listOne.get(i), listTwo.get(i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ public void testSerialization() throws IOException {
List<QueryGroup> list2 = new ArrayList<>();
list1.add(queryGroupOne);
list2.add(otherRequest.getQueryGroup());
assertEqualQueryGroups(list1, list2);
assertEqualQueryGroups(list1, list2, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testSerialization() throws IOException {
List<QueryGroup> listTwo = new ArrayList<>();
listOne.add(responseGroup);
listTwo.add(otherResponseGroup);
QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo);
QueryGroupTestUtils.assertEqualQueryGroups(listOne, listTwo, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testSerializationSingleQueryGroup() throws IOException {

GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput);
assertEquals(response.getRestStatus(), otherResponse.getRestStatus());
QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups());
QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups(), false);
}

/**
Expand All @@ -58,7 +58,7 @@ public void testSerializationMultipleQueryGroup() throws IOException {
GetQueryGroupResponse otherResponse = new GetQueryGroupResponse(streamInput);
assertEquals(response.getRestStatus(), otherResponse.getRestStatus());
assertEquals(2, otherResponse.getQueryGroups().size());
QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups());
QueryGroupTestUtils.assertEqualQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups(), false);
}

/**
Expand Down
Loading

0 comments on commit 945f1a6

Please sign in to comment.