From 5116fe63c484dd82ec05b74ffd943d222a15b434 Mon Sep 17 00:00:00 2001 From: "Ruiz-Lucena Rafael (ETAS-E2E/XPC-Fe)" Date: Mon, 15 Jul 2024 19:41:57 +0200 Subject: [PATCH] Initial publish request values implementation for OpenProviderStream --- databroker/src/broker.rs | 2 +- .../src/grpc/kuksa_val_v1/conversions.rs | 2 + .../src/grpc/kuksa_val_v2/conversions.rs | 450 +++++------------- databroker/src/grpc/kuksa_val_v2/mod.rs | 2 +- databroker/src/grpc/kuksa_val_v2/val.rs | 345 ++++++++++++-- .../src/grpc/sdv_databroker_v1/conversions.rs | 6 + databroker/src/main.rs | 2 +- databroker/src/types.rs | 11 + databroker/src/viss/v2/conversions.rs | 1 + 9 files changed, 449 insertions(+), 372 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 4e098067..90c3fb86 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -15,7 +15,7 @@ use crate::permissions::{PermissionError, Permissions}; pub use crate::types; use crate::query; -pub use crate::types::{ChangeType, DataType, DataValue, EntryType}; +pub use crate::types::{ChangeType, DataType, DataValue, EntryType, ValueFailure}; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; diff --git a/databroker/src/grpc/kuksa_val_v1/conversions.rs b/databroker/src/grpc/kuksa_val_v1/conversions.rs index d9b972d1..a4df19d1 100644 --- a/databroker/src/grpc/kuksa_val_v1/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v1/conversions.rs @@ -143,6 +143,7 @@ impl From for Option { })), timestamp: Some(from.ts.into()), }), + broker::DataValue::ValueFailure(_) => None, } } } @@ -231,6 +232,7 @@ impl From for Option { })), timestamp: None, }), + broker::DataValue::ValueFailure(_) => None, } } } diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs index 67942aa1..498bb830 100644 --- a/databroker/src/grpc/kuksa_val_v2/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -1,5 +1,5 @@ // /******************************************************************************** -// * Copyright (c) 2022 Contributors to the Eclipse Foundation +// * Copyright (c) 2024 Contributors to the Eclipse Foundation // * // * See the NOTICE file(s) distributed with this work for additional // * information regarding copyright ownership. @@ -10,331 +10,139 @@ // * // * SPDX-License-Identifier: Apache-2.0 // ********************************************************************************/ +use databroker_proto::kuksa::val::v2 as proto; +use proto::datapoint::{ValueState::Failure, ValueState::Value}; -// use databroker_proto::kuksa::val::v1 as proto; +use crate::broker; -// use crate::broker; +use std::time::SystemTime; -// use std::convert::TryFrom; -// use std::time::SystemTime; +impl From<&proto::Datapoint> for broker::Datapoint { + fn from(datapoint: &proto::Datapoint) -> Self { + let value = broker::DataValue::from(datapoint); + let ts = SystemTime::now(); -// impl From<&broker::EntryType> for proto::EntryType { -// fn from(from: &broker::EntryType) -> Self { -// match from { -// broker::EntryType::Sensor => proto::EntryType::Sensor, -// broker::EntryType::Attribute => proto::EntryType::Attribute, -// broker::EntryType::Actuator => proto::EntryType::Actuator, -// } -// } -// } + match &datapoint.timestamp { + Some(source_timestamp) => { + let source: Option = match source_timestamp.clone().try_into() { + Ok(source) => Some(source), + Err(_) => None, + }; + broker::Datapoint { + ts, + source_ts: source, + value, + } + } + None => broker::Datapoint { + ts, + source_ts: None, + value, + }, + } + } +} -// impl From for proto::DataType { -// fn from(from: broker::DataType) -> Self { -// match from { -// broker::DataType::String => proto::DataType::String, -// broker::DataType::Bool => proto::DataType::Boolean, -// broker::DataType::Int8 => proto::DataType::Int8, -// broker::DataType::Int16 => proto::DataType::Int16, -// broker::DataType::Int32 => proto::DataType::Int32, -// broker::DataType::Int64 => proto::DataType::Int64, -// broker::DataType::Uint8 => proto::DataType::Uint8, -// broker::DataType::Uint16 => proto::DataType::Uint16, -// broker::DataType::Uint32 => proto::DataType::Uint32, -// broker::DataType::Uint64 => proto::DataType::Uint64, -// broker::DataType::Float => proto::DataType::Float, -// broker::DataType::Double => proto::DataType::Double, -// broker::DataType::StringArray => proto::DataType::StringArray, -// broker::DataType::BoolArray => proto::DataType::BooleanArray, -// broker::DataType::Int8Array => proto::DataType::Int8Array, -// broker::DataType::Int16Array => proto::DataType::Int16Array, -// broker::DataType::Int32Array => proto::DataType::Int32Array, -// broker::DataType::Int64Array => proto::DataType::Int64Array, -// broker::DataType::Uint8Array => proto::DataType::Uint8Array, -// broker::DataType::Uint16Array => proto::DataType::Uint16Array, -// broker::DataType::Uint32Array => proto::DataType::Uint32Array, -// broker::DataType::Uint64Array => proto::DataType::Uint64Array, -// broker::DataType::FloatArray => proto::DataType::FloatArray, -// broker::DataType::DoubleArray => proto::DataType::DoubleArray, -// } -// } -// } +fn from_i32(value: i32) -> proto::ValueFailure { + // Use a match statement to convert the i32 to the corresponding enum variant + match value { + 1 => proto::ValueFailure::InvalidValue, + 2 => proto::ValueFailure::NotProvided, + 3 => proto::ValueFailure::UnknownSignal, + 4 => proto::ValueFailure::AccessDenied, + 5 => proto::ValueFailure::InternalError, + _ => proto::ValueFailure::Unspecified, + } +} -// impl From for Option { -// fn from(from: broker::Datapoint) -> Self { -// match from.value { -// broker::DataValue::NotAvailable => None, -// broker::DataValue::Bool(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Bool(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::String(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::String(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Int32(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int32(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Int64(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int64(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Uint32(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint32(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Uint64(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint64(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Float(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Float(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Double(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Double(value)), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::BoolArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::BoolArray(proto::BoolArray { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::StringArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::StringArray(proto::StringArray { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Int32Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int32Array(proto::Int32Array { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Int64Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int64Array(proto::Int64Array { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Uint32Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint32Array(proto::Uint32Array { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::Uint64Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint64Array(proto::Uint64Array { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::FloatArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::FloatArray(proto::FloatArray { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// broker::DataValue::DoubleArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::DoubleArray(proto::DoubleArray { -// values, -// })), -// timestamp: Some(from.ts.into()), -// }), -// } -// } -// } +impl From<&proto::ValueFailure> for broker::ValueFailure { + fn from(value_failure: &proto::ValueFailure) -> Self { + match value_failure { + proto::ValueFailure::Unspecified => broker::ValueFailure::Unspecified, + proto::ValueFailure::InvalidValue => broker::ValueFailure::InvalidValue, + proto::ValueFailure::NotProvided => broker::ValueFailure::NotProvided, + proto::ValueFailure::UnknownSignal => broker::ValueFailure::UnknownSignal, + proto::ValueFailure::AccessDenied => broker::ValueFailure::AccessDenied, + proto::ValueFailure::InternalError => broker::ValueFailure::InternalError, + } + } +} -// impl From for Option { -// fn from(from: broker::DataValue) -> Self { -// match from { -// broker::DataValue::NotAvailable => None, -// broker::DataValue::Bool(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Bool(value)), -// timestamp: None, -// }), -// broker::DataValue::String(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::String(value)), -// timestamp: None, -// }), -// broker::DataValue::Int32(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int32(value)), -// timestamp: None, -// }), -// broker::DataValue::Int64(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int64(value)), -// timestamp: None, -// }), -// broker::DataValue::Uint32(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint32(value)), -// timestamp: None, -// }), -// broker::DataValue::Uint64(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint64(value)), -// timestamp: None, -// }), -// broker::DataValue::Float(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Float(value)), -// timestamp: None, -// }), -// broker::DataValue::Double(value) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Double(value)), -// timestamp: None, -// }), -// broker::DataValue::BoolArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::BoolArray(proto::BoolArray { -// values, -// })), -// timestamp: None, -// }), -// broker::DataValue::StringArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::StringArray(proto::StringArray { -// values, -// })), -// timestamp: None, -// }), -// broker::DataValue::Int32Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int32Array(proto::Int32Array { -// values, -// })), -// timestamp: None, -// }), -// broker::DataValue::Int64Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Int64Array(proto::Int64Array { -// values, -// })), -// timestamp: None, -// }), -// broker::DataValue::Uint32Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint32Array(proto::Uint32Array { -// values, -// })), -// timestamp: None, -// }), -// broker::DataValue::Uint64Array(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::Uint64Array(proto::Uint64Array { -// values, -// })), -// timestamp: None, -// }), -// broker::DataValue::FloatArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::FloatArray(proto::FloatArray { -// values, -// })), -// timestamp: None, -// }), -// broker::DataValue::DoubleArray(values) => Some(proto::Datapoint { -// value: Some(proto::datapoint::Value::DoubleArray(proto::DoubleArray { -// values, -// })), -// timestamp: None, -// }), -// } -// } -// } +impl From<&proto::Datapoint> for broker::DataValue { + fn from(datapoint: &proto::Datapoint) -> Self { + match &datapoint.value_state { + Some(Value(value)) => match &value.typed_value { + Some(proto::value::TypedValue::String(value)) => { + broker::DataValue::String(value.to_owned()) + } + Some(proto::value::TypedValue::Bool(value)) => broker::DataValue::Bool(*value), + Some(proto::value::TypedValue::Int32(value)) => broker::DataValue::Int32(*value), + Some(proto::value::TypedValue::Int64(value)) => broker::DataValue::Int64(*value), + Some(proto::value::TypedValue::Uint32(value)) => broker::DataValue::Uint32(*value), + Some(proto::value::TypedValue::Uint64(value)) => broker::DataValue::Uint64(*value), + Some(proto::value::TypedValue::Float(value)) => broker::DataValue::Float(*value), + Some(proto::value::TypedValue::Double(value)) => broker::DataValue::Double(*value), + Some(proto::value::TypedValue::StringArray(array)) => { + broker::DataValue::StringArray(array.values.clone()) + } + Some(proto::value::TypedValue::BoolArray(array)) => { + broker::DataValue::BoolArray(array.values.clone()) + } + Some(proto::value::TypedValue::Int32Array(array)) => { + broker::DataValue::Int32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Int64Array(array)) => { + broker::DataValue::Int64Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint32Array(array)) => { + broker::DataValue::Uint32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint64Array(array)) => { + broker::DataValue::Uint64Array(array.values.clone()) + } + Some(proto::value::TypedValue::FloatArray(array)) => { + broker::DataValue::FloatArray(array.values.clone()) + } + Some(proto::value::TypedValue::DoubleArray(array)) => { + broker::DataValue::DoubleArray(array.values.clone()) + } + None => todo!(), + }, + Some(Failure(value)) => { + broker::DataValue::ValueFailure(broker::ValueFailure::from(&from_i32(*value))) + } + None => broker::DataValue::NotAvailable, + } + } +} -// impl From> for broker::DataValue { -// fn from(from: Option) -> Self { -// match from { -// Some(value) => match value { -// proto::datapoint::Value::String(value) => broker::DataValue::String(value), -// proto::datapoint::Value::Bool(value) => broker::DataValue::Bool(value), -// proto::datapoint::Value::Int32(value) => broker::DataValue::Int32(value), -// proto::datapoint::Value::Int64(value) => broker::DataValue::Int64(value), -// proto::datapoint::Value::Uint32(value) => broker::DataValue::Uint32(value), -// proto::datapoint::Value::Uint64(value) => broker::DataValue::Uint64(value), -// proto::datapoint::Value::Float(value) => broker::DataValue::Float(value), -// proto::datapoint::Value::Double(value) => broker::DataValue::Double(value), -// proto::datapoint::Value::StringArray(array) => { -// broker::DataValue::StringArray(array.values) -// } -// proto::datapoint::Value::BoolArray(array) => { -// broker::DataValue::BoolArray(array.values) -// } -// proto::datapoint::Value::Int32Array(array) => { -// broker::DataValue::Int32Array(array.values) -// } -// proto::datapoint::Value::Int64Array(array) => { -// broker::DataValue::Int64Array(array.values) -// } -// proto::datapoint::Value::Uint32Array(array) => { -// broker::DataValue::Uint32Array(array.values) -// } -// proto::datapoint::Value::Uint64Array(array) => { -// broker::DataValue::Uint64Array(array.values) -// } -// proto::datapoint::Value::FloatArray(array) => { -// broker::DataValue::FloatArray(array.values) -// } -// proto::datapoint::Value::DoubleArray(array) => { -// broker::DataValue::DoubleArray(array.values) -// } -// }, -// None => broker::DataValue::NotAvailable, -// } -// } -// } - -// impl From<&broker::Field> for proto::Field { -// fn from(from: &broker::Field) -> Self { -// match from { -// broker::Field::Datapoint => proto::Field::Value, -// broker::Field::ActuatorTarget => proto::Field::ActuatorTarget, -// broker::Field::MetadataUnit => proto::Field::MetadataUnit, -// } -// } -// } - -// impl TryFrom<&proto::Field> for broker::Field { -// type Error = &'static str; - -// fn try_from(from: &proto::Field) -> Result { -// match from { -// proto::Field::Value => Ok(broker::Field::Datapoint), -// proto::Field::ActuatorTarget => Ok(broker::Field::ActuatorTarget), -// _ => Err("Unknown field"), -// } -// } -// } - -// impl From for broker::Datapoint { -// fn from(from: proto::Datapoint) -> Self { -// Self { -// ts: SystemTime::now(), -// source_ts: match from.timestamp { -// Some(ts) => match std::convert::TryInto::try_into(ts) { -// Ok(ts) => Some(ts), -// Err(_) => None, -// }, -// None => None, -// }, -// value: broker::DataValue::from(from.value), -// } -// } -// } - -// impl From for proto::DataEntry { -// fn from(from: broker::EntryUpdate) -> Self { -// Self { -// path: from.path.unwrap_or_default(), -// value: match from.datapoint { -// Some(datapoint) => Option::::from(datapoint), -// None => None, -// }, -// actuator_target: match from.actuator_target { -// Some(Some(actuator_target)) => Option::::from(actuator_target), -// Some(None) => None, -// None => None, -// }, -// metadata: { -// let metadata = proto::Metadata { -// unit: from.unit, -// ..Default::default() -// }; -// Some(metadata) -// }, -// } -// } -// } +impl From<&broker::UpdateError> for proto::Error { + fn from(update_error: &broker::UpdateError) -> Self { + match update_error { + broker::UpdateError::NotFound => proto::Error { + code: proto::ErrorCode::NotFound.into(), + message: "Not Found".to_string(), + }, + broker::UpdateError::WrongType => proto::Error { + code: proto::ErrorCode::InvalidArgument.into(), + message: "Wrong Type".to_string(), + }, + broker::UpdateError::OutOfBounds => proto::Error { + code: proto::ErrorCode::InvalidArgument.into(), + message: "Out of Bounds".to_string(), + }, + broker::UpdateError::UnsupportedType => proto::Error { + code: proto::ErrorCode::InvalidArgument.into(), + message: "Unsupported Type".to_string(), + }, + broker::UpdateError::PermissionDenied => proto::Error { + code: proto::ErrorCode::PermissionDenied.into(), + message: "Permission Denied".to_string(), + }, + broker::UpdateError::PermissionExpired => proto::Error { + code: proto::ErrorCode::PermissionDenied.into(), + message: "Permission Expired".to_string(), + }, + } + } +} diff --git a/databroker/src/grpc/kuksa_val_v2/mod.rs b/databroker/src/grpc/kuksa_val_v2/mod.rs index 2da40882..88302b19 100644 --- a/databroker/src/grpc/kuksa_val_v2/mod.rs +++ b/databroker/src/grpc/kuksa_val_v2/mod.rs @@ -1,5 +1,5 @@ /******************************************************************************** -* Copyright (c) 2022 Contributors to the Eclipse Foundation +* Copyright (c) 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index c9db3230..7f549cf5 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -11,37 +11,48 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use std::pin::Pin; +use std::{collections::HashMap, pin::Pin}; -use databroker_proto::kuksa::val::v2 as proto; -use tokio_stream::Stream; -use tonic::Code; +use crate::{ + broker::{self, AuthorizedAccess}, + permissions::Permissions, +}; -use crate::broker; +use databroker_proto::kuksa::val::v2::{ + self as proto, + open_provider_stream_request::Action::{ + BatchActuateStreamResponse, ProvidedActuation, PublishValuesRequest, + }, +}; +use kuksa::proto::v2::{ + open_provider_stream_response, OpenProviderStreamResponse, PublishValuesResponse, +}; + +use tokio::{select, sync::mpsc}; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::{Code, Response}; +use tracing::debug; #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { async fn get_value( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } - + async fn get_values( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } - + async fn list_values( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } @@ -53,69 +64,307 @@ impl proto::val_server::Val for broker::DataBroker { + 'static, >, >; - + async fn subscribe( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } async fn actuate( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } async fn batch_actuate( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } - + async fn list_metadata( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } async fn publish_value( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } - type OpenProviderStreamStream = Pin< - Box< - dyn Stream> - + Send - + Sync - + 'static, - >, - >; + // type OpenProviderStreamStream = Pin< + // Box< + // dyn Stream> + // + Send + // + Sync + // + 'static, + // >, + // >; + + type OpenProviderStreamStream = + ReceiverStream>; async fn open_provider_stream( &self, request: tonic::Request>, - ) -> Result, tonic::Status> - { - Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) + ) -> Result, tonic::Status> { + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + // Should databroker register internally here new opened streams???? + // The provided actuation will take ownership over the actuators but what happens + // if a provider is publishing sensor values and the stream is closed? + // How will the application know that there is no provider and should stop the subscription? + let mut stream = request.into_inner(); + + let mut shutdown_trigger = self.get_shutdown_trigger(); + + // Copy (to move into task below) + let broker = self.clone(); + + // Create stream (to be returned) + let (response_stream_sender, response_stream_receiver) = mpsc::channel(10); + + // Listening on stream + tokio::spawn(async move { + let permissions = permissions; + let broker = broker.authorized_access(&permissions); + loop { + select! { + message = stream.message() => { + match message { + Ok(request) => { + match request { + Some(req) => { + match req.action { + Some(ProvidedActuation(_provided_actuation)) => { + if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { + debug!("Failed to send error response: {}", err); + } + break; + }, + Some(PublishValuesRequest(publish_values_request)) => { + let response = publish_values(&broker, &publish_values_request).await; + if let Err(err) = response_stream_sender.send(Ok(response)).await + { + debug!("Failed to send response: {}", err); + } + }, + Some(BatchActuateStreamResponse(_batch_actuate_stream_response)) => { + if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { + debug!("Failed to send error response: {}", err); + } + break; + }, + None => { + + }, + } + }, + None => { + debug!("provider: no more messages"); + break; + } + } + }, + Err(err) => { + debug!("provider: connection broken: {:?}", err); + break; + }, + } + }, + _ = shutdown_trigger.recv() => { + debug!("provider: shutdown received"); + break; + } + } + } + }); + + // Return the error stream + Ok(Response::new(ReceiverStream::new(response_stream_receiver))) } async fn get_server_info( &self, - request: tonic::Request, - ) -> Result, tonic::Status> - { + _request: tonic::Request, + ) -> Result, tonic::Status> { Err(tonic::Status::new(Code::Unimplemented, "Unimplemented")) } -} \ No newline at end of file +} + +async fn publish_values( + broker: &AuthorizedAccess<'_, '_>, + request: &databroker_proto::kuksa::val::v2::PublishValuesRequest, +) -> OpenProviderStreamResponse { + let ids: Vec<(i32, broker::EntryUpdate)> = request + .datapoints + .iter() + .map(|(id, datapoint)| { + ( + *id, + broker::EntryUpdate { + path: None, + datapoint: Some(broker::Datapoint::from(datapoint)), + actuator_target: None, + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + ) + }) + .collect(); + + match broker.update_entries(ids).await { + Ok(_) => OpenProviderStreamResponse { + action: Some( + open_provider_stream_response::Action::PublishValuesResponse( + PublishValuesResponse { + request_id: request.request_id, + status: HashMap::new(), + }, + ), + ), + }, + Err(err) => OpenProviderStreamResponse { + action: Some( + open_provider_stream_response::Action::PublishValuesResponse( + PublishValuesResponse { + request_id: request.request_id, + status: err + .iter() + .map(|(id, error)| (*id, proto::Error::from(error))) + .collect(), + }, + ), + ), + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{broker::DataBroker, permissions}; + use databroker_proto::kuksa::val::v2::val_server::Val; + use proto::open_provider_stream_response::Action::{ + BatchActuateStreamRequest, ProvideActuatorResponse, PublishValuesResponse, + }; + use proto::{open_provider_stream_request, OpenProviderStreamRequest, PublishValuesRequest}; + + /* + Test open_provider_stream service method + */ + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_open_provider_stream() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let request_id = 1; + + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let request = OpenProviderStreamRequest { + action: Some(open_provider_stream_request::Action::PublishValuesRequest( + PublishValuesRequest { + request_id, + datapoints: { + let timestamp = Some(std::time::SystemTime::now().into()); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::String( + "example_value".to_string(), + )), + }; + + let datapoint = proto::Datapoint { + timestamp, + value_state: Some(proto::datapoint::ValueState::Value(value)), + }; + + let mut map = HashMap::new(); + map.insert(entry_id, datapoint); + map + }, + }, + )), + }; + + // Manually insert permissions + let mut streaming_request = tonic_mock::streaming_request(vec![request]); + streaming_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.open_provider_stream(streaming_request).await { + Ok(response) => { + std::thread::sleep(std::time::Duration::from_secs(3)); + tokio::spawn(async move { + std::thread::sleep(std::time::Duration::from_secs(3)); + let stream = response.into_inner(); + let mut receiver = stream.into_inner(); + while let Some(value) = receiver.recv().await { + match value { + Ok(value) => match value.action { + Some(ProvideActuatorResponse(_)) => { + panic!("Should not happen") + } + Some(PublishValuesResponse(publish_values_response)) => { + assert_eq!(publish_values_response.request_id, request_id); + assert_eq!(publish_values_response.status.len(), 1); + match publish_values_response.status.get(&entry_id) { + Some(value) => { + assert_eq!(value.code, 1); + assert_eq!(value.message, "Wrong Type"); + } + None => { + panic!("Should not happen") + } + } + } + Some(BatchActuateStreamRequest(_)) => { + panic!("Should not happen") + } + None => { + panic!("Should not happen") + } + }, + Err(_) => { + panic!("Should not happen") + } + } + } + }); + } + Err(_) => { + panic!("Should not happen") + } + } + } +} diff --git a/databroker/src/grpc/sdv_databroker_v1/conversions.rs b/databroker/src/grpc/sdv_databroker_v1/conversions.rs index e028c390..0df1578c 100644 --- a/databroker/src/grpc/sdv_databroker_v1/conversions.rs +++ b/databroker/src/grpc/sdv_databroker_v1/conversions.rs @@ -101,6 +101,9 @@ impl From<&broker::Datapoint> for proto::Datapoint { broker::DataValue::NotAvailable => proto::datapoint::Value::FailureValue( proto::datapoint::Failure::NotAvailable as i32, ), + broker::DataValue::ValueFailure(_) => proto::datapoint::Value::FailureValue( + proto::datapoint::Failure::InternalError as i32, + ), }; proto::Datapoint { @@ -166,6 +169,9 @@ impl From<&broker::QueryField> for proto::Datapoint { broker::DataValue::NotAvailable => proto::datapoint::Value::FailureValue( proto::datapoint::Failure::NotAvailable.into(), ), + broker::DataValue::ValueFailure(_) => proto::datapoint::Value::FailureValue( + proto::datapoint::Failure::InternalError.into(), + ), }; proto::Datapoint { diff --git a/databroker/src/main.rs b/databroker/src/main.rs index c576d566..15bc8e60 100644 --- a/databroker/src/main.rs +++ b/databroker/src/main.rs @@ -445,7 +445,7 @@ async fn main() -> Result<(), Box> { } } - let mut apis = vec![grpc::server::Api::KuksaValV1]; + let mut apis = vec![grpc::server::Api::KuksaValV1, grpc::server::Api::KuksaValV2]; if args.get_flag("enable-databroker-v1") { apis.push(grpc::server::Api::SdvDatabrokerV1); diff --git a/databroker/src/types.rs b/databroker/src/types.rs index 6d9241fd..16000c31 100644 --- a/databroker/src/types.rs +++ b/databroker/src/types.rs @@ -55,6 +55,16 @@ pub enum ChangeType { Continuous, } +#[derive(Debug, Clone, PartialEq)] +pub enum ValueFailure { + Unspecified, + InvalidValue, + NotProvided, + UnknownSignal, + AccessDenied, + InternalError, +} + #[derive(Debug, Clone, PartialEq)] pub enum DataValue { NotAvailable, @@ -74,6 +84,7 @@ pub enum DataValue { Uint64Array(Vec), FloatArray(Vec), DoubleArray(Vec), + ValueFailure(ValueFailure), } #[derive(Debug)] diff --git a/databroker/src/viss/v2/conversions.rs b/databroker/src/viss/v2/conversions.rs index a209cb6a..e6fcf06c 100644 --- a/databroker/src/viss/v2/conversions.rs +++ b/databroker/src/viss/v2/conversions.rs @@ -270,6 +270,7 @@ impl From for Value { broker::DataValue::DoubleArray(array) => { Value::Array(array.iter().map(|value| value.to_string()).collect()) } + broker::DataValue::ValueFailure(_) => Value::None, } } }