Skip to content

Commit

Permalink
abstracted the event writer implementation and implemented a new Kine…
Browse files Browse the repository at this point in the history
…sisEventWriter
  • Loading branch information
twincitiesguy committed Jul 23, 2020
1 parent 3f569ff commit ee0c742
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 54 deletions.
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ apply plugin: "application"
apply plugin: "com.github.johnrengelman.shadow"

group = "com.dellemc.sdp.demo"
version = 1.1
version = 1.2

mainClassName = "com.dellemc.sdp.demo.music.SongPlayGeneratorCli"
mainClassName = "com.dellemc.sdp.demo.music.SongEventGeneratorCli"
archivesBaseName = 'pravega-music-demo'
sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand All @@ -32,6 +32,7 @@ dependencies {
compile "org.slf4j:slf4j-api:${slf4jApiVersion}"
compile "commons-cli:commons-cli:1.4"
compile "ch.qos.logback:logback-classic:1.2.3"
compile "com.amazonaws:amazon-kinesis-client:1.13.3"
testImplementation "org.junit.jupiter:junit-jupiter:5.5.2"
testCompile "io.pravega:pravega-standalone:${pravegaVersion}"
}
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/dellemc/sdp/demo/music/KinesisEventWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.dellemc.sdp.demo.music;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class KinesisEventWriter implements SongEventGenerator.EventWriter, AutoCloseable {
SongEventGenerator.Config config;
AmazonKinesis kinesisClient;

public KinesisEventWriter(SongEventGenerator.Config config) {
this.config = config;
kinesisClient = AmazonKinesisClientBuilder.standard().withCredentials(new ProfileCredentialsProvider(config.getAwsProfile())).build();
}

@Override
public void writeEvent(String routingKey, String body) {
kinesisClient.putRecord(config.getStream(), ByteBuffer.wrap(body.getBytes(StandardCharsets.UTF_8)), routingKey);
}

@Override
public synchronized void close() {
try {
if (kinesisClient != null) kinesisClient.shutdown();
} catch (Throwable t) {
t.printStackTrace();
}
kinesisClient = null;
}
}
80 changes: 80 additions & 0 deletions src/main/java/com/dellemc/sdp/demo/music/PravegaEventWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.dellemc.sdp.demo.music;

import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.pravega.keycloak.client.PravegaKeycloakCredentials;

import java.net.URI;

public class PravegaEventWriter implements SongEventGenerator.EventWriter, AutoCloseable {
SongEventGenerator.Config config;
EventStreamClientFactory clientFactory;
EventStreamWriter<String> writer;

public PravegaEventWriter(SongEventGenerator.Config config) {
this.config = config;

// create stream
ClientConfig clientConfig = createClientConfig();
createStream(clientConfig);

// create writer
clientFactory = EventStreamClientFactory.withScope(config.getScope(), clientConfig);
writer = clientFactory.createEventWriter(
config.getStream(), new UTF8StringSerializer(), EventWriterConfig.builder().build());
}

@Override
public void writeEvent(String routingKey, String body) {
writer.writeEvent(routingKey, body);
}

@Override
public synchronized void close() {
try {
if (writer != null) writer.close();
} catch (Throwable t) {
t.printStackTrace();
}
writer = null;
try {
if (clientFactory != null) clientFactory.close();
} catch (Throwable t) {
t.printStackTrace();
}
clientFactory = null;
}

ClientConfig createClientConfig() {
ClientConfig.ClientConfigBuilder builder = ClientConfig.builder();
builder.controllerURI(URI.create(config.getControllerEndpoint()));

// Keycloak means we are using Streaming Data Platform
if (config.isUseKeycloak()) {
builder.credentials(new PravegaKeycloakCredentials());
}

return builder.build();
}

void createStream(ClientConfig clientConfig) {
try (StreamManager streamManager = StreamManager.create(clientConfig)) {

// create the scope
if (!config.isUseKeycloak()) // can't create a scope in SDP
streamManager.createScope(config.getScope());

// create the stream
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.byEventRate(5, 2, 2))
.build();
streamManager.createStream(config.getScope(), config.getStream(), streamConfiguration);
}
}
}
93 changes: 42 additions & 51 deletions src/main/java/com/dellemc/sdp/demo/music/SongEventGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,9 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.pravega.keycloak.client.PravegaKeycloakCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -59,14 +49,8 @@ public void run() {
players.add(new SongPlayer(i + 1));
}

// create stream
ClientConfig clientConfig = createClientConfig();
createStream(clientConfig);

// create client factory and event writer
try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(config.getScope(), clientConfig);
EventStreamWriter<String> writer = clientFactory.createEventWriter(
config.getStream(), new UTF8StringSerializer(), EventWriterConfig.builder().build())) {
// create event writer
try (EventWriter eventWriter = createEventWriter(config)) {

// loop until stopped
while (running.get()) {
Expand All @@ -86,7 +70,7 @@ public void run() {
events.sort((o1, o2) -> (int) (o1.timestamp - o2.timestamp));

// submit events to be emitted on time
taskService.submit(new TimedEventWriterTask(writer, events));
taskService.submit(new TimedEventWriterTask(eventWriter, events));

// sleep for a while
Thread.sleep(futureEventThresholdMS / 5);
Expand All @@ -98,52 +82,37 @@ public void run() {
}
}

public void stop() {
running.set(false);
}

ClientConfig createClientConfig() {
ClientConfig.ClientConfigBuilder builder = ClientConfig.builder();
builder.controllerURI(URI.create(config.getControllerEndpoint()));

// Keycloak means we are using Streaming Data Platform
if (config.isUseKeycloak()) {
builder.credentials(new PravegaKeycloakCredentials());
EventWriter createEventWriter(Config config) {
if (config.isUseKinesis()) {
return new KinesisEventWriter(config);
} else {
return new PravegaEventWriter(config);
}

return builder.build();
}

void createStream(ClientConfig clientConfig) {
try (StreamManager streamManager = StreamManager.create(clientConfig)) {

// create the scope
if (!config.isUseKeycloak()) // can't create a scope in SDP
streamManager.createScope(config.getScope());

// create the stream
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.byEventRate(5, 2, 2))
.build();
streamManager.createStream(config.getScope(), config.getStream(), streamConfiguration);
}
public void stop() {
running.set(false);
}

static class Config {
String controllerEndpoint;
String scope;
String stream;
boolean useKeycloak;
boolean useKinesis;
String awsProfile;
int playerCount = DEFAULT_PLAYER_COUNT;

public Config() {
}

public Config(String controllerEndpoint, String scope, String stream, boolean useKeycloak, int playerCount) {
public Config(String controllerEndpoint, String scope, String stream, boolean useKeycloak, boolean useKinesis, String awsProfile, int playerCount) {
setControllerEndpoint(controllerEndpoint);
setScope(scope);
setStream(stream);
setUseKeycloak(useKeycloak);
setUseKinesis(useKinesis);
setAwsProfile(awsProfile);
setPlayerCount(playerCount);
}

Expand All @@ -152,8 +121,6 @@ public String getControllerEndpoint() {
}

public void setControllerEndpoint(String controllerEndpoint) {
if (controllerEndpoint == null || controllerEndpoint.trim().length() == 0)
throw new IllegalArgumentException("controller endpoint is required");
this.controllerEndpoint = controllerEndpoint;
}

Expand All @@ -162,7 +129,6 @@ public String getScope() {
}

public void setScope(String scope) {
if (scope == null || scope.trim().length() == 0) throw new IllegalArgumentException("scope is required");
this.scope = scope;
}

Expand All @@ -183,6 +149,22 @@ public void setUseKeycloak(boolean useKeycloak) {
this.useKeycloak = useKeycloak;
}

public boolean isUseKinesis() {
return useKinesis;
}

public void setUseKinesis(boolean useKinesis) {
this.useKinesis = useKinesis;
}

public String getAwsProfile() {
return awsProfile;
}

public void setAwsProfile(String awsProfile) {
this.awsProfile = awsProfile;
}

public int getPlayerCount() {
return playerCount;
}
Expand All @@ -199,19 +181,21 @@ public String toString() {
", scope='" + scope + '\'' +
", stream='" + stream + '\'' +
", useKeycloak=" + useKeycloak +
", useKinesis=" + useKinesis +
", awsProfile=" + awsProfile +
", playerCount=" + playerCount +
'}';
}
}

class TimedEventWriterTask implements Runnable {
private EventStreamWriter<String> writer;
private EventWriter writer;
private List<SongEvent> events;

/**
* @param events a *sorted* list of events to write (sorted by future emission time)
*/
public TimedEventWriterTask(EventStreamWriter<String> writer, List<SongEvent> events) {
public TimedEventWriterTask(EventWriter writer, List<SongEvent> events) {
this.writer = writer;
this.events = events;
}
Expand Down Expand Up @@ -244,4 +228,11 @@ public void run() {
}
}
}

public interface EventWriter extends AutoCloseable {
void writeEvent(String routingKey, String body);

@Override
void close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ static Options options() {
options.addOption(Option.builder("v").longOpt("verbose").desc("Verbose logging").build());
options.addOption(Option.builder("d").longOpt("debug").desc("Debug logging").build());

options.addOption(Option.builder("z").longOpt("kinesis").desc("Write to Kinesis instead of Pravega (for testing)").build());
options.addOption(Option.builder().longOpt("aws-profile").desc("When writing to Kinesis, the AWS CLI profile to use (configuration must be set for this profile)")
.hasArg().argName("aws-profile").build());

options.addOption(Option.builder("h").longOpt("help").desc("Print this help text").build());
return options;
}
Expand All @@ -41,6 +45,8 @@ static SongEventGenerator.Config parseConfig(CommandLine commandLine) {
config.setScope(commandLine.getOptionValue('x'));
config.setStream(commandLine.getOptionValue('s'));
config.setUseKeycloak(commandLine.hasOption('k'));
config.setUseKinesis(commandLine.hasOption('z'));
config.setAwsProfile(commandLine.getOptionValue("aws-profile"));

return config;
}
Expand All @@ -50,7 +56,7 @@ public static void main(String[] args) throws Exception {

// help text
if (commandLine.hasOption('h')) {
System.out.println("\n" + SongEventGenerator.class.getSimpleName() + " - generates random song plays and writes them to a Pravega stream\n");
System.out.println("\n" + SongEventGenerator.class.getSimpleName() + " - generates random song plays and writes them to a stream\n");
HelpFormatter hf = new HelpFormatter();
hf.printHelp(SongEventGenerator.class.getSimpleName(), options(), true);
System.out.println();
Expand Down

0 comments on commit ee0c742

Please sign in to comment.