diff --git a/Cargo.lock b/Cargo.lock index d6ed927..3655c11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -963,7 +963,7 @@ dependencies = [ [[package]] name = "mq-workload-generator" -version = "0.2.1" +version = "0.2.2" dependencies = [ "async-trait", "clap", diff --git a/Cargo.toml b/Cargo.toml index 0f11df8..09b87bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mq-workload-generator" -version = "0.2.1" +version = "0.2.2" edition = "2021" authors = [ "SSpirits ", diff --git a/README.md b/README.md index 41dcc07..debf6e8 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,8 @@ Options: Number of the client [env: PARALLELISM=] [default: 1] -q, --qps Send tps of the sum of all producers [env: QPS=] [default: 100] + --key-size + Random key with specified size for each record sent by producer [env: KEY_SIZE=] --min-payload-size Minimum message payload size, measured in bytes [env: MIN_PAYLOAD_SIZE=] --max-payload-size diff --git a/deployment-consumer.yaml b/deployment-consumer.yaml index 6c42ed2..ff551ec 100644 --- a/deployment-consumer.yaml +++ b/deployment-consumer.yaml @@ -24,11 +24,9 @@ spec: - name: MODE value: "consumer" - name: PARALLELISM - value: "3" - - name: QPS - value: "5000" + value: "1" - name: ACCESS_POINT - value: "localhost:9092" + value: "10.0.16.180:9092" - name: TOPIC value: "test" - name: GROUP diff --git a/deployment-producer.yaml b/deployment-producer.yaml index f3ec513..562bf17 100644 --- a/deployment-producer.yaml +++ b/deployment-producer.yaml @@ -26,12 +26,12 @@ spec: - name: PARALLELISM value: "1" - name: QPS - value: "10000" + value: "1000" - name: ACCESS_POINT value: "localhost:9092" - name: TOPIC value: "test" - - name: GROUP - value: "automq_workload_generator" + - name: KEY_SIZE + value: "128" imagePullPolicy: Always restartPolicy: Always \ No newline at end of file diff --git a/src/common.rs b/src/common.rs index a508efa..d6571e3 100644 --- a/src/common.rs +++ b/src/common.rs @@ -34,6 +34,10 @@ pub struct WorkloadOption { #[arg(short, long, env, default_value_t = 100)] pub qps: usize, + /// Random key with specified size for each record sent by producer + #[arg(long, env)] + pub key_size: Option, + /// Minimum message payload size, measured in bytes #[arg(long, env)] pub min_payload_size: Option, @@ -68,17 +72,17 @@ impl WorkloadOption { let default = self.payload_size..self.payload_size + 1; if let (Some(min_payload_size), Some(max_payload_size)) = (self.min_payload_size, self.max_payload_size) { if max_payload_size > min_payload_size { - return min_payload_size..max_payload_size + return min_payload_size..max_payload_size; } } default } - + pub(crate) fn access_point(&self) -> &str { if let Some(access_point) = &self.access_point { access_point - } else { - match self.driver.as_str() { + } else { + match self.driver.as_str() { "rocketmq" => "localhost:8081", "kafka" => "localhost:9092", _ => panic!("Unsupported driver") diff --git a/src/kafka.rs b/src/kafka.rs index a631bd2..0c028e0 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -30,13 +30,23 @@ pub(crate) async fn start_producer(logger: &Logger, option: &WorkloadOption, rat let logger_clone = logger.clone(); let topic = option.topic.to_string(); let verbose = option.verbose; + let key_size = option.key_size; let payload_size_range_clone = payload_size_range.clone(); tokio::spawn(async move { let payload = gen_payload(thread_rng(), payload_size_range_clone); - let message: FutureRecord<'_, (), [u8]> = FutureRecord::to(&topic) + + let message: FutureRecord<'_, [u8], [u8]> = FutureRecord::to(&topic) .payload(payload.to_bytes()); - let result = producer_clone.send(message, Duration::from_secs(0)).await; + + let result = if let Some(size) = key_size { + let key = gen_payload(thread_rng(), size..size + 1); + let message = message.key(key.to_bytes()); + producer_clone.send(message, Duration::from_secs(0)).await + } else { + producer_clone.send(message, Duration::from_secs(0)).await + }; + if let Err(error) = result { if verbose { error!(logger_clone, "send message failed: {:?}", error)