From 188bdc876cef81408a937fd2eb71caf06d76be63 Mon Sep 17 00:00:00 2001 From: Karthik Kumarguru Date: Fri, 13 Nov 2020 10:46:33 -0800 Subject: [PATCH 1/2] Add v1 cortisol code --- build.gradle | 1 + .../tools/cortisol/BulkLoadParams.java | 52 ++++++ .../tools/cortisol/CortisolClient.java | 23 +++ .../tools/cortisol/CortisolHelper.java | 16 ++ .../tools/cortisol/IngestTask.java | 172 ++++++++++++++++++ .../tools/cortisol/SearchLoadParams.java | 7 + .../tools/cortisol/SimpleCortisolClient.java | 57 ++++++ 7 files changed, 328 insertions(+) create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/BulkLoadParams.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolClient.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolHelper.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SearchLoadParams.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SimpleCortisolClient.java diff --git a/build.gradle b/build.gradle index 9dc4bdedf..c62977e4d 100644 --- a/build.gradle +++ b/build.gradle @@ -273,6 +273,7 @@ dependencies { implementation 'io.grpc:grpc-protobuf:1.28.0' implementation 'io.grpc:grpc-stub:1.28.0' implementation 'javax.annotation:javax.annotation-api:1.3.2' + implementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.4.0' // JDK9+ has to run powermock 2+. https://github.com/powermock/powermock/issues/888 testCompile group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.0' diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/BulkLoadParams.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/BulkLoadParams.java new file mode 100644 index 000000000..c83c79237 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/BulkLoadParams.java @@ -0,0 +1,52 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol; + +/** + * Class containing the metadata for the bulk stress test. + */ +public class BulkLoadParams { + private int durationInSeconds; + private int docsPerSecond; + private int docsPerRequest = 1; + private String mappingTemplateJson = ""; + private String trackLocation = ""; + + public int getDurationInSeconds() { + return durationInSeconds; + } + + public void setDurationInSeconds(int durationInSeconds) { + this.durationInSeconds = durationInSeconds; + } + + public int getDocsPerSecond() { + return docsPerSecond; + } + + public void setDocsPerSecond(int docsPerSecond) { + this.docsPerSecond = docsPerSecond; + } + + public int getDocsPerRequest() { + return docsPerRequest; + } + + public void setDocsPerRequest(int docsPerRequest) { + this.docsPerRequest = docsPerRequest; + } + + public String getMappingTemplateJson() { + return mappingTemplateJson; + } + + public void setMappingTemplateJson(String mappingTemplateJson) { + this.mappingTemplateJson = mappingTemplateJson; + } + + public String getTrackLocation() { + return trackLocation; + } + + public void setTrackLocation(String trackLocation) { + this.trackLocation = trackLocation; + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolClient.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolClient.java new file mode 100644 index 000000000..eb85d0443 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolClient.java @@ -0,0 +1,23 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol; + +/** + * The client interface for Cortisol. + */ +public interface CortisolClient { + /** + * Runs an ingest workload as specified by the bulkLoadParams object. + * @param bulkLoadParams The object specifying the parameters for the stress test. + */ + void stressBulk(final BulkLoadParams bulkLoadParams); + + /** + * Runs a search workload as specified by the searchLoadParams object. + * @param searchLoadParams The object specifying the parameters for the stress test. + */ + void stressSearch(final SearchLoadParams searchLoadParams); + + /** + * Cleans up the cluster by deleting the indices created for stress testing. + */ + void cleanup(); +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolHelper.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolHelper.java new file mode 100644 index 000000000..ac20a9fd3 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/CortisolHelper.java @@ -0,0 +1,16 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol; + +import org.jooq.tools.json.JSONObject; + +public class CortisolHelper { + public static String buildIndexSettingsJson(final int nPrimaries, final int nReplicas) { + JSONObject container = new JSONObject(); + JSONObject settings = new JSONObject(); + JSONObject index = new JSONObject(); + index.put("number_of_shards", nPrimaries); + index.put("number_of_replicas", nReplicas); + settings.put("index", index); + container.put("settings", settings); + return container.toString(); + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java new file mode 100644 index 000000000..943ce47d9 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java @@ -0,0 +1,172 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class IngestTask implements Runnable { + private static final String CORTISOL_PREFIX = "cort-"; + private final String endpoint; + private final int port; + private final BulkLoadParams params; + private final long endTime; + private final RestHighLevelClient client; + private final Random random; + private final String indexName; + private final TokenBucket tokenBucket; + private final AtomicInteger requestCount; + private final AtomicInteger successCount; + private final AtomicInteger failureCount; + + public IngestTask(final BulkLoadParams bulkLoadParams, final String endpoint, final int port, final int nIngestThreads) { + this.params = bulkLoadParams; + this.endpoint = endpoint; + this.port = port; + long startTime = System.currentTimeMillis(); + this.endTime = startTime + bulkLoadParams.getDurationInSeconds() * 1000; + this.client = buildEsClient(); + this.random = new Random(); + this.requestCount = new AtomicInteger(); + this.successCount = new AtomicInteger(); + this.failureCount = new AtomicInteger(); + this.indexName = CORTISOL_PREFIX + random.nextInt(100); + this.tokenBucket = new TokenBucket(params.getDocsPerSecond() / (params.getDocsPerRequest() * nIngestThreads)); + } + + @Override + public void run() { + try { + BufferedReader br = new BufferedReader(new FileReader(params.getTrackLocation())); + createIndex(params.getMappingTemplateJson(), 3, 1); + + while (System.currentTimeMillis() < endTime) { + // take blocks till tokens are available in the token bucket. + // 0 indicates an interrupted exception, so break out and exit. + if (tokenBucket.take() != 0) { + List docs = new ArrayList<>(); + for (int i = 0; i < params.getDocsPerRequest(); ++i) { + String doc = br.readLine(); + if (doc == null) { + // restart the indexing from the top. + br = new BufferedReader(new FileReader(params.getTrackLocation())); + doc = br.readLine(); + } + + docs.add(doc); + } + makeBulkRequest(docs); + } else { + break; + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private RestHighLevelClient buildEsClient() { + final RestClientBuilder builder = RestClient.builder(new HttpHost(endpoint, port, "http")); + return new RestHighLevelClient(builder); + } + + private void createIndex(final String mappingTemplateJson, int nPrimaryShards, int nReplicaShards) throws IOException { + final CreateIndexRequest cir = new CreateIndexRequest(indexName); + cir.mapping(mappingTemplateJson, XContentType.JSON); + cir.settings(CortisolHelper.buildIndexSettingsJson(nPrimaryShards, nReplicaShards), XContentType.JSON); + final CreateIndexResponse response = client.indices().create(cir, RequestOptions.DEFAULT); + assert response.isAcknowledged(); + } + + private void makeBulkRequest(final List bulkDocs) { + final BulkRequest bulkRequest = new BulkRequest(); + for (String bulkDoc : bulkDocs) { + final IndexRequest indexRequest = Requests.indexRequest(indexName); + indexRequest.source(bulkDoc, XContentType.JSON); + bulkRequest.add(indexRequest); + } + client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (!bulkItemResponses.hasFailures()) { + successCount.addAndGet(bulkDocs.size()); + } else { + for (BulkItemResponse response : bulkItemResponses) { + if (response.isFailed()) { + failureCount.incrementAndGet(); + } else { + successCount.incrementAndGet(); + } + } + } + } + + @Override + public void onFailure(Exception e) { + failureCount.addAndGet(bulkDocs.size()); + } + }); + requestCount.addAndGet(bulkDocs.size()); + } + + private class TokenBucket { + private final Lock lock = new ReentrantLock(); + private final Condition hasTokens = lock.newCondition(); + private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(); + + private final int maxTokens; + private int nTokens; + + public TokenBucket(final int maxTokens) { + this.maxTokens = maxTokens; + this.ses.schedule(this::refill, 1, TimeUnit.SECONDS); + } + + public int take() { + lock.lock(); + try { + while (nTokens == 0) { + hasTokens.await(); + } + return nTokens--; + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + + return 0; + } + + private void refill() { + lock.lock(); + try { + nTokens = maxTokens; + hasTokens.signal(); + } finally { + lock.unlock(); + } + } + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SearchLoadParams.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SearchLoadParams.java new file mode 100644 index 000000000..6c67aa8cb --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SearchLoadParams.java @@ -0,0 +1,7 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol; + +/** + * [TBD] Class containing the metadata for search stress test. + */ +public class SearchLoadParams { +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SimpleCortisolClient.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SimpleCortisolClient.java new file mode 100644 index 000000000..7132e8f4a --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/SimpleCortisolClient.java @@ -0,0 +1,57 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol; + +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.concurrent.*; +import java.util.stream.IntStream; + +/** + * A basic implementation of {@link CortisolClient}. + */ +public class SimpleCortisolClient implements CortisolClient { + private final String endpoint; + private final int port; + private final int nThreads; + private final ExecutorService executorService; + + public SimpleCortisolClient(final String endpoint, final int port) { + this.endpoint = endpoint; + this.port = port; + nThreads = Runtime.getRuntime().availableProcessors(); + executorService = Executors.newFixedThreadPool(nThreads); + } + + /** + * Tries to run an ingest load as specified by the docsPerSecond configured in the {@link BulkLoadParams} instance. + * The way it runs the ingest load is by creating multiple threads where each thread will create its own index on + * the cluster and ingests into it. + * @param bulkLoadParams The object specifying the parameters for the stress test. + */ + @Override + public void stressBulk(BulkLoadParams bulkLoadParams) { + setupShutDownHook(bulkLoadParams.getDurationInSeconds()); + IntStream.of(nThreads).forEach(i -> executorService.submit(new IngestTask(bulkLoadParams, endpoint, port, nThreads))); + } + + @Override + public void stressSearch(SearchLoadParams searchLoadParams) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void cleanup() { + try { + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + executorService.shutdownNow(); + } + } + + private void setupShutDownHook(final int duration) { + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + scheduledExecutorService.schedule(this::cleanup, duration, TimeUnit.SECONDS); + System.out.println("Shutdown hook enabled. Will shutdown after"); + } +} From db75be49a2a4a6c99e0f1ad8ccec552dd4dbb6ce Mon Sep 17 00:00:00 2001 From: Karthik Kumarguru Date: Mon, 30 Nov 2020 10:29:34 -0800 Subject: [PATCH 2/2] PR comment fixes --- .../performanceanalyzer/tools/cortisol/IngestTask.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java index 943ce47d9..c843c7ccb 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/tools/cortisol/IngestTask.java @@ -81,6 +81,8 @@ public void run() { break; } } + + br.close(); } catch (IOException e) { e.printStackTrace(); } @@ -130,6 +132,9 @@ public void onFailure(Exception e) { requestCount.addAndGet(bulkDocs.size()); } + /** + * Custom token bucket class that lets at most {@code nTokens} be available every second. + */ private class TokenBucket { private final Lock lock = new ReentrantLock(); private final Condition hasTokens = lock.newCondition();