Skip to content

Commit

Permalink
feat: replay last 1000 lines of logs
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Jun 19, 2024
1 parent caeb8d8 commit fb713ff
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 32 deletions.
6 changes: 3 additions & 3 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use server::recycler::recycler_worker;
use server::routes::{
dashboard_status, job_info, job_list, job_restart, ping, pipeline_info, pipeline_list,
pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, ws_viewer_handler,
ws_worker_handler, AppState, ViewerMap,
ws_worker_handler, AppState, WSStateMap,
};
use server::routes::{pipeline_new, worker_heartbeat};
use server::routes::{pipeline_status, worker_status};
use server::{DbPool, RemoteAddr, ARGS};
use std::collections::HashMap;
use std::os::unix::fs::PermissionsExt;
use std::sync::RwLock;
use std::sync::Mutex;
use teloxide::prelude::*;
use tower::Service;
use tower_http::cors::{Any, CorsLayer};
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn main() -> anyhow::Result<()> {
let state = AppState {
pool: pool.clone(),
bot,
ws_viewer_map: ViewerMap::new(RwLock::new(HashMap::new())),
ws_state_map: WSStateMap::new(Mutex::new(HashMap::new())),
};

let mut app = Router::new()
Expand Down
16 changes: 11 additions & 5 deletions server/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl};
use futures::channel::mpsc::UnboundedSender;
use serde::Serialize;
use std::{
collections::{BTreeMap, HashMap},
sync::{Arc, RwLock},
collections::{BTreeMap, HashMap, VecDeque},
sync::{Arc, Mutex},
};

use teloxide::prelude::*;
Expand All @@ -37,14 +37,20 @@ pub struct Viewer {
sender: UnboundedSender<axum::extract::ws::Message>,
}

// map from hostname to viewer
pub type ViewerMap = Arc<RwLock<HashMap<String, Vec<Arc<Viewer>>>>>;
#[derive(Default)]
pub struct WSState {
last_logs: VecDeque<axum::extract::ws::Message>,
viewers: Vec<Arc<Viewer>>,
}

// map from hostname to ws state
pub type WSStateMap = Arc<Mutex<HashMap<String, WSState>>>;

#[derive(Clone)]
pub struct AppState {
pub pool: DbPool,
pub bot: Option<Bot>,
pub ws_viewer_map: ViewerMap,
pub ws_state_map: WSStateMap,
}

// learned from https://github.com/tokio-rs/axum/blob/main/examples/anyhow-error-response/src/main.rs
Expand Down
58 changes: 34 additions & 24 deletions server/src/routes/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::sync::Arc;

use super::{AppState, ViewerMap};
use super::{AppState, WSStateMap};
use crate::{routes::Viewer, RemoteAddr};
use axum::{
extract::{ws::WebSocket, ConnectInfo, Path, State, WebSocketUpgrade},
response::IntoResponse,
};
use futures::{channel::mpsc::unbounded, future, StreamExt, TryStreamExt};
use futures::{channel::mpsc::unbounded, future, SinkExt, StreamExt, TryStreamExt};
use std::sync::Arc;
use tracing::info;

pub async fn ws_worker_handler(
Expand All @@ -15,14 +14,14 @@ pub async fn ws_worker_handler(
ConnectInfo(addr): ConnectInfo<RemoteAddr>,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_worker_socket(socket, addr, hostname, state.ws_viewer_map))
ws.on_upgrade(move |socket| handle_worker_socket(socket, addr, hostname, state.ws_state_map))
}

async fn handle_worker_socket(
socket: WebSocket,
who: RemoteAddr,
hostname: String,
viewer_map: ViewerMap,
state_map: WSStateMap,
) {
info!("{:?} connected as worker with hostname {}", who, hostname);

Expand All @@ -32,10 +31,17 @@ async fn handle_worker_socket(
if let Err(err) = incoming
.try_for_each(|msg| {
// We want to broadcast the message to viewers subscribing to the hostname
if let Some(viewers) = viewer_map.read().unwrap().get(&hostname) {
for recp in viewers {
let mut map = state_map.lock().unwrap();
if let Some(state) = map.get_mut(&hostname) {
for recp in &state.viewers {
recp.sender.unbounded_send(msg.clone()).unwrap();
}

// save last 1000 entries
state.last_logs.push_back(msg.clone());
if state.last_logs.len() > 1000 {
state.last_logs.pop_front();
}
}

future::ok(())
Expand All @@ -60,31 +66,38 @@ pub async fn ws_viewer_handler(
ConnectInfo(addr): ConnectInfo<RemoteAddr>,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_viewer_socket(socket, addr, hostname, state.ws_viewer_map))
ws.on_upgrade(move |socket| handle_viewer_socket(socket, addr, hostname, state.ws_state_map))
}

async fn handle_viewer_socket(
socket: WebSocket,
who: RemoteAddr,
hostname: String,
viewer_map: ViewerMap,
state_map: WSStateMap,
) {
let (tx, rx) = unbounded();
info!("{:?} connected as viewer with hostname {}", who, hostname);
let (mut outgoing, _incoming) = socket.split();

// register our tx to ViewerMap
// register our tx to WSStateMap
// and return latest logs
let viewer = Arc::new(Viewer {
remote_addr: who.clone(),
sender: tx,
});
viewer_map
.write()
.unwrap()
.entry(hostname.clone())
.or_default()
.push(viewer.clone());
let msgs = {
let mut map = state_map.lock().unwrap();
let state = map.entry(hostname.clone()).or_default();

state.viewers.push(viewer.clone());

// collect last logs
state.last_logs.clone()
};
for msg in msgs {
outgoing.send(msg).await.ok();
}

let (outgoing, _incoming) = socket.split();
// forward rx to websocket
if let Err(err) = rx.map(Ok).forward(outgoing).await {
info!(
Expand All @@ -99,10 +112,7 @@ async fn handle_viewer_socket(
);

// remove from viewer map
viewer_map
.write()
.unwrap()
.entry(hostname.clone())
.or_default()
.retain(|v| !Arc::ptr_eq(v, &viewer));
let mut map = state_map.lock().unwrap();
let state = map.entry(hostname.clone()).or_default();
state.viewers.retain(|v| !Arc::ptr_eq(v, &viewer));
}

0 comments on commit fb713ff

Please sign in to comment.