Skip to content

Commit

Permalink
Add optional start and stop parameters to schema query API
Browse files Browse the repository at this point in the history
  • Loading branch information
aprimadi committed Jan 30, 2024
1 parent 1da2730 commit 091127f
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 58 deletions.
11 changes: 7 additions & 4 deletions examples/measurement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,23 @@ async fn main() -> Result<(), Box<dyn Error>> {

let client = influxdb2::Client::new(influx_url, org, token);

let measurements = client.list_measurements(bucket, Some(365)).await.unwrap();
let measurements = client
.list_measurements(bucket, Some("-365d"), Some("-1d"))
.await
.unwrap();
println!("measurements: {:?}", measurements);

for m in measurements.iter() {
let field_keys = client
.list_measurement_field_keys(bucket, &m, Some(365))
.list_measurement_field_keys(bucket, &m, Some("-365d"), Some("now()"))
.await
.unwrap();
println!("field keys: {:?}", field_keys);
}

for m in measurements.iter() {
let tag_values = client
.list_measurement_tag_values(bucket, &m, "host", Some(365))
.list_measurement_tag_values(bucket, &m, "host", Some("-365d"), None)
.await;
println!(
"tag values for measurement {} and tag {}: {:?}",
Expand All @@ -32,7 +35,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

for m in measurements.iter() {
let tag_values = client
.list_measurement_tag_keys(bucket, &m, Some(365))
.list_measurement_tag_keys(bucket, &m, Some("-365d"), None)
.await;
println!(
"tag values for measurement {} and tag {}: {:?}",
Expand Down
139 changes: 85 additions & 54 deletions src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,106 +251,137 @@ impl Client {
}

/// Returns bucket measurements
///
/// # Arguments
///
/// * `bucket` - The bucket name
/// * `start` - Optional start time. Default is `-30d`
/// * `stop` - Optional stop time. Default is `now()`
pub async fn list_measurements(
&self,
bucket: &str,
days_ago: Option<i64>,
start: Option<&str>,
stop: Option<&str>,
) -> Result<Vec<String>, RequestError> {
let mut params = vec![];
params.push(format!(r#"bucket: "{bucket}""#));
if let Some(start) = start {
params.push(format!("start: {start}"));
}
if let Some(stop) = stop {
params.push(format!("stop: {stop}"));
}
let params = params.join(", ");

let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"
schema.measurements(bucket: "{bucket}"{}) "#,
match days_ago {
Some(days_ago) => {
format!(", start: -{}d", days_ago)
}
None => {
String::from("")
}
}
schema.measurements({params})"#
));
self.exec_schema_query(query).await
}

/// List field keys for measurement
///
/// # Arguments
///
/// * `bucket` - The bucket name
/// * `measurement` - The measurement name
/// * `start` - Optional start time. Default is `-30d`
/// * `stop` - Optional stop time. Default is `now()`
pub async fn list_measurement_field_keys(
&self,
bucket: &str,
measurement: &str,
days_ago: Option<i64>,
start: Option<&str>,
stop: Option<&str>,
) -> Result<Vec<String>, RequestError> {
let mut params = vec![];
params.push(format!(r#"bucket: "{bucket}""#));
params.push(format!(r#"measurement: "{measurement}""#));
if let Some(start) = start {
params.push(format!("start: {start}"));
}
if let Some(stop) = stop {
params.push(format!("stop: {stop}"));
}
let params = params.join(", ");

let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"
schema.measurementFieldKeys(
bucket: "{bucket}",
measurement: "{measurement}",
{}
)"#,
match days_ago {
Some(days_ago) => {
format!("start: -{}d", days_ago)
}
None => {
String::from("")
}
}
schema.measurementFieldKeys({params})"#,
));
self.exec_schema_query(query).await
}

/// List all tag values for measurement tag
///
/// # Arguments
///
/// * `bucket` - The bucket name
/// * `measurement` - The measurement name
/// * `tag` - The tag name
/// * `start` - Optional start time. Default is `-30d`
/// * `stop` - Optional stop time. Default is `now()`
pub async fn list_measurement_tag_values(
&self,
bucket: &str,
measurement: &str,
tag: &str,
days_ago: Option<i64>,
start: Option<&str>,
stop: Option<&str>,
) -> Result<Vec<String>, RequestError> {
let mut params = vec![];
params.push(format!(r#"bucket: "{bucket}""#));
params.push(format!(r#"measurement: "{measurement}""#));
params.push(format!(r#"tag: "{tag}""#));
if let Some(start) = start {
params.push(format!("start: {start}"));
}
if let Some(stop) = stop {
params.push(format!("stop: {stop}"));
}
let params = params.join(", ");

let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"
schema.measurementTagValues(
bucket: "{bucket}",
measurement: "{measurement}",
tag: "{tag}",
{}
)"#,
match days_ago {
Some(days_ago) => {
format!("start: -{}d", days_ago)
}
None => {
String::from("")
}
}
schema.measurementTagValues({params})"#,
));
self.exec_schema_query(query).await
}

/// List all tag keys for measurement
///
/// # Arguments
///
/// * `bucket` - The bucket name
/// * `measurement` - The measurement name
/// * `start` - Optional start time. Default is `-30d`
/// * `stop` - Optional stop time. Default is `now()`
pub async fn list_measurement_tag_keys(
&self,
bucket: &str,
measurement: &str,
days_ago: Option<i64>,
start: Option<&str>,
stop: Option<&str>,
) -> Result<Vec<String>, RequestError> {
let mut params = vec![];
params.push(format!(r#"bucket: "{bucket}""#));
params.push(format!(r#"measurement: "{measurement}""#));
if let Some(start) = start {
params.push(format!("start: {start}"));
}
if let Some(stop) = stop {
params.push(format!("stop: {stop}"));
}
let params = params.join(", ");

let query = Query::new(format!(
r#"import "influxdata/influxdb/schema"
schema.measurementTagKeys(
bucket: "{bucket}",
measurement: "{measurement}",
{}
)"#,
match days_ago {
Some(days_ago) => {
format!("start: -{}d", days_ago)
}
None => {
String::from("")
}
}
schema.measurementTagKeys({params})"#,
));
self.exec_schema_query(query).await
}
Expand Down

0 comments on commit 091127f

Please sign in to comment.