Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] add concurrent limit on datasource and sessions #2394

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum Key {
QUERY_SIZE_LIMIT("plugins.query.size_limit"),
ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"),
DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"),
DATASOURCES_LIMIT("plugins.query.datasources.limit"),

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
Expand Down
3 changes: 2 additions & 1 deletion datasources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ repositories {
dependencies {
implementation project(':core')
implementation project(':protocol')
implementation project(':opensearch')
implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}"
implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}"
Expand All @@ -35,7 +36,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "skipped", "failed"
exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.common.setting.Settings.Key.DATASOURCES_LIMIT;
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
Expand All @@ -30,6 +31,7 @@ public class TransportCreateDataSourceAction
new ActionType<>(NAME, CreateDataSourceActionResponse::new);

private DataSourceService dataSourceService;
private org.opensearch.sql.opensearch.setting.OpenSearchSettings settings;

/**
* TransportCreateDataSourceAction action for creating datasource.
Expand All @@ -42,33 +44,44 @@ public class TransportCreateDataSourceAction
public TransportCreateDataSourceAction(
TransportService transportService,
ActionFilters actionFilters,
DataSourceServiceImpl dataSourceService) {
DataSourceServiceImpl dataSourceService,
org.opensearch.sql.opensearch.setting.OpenSearchSettings settings) {
super(
TransportCreateDataSourceAction.NAME,
transportService,
actionFilters,
CreateDataSourceActionRequest::new);
this.dataSourceService = dataSourceService;
this.settings = settings;
}

@Override
protected void doExecute(
Task task,
CreateDataSourceActionRequest request,
ActionListener<CreateDataSourceActionResponse> actionListener) {
try {
DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata();
dataSourceService.createDataSource(dataSourceMetadata);
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
int dataSourceLimit = settings.getSettingValue(DATASOURCES_LIMIT);
if (dataSourceService.getDataSourceMetadata(false).size() >= dataSourceLimit) {
actionListener.onFailure(
new IllegalStateException(
String.format(
"domain concurrent datasources can not" + " exceed %d", dataSourceLimit)));
} else {
try {

DataSourceMetadata dataSourceMetadata = request.getDataSourceMetadata();
dataSourceService.createDataSource(dataSourceMetadata);
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Created DataSource with name " + dataSourceMetadata.getName());
actionListener.onResponse(new CreateDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package org.opensearch.sql.datasources.transport;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.common.setting.Settings.Key.DATASOURCES_LIMIT;

import java.util.HashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
Expand All @@ -21,6 +26,7 @@
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -29,9 +35,13 @@ public class TransportCreateDataSourceActionTest {

@Mock private TransportService transportService;
@Mock private TransportCreateDataSourceAction action;
@Mock private DataSourceServiceImpl dataSourceService;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private DataSourceServiceImpl dataSourceService;

@Mock private Task task;
@Mock private ActionListener<CreateDataSourceActionResponse> actionListener;
@Mock private OpenSearchSettings settings;

@Captor
private ArgumentCaptor<CreateDataSourceActionResponse>
Expand All @@ -43,7 +53,9 @@ public class TransportCreateDataSourceActionTest {
public void setUp() {
action =
new TransportCreateDataSourceAction(
transportService, new ActionFilters(new HashSet<>()), dataSourceService);
transportService, new ActionFilters(new HashSet<>()), dataSourceService, settings);
when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1);
when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(20);
}

@Test
Expand Down Expand Up @@ -79,4 +91,30 @@ public void testDoExecuteWithException() {
Assertions.assertTrue(exception instanceof RuntimeException);
Assertions.assertEquals("Error", exception.getMessage());
}

@Test
public void testDataSourcesLimit() {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName("test_datasource");
dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS);
CreateDataSourceActionRequest request = new CreateDataSourceActionRequest(dataSourceMetadata);
when(dataSourceService.getDataSourceMetadata(false).size()).thenReturn(1);
when(settings.getSettingValue(DATASOURCES_LIMIT)).thenReturn(1);

action.doExecute(
task,
request,
new ActionListener<CreateDataSourceActionResponse>() {
@Override
public void onResponse(CreateDataSourceActionResponse createDataSourceActionResponse) {
fail();
}

@Override
public void onFailure(Exception e) {
assertEquals("domain concurrent datasources can not exceed 1", e.getMessage());
}
});
verify(dataSourceService, times(0)).createDataSource(dataSourceMetadata);
}
}
37 changes: 35 additions & 2 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,12 @@ SQL query::
}

plugins.query.executionengine.spark.session.limit
===================================================
==================================================

Description
-----------

Each datasource can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.
Each cluster can have maximum 100 sessions running in parallel by default. You can increase limit by this setting.

1. The default value is 100.
2. This setting is node scope.
Expand Down Expand Up @@ -383,3 +383,36 @@ SQL query::
}
}


plugins.query.datasources.limit
===============================

Description
-----------

Each cluster can have maximum 20 datasources. You can increase limit by this setting.

1. The default value is 20.
2. This setting is node scope.
3. This setting can be updated dynamically.

You can update the setting with a new value like this.

SQL query::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient":{"plugins.query.datasources.limit":25}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"datasources": {
"limit": "25"
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -34,6 +35,11 @@

public class DataSourceAPIsIT extends PPLIntegTestCase {

@After
public void cleanUp() throws IOException {
wipeAllClusterSettings();
}

@AfterClass
protected static void deleteDataSourcesCreated() throws IOException {
Request deleteRequest = getDeleteDataSourceRequest("create_prometheus");
Expand All @@ -51,6 +57,10 @@ protected static void deleteDataSourcesCreated() throws IOException {
deleteRequest = getDeleteDataSourceRequest("Create_Prometheus");
deleteResponse = client().performRequest(deleteRequest);
Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode());

deleteRequest = getDeleteDataSourceRequest("duplicate_prometheus");
deleteResponse = client().performRequest(deleteRequest);
Assert.assertEquals(204, deleteResponse.getStatusLine().getStatusCode());
}

@SneakyThrows
Expand Down Expand Up @@ -283,4 +293,45 @@ public void issue2196() {
Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password"));
Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription());
}

@Test
public void datasourceLimitTest() throws InterruptedException, IOException {
DataSourceMetadata d1 = mockDataSourceMetadata("duplicate_prometheus");
Request createRequest = getCreateDataSourceRequest(d1);
Response response = client().performRequest(createRequest);
Assert.assertEquals(201, response.getStatusLine().getStatusCode());
// Datasource is not immediately created. so introducing a sleep of 2s.
Thread.sleep(2000);

updateClusterSettings(new ClusterSetting(TRANSIENT, "plugins.query.datasources.limit", "1"));

DataSourceMetadata d2 = mockDataSourceMetadata("d2");
ResponseException exception =
Assert.assertThrows(
ResponseException.class, () -> client().performRequest(getCreateDataSourceRequest(d2)));
Assert.assertEquals(400, exception.getResponse().getStatusLine().getStatusCode());
String prometheusGetResponseString = getResponseBody(exception.getResponse());
JsonObject errorMessage = new Gson().fromJson(prometheusGetResponseString, JsonObject.class);
Assert.assertEquals(
"domain concurrent datasources can not exceed 1",
errorMessage.get("error").getAsJsonObject().get("details").getAsString());
}

public DataSourceMetadata mockDataSourceMetadata(String name) {
return new DataSourceMetadata(
name,
"Prometheus Creation for Integ test",
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of(
"prometheus.uri",
"https://localhost:9090",
"prometheus.auth.type",
"basicauth",
"prometheus.auth.username",
"username",
"prometheus.auth.password",
"password"),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<?> DATASOURCES_LIMIT_SETTING =
Setting.intSetting(
Key.DATASOURCES_LIMIT.getKeyValue(),
20,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -272,6 +279,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.AUTO_INDEX_MANAGEMENT_ENABLED,
AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
new Updater(Key.AUTO_INDEX_MANAGEMENT_ENABLED));
register(
settingBuilder,
clusterSettings,
Key.DATASOURCES_LIMIT,
DATASOURCES_LIMIT_SETTING,
new Updater(Key.DATASOURCES_LIMIT));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -342,6 +355,7 @@ public static List<Setting<?>> pluginSettings() {
.add(SESSION_INDEX_TTL_SETTING)
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
Expand Down Expand Up @@ -258,7 +259,7 @@ public Collection<Object> createComponents(
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
return ImmutableList.of(
dataSourceService, asyncQueryExecutorService, clusterManagerEventListener);
dataSourceService, asyncQueryExecutorService, clusterManagerEventListener, pluginSettings);
}

@Override
Expand Down Expand Up @@ -333,7 +334,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings));
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;

/** Process async query request. */
public abstract class AsyncQueryHandler {
Expand Down Expand Up @@ -45,5 +48,8 @@ protected abstract JSONObject getResponseFromResultIndex(
protected abstract JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata);

abstract String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata);
public abstract String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata);

public abstract DispatchQueryResponse submit(
DispatchQueryRequest request, DispatchQueryContext context);
}
Loading
Loading