Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add v1 cortisol code #514

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ dependencies {
implementation 'io.grpc:grpc-protobuf:1.28.0'
implementation 'io.grpc:grpc-stub:1.28.0'
implementation 'javax.annotation:javax.annotation-api:1.3.2'
implementation 'org.elasticsearch.client:elasticsearch-rest-high-level-client:7.4.0'

// JDK9+ has to run powermock 2+. https://github.com/powermock/powermock/issues/888
testCompile group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

/**
* Class containing the metadata for the bulk stress test.
*/
public class BulkLoadParams {
private int durationInSeconds;
private int docsPerSecond;
private int docsPerRequest = 1;
private String mappingTemplateJson = "";
private String trackLocation = "";

public int getDurationInSeconds() {
return durationInSeconds;
}

public void setDurationInSeconds(int durationInSeconds) {
this.durationInSeconds = durationInSeconds;
}

public int getDocsPerSecond() {
return docsPerSecond;
}

public void setDocsPerSecond(int docsPerSecond) {
this.docsPerSecond = docsPerSecond;
}

public int getDocsPerRequest() {
return docsPerRequest;
}

public void setDocsPerRequest(int docsPerRequest) {
this.docsPerRequest = docsPerRequest;
}

public String getMappingTemplateJson() {
return mappingTemplateJson;
}

public void setMappingTemplateJson(String mappingTemplateJson) {
this.mappingTemplateJson = mappingTemplateJson;
}

public String getTrackLocation() {
return trackLocation;
}

public void setTrackLocation(String trackLocation) {
this.trackLocation = trackLocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

/**
* The client interface for Cortisol.
*/
public interface CortisolClient {
/**
* Runs an ingest workload as specified by the bulkLoadParams object.
* @param bulkLoadParams The object specifying the parameters for the stress test.
*/
void stressBulk(final BulkLoadParams bulkLoadParams);

/**
* Runs a search workload as specified by the searchLoadParams object.
* @param searchLoadParams The object specifying the parameters for the stress test.
*/
void stressSearch(final SearchLoadParams searchLoadParams);

/**
* Cleans up the cluster by deleting the indices created for stress testing.
*/
void cleanup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

import org.jooq.tools.json.JSONObject;

public class CortisolHelper {
public static String buildIndexSettingsJson(final int nPrimaries, final int nReplicas) {
JSONObject container = new JSONObject();
JSONObject settings = new JSONObject();
JSONObject index = new JSONObject();
index.put("number_of_shards", nPrimaries);
index.put("number_of_replicas", nReplicas);
settings.put("index", index);
container.put("settings", settings);
return container.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class IngestTask implements Runnable {
private static final String CORTISOL_PREFIX = "cort-";
private final String endpoint;
private final int port;
private final BulkLoadParams params;
private final long endTime;
private final RestHighLevelClient client;
private final Random random;
private final String indexName;
private final TokenBucket tokenBucket;
private final AtomicInteger requestCount;
private final AtomicInteger successCount;
private final AtomicInteger failureCount;

public IngestTask(final BulkLoadParams bulkLoadParams, final String endpoint, final int port, final int nIngestThreads) {
this.params = bulkLoadParams;
this.endpoint = endpoint;
this.port = port;
long startTime = System.currentTimeMillis();
this.endTime = startTime + bulkLoadParams.getDurationInSeconds() * 1000;
this.client = buildEsClient();
this.random = new Random();
this.requestCount = new AtomicInteger();
this.successCount = new AtomicInteger();
this.failureCount = new AtomicInteger();
this.indexName = CORTISOL_PREFIX + random.nextInt(100);
this.tokenBucket = new TokenBucket(params.getDocsPerSecond() / (params.getDocsPerRequest() * nIngestThreads));
}

@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(params.getTrackLocation()));
createIndex(params.getMappingTemplateJson(), 3, 1);

while (System.currentTimeMillis() < endTime) {
// take blocks till tokens are available in the token bucket.
// 0 indicates an interrupted exception, so break out and exit.
if (tokenBucket.take() != 0) {
List<String> docs = new ArrayList<>();
for (int i = 0; i < params.getDocsPerRequest(); ++i) {
String doc = br.readLine();
if (doc == null) {
// restart the indexing from the top.
br = new BufferedReader(new FileReader(params.getTrackLocation()));
doc = br.readLine();
}

docs.add(doc);
}
makeBulkRequest(docs);
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private RestHighLevelClient buildEsClient() {
final RestClientBuilder builder = RestClient.builder(new HttpHost(endpoint, port, "http"));
return new RestHighLevelClient(builder);
}

private void createIndex(final String mappingTemplateJson, int nPrimaryShards, int nReplicaShards) throws IOException {
final CreateIndexRequest cir = new CreateIndexRequest(indexName);
cir.mapping(mappingTemplateJson, XContentType.JSON);
cir.settings(CortisolHelper.buildIndexSettingsJson(nPrimaryShards, nReplicaShards), XContentType.JSON);
final CreateIndexResponse response = client.indices().create(cir, RequestOptions.DEFAULT);
assert response.isAcknowledged();
}

private void makeBulkRequest(final List<String> bulkDocs) {
final BulkRequest bulkRequest = new BulkRequest();
for (String bulkDoc : bulkDocs) {
final IndexRequest indexRequest = Requests.indexRequest(indexName);
indexRequest.source(bulkDoc, XContentType.JSON);
bulkRequest.add(indexRequest);
}
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (!bulkItemResponses.hasFailures()) {
successCount.addAndGet(bulkDocs.size());
} else {
for (BulkItemResponse response : bulkItemResponses) {
if (response.isFailed()) {
failureCount.incrementAndGet();
} else {
successCount.incrementAndGet();
}
}
}
}

@Override
public void onFailure(Exception e) {
failureCount.addAndGet(bulkDocs.size());
}
});
requestCount.addAndGet(bulkDocs.size());
}

private class TokenBucket {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a comment to this class since it functions slightly different than a standard token bucket. I believe this lets you make at most nTokens concurrent take() calls every second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private final Lock lock = new ReentrantLock();
private final Condition hasTokens = lock.newCondition();
private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can avoid having a scheduler here. Have calls to take() block, and make sure every take() refills what it took from the bucket once it's complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take() already blocks with await() but I see what you mean here. The idea is to make sure that we make a fixed number of calls every second. If we made the call to makeBulkRequest() block and replenish the tokens once the requests are done, then we won't be able to guarantee a constant request rate. We don't want to replenish too soon(if bulk calls finish faster) or too late(if the bulk calls take longer to finish).


private final int maxTokens;
private int nTokens;

public TokenBucket(final int maxTokens) {
this.maxTokens = maxTokens;
this.ses.schedule(this::refill, 1, TimeUnit.SECONDS);
}

public int take() {
lock.lock();
try {
while (nTokens == 0) {
hasTokens.await();
}
Comment on lines +154 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be in a while loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to guard against spurious wake ups.

return nTokens--;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}

return 0;
}

private void refill() {
lock.lock();
try {
nTokens = maxTokens;
hasTokens.signal();
} finally {
lock.unlock();
}
Comment on lines +167 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the take() method just replenish the bucket by 1 once it's done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work done after taking a token is dependent on the time each bulk() takes. We won't be able to guarantee a constant request rate. Some more explanation is in the next comment.

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

/**
* [TBD] Class containing the metadata for search stress test.
*/
public class SearchLoadParams {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.tools.cortisol;

import org.elasticsearch.client.RestHighLevelClient;

import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
* A basic implementation of {@link CortisolClient}.
*/
public class SimpleCortisolClient implements CortisolClient {
private final String endpoint;
private final int port;
private final int nThreads;
private final ExecutorService executorService;

public SimpleCortisolClient(final String endpoint, final int port) {
this.endpoint = endpoint;
this.port = port;
nThreads = Runtime.getRuntime().availableProcessors();
executorService = Executors.newFixedThreadPool(nThreads);
}

/**
* Tries to run an ingest load as specified by the docsPerSecond configured in the {@link BulkLoadParams} instance.
* The way it runs the ingest load is by creating multiple threads where each thread will create its own index on
* the cluster and ingests into it.
* @param bulkLoadParams The object specifying the parameters for the stress test.
*/
@Override
public void stressBulk(BulkLoadParams bulkLoadParams) {
setupShutDownHook(bulkLoadParams.getDurationInSeconds());
IntStream.of(nThreads).forEach(i -> executorService.submit(new IngestTask(bulkLoadParams, endpoint, port, nThreads)));
}

@Override
public void stressSearch(SearchLoadParams searchLoadParams) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public void cleanup() {
try {
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
executorService.shutdownNow();
}
}

private void setupShutDownHook(final int duration) {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.schedule(this::cleanup, duration, TimeUnit.SECONDS);
System.out.println("Shutdown hook enabled. Will shutdown after");
}
}