Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update: Requests pattern #12

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions src/device/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
net::{Ipv4Addr, SocketAddrV4},
ops::Deref,
};
use tokio::sync::{mpsc, oneshot};
use tokio_serial::available_ports;
Expand Down Expand Up @@ -145,19 +146,32 @@ pub struct DeviceAnswer {
}

#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)]
#[serde(tag = "command", content = "payload")]
pub enum Request {
AutoCreate,
Create(CreateStruct),
Delete(Uuid),
Delete(UuidWrapper),
List,
Info(Uuid),
Info(UuidWrapper),
Search,
Ping(DeviceRequestStruct),
GetDeviceHandler(Uuid),
EnableContinuousMode(Uuid),
DisableContinuousMode(Uuid),
GetDeviceHandler(UuidWrapper),
EnableContinuousMode(UuidWrapper),
DisableContinuousMode(UuidWrapper),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UuidWrapper {
pub uuid: Uuid,
}

impl Deref for UuidWrapper {
type Target = Uuid;

fn deref(&self) -> &Self::Target {
&self.uuid
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateStruct {
pub source: SourceSelection,
Expand All @@ -166,8 +180,8 @@ pub struct CreateStruct {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceRequestStruct {
pub target: Uuid,
pub request: crate::device::devices::PingRequest,
pub uuid: Uuid,
pub device_request: crate::device::devices::PingRequest,
}

impl DeviceManager {
Expand All @@ -187,7 +201,7 @@ impl DeviceManager {
}
}
Request::Delete(uuid) => {
let result = self.delete(uuid).await;
let result = self.delete(*uuid).await;
if let Err(e) = actor_request.respond_to.send(result) {
error!("DeviceManager: Failed to return Delete response: {e:?}");
}
Expand All @@ -199,25 +213,25 @@ impl DeviceManager {
}
}
Request::Info(device_id) => {
let result = self.info(device_id).await;
let result = self.info(*device_id).await;
if let Err(e) = actor_request.respond_to.send(result) {
error!("DeviceManager: Failed to return Info response: {:?}", e);
}
}
Request::EnableContinuousMode(uuid) => {
let result = self.continuous_mode(uuid).await;
let result = self.continuous_mode(*uuid).await;
if let Err(e) = actor_request.respond_to.send(result) {
error!("DeviceManager: Failed to return EnableContinuousMode response: {e:?}");
}
}
Request::DisableContinuousMode(uuid) => {
let result = self.continuous_mode_off(uuid).await;
let result = self.continuous_mode_off(*uuid).await;
if let Err(e) = actor_request.respond_to.send(result) {
error!("DeviceManager: Failed to return DisableContinuousMode response: {e:?}");
}
}
Request::GetDeviceHandler(id) => {
let answer = self.get_device_handler(id).await;
let answer = self.get_device_handler(*id).await;
if let Err(e) = actor_request.respond_to.send(answer) {
error!("DeviceManager: Failed to return GetDeviceHandler response: {e:?}");
}
Expand Down Expand Up @@ -515,8 +529,11 @@ impl ManagerActorHandler {
// Devices requests are forwarded directly to device and let manager handle other incoming request.
Request::Ping(request) => {
trace!("Handling Ping request: {request:?}: Forwarding request to device handler");
let get_handler_target = request.target;
let handler_request = Request::GetDeviceHandler(get_handler_target);
let get_handler_target = request.uuid;
let handler_request =
Request::GetDeviceHandler(crate::device::manager::UuidWrapper {
uuid: get_handler_target,
});
let manager_request = ManagerActorRequest {
request: handler_request,
respond_to: result_sender,
Expand All @@ -541,13 +558,13 @@ impl ManagerActorHandler {
trace!(
"Handling Ping request: {request:?}: Successfully received the handler"
);
let result = handler.send(request.request.clone()).await;
let result = handler.send(request.device_request.clone()).await;
match result {
Ok(result) => {
info!("Handling Ping request: {request:?}: Success");
Ok(Answer::DeviceMessage(DeviceAnswer {
answer: result,
device_id: request.target,
device_id: request.uuid,
}))
}
Err(err) => {
Expand Down
7 changes: 7 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use serde::{Deserialize, Serialize};
use tracing::info;

#[macro_use]
Expand All @@ -9,6 +10,12 @@ mod device;
mod logger;
mod server;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "module")]
pub enum ModuleType {
DeviceManager(device::manager::Request),
}

#[tokio::main]
async fn main() {
// CLI should be started before logger to allow control over verbosity
Expand Down
54 changes: 30 additions & 24 deletions src/server/protocols/v1/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex};
use tracing::info;
use uuid::Uuid;

use crate::device::manager::{ManagerActorHandler, Request};
use crate::device::manager::ManagerActorHandler;

pub struct StringMessage(String);

Expand Down Expand Up @@ -134,7 +134,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebsocketActor {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
let manager_requests: Vec<Request> = match serde_json::from_str(&text) {
let manager_requests: Vec<crate::ModuleType> = match serde_json::from_str(&text) {
Ok(requests) => requests,
Err(err) => match serde_json::from_str(&text) {
Ok(request) => vec![request],
Expand All @@ -147,27 +147,31 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebsocketActor {
};

for request in manager_requests {
let manager_handler = self.manager_handler.clone();

let future =
async move { manager_handler.send(request).await }.into_actor(self);

future
.then(|res, _, ctx| {
match &res {
Ok(result) => {
crate::server::protocols::v1::websocket::send_to_websockets(
json!(result),
None,
);
}
Err(err) => {
ctx.text(serde_json::to_string_pretty(err).unwrap());
}
}
fut::ready(())
})
.wait(ctx);
match request {
crate::ModuleType::DeviceManager(request) => {
let manager_handler = self.manager_handler.clone();

let future =
async move { manager_handler.send(request).await }.into_actor(self);

future
.then(|res, _, ctx| {
match &res {
Ok(result) => {
crate::server::protocols::v1::websocket::send_to_websockets(
json!(result),
None,
);
}
Err(err) => {
ctx.text(serde_json::to_string_pretty(err).unwrap());
}
}
fut::ready(())
})
.wait(ctx);
}
}
}
}
Ok(ws::Message::Close(msg)) => ctx.close(msg),
Expand All @@ -191,7 +195,9 @@ pub async fn websocket(
let device_number = query.into_inner().device_number;

if let Some(device_number) = device_number {
let request = crate::device::manager::Request::Info(device_number);
let request = crate::device::manager::Request::Info(crate::device::manager::UuidWrapper {
uuid: device_number,
});
match manager_handler.send(request).await {
Ok(response) => {
info!(
Expand Down