Skip to content

Commit

Permalink
change default driver from rocketmq to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadowySpirits committed Mar 1, 2024
1 parent 847c911 commit 20ed4b4
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mq-workload-generator"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
authors = [
"SSpirits <[email protected]>",
Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ Download the binary from [release page](https://github.com/ShadowySpirits/mq-wor
send and receive 100 messages per second to the topic `test`:

```shell
# To set a parameter, choose either the CLI or ENV mode.
export ACCESS_POINT=localhost:9092
mq-workload-generator --topic test --qps 100
# example output:
# this tool will print the current send and receive tps every second
# this tool will print the current send and receive tps every second.
Jul 25 10:38:44.203 INFO[src/main.rs:32:5] Begin generating workload and wait for the server to come online...
Jul 25 10:38:44.204 INFO[src/main.rs:82:17] current send tps: 100, receive tps: 100
Jul 25 10:38:45.206 INFO[src/main.rs:82:17] current send tps: 100, receive tps: 100
Expand All @@ -44,9 +46,9 @@ Usage: mq-workload-generator [OPTIONS] --topic <TOPIC>

Options:
-d, --driver <DRIVER>
Work load driver, available option: rocketmq, kafka [env: DRIVER=] [default: rocketmq]
Work load driver, available option: rocketmq, kafka [env: DRIVER=] [default: kafka]
-a, --access-point <ACCESS_POINT>
Access point of the mq cluster [env: ACCESS_POINT=] [default: localhost:8081]
Access point of the mq cluster, default is localhost:8081 for RocketMQ and localhost:9092 for Kafka [env: ACCESS_POINT=]
-t, --topic <TOPIC>
Target topic [env: TOPIC=]
-g, --group <GROUP>
Expand Down Expand Up @@ -87,8 +89,8 @@ kubectl apply -f deployment-producer.yaml

## TODO

- [x] Add more options: user-specific consumer group, consume time, lag message count, etc.
- [x] Support more platform: Apache Kafka, etc.
- [x] Add more options: user-specific consumer group, payload size, consume time, lag message count, etc.
- [x] Support more platform: Apache Kafka.
- [ ] Add more metrics: send/receive latency, etc.
- [ ] Add more test cases: send/receive with large message or delay/transaction message.

Expand Down
21 changes: 16 additions & 5 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use slog_async::OverflowStrategy;
#[command(author, version, about, long_about = None)]
pub struct WorkloadOption {
/// Work load driver, available option: rocketmq, kafka
#[arg(short, long, env, default_value = "rocketmq")]
#[arg(short, long, env, default_value = "kafka")]
pub driver: String,

/// Access point of the mq cluster
#[arg(short, long, env, default_value = "localhost:8081")]
pub access_point: String,
/// Access point of the mq cluster, default is localhost:8081 for RocketMQ and localhost:9092 for Kafka
#[arg(short, long, env)]
pub access_point: Option<String>,

/// Target topic
#[arg(short, long, env)]
Expand Down Expand Up @@ -73,6 +73,18 @@ impl WorkloadOption {
}
default
}

pub(crate) fn access_point(&self) -> &str {
if let Some(access_point) = &self.access_point {
access_point
} else {
match self.driver.as_str() {
"rocketmq" => "localhost:8081",
"kafka" => "localhost:9092",
_ => panic!("Unsupported driver")
}
}
}
}

pub(crate) fn terminal_logger() -> Logger {
Expand All @@ -89,7 +101,6 @@ pub(crate) fn terminal_logger() -> Logger {
Logger::root(drain, o!())
}


pub(crate) fn gen_payload(mut rng: ThreadRng, payload_size_range: Range<usize>) -> Vec<u8> {
let body_size = rng.gen_range(payload_size_range);
let mut body = vec![0u8; body_size];
Expand Down
8 changes: 5 additions & 3 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::common::{gen_payload, WorkloadOption};

pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rate_limiter: &RateLimiter) {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &option.access_point)
.set("bootstrap.servers", option.access_point())
.set("linger.ms", "5")
.set("batch.size", "1048576") // 1MB
.create()
Expand All @@ -25,13 +25,13 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat

loop {
rate_limiter.acquire(1).await;
SEND_COUNT.fetch_add(1, Ordering::Relaxed);

let producer_clone = producer.clone();
let logger_clone = logger.clone();
let topic = option.topic.to_string();
let verbose = option.verbose;
let payload_size_range_clone = payload_size_range.clone();

tokio::spawn(async move {
let payload = gen_payload(thread_rng(), payload_size_range_clone);
let message: FutureRecord<'_, (), [u8]> = FutureRecord::to(&topic)
Expand All @@ -41,6 +41,8 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat
if verbose {
error!(logger_clone, "send message failed: {:?}", error)
}
} else {
SEND_COUNT.fetch_add(1, Ordering::Relaxed);
}
});
}
Expand All @@ -49,7 +51,7 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat
pub(crate) async fn start_consumer(logger: &Logger, option: &WorkloadOption) {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &option.group)
.set("bootstrap.servers", &option.access_point)
.set("bootstrap.servers", option.access_point())
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
Expand Down
7 changes: 4 additions & 3 deletions src/rmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat
producer_option.set_topics(vec![option.topic.clone()]);

let mut client_option = ClientOption::default();
client_option.set_access_url(&option.access_point);
client_option.set_access_url(option.access_point());
client_option.set_enable_tls(false);
client_option.set_access_key(&option.access_key);
client_option.set_secret_key(&option.secret_key);
Expand All @@ -30,7 +30,6 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat

loop {
rate_limiter.acquire(1).await;
SEND_COUNT.fetch_add(1, Ordering::Relaxed);

let payload = gen_payload(thread_rng(), payload_size_range.clone());

Expand All @@ -44,6 +43,8 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat
if option.verbose {
error!(logger, "send message failed: {:?}", error)
}
} else {
SEND_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
}
Expand All @@ -55,7 +56,7 @@ pub(crate) async fn start_consumer(logger: &Logger, option: &WorkloadOption) {
consumer_option.set_topics(vec![option.topic.clone()]);

let mut client_option = ClientOption::default();
client_option.set_access_url(&option.access_point);
client_option.set_access_url(option.access_point());
client_option.set_enable_tls(false);
client_option.set_access_key(&option.access_key);
client_option.set_secret_key(&option.secret_key);
Expand Down

0 comments on commit 20ed4b4

Please sign in to comment.