Skip to content

Commit

Permalink
Continuous subscription at a specific frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Mar 6, 2024
1 parent 26ba1c4 commit ecb1549
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 11 deletions.
22 changes: 18 additions & 4 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ impl Subscriptions {
self.change_subscriptions.push(subscription)
}

pub fn add_continuous_subscription(&mut self, subscription: ContinuousSubscription) {
pub fn add_continuous_subscription(
&mut self,
subscription: ContinuousSubscription,
frequency: u64,
) {
let local_subscription = subscription.clone();
self.continuous_subscriptions
.lock()
Expand All @@ -627,7 +631,7 @@ impl Subscriptions {
while !local_subscription.sender.is_closed() {
let _ = local_subscription.notify(None).await;
// Simulate some asynchronous work
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await;
}
});
}
Expand Down Expand Up @@ -1593,6 +1597,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
frequency: Option<u64>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
Expand Down Expand Up @@ -1636,6 +1641,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

let (sender, receiver) = mpsc::channel(10);
if !entries_on_changed.is_empty() {
if frequency.is_some() && entries_continuous.is_empty() {
return Err(SubscriptionError::InvalidInput);
}
let subscription = ChangeSubscription {
entries: entries_on_changed,
sender: sender.clone(),
Expand All @@ -1658,6 +1666,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
}

if !entries_continuous.is_empty() {
if frequency.is_none() {
return Err(SubscriptionError::InvalidInput);
}
let subscription_continuous = ContinuousSubscription {
entries: entries_continuous,
sender,
Expand All @@ -1677,7 +1688,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.subscriptions
.write()
.await
.add_continuous_subscription(subscription_continuous);
.add_continuous_subscription(subscription_continuous, frequency.unwrap());
}

let stream = ReceiverStream::new(receiver);
Expand Down Expand Up @@ -3161,7 +3172,10 @@ mod tests {
.expect("Register datapoint should succeed");

let mut stream = broker
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
None,
)
.await
.expect("subscription should succeed");

Expand Down
2 changes: 1 addition & 1 deletion kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

match broker.subscribe(entries).await {
match broker.subscribe(entries, request.frequency_hertz).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);
Ok(tonic::Response::new(Box::pin(stream)))
Expand Down
15 changes: 12 additions & 3 deletions kuksa_databroker/lib/kuksa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand Down Expand Up @@ -321,7 +324,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand All @@ -346,7 +352,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand Down
7 changes: 4 additions & 3 deletions proto/kuksa/val/v1/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ message SetResponse {

// Define what to subscribe to
message SubscribeEntry {
string path = 1;
View view = 2;
repeated Field fields = 3;
string path = 1;
View view = 2;
repeated Field fields = 3;
}

// Subscribe to changes in datapoints.
message SubscribeRequest {
repeated SubscribeEntry entries = 1;
optional uint64 frequency_hertz = 2;
}

// A subscription response
Expand Down

0 comments on commit ecb1549

Please sign in to comment.