Skip to content

Commit

Permalink
feat: add parameter key-size
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadowySpirits committed Mar 19, 2024
1 parent 20ed4b4 commit c6387e7
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mq-workload-generator"
version = "0.2.1"
version = "0.2.2"
edition = "2021"
authors = [
"SSpirits <[email protected]>",
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Options:
Number of the client [env: PARALLELISM=] [default: 1]
-q, --qps <QPS>
Send tps of the sum of all producers [env: QPS=] [default: 100]
--key-size <KEY_SIZE>
Random key with specified size for each record sent by producer [env: KEY_SIZE=]
--min-payload-size <MIN_PAYLOAD_SIZE>
Minimum message payload size, measured in bytes [env: MIN_PAYLOAD_SIZE=]
--max-payload-size <MAX_PAYLOAD_SIZE>
Expand Down
6 changes: 2 additions & 4 deletions deployment-consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ spec:
- name: MODE
value: "consumer"
- name: PARALLELISM
value: "3"
- name: QPS
value: "5000"
value: "1"
- name: ACCESS_POINT
value: "localhost:9092"
value: "10.0.16.180:9092"
- name: TOPIC
value: "test"
- name: GROUP
Expand Down
6 changes: 3 additions & 3 deletions deployment-producer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ spec:
- name: PARALLELISM
value: "1"
- name: QPS
value: "10000"
value: "1000"
- name: ACCESS_POINT
value: "localhost:9092"
- name: TOPIC
value: "test"
- name: GROUP
value: "automq_workload_generator"
- name: KEY_SIZE
value: "128"
imagePullPolicy: Always
restartPolicy: Always
12 changes: 8 additions & 4 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub struct WorkloadOption {
#[arg(short, long, env, default_value_t = 100)]
pub qps: usize,

/// Random key with specified size for each record sent by producer
#[arg(long, env)]
pub key_size: Option<usize>,

/// Minimum message payload size, measured in bytes
#[arg(long, env)]
pub min_payload_size: Option<usize>,
Expand Down Expand Up @@ -68,17 +72,17 @@ impl WorkloadOption {
let default = self.payload_size..self.payload_size + 1;
if let (Some(min_payload_size), Some(max_payload_size)) = (self.min_payload_size, self.max_payload_size) {
if max_payload_size > min_payload_size {
return min_payload_size..max_payload_size
return min_payload_size..max_payload_size;
}
}
default
}

pub(crate) fn access_point(&self) -> &str {
if let Some(access_point) = &self.access_point {
access_point
} else {
match self.driver.as_str() {
} else {
match self.driver.as_str() {
"rocketmq" => "localhost:8081",
"kafka" => "localhost:9092",
_ => panic!("Unsupported driver")
Expand Down
14 changes: 12 additions & 2 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat
let logger_clone = logger.clone();
let topic = option.topic.to_string();
let verbose = option.verbose;
let key_size = option.key_size;
let payload_size_range_clone = payload_size_range.clone();

tokio::spawn(async move {
let payload = gen_payload(thread_rng(), payload_size_range_clone);
let message: FutureRecord<'_, (), [u8]> = FutureRecord::to(&topic)

let message: FutureRecord<'_, [u8], [u8]> = FutureRecord::to(&topic)
.payload(payload.to_bytes());
let result = producer_clone.send(message, Duration::from_secs(0)).await;

let result = if let Some(size) = key_size {
let key = gen_payload(thread_rng(), size..size + 1);
let message = message.key(key.to_bytes());
producer_clone.send(message, Duration::from_secs(0)).await
} else {
producer_clone.send(message, Duration::from_secs(0)).await
};

if let Err(error) = result {
if verbose {
error!(logger_clone, "send message failed: {:?}", error)
Expand Down

0 comments on commit c6387e7

Please sign in to comment.