Skip to content

Commit

Permalink
feat: move websocket to separate worker
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Jul 1, 2024
1 parent b62f222 commit bf487d6
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 55 deletions.
59 changes: 6 additions & 53 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{get_memory_bytes, Args};
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use flume::{unbounded, Receiver, Sender};
use futures_util::{future::try_join3, StreamExt};
use flume::Sender;
use futures_util::future::try_join3;
use log::{error, info, warn};
use reqwest::Url;
use std::{
path::Path,
process::{Output, Stdio},
Expand All @@ -14,10 +13,9 @@ use tokio::{
fs,
io::{AsyncBufReadExt, AsyncRead, BufReader},
process::Command,
select,
time::sleep,
};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tokio_tungstenite::tungstenite::Message;

async fn get_output_logged(
cmd: &str,
Expand Down Expand Up @@ -380,7 +378,7 @@ async fn build(
Ok(result)
}

async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
async fn build_worker_inner(args: &Args, tx: Sender<Message>) -> anyhow::Result<()> {
let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");

Expand All @@ -401,31 +399,6 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
logical_cores: num_cpus::get() as i32,
};

// wss://hostname/api/ws/worker/:hostname
let ws = Url::parse(&args.server.replace("http", "ws"))?
.join("api/")?
.join("ws/")?
.join("worker/")?
.join(&hostname)?;

let (tx, rx) = unbounded();

select! { res = poll_server(client, args, req, tree_path, tx) => {
warn!("{res:?}");
res
}, res = websocket_connect(rx, ws) => {
warn!("{res:?}");
res
} }
}

async fn poll_server(
client: reqwest::Client,
args: &Args,
req: WorkerPollRequest,
tree_path: std::path::PathBuf,
tx: Sender<Message>,
) -> Result<(), anyhow::Error> {
loop {
if let Some(job) = client
.post(format!("{}/api/worker/poll", args.server))
Expand Down Expand Up @@ -467,32 +440,12 @@ async fn poll_server(
}
}

pub async fn build_worker(args: Args) -> ! {
pub async fn build_worker(args: Args, tx: Sender<Message>) -> ! {
loop {
info!("Starting build worker");
if let Err(err) = build_worker_inner(&args).await {
if let Err(err) = build_worker_inner(&args, tx.clone()).await {
warn!("Got error running heartbeat worker: {}", err);
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

pub async fn websocket_connect(rx: Receiver<Message>, ws: Url) -> anyhow::Result<()> {
loop {
info!("Starting websocket connect to {:?}", ws);
match connect_async(ws.as_str()).await {
Ok((ws_stream, _)) => {
let (write, _) = ws_stream.split();
let rx = rx.clone().into_stream();
if let Err(e) = rx.map(Ok).forward(write).await {
warn!("{e}");
}
}
Err(err) => {
warn!("Got error connecting to websocket: {}", err);
}
}

tokio::time::sleep(Duration::from_secs(5)).await;
}
}
1 change: 1 addition & 0 deletions worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use sysinfo::System;

pub mod build;
pub mod heartbeat;
pub mod websocket;

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
Expand Down
7 changes: 5 additions & 2 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use clap::Parser;
use log::info;
use sysinfo::System;
use worker::{build::build_worker, heartbeat::heartbeat_worker, Args};
use worker::{build::build_worker, heartbeat::heartbeat_worker, websocket::websocket_worker, Args};
use flume::{unbounded};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -14,7 +15,9 @@ async fn main() -> anyhow::Result<()> {
let mut s = System::new();
s.refresh_memory();

let (tx, rx) = unbounded();
tokio::spawn(websocket_worker(args.clone(), rx));
tokio::spawn(heartbeat_worker(args.clone()));
build_worker(args.clone()).await;
build_worker(args.clone(), tx).await;
Ok(())
}
35 changes: 35 additions & 0 deletions worker/src/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::Args;
use flume::Receiver;
use futures_util::StreamExt;
use log::{info, warn};
use reqwest::Url;
use std::time::Duration;
use tokio_tungstenite::{connect_async, tungstenite::Message};

pub async fn websocket_worker(args: Args, rx: Receiver<Message>) -> anyhow::Result<()> {
// wss://hostname/api/ws/worker/:hostname
let hostname = gethostname::gethostname().to_string_lossy().to_string();
let ws = Url::parse(&args.server.replace("http", "ws"))?
.join("api/")?
.join("ws/")?
.join("worker/")?
.join(&hostname)?;

loop {
info!("Starting websocket connect to {:?}", ws);
match connect_async(ws.as_str()).await {
Ok((ws_stream, _)) => {
let (write, _) = ws_stream.split();
let rx = rx.clone().into_stream();
if let Err(e) = rx.map(Ok).forward(write).await {
warn!("Failed to forward message to websocket: {e}");
}
}
Err(err) => {
warn!("Got error connecting to websocket: {}", err);
}
}

tokio::time::sleep(Duration::from_secs(5)).await;
}
}

0 comments on commit bf487d6

Please sign in to comment.